1 year ago
#335096
Dhinsha Mahesh
AWS Kinesis Data Analytics: PyFlink with nested JSON data
With Kinesis analytics SQL application, we have the option to configure the input Schema. So that we can map the input data to flat structure. So I'm looking for the similar configuration with Kinesis Data Analytics Apache Flink application(Python)
To put it another way, how can we use PyFlink to handle nested JSON data (JSON with arrays; please see the sample data and needed mapping below)?
Sharing the sample data that comes into Analytics application
"data_json": {
"deviceID": "xxxxx",
"sensor": [{
"information": {
"trigger": false
},
"offsetSec": 4.81,
"data": {
"temperature": 37.97,
"illuminance": 0,
"activity": 0,
"humidity": 31.46
}
}],
"MACAddress": "xxxx",
"parentID": "xxxx",
"type": 1
}
}
Expected schema format (one with SQL application: need similar in PyFlink):
RecordColumns:
- Name: "mac_address"
SqlType: "VARCHAR(20)"
Mapping: "data_json.MACAddress"
- Name: "device_key"
SqlType: "VARCHAR(20)"
Mapping: "data_json.deviceID"
- Name: "type"
SqlType: "INT"
Mapping: "data_json.type"
- Name: "parent_id"
SqlType: "VARCHAR(20)"
Mapping: "data_json.parentID"
- Name: "illuminance"
SqlType: "INT"
Mapping: "data_json.sensor[0:].data.illuminance"
- Name: "activity"
SqlType: "INT"
Mapping: "data_json.sensor[0:].data.activity"
- Name: "humidity"
SqlType: "INT"
Mapping: "data_json.sensor[0:].data.humidity"
- Name: "temperature"
SqlType: "INT"
Mapping: "data_json.sensor[0:].data.temperature"
- Name: "offset_sec"
SqlType: "DOUBLE"
Mapping: "data_json.sensor[0:].offsetSec"
- Name: "trigger_info"
SqlType: "INT"
Mapping: "data_json.sensor[0:].information.trigger"
RecordFormat:
RecordFormatType: "JSON"
amazon-web-services
apache-flink
pyflink
amazon-kinesis-analytics
0 Answers
Your Answer