Processing Cassandra Data Efficiently With Apache Spark
Spark RDDs may have an element called Partitioner, which are used in shuffles and co-grouping events (join , group by, reduce by etc). When it is not specified, spark usually repartition the data using HashPartitioner. A repartition requires taking every value and moving it to partition specified by the given partitioner.
Cassandra also has the notion of Partitioner. In primary key of Cassandra table, first column is Partitioning Column. When data is inserted into Cassandra table, Cassandra Partitioner generates the token out of Partitioning Column, and distribute the records in Cassandra nodes in ring, based on the value of token ranges.
Spark Cassandra Connector has CassandraPartitioner for specific RDDs, when they are keyed using keyBy. While Querying the Cassandra table, if keyBy clause is included, it generates CassandraTableScanRDD.
CREATE KEYSPACE doc_example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TABLE doc_example.purchases (userid int, purchaseid int, objectid text, amount int, PRIMARY KEY (userid, purchaseid, objectid));
CREATE TABLE doc_example.users (userid int PRIMARY KEY, name text,zipcode int);
import com.datastax.spark.connector._
val ks = "doc_example"
val rdd = { sc.cassandraTable[(String, Int)](ks, "users").select("name" as "_1", "zipcode" as "_2", "userid").keyBy[Tuple1[Int]]("userid")}
rdd.partitioner
//res3: Option[org.apache.spark.Partitioner] = Some(com.datastax.spark.connector.rdd.partitioner.CassandraPartitioner@94515d3e)
In above example, when used keyBy clause, it generated the Partitioner it self, hence no more shuffle will happen while doing co-grouping (Join & GroupBy) using this data set.
Querying the table without specifying the keyBy clause, will not generate the partitioner,
import com.datastax.spark.connector._
val ks = "doc_example"
val rdd1 = sc.cassandraTable[(Int,Int)](ks, "users")
rdd1.partitioner
//res8: Option[org.apache.spark.Partitioner] = None
Hence shuffle will happen while performing co-grouping operations join, groupBy etc. Shuffle is expensive operation, which takes at least 70% of total time in best case.
This can only be applied to RDDs that are keyed with their partition key, so it can not be used to group or reduce on a generic Cassandra column. As long as you are grouping/reducing within a Cassandra partition this approach should save significant time.
When joining two RDDs they are both required to be partitioned before the join can occur. Previously this meant that when joining against a Cassandra, both the Cassandra RDD and the RDD to be joined would need to be shuffled. Now if the Cassandra RDD is keyed on the partition key you can join without the Cassandra RDD being shuffled.
One area that can benefit greatly from the Partitioner is joins between Cassandra tables on a common partition key. This previously would requiring both RDDs but now requires no shuffles at all. To use this both RDDs must have the same partitioner. A helper method applyPartitionerFrom has been added to make it easy to share a partitioner between Cassandra RDDs.