Things You Should Be Doing When Using Cassandra Drivers

Hi folks, a couple weeks back I gave a webinar on what I consider to be a pretty important topic, “Things You Should Be Doing When Using Cassandra Drivers”. I was really please and surprised by the amount of great questions I got following that webinar. I’m doing this blog post to address a lot of those questions, since there are probably others wondering about these as well.

1. Is a prepared statement bound on one session or is it useable on another session?

 A prepared statement is derived from a particular session instance. So when you prepare a statement and it is sent over to the server, it is sent to the cluster with which this session instance is associated with. A session hold  connection to a certain keyspace through a cluster instance, allowing it to be queried. A prepared statement is a method of, and holds the characteristics of, the particular session you created. The prepared statement does not exist independently from the session, therefore you cannot “move it” or use it on another session. You would want to share the same cluster or session instance across your application, as you general have one keyspace per application. This is set at the application level, as part of your cluster configuration, as so:

cluster = Cluster
   .builder()
   .addContactPoint("192.168.0.30")
   .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE)
   .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
   .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))
   .build();
session = cluster.connect("demo");

All queries that use this session will follow the same characteristics of the set cluster configuration. This query will utilize the connection held by the session. Your session is like the tunnel to your cluster

PreparedStatement statement = session.prepare("INSERT INTO users" +"(lastname, age, city, email, firstname)"
+ "VALUES (?,?,?,?,?);");

2. Does the query builder in Java have a way for generating prepared statements?

Has no way of generating prepared statements, but you can use the statement built in query builder to make a prepared statement. Like this:

session.prepare(builtByQueryBuilderStatement);
PreparedStatement statement = session.prepare(QueryBuilder.select().all().from("demo", "users").where(eq("lastname", (?)));)
BoundStatement boundStatement = new BoundStatement(statement);
session.execute(boundStatement.bind("Jones"));

3. How do you get the token ranges using a Cassandra driver?

I would use CQL for this actually. Use the system keyspace and DESCRIBE the system.local table to see the what’s in there. The system keyspace includes a number of tables that contain details about your Cassandra database objects and cluster configuration. The local table (RF=1) holds information a node has about itself and a superset of gossip. Gossip is a peer-to-peer communication protocol for exchanging location and state information between nodes. There this table contains information about the other nodes in the cluster. You can query from the local table to get a list of tokens and host_ids for the cluster.

cqlsh> USE system;

cqlsh:system> desc table local;

CREATE TABLE local (
   key text,
   bootstrapped text,
   cluster_name text,
   cql_version text,
   data_center text,
   gossip_generation int,
   host_id uuid,
   native_protocol_version text,
   partitioner text,
   rack text,
   release_version text,
   schema_version uuid,
   thrift_version text,
   tokens set<text>,
   truncated_at map<uuid, blob>,
   PRIMARY KEY (key)
);

As you can see there are columns for host_id (a node identifier) and tokens

cqlsh:system> select host_id, tokens from local;

will give you the tokens held by each node in the cluster (host_id). Similar information could also be obtained from querying the system.peers table.

 
4. What is the best practice for writing in batches?
 
Batches are the best way to update multiple tables at once. Keep batches small, under 5 kb of data. Remember that batches are for atomicity, not performance optimization. Check out this blog post for more on best practices when using batches.

 5.  Is there a convenient way of mapping between futures returned for the async calls in Scala’s future classes?

 This might become a  feature of the Spark connector; until then, check out these potential options

6. How about batches spanning a keyspace?

 You can create batches that span multiple keyspaces. Here I’ve demonstrated in CQL how to insert rows into different user tables in different keyspaces (demo, demo2, demo3). You need to specify the keyspace and table as part of the query.

BEGIN BATCH
   INSERT INTO demo.users (lastname, age, city, email, firstname)    VALUES
   ('Smith', 46, 'Sacramento', 'john@example.com', 'John');
   INSERT INTO demo2.users (lastname, age, city, email, firstname
   VALUES
   ('Doe', 36, 'Beverly Hills', 'jane@example.com', 'Jane');
   INSERT INTO demo3.users (lastname, age, city, email, firstname)   VALUES
   ('Byrne', 24, 'San Diego', 'rob@example.com', 'Rob');
APPLY BATCH;

 7. Do you gain write performance by using a token aware connection?

Yes, you do. Using token aware policy, you avoid the network hops associated with the client not being aware of the layout of the token ranges associated with each node in the cluster. When the client connects to a node that does not hold the token range for the write, whether as a primary range or replica range, that node then has to coordinate with another replica node to send the write onto it. It’s much more efficient for the client to connect to a replica node from the get-go.

8. Where or when do you set the consistency level in the java driver, is that just set on the one statement or the whole session?

 It can actually be part of either. It can be set as a Cluster attribute, in which case any queries run with session.execute will have that consistency level:

cluster = Cluster.builder().addContactPoint("192.168.0.30")
   .withQueryOptions(new QueryOptions()
   .setConsistencyLevel(ConsistencyLevel.ONE)
   .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
   .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))
   .build();
