1 year ago
#353173
Cleared
Pushing down aggregation to Cassandra when querying from Spark
I've got a Cassandra table looking like this:
my_keyspace.my_table (
part_key_col_1 text,
clust_key_col_1 int,
clust_key_col_2 text,
value_col_1 text
PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)
I'm looking to retrieve tha maximum value of clust_key_col_1
for each part_key_col_1
, where I also want a filter on clust_key_col_1
. In CQL I can achieve this using:
SELECT
part_key_col_1
max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING
Even though I need to use ALLOW FILTERING
the query is super fast, I got roughly 1 000 000 unique part_key_col_1
and for each part_key_col_1
I got less than 5000 unique clust_key_col_1
.
My problem comes when I try to get the same data in Spark using Spark Cassandra Connector. I've tried the following in Spark:
cassandra_df = (
spark.read
.format("org.apache.spark.sql.cassandra")
.options(table='my_table', keyspace='my_keyspace')
.load()
.filter(f.col('clust_key_col_1') < 123)
.groupBy(f.col('part_key_col_1'))
.agg(
f.max('clust_key_col_1')
)
)
But the Physical-plan ends up being:
== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
+- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>
Meaning the filter for clust_key_col_1
gets pushed down to Cassandra, but the grouping and the aggregation does not. Instead all the data (with clust_key_col_1 < 123
) gets loaded into Spark and aggregated in Spark. Can I somehow "push down" the grouping/aggregation to Cassandra and only load the max(clust_key_col_1)
for each part_key_col_1
) to reduce the load on Spark and the network? Right now Spark will load 1 000 000 * 5000 rows instead of 1 000 000 rows.
apache-spark
pyspark
cassandra
spark-cassandra-connector
0 Answers
Your Answer