1 year ago

#371994

test-img

John

How to specify CREATE TABLE in Flink SQL when receiving data stream of non-primitive types (using PyFlink)?

A Flink SQL application receives data from an AWS Kinesis Data Stream, where the received messages are in JSON and where the schema is expressed in JSON Schema and which contains a property which is not a primitive object, for example:

{
  "$id": "https://example.com/schemas/customer",
  "$schema": "https://json-schema.org/draft/2020-12/schema",

  "type": "object",
  "properties": {
    "first_name": { "type": "string" },
    "last_name": { "type": "string" },
    "shipping_address": { "$ref": "/schemas/address" },
    "billing_address": { "$ref": "/schemas/address" }
  },
  "required": ["first_name", "last_name", "shipping_address", "billing_address"],

  "$defs": {
    "address": {
      "$id": "/schemas/address",
      "$schema": "http://json-schema.org/draft-07/schema#",

      "type": "object",
      "properties": {
        "street_address": { "type": "string" },
        "city": { "type": "string" },
        "state": { "$ref": "#/definitions/state" }
      },
      "required": ["street_address", "city", "state"],

      "definitions": {
        "state": { "enum": ["CA", "NY", "... etc ..."] }
      }
    }
  }
}

I can see in the documentation that:

Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog or referenced in a CREATE TABLE DDL.

So if I cannot use CREATE TABLE in order to create an input table representing the stream of data my application is receiving, how should I handle the stream of data? Can I even use Flink SQL at all?

NOTE: I need to write my application in Python.

python

apache-flink

flink-streaming

flink-sql

pyflink

0 Answers

Your Answer

Accepted video resources