1 year ago
#371994
John
How to specify CREATE TABLE in Flink SQL when receiving data stream of non-primitive types (using PyFlink)?
A Flink SQL application receives data from an AWS Kinesis Data Stream, where the received messages are in JSON and where the schema is expressed in JSON Schema and which contains a property which is not a primitive object, for example:
{
"$id": "https://example.com/schemas/customer",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"properties": {
"first_name": { "type": "string" },
"last_name": { "type": "string" },
"shipping_address": { "$ref": "/schemas/address" },
"billing_address": { "$ref": "/schemas/address" }
},
"required": ["first_name", "last_name", "shipping_address", "billing_address"],
"$defs": {
"address": {
"$id": "/schemas/address",
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"street_address": { "type": "string" },
"city": { "type": "string" },
"state": { "$ref": "#/definitions/state" }
},
"required": ["street_address", "city", "state"],
"definitions": {
"state": { "enum": ["CA", "NY", "... etc ..."] }
}
}
}
}
I can see in the documentation that:
Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog or referenced in a CREATE TABLE DDL.
So if I cannot use CREATE TABLE in order to create an input table representing the stream of data my application is receiving, how should I handle the stream of data? Can I even use Flink SQL at all?
NOTE: I need to write my application in Python.
python
apache-flink
flink-streaming
flink-sql
pyflink
0 Answers
Your Answer