2 years ago
#365225
Hongbo Miao
How to create an output stream (changelog) based on a table in KSQL correctly?
Step 1: Create table
I currently have a table in KSQL which created by
CREATE TABLE cdc_window_table
    WITH (KAFKA_TOPIC='cdc_stream',
          VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
At this point, it created a new table. I can view it by
SELECT *
FROM cdc_window_table
EMIT CHANGES;
which returns data like
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART    |WINDOWEND      |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1             |1648767460000  |1648767480000  |1                    |
|a1             |1648767460000  |1648767480000  |2                    |
|a1             |1648767460000  |1648767480000  |3                    |
|a1             |1648767480000  |1648767500000  |1                    |
|a1             |1648767740000  |1648767760000  |1                    |
Step 2: Create output stream (changelog) - FAILED
I am trying to create an output stream (changelog) based on this table like this image:
(Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
After reading this, I tried these 4 methods:
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
                                                 application_id_count INT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='TUMBLING',
         WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');
When I view by
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
It only shows table header without any changelog data coming:
+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+
What would be correct way to create an output stream (changelog) based on a table?
apache-kafka
ksqldb
0 Answers
Your Answer