session = cluster.connect("demo");

 You can also set the consistency level as a part of the session.execute statement itself:

session.execute(new SimpleStatement("INSERT INTO users (lastname, age, city,email, firstname) VALUES ('Jones', 45,'Austin','<a href="mailto:bob@example.com">bob@example.com</a>','Bob')")
.setConsistencyLevel(ConsistencyLevel.ALL));

 9. What can be considered a large batch?

Anything with tens of statements or 5kb of data. Over 5kb is not recommended. Batch hammer one node (the coordinator), the load is not dispersed, and therefore can slow things down if they are very large.

10. Sessions are long-lived instances, however sessions has close method. Should one be closing a session ever?

It’s good housekeeping the close the connection once it’s finished running. Add the instance method, session.close(), to shutdown the cluster once you are finished with it. If there are any async calls, it will wait for them to finish. If you want, you could do a cluster.close() if you are shutting down the program.

 11. What details does a future ResultSet produce? A future isn’t blocking, but when does it block?

 A future ResultSet will give you whatever a regular ResultSet will give you. It waits for a query to return and returns it’s results. Have a look at the API. The real difference between a Resultset and a future Resultset is that the future will not block until it is called, therefore other work can be done in the meantime as the query runs.

12. What about the Spark-Cassandra connecter? Can I use it as a Scala driver?

While the Spark-Cassandra connecter does a lot of things like a driver would, it is in fact not a Scala driver. Datastax has yet you provide a driver for Scala.

13. Can you comment on “IN” vs. multiple parallel queries?

Using an IN clause can degrade performance, because usually many nodes must be queried, while this one query is sent to the coordinator to handle. If you’re using an IN clause with 60 values, the coordinator will block and wait for 60 values to be return, an in the worse case scenario, they would be on 60 different nodes. In addition to high latency, you put a lot of stress on coordinator memory. If you’re using token awareness as part of your cluster configuration on your driver, it’s also one more reason to use multiple queries, as these queries can go directly to the nodes holding a replica. The recommendations about when not to use an index also apply to when not to use an IN with a WHERE clause. Also note that with an IN clause, if there is failure in retrieving one partition, the whole operation will fail. If you can choose to use multiple parallel queries over an IN clause, go for it. Check out this blog post on using asynchronous queries as an equivalent to the SELECT….IN clause.

14. What is the difference between CQL based drivers and Thrift based API?

 CQL is the native protocol, Thrift is an RPC layer. While legacy thrift API exposes the internal storage structure of Cassandra pretty much directly, CQL3 provides a thin abstraction layer over this internal structure. Apache Cassandra is moving away from Thrift, and in fact as of 3.0, Thrift will be frozen. It is believed that CQL3 is a simpler and overall better API for Cassandra than the thrift API is. It hides from the API a number of distracting and useless implementation details and allows to provide native syntaxes for common encodings/idioms.Therefore, new projects/applications are encouraged to use CQL3. All Datastax drivers are CQL based. Learn about Thrift to CQL transitions in this here, here and here.

15. A prepared statement does not perform well in a loop. What is the best way to batch multiple inserts through a prepared statement?

The prepared statement is not put into a for loop, but the the portion of code that binds parameter to the prepared statement can be. In this example, parameter are read from a text file and bound to the prepared statement in loop, each iteration of the loop binds a “row” of data from the text file to the prepared statement:

PreparedStatement p = session.prepare("select log_entry from log_index where                      id = ?");
BoundStatement b = new BoundStatement(statement);
int[] array = new int[]{1,2,3,4,5,6,7,8,9,10};
for (int i = 0; i < array.length; i++){
   session.execute(b.bind(array[i]));
}

16. Can async API be used with the .NET framework 4.0 or higher versions?

 The Task based async API is available in .NET 4.0 and higher, but you obviously won’t have access to the async/await keywords to use with it unless you’re on 4.5.

17. Does load balancing policy from client driver matter if the partition key is used in all queries?

