1 year ago

#360524

test-img

Alex Woolford

Perform streaming aggregations on CDC events that contain deletes/updates

A friend reached out and told me that he'd built a data pipeline that captures changes from MySQL using the Debezium connector, and then performs streaming aggregations on the events using ksqlDB.

aggregations in ksqlDB

At first glance, it appeared to work well. He added records to MySQL and could see the totals increase in ksqlDB. That's the very simplest scenario: where records are only created. Unfortunately, his ksqlDB aggregation didn't handle updates or deletes.

We thought ksqlDB might be a simpler solution, but COALESCE (and various other functions) don't seem to work with structs. For example:

ksql> CREATE STREAM ORDER_LINE_DEBEZIUM WITH (KAFKA_TOPIC='order_line_debezium', VALUE_FORMAT='AVRO');

ksql> SELECT
>  BEFORE -> ID AS BEFORE_ID,
>  AFTER->ID AS AFTER_ID,
>  COALESCE(BEFORE -> ID, AFTER->ID) AS ID
>FROM ORDER_LINE_DEBEZIUM EMIT CHANGES;
+-------------------+-------------------+-------------------+
|BEFORE_ID          |AFTER_ID           |ID                 |
+-------------------+-------------------+-------------------+
|null               |13363              |null               |
|null               |13364              |null               |
|null               |13365              |null               |

COALESCE should take the first non-null value. The AFTER -> ID property is not null, and yet the COALESCE function returned null.

I wrote a quick & dirty Kafka Streams job:

aggregations in Kafka Streams

Here's the gist of it:

import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.woolford.avro.OrderAdjustment;
import io.woolford.avro.OrderId;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class OrderAggregator {

    final Logger LOG = LoggerFactory.getLogger(OrderAggregator.class);

    void run() throws IOException {
        // load properties
        Properties props = new Properties();
        InputStream input = OrderAggregator.class.getClassLoader().getResourceAsStream("config.properties");
        props.load(input);

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<GenericRecord, GenericRecord> orderLineStream = builder.stream("order_line_debezium");

        orderLineStream
                .filter((key, value) -> value != null)
                .map((key, value) -> {

                    Object op = String.valueOf(value.get("op")); // d, c, u (and r - if snapshot)

                    Object before = value.get("before");
                    Object after = value.get("after");

                    OrderAdjustment orderAdjustment = new OrderAdjustment();

                    Long id = null;
                    Long orderId = null;

                    if (op.equals("d")){
                        // delete
                        id = (Long) ((GenericData.Record) before).get("id");
                        orderAdjustment.setId(id);

                        orderId = (Long) ((GenericData.Record) before).get("order_id");
                        orderAdjustment.setOrderId(orderId);

                        int beforeQuantity = (int) ((GenericData.Record) before).get("quantity");
                        orderAdjustment.setQuantity(-beforeQuantity);
                    } else if (op.equals("u")) {
                        // update
                        id = (Long) ((GenericData.Record) before).get("id");
                        orderAdjustment.setId(id);

                        orderId = (Long) ((GenericData.Record) before).get("order_id");
                        orderAdjustment.setOrderId(orderId);

                        int beforeQuantity = (int) ((GenericData.Record) before).get("quantity");
                        int afterQuantity = (int) ((GenericData.Record) after).get("quantity");
                        orderAdjustment.setQuantity(afterQuantity - beforeQuantity);
                    } else {
                        // create (c) or read (r) - happens on snapshot)
                        id = (Long) ((GenericData.Record) after).get("id");
                        orderAdjustment.setId(id);

                        orderId = (Long) ((GenericData.Record) after).get("order_id");
                        orderAdjustment.setOrderId(orderId);

                        int quantity = (int) ((GenericData.Record) after).get("quantity");
                        orderAdjustment.setQuantity(quantity);
                    }

                    LOG.info("orderAdjustment: " + orderAdjustment);

                    OrderId orderIdAvro = new OrderId();
                    orderIdAvro.setOrderId(orderId);

                    return new KeyValue<>(orderIdAvro, orderAdjustment);

                }).to("order_line_adjustment");


        // run it
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        streams.cleanUp();
        streams.start();

        // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }

}

The "full-fidelity" Debezium events (i.e. not unwrapped) contain the before and after states. This means each message contains everything we need to calculate the changes. Since the Streams job is stateless, perhaps it'd make more sense to do this transformation in an SMT. If he uses Kafka Streams to do this, the Streams job would need to be monitored and maintained.

Side note: because he'll be aggregating a stream of changes, it's important to enable exactly-once semantics. I mention this in case someone reads this in the future, forgets to turn it on, and eventually experiences inflated aggregates and doesn't know why. ;)

Capturing CDC events and performing real-time aggregations seems like it'd be a very popular use-case. I'm wondering if anyone in the community has examples of this that don't entail writing custom Kafka Streams applications or SMTs to do this.

Does anyone have guidance/thoughts/recommendations? A config-only solution would be amazing.

java

apache-kafka

apache-kafka-streams

debezium

ksqldb

0 Answers

Your Answer

Accepted video resources