1 year ago
#372144
Gloripaxis
Empty result after left-joining two streaming DataFrames and then aggregating them in PySpark
Data description
I have two CSV files with logs - one contains Span logs, and the other contains Error logs (Elastic APM logs, for those who are acquainted). The relevant span fields are timestamp
, id
, name
and duration
, while the relevant error fields are error_timestamp
, error_id
and span_id
. The span_id
field in error logs is a foreign key referencing the id
field in span logs, thus defining a one-to-many relationship between spans and errors.
NOTE: I know I shouldn't use the Streaming API if I already have CSV files, but my task is to research Spark Structured Streaming. Connecting to the actual (production) streaming source is not an option for the research phase, so I created a Python script which simulates a streaming source by writing span logs to port 9999 and error logs to port 9998, in a mutually chronological order.
The task
I want to do the following:
- Join span and error Streaming DataFrames using a left outer join
- Group the resulting DataFrame by:
a. A non-overlapping time-window of 30 seconds, windowing over thespan.timestamp
column
b. The span name - Aggregate the groups, calculating:
a. average value ofspan.duration
b. number of distinct spans in group (span.id
)
c. number of distinct errors in group (error.error_id
)
What I tried
The server simulating a streaming source certainly works correctly - I tried processing both streams without joining them, and everything worked great.
I've read the official documentation for Spark Structured Streaming, and I think I have mostly understood the principles behind stream-stream outer joins, the purpose of watermarking, as well as the aggregation rules. This is my code so far:
from pyspark.sql import functions as F
# ...
# initialization of spans_df and errors_df dataframes
# ...
wm_spans = spans_df.withWatermark("timestamp", "60 seconds")
wm_errors = errors_df.withWatermark("timestamp", "60 seconds")
aggregation = wm_spans.join(
wm_errors,
F.expr(
"""
id = span_id AND
error_timestamp >= timestamp AND
error_timestamp <= timestamp + interval 30 seconds
"""
),
"leftOuter"
).groupBy(
F.window("timestamp", "30 seconds", "30 seconds"),
"name"
).agg(
F.avg(F.col('duration')).alias('davg'),
F.approx_count_distinct(F.col('id')).alias('nSps'),
F.approx_count_distinct(F.col('error_id')).alias('nErr'),
)
query = aggregation.writeStream\
.outputMode("append")\
.format("console")\
.start()
Expected output
Based on the chapter about watermarks in windowed aggregations and the chapter about outer joins with watermarking in the official Spark Structured Streaming documentation, I would expect this code to produce a row for each (30s time window, distinct span name)
pair. The number of errors for each such row could be 0 (if no error was linked to any of the spans in the given time window).
The problem
When I ran this for the first time, Spark gave me the following error:
ERROR UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global
watermark. The query contains stateful operation which can emit rows older than the current
watermark plus allowed late record delay, which are "late rows" in downstream stateful operations
and these rows can be discarded. Please refer the programming guide doc for more details.
If you understand the possible risk of correctness issue and still need to run the query,
you can disable this check by setting the config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
So I set the specified config to false
, which turned this ERROR message into a WARN message. However, the resulting tables are empty. I am aware this is most likely due to the correctness issue mentioned in the warning above, but I don't have any idea how I could fix/avoid the issue, while still accomplishing my goal.
I tried many different combinations of
- Watermarking period (currently 60 seconds for both dataframes)
- Join condition period (currently 30 seconds:
error_timestamp <= timestamp + interval 30 seconds
) - Windowing period (currently 30 seconds length, with a 30 seconds interval)
NOTE: Not every batch gives an empty result. The starting timestamp is 2022-01-31T22:00:00
. There are thousands of span logs between 22:00:00 and 22:05:00, but only 9 error logs, with all 9 of them between 22:04:22 and 22:04:43. Thus, they fall into two 30s time windows: 22:04:00-22:04:30 and 22:04:30-22:05:00. The batches that are processing this part of the dataset actually result in a seemingly successful aggregation:
-------------------------------------------------
Batch: 26
-------------------------------------------------
+-----------------------+------------------------+-------...
|start |name |dmin ...
+-----------------------+------------------------+-------...
|2022-01-30 22:04:00 |<one of the span names> |3251.0 ...
+-----------------------+------------------------+-------...
I find this result surprising - using left outer join should ensure that all rows from the left table are kept, possibly being extended by NULL
columns if no match is found in the right table. However, I get non-empty results only when there is a match in the right table.
Any help will be appreciated! If I forgot to state some relevant information, please do tell and I'll add it ASAP - I am not exactly accustomed to writing SO questions, so please keep that in mind.
Related questions
- How to join two streaming datasets when one dataset involves aggregation - this is somewhat similar, but not helpful because the OP didn't even use watermarking, which is not optional when outer-joining two streaming datasets.
- Outer join two Datasets (not DataFrames) in Spark Structured Streaming - the main difference is that the OP is not performing grouped aggregations after joining the two datasets. Instead, they are just joining them and using them as-is. Furthermore, they are using the Dataset API, whereas I am using the DataFrame API.
- Calculating a moving average column using pyspark structured streaming - again, the OP here didn't use watermarking, but also, they tried to use windowing over a non-time-based column.
pyspark
left-join
aggregate
spark-streaming
watermark
0 Answers
Your Answer