1 year ago
#121852

Murli Krishnan
Apache Beam - Multiple Pcollection - Dataframetransform Issue
I am running a below sample in apache beam
import apache_beam as beam
from apache_beam import Row
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam.dataframe.transforms import DataframeTransform
import logging
import argparse
import sys
import pandas
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Just a Dummy Dataframe Transform Function, Ignore the logic
def transformdf(a, b):
a["addr"] = "addr-common"
return a
p = beam.Pipeline(options=pipeline_options)
# Schema Aware Pcollection
data1 = [Row(id=1, name="abc"), Row(id=2, name="def"), Row(id=3, name="ghi")]
pcol1 = (p | "Create1" >> beam.Create(data1))
data2 = [Row(addr="addr1"), Row(addr="addr2"), Row(addr="addr3")]
pcol2 = (p | "Create2" >> beam.Create(data2))
pcol = ({"a":pcol1, "b":pcol2} | "TransformedDF" >> DataframeTransform(transformdf))
# The above throws issue with duplicate label error
pcol | "Map" >> beam.Map(lambda row: {"id":row.id, "name":row.name, "addr":row.addr}) | "Print" >> beam.Map(print)
p.run().wait_until_finish()
The code errors out with the error `
RuntimeError: A transform with label "TransformedDF/BatchElements(pc)" already exists in the pipeline `
The syntax and usage seems correct as per the link https://beam.apache.org/documentation/dsls/dataframes/overview/#embedding-dataframes-in-a-pipeline
output = {"a":pcol1, "b":pcol2"} | DataframeTransform(lambda/function)
I am currently using apache beam 2.35.0 Is this issue with Python SDK?
google-cloud-dataflow
apache-beam
apache-beam-internals
0 Answers
Your Answer