Partition key is being used in all queries regardless, unless you’re doing a SELECT *. Partition key allows us to pinpoint where the data exists on disk. A load balancing policy like Token Aware will make use of this feature. It uses the partition key to go directly to the node the data is located on. In any case, a partition key is used in all queries. The coordinator will use it to discover what node the data in question is located on. The load balancing policy avoids this step and still makes use of the partition key.

18. In .NET, is a Cassandra Row.GetValue<T> the retrieve to retrieve values for each column?

Row.GetValue<T> is the way to get a column’s value.  If you want something to do mapping from rows to POCOs for you, try the CqlPoco package (separate from the driver) or the LINQ provider that’s built in.

19. Is there any support for retry policy with a backoff?

 Yes, there is the DowngradingConsistencyRetryPolicy, and this will retry your read or write at a lower consistency level if it fails the first time. You should use this with caution if you are required to meet a certain level of consistency as part of your business requirements. For example, if your consistency level had been set to quorum on a 3 node cluster, it would go down to ONE. Read more about this consistency level here.

20. I have some difficulties with paging through a row set, is there some examples available?

Prior to Cassandra 2.0, tricks were used to page through a large result set. However, Cassandra 2.0 has auto paging. Instead of using token function to create paging, it is now a built-in feature. Now developers can iterate over the entire result set, without having to care that it’s size is larger the the memory. As the client code iterates over the results, some extra rows can be fetched, while old ones are dropped. Looking at this in Java, note that SELECT statement returns all rows, and the number of rows retrieved is set to 100. I’ve shown a simple statement here, but the same code can be written with a prepared statement, couple with a bound statement. It is possible to disable automatic paging if it is not desired. It is also important to test various fetch size settings, since you will want to keep the memorize small enough, but not so small that too many round-trips to the database are taken. Check out this blog post to see how paging works server side.

Statement stmt = new SimpleStatement("select * FROM raw_weather_data WHERE wsid= '725474:99999' AND year = 2005 AND month = 6");
stmt.setFetchSize(24);
ResultSet rs = session.execute(stmt);
Iterator<Row> iter = rs.iterator();
while (!rs.isFullyFetched()) {
   rs.fetchMoreResults();
   Row row = iter.next();
   System.out.println(row);
}

21. When opening a new connection, should I use one host ip or multiple?

 You should use multiple ip if you can, giving only one ip address from the cluster could result in having a signal point of failure if that particular node goes down. Its good to give the client more options.

22. At what point do parallel statements perform better than/in in the IN clause? And is there a limit on how many keys should be put in the in clause?

See question 13 for more on IN vs. parallel statements. In addition to that, specifically the limit on the number of keys in an IN statement, the most you can have is 65535. But practically speaking you should only be using small numbers of keys in INs, just for performance reasons.

23. Do we have a default retry for connections on a read/write operation?

 There is a default retry.This policy retries queries in only two cases:

  • On a read timeout, if enough replica replied but data was not retrieved.
  • On a write timeout, if we timeout while writing the distributed log used by batch statements.

See the  API.

24. Is there a per-host stats for connections in Cassandra?

Cassandra exposes a number of statistics and management operations via Java Management Extensions (JMX). Java Management Extensions (JMX) is a Java technology that supplies tools for managing and monitoring Java applications and services. Any statistic or operation that a Java application has exposed as an MBean can then be monitored or manipulated using JMX. Use VisualVM on a running app to see the MBeans. Java VisualVM is a tool that provides a visual interface for viewing detailed information about Java technology-based applications (Java applications) while they are running on a Java Virtual Machine (JVM)

25. How do I get a list of nodes per keyspace?

 The keyspace actually exists within the cluster, and the cluster is made up of a group of node. The keyspace’s data is held by the node in that cluster.

26. What should we do to avoid timeouts in Cassandra connections from low use applications having to reconnect every 5 minutes?

 If you are using AWS and Azure, you may find that your connection gets killed after idling for 1-5 minutes. We are aware of this issue, have a look at these JIRAs here and here to see what we are doing about the problem.

27. Do you suggest a special driver property to use a huge data load? Like loading a few million rows of a thousands bytes each? Is there any tweaks to make a larger set like this work?

There is no “config option” to enable to your driver to deal with huge data loads; this is more of an issue of strategy than anything. Using the features I discussed in my webinar, like prepared statements, token aware load balancing, and executing async queries, you will increase in your overall performance, and in turn make larger data load easier to handle. Be careful with things like batches and IN statements, they are there for atomicity, not performance.

Advertisements

About Rebecca Mills

Biochemist by trade, transitioning to computer engineering.
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s