1 year ago
#388171
Pankaj
SparkSQL query using "PARTITION by" giving wrong output
I have a bunch of csv files for which I am using Pyspark for faster processing. However, am a total noob with Spark (Pyspark). So far I have been able to create a RDD, a subsequent data frame and a temporary view (country_name) to easily query the data.
Input Data
+---+--------------------------+-------+--------------------------+-------------------+
|ID |NAME |COUNTRY|ADDRESS |DESCRIPTION |
+---+--------------------------+-------+--------------------------+-------------------+
|1 | |QAT | |INTERIOR DECORATING|
|2 |S&T |QAT |AL WAAB STREET |INTERIOR DECORATING|
|3 | |QAT | |INTERIOR DECORATING|
|4 |THE ROSA BERNAL COLLECTION|QAT | |INTERIOR DECORATING|
|5 | |QAT |AL SADD STREET |INTERIOR DECORATING|
|6 |AL MANA |QAT |SALWA ROAD |INTERIOR DECORATING|
|7 | |QAT |SUHAIM BIN HAMAD STREET |INTERIOR DECORATING|
|8 |INTERTEC |QAT |AL MIRQAB AL JADEED STREET|INTERIOR DECORATING|
|9 | |EGY | |HOTELS |
|10 | |EGY |QASIM STREET |HOTELS |
|11 |AIRPORT HOTEL |EGY | |HOTELS |
|12 | |EGY |AL SOUQ |HOTELS |
+---+--------------------------+-------+--------------------------+-------------------+
I am stuck trying to convert this particular PostgreSQL query into sparksql.
select country,
name as 'col_name',
description,
ct,
ct_desc,
(ct*100/ct_desc)
from
(select description,
country,
count(name) over (PARTITION by description) as ct,
count(description) over (PARTITION by description) as ct_desc
from country_table
) x
group by 1,2,3,4,5,6
Correct output from PostgreSQL -
+-------+--------+-------------------+--+-------+----------------+
|country|col_name|description |ct|ct_desc|(ct*100/ct_desc)|
+-------+--------+-------------------+--+-------+----------------+
|QAT |name |INTERIOR DECORATING|7 |14 |50.0 |
+-------+--------+-------------------+--+-------+----------------+
Here is the sparksql query I am using -
df_fill_by_col = spark.sql("select country,
name as 'col_name',
description,
ct,
ct_desc,
(ct*100/ct_desc)
from
( Select description,
country,
count(name) over (PARTITION by description) as ct,
count(description) over (PARTITION by description) as ct_desc
from country_name
)x
group by 1,2,3,4,5,6 ")
df_fill_by_col.show()
From SparkSQL -
+-------+--------+-------------------+--+-------+----------------+
|country|col_name|description |ct|ct_desc|(ct*100/ct_desc)|
+-------+--------+-------------------+--+-------+----------------+
|QAT |name |INTERIOR DECORATING|14|14 |100.0 |
+-------+--------+-------------------+--+-------+----------------+
The sparksql query is giving odd outputs especially where few values are null in the dataframe. For the same file and record the ct column is giving double value 7 v/s 14.
Below is the entire code, from reading the csv file to creating dataframe and querying data.
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import csv, copy, os, sys, unicodedata, string, time, glob
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
if __name__ == "__main__":
spark = SparkSession.builder.appName("PythonSQL").config("spark.some.config.option", "some-value").getOrCreate()
sc = spark.sparkContext
lines = sc.textFile("path_to_csvfiles")
parts = lines.map(lambda l: l.split("|"))
country_name = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4].strip()))
schemaString = "ID NAME COUNTRY ADDRESS DESCRIPTION"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
df_schema = StructType(fields)
df_schema1 = spark.createDataFrame(country_name, df_schema)
df_schema1.createOrReplaceTempView("country_name")
df_schema1.cache()
df_fill_by_col = spark.sql("select country, name as 'col_name', description, ct, ct_desc, (ct*100/ct_desc) from ( Select description, country, count(name) over (PARTITION by description) as ct, count(description) over (PARTITION by description) as ct_desc from country_name )x group by 1,2,3,4,5,6 ")
df_fill_by_col.show()
Please let me know if there is a way of getting the sparksql query to work.
Thanks, Pankaj
Edit - This code will run on multiple countries and columns
apache-spark
pyspark
apache-spark-sql
postgresql-14
0 Answers
Your Answer