1 year ago

#355794

test-img

DarioB

why creating a beam dataframe from beam.rows do not work but it does from beam.select

I have simplified my problem with the following two cases:

case 1:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe

input_file = "gs://bucket/inputfile.parquet"

p = beam.Pipeline()

updates = (p
           | 'Read updates file %s' % (input_file) >> beam.io.ReadFromParquet(input_file)
           | 'To rows' >> beam.Map(lambda row: beam.Row(**row))
           )

df = to_dataframe(updates)
size = df.groupby("year").size()

p.run()

case 2:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe

input_file = "gs://bucket/inputfile.parquet"

p = beam.Pipeline()

updates = (p
           | 'Read updates file %s' % (input_file) >> beam.io.ReadFromParquet(input_file)
           | 'To rows' >> beam.Select(id=lambda item: item["id"], area=lambda item: item["area"], year=lambda item: item["year"])
           )

df = to_dataframe(updates)
size = df.groupby("year").size()

p.run()

The only difference is how i create the rows. Why the second case works but the first one doesn't ???
According to this, they should be equivalent.

python

apache-beam

apache-beam-io

0 Answers

Your Answer

Accepted video resources