1 year ago
#365225
Hongbo Miao
How to create an output stream (changelog) based on a table in KSQL correctly?
Step 1: Create table
I currently have a table in KSQL which created by
CREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
At this point, it created a new table. I can view it by
SELECT *
FROM cdc_window_table
EMIT CHANGES;
which returns data like
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1 |1648767460000 |1648767480000 |1 |
|a1 |1648767460000 |1648767480000 |2 |
|a1 |1648767460000 |1648767480000 |3 |
|a1 |1648767480000 |1648767500000 |1 |
|a1 |1648767740000 |1648767760000 |1 |
Step 2: Create output stream (changelog) - FAILED
I am trying to create an output stream (changelog) based on this table like this image:
(Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
After reading this, I tried these 4 methods:
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
application_id_count INT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='TUMBLING',
WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
When I view by
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
It only shows table header without any changelog data coming:
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
What would be correct way to create an output stream (changelog) based on a table?
apache-kafka
ksqldb
0 Answers
Your Answer