1 year ago

#353173

test-img

Cleared

Pushing down aggregation to Cassandra when querying from Spark

I've got a Cassandra table looking like this:

my_keyspace.my_table (
    part_key_col_1 text,
    clust_key_col_1 int,
    clust_key_col_2 text,
    value_col_1 text
    PRIMARY KEY (part_key_col_1, clust_key_col_1, clust_key_col_2, value_col_1)

I'm looking to retrieve tha maximum value of clust_key_col_1 for each part_key_col_1, where I also want a filter on clust_key_col_1. In CQL I can achieve this using:

SELECT
    part_key_col_1
    max(clust_key_col_1)
FROM my_table
WHERE clust_key_col_1 < 123
GROUP BY part_key_col_1
ALLOW FILTERING

Even though I need to use ALLOW FILTERING the query is super fast, I got roughly 1 000 000 unique part_key_col_1 and for each part_key_col_1 I got less than 5000 unique clust_key_col_1 .

My problem comes when I try to get the same data in Spark using Spark Cassandra Connector. I've tried the following in Spark:

cassandra_df = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table='my_table', keyspace='my_keyspace')
    .load()
    .filter(f.col('clust_key_col_1') < 123)
    .groupBy(f.col('part_key_col_1'))
    .agg(
        f.max('clust_key_col_1')
    )
)

But the Physical-plan ends up being:

== Physical Plan ==
*(2) HashAggregate(keys=[part_key_col_1#144], functions=[max(clust_key_col_1#145)])
+- Exchange hashpartitioning(part_key_col_1#144, 20)
   +- *(1) HashAggregate(keys=[part_key_col_1#144], functions=[partial_max(clust_key_col_1#145)])
      +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [part_key_col_1#144,clust_key_col_1#145] PushedFilters: [*LessThan(clust_key_col_1,123)], ReadSchema: struct<part_key_col_1:string,clust_key_col_1:int>

Meaning the filter for clust_key_col_1 gets pushed down to Cassandra, but the grouping and the aggregation does not. Instead all the data (with clust_key_col_1 < 123) gets loaded into Spark and aggregated in Spark. Can I somehow "push down" the grouping/aggregation to Cassandra and only load the max(clust_key_col_1) for each part_key_col_1) to reduce the load on Spark and the network? Right now Spark will load 1 000 000 * 5000 rows instead of 1 000 000 rows.

apache-spark

pyspark

cassandra

spark-cassandra-connector

0 Answers

Your Answer

Accepted video resources