1 year ago
#360524
Alex Woolford
Perform streaming aggregations on CDC events that contain deletes/updates
A friend reached out and told me that he'd built a data pipeline that captures changes from MySQL using the Debezium connector, and then performs streaming aggregations on the events using ksqlDB.
At first glance, it appeared to work well. He added records to MySQL and could see the totals increase in ksqlDB. That's the very simplest scenario: where records are only created. Unfortunately, his ksqlDB aggregation didn't handle updates or deletes.
We thought ksqlDB might be a simpler solution, but COALESCE
(and various other functions) don't seem to work with structs. For example:
ksql> CREATE STREAM ORDER_LINE_DEBEZIUM WITH (KAFKA_TOPIC='order_line_debezium', VALUE_FORMAT='AVRO');
ksql> SELECT
> BEFORE -> ID AS BEFORE_ID,
> AFTER->ID AS AFTER_ID,
> COALESCE(BEFORE -> ID, AFTER->ID) AS ID
>FROM ORDER_LINE_DEBEZIUM EMIT CHANGES;
+-------------------+-------------------+-------------------+
|BEFORE_ID |AFTER_ID |ID |
+-------------------+-------------------+-------------------+
|null |13363 |null |
|null |13364 |null |
|null |13365 |null |
COALESCE
should take the first non-null value. The AFTER -> ID
property is not null, and yet the COALESCE
function returned null
.
I wrote a quick & dirty Kafka Streams job:
Here's the gist of it:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.woolford.avro.OrderAdjustment;
import io.woolford.avro.OrderId;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class OrderAggregator {
final Logger LOG = LoggerFactory.getLogger(OrderAggregator.class);
void run() throws IOException {
// load properties
Properties props = new Properties();
InputStream input = OrderAggregator.class.getClassLoader().getResourceAsStream("config.properties");
props.load(input);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<GenericRecord, GenericRecord> orderLineStream = builder.stream("order_line_debezium");
orderLineStream
.filter((key, value) -> value != null)
.map((key, value) -> {
Object op = String.valueOf(value.get("op")); // d, c, u (and r - if snapshot)
Object before = value.get("before");
Object after = value.get("after");
OrderAdjustment orderAdjustment = new OrderAdjustment();
Long id = null;
Long orderId = null;
if (op.equals("d")){
// delete
id = (Long) ((GenericData.Record) before).get("id");
orderAdjustment.setId(id);
orderId = (Long) ((GenericData.Record) before).get("order_id");
orderAdjustment.setOrderId(orderId);
int beforeQuantity = (int) ((GenericData.Record) before).get("quantity");
orderAdjustment.setQuantity(-beforeQuantity);
} else if (op.equals("u")) {
// update
id = (Long) ((GenericData.Record) before).get("id");
orderAdjustment.setId(id);
orderId = (Long) ((GenericData.Record) before).get("order_id");
orderAdjustment.setOrderId(orderId);
int beforeQuantity = (int) ((GenericData.Record) before).get("quantity");
int afterQuantity = (int) ((GenericData.Record) after).get("quantity");
orderAdjustment.setQuantity(afterQuantity - beforeQuantity);
} else {
// create (c) or read (r) - happens on snapshot)
id = (Long) ((GenericData.Record) after).get("id");
orderAdjustment.setId(id);
orderId = (Long) ((GenericData.Record) after).get("order_id");
orderAdjustment.setOrderId(orderId);
int quantity = (int) ((GenericData.Record) after).get("quantity");
orderAdjustment.setQuantity(quantity);
}
LOG.info("orderAdjustment: " + orderAdjustment);
OrderId orderIdAvro = new OrderId();
orderIdAvro.setOrderId(orderId);
return new KeyValue<>(orderIdAvro, orderAdjustment);
}).to("order_line_adjustment");
// run it
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.cleanUp();
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
The "full-fidelity" Debezium events (i.e. not unwrapped) contain the before
and after
states. This means each message contains everything we need to calculate the changes. Since the Streams job is stateless, perhaps it'd make more sense to do this transformation in an SMT. If he uses Kafka Streams to do this, the Streams job would need to be monitored and maintained.
Side note: because he'll be aggregating a stream of changes, it's important to enable exactly-once semantics. I mention this in case someone reads this in the future, forgets to turn it on, and eventually experiences inflated aggregates and doesn't know why. ;)
Capturing CDC events and performing real-time aggregations seems like it'd be a very popular use-case. I'm wondering if anyone in the community has examples of this that don't entail writing custom Kafka Streams applications or SMTs to do this.
Does anyone have guidance/thoughts/recommendations? A config-only solution would be amazing.
java
apache-kafka
apache-kafka-streams
debezium
ksqldb
0 Answers
Your Answer