1 year ago

#372144

test-img

Gloripaxis

Empty result after left-joining two streaming DataFrames and then aggregating them in PySpark

Data description

I have two CSV files with logs - one contains Span logs, and the other contains Error logs (Elastic APM logs, for those who are acquainted). The relevant span fields are timestamp, id, name and duration, while the relevant error fields are error_timestamp, error_id and span_id. The span_id field in error logs is a foreign key referencing the id field in span logs, thus defining a one-to-many relationship between spans and errors.

NOTE: I know I shouldn't use the Streaming API if I already have CSV files, but my task is to research Spark Structured Streaming. Connecting to the actual (production) streaming source is not an option for the research phase, so I created a Python script which simulates a streaming source by writing span logs to port 9999 and error logs to port 9998, in a mutually chronological order.

The task

I want to do the following:

  1. Join span and error Streaming DataFrames using a left outer join
  2. Group the resulting DataFrame by:
    a. A non-overlapping time-window of 30 seconds, windowing over the span.timestamp column
    b. The span name
  3. Aggregate the groups, calculating:
    a. average value of span.duration
    b. number of distinct spans in group (span.id)
    c. number of distinct errors in group (error.error_id)

What I tried

The server simulating a streaming source certainly works correctly - I tried processing both streams without joining them, and everything worked great.

I've read the official documentation for Spark Structured Streaming, and I think I have mostly understood the principles behind stream-stream outer joins, the purpose of watermarking, as well as the aggregation rules. This is my code so far:

from pyspark.sql import functions as F

# ...
# initialization of spans_df and errors_df dataframes
# ...

wm_spans = spans_df.withWatermark("timestamp", "60 seconds")
wm_errors = errors_df.withWatermark("timestamp", "60 seconds")

aggregation = wm_spans.join(
        wm_errors,
        F.expr(
            """
            id = span_id AND
            error_timestamp >= timestamp AND
            error_timestamp <= timestamp + interval 30 seconds
            """
        ), 
        "leftOuter"
    ).groupBy(
        F.window("timestamp", "30 seconds", "30 seconds"), 
        "name"
    ).agg(
        F.avg(F.col('duration')).alias('davg'),
        F.approx_count_distinct(F.col('id')).alias('nSps'),
        F.approx_count_distinct(F.col('error_id')).alias('nErr'),
    )

query = aggregation.writeStream\
    .outputMode("append")\
    .format("console")\
    .start()

Expected output

Based on the chapter about watermarks in windowed aggregations and the chapter about outer joins with watermarking in the official Spark Structured Streaming documentation, I would expect this code to produce a row for each (30s time window, distinct span name) pair. The number of errors for each such row could be 0 (if no error was linked to any of the spans in the given time window).

The problem

When I ran this for the first time, Spark gave me the following error:

ERROR UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global 
watermark. The query contains stateful operation which can emit rows older than the current 
watermark plus allowed late record delay, which are "late rows" in downstream stateful operations 
and these rows can be discarded. Please refer the programming guide doc for more details. 
If you understand the possible risk of correctness issue and still need to run the query, 
you can disable this check by setting the config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

So I set the specified config to false, which turned this ERROR message into a WARN message. However, the resulting tables are empty. I am aware this is most likely due to the correctness issue mentioned in the warning above, but I don't have any idea how I could fix/avoid the issue, while still accomplishing my goal.

I tried many different combinations of

  1. Watermarking period (currently 60 seconds for both dataframes)
  2. Join condition period (currently 30 seconds: error_timestamp <= timestamp + interval 30 seconds)
  3. Windowing period (currently 30 seconds length, with a 30 seconds interval)

NOTE: Not every batch gives an empty result. The starting timestamp is 2022-01-31T22:00:00. There are thousands of span logs between 22:00:00 and 22:05:00, but only 9 error logs, with all 9 of them between 22:04:22 and 22:04:43. Thus, they fall into two 30s time windows: 22:04:00-22:04:30 and 22:04:30-22:05:00. The batches that are processing this part of the dataset actually result in a seemingly successful aggregation:

-------------------------------------------------
Batch: 26
-------------------------------------------------
+-----------------------+------------------------+-------...
|start                  |name                    |dmin   ...
+-----------------------+------------------------+-------...
|2022-01-30 22:04:00    |<one of the span names> |3251.0 ...
+-----------------------+------------------------+-------...

I find this result surprising - using left outer join should ensure that all rows from the left table are kept, possibly being extended by NULL columns if no match is found in the right table. However, I get non-empty results only when there is a match in the right table.

Any help will be appreciated! If I forgot to state some relevant information, please do tell and I'll add it ASAP - I am not exactly accustomed to writing SO questions, so please keep that in mind.

Related questions

pyspark

left-join

aggregate

spark-streaming

watermark

0 Answers

Your Answer

Accepted video resources