1 year ago

#388171

test-img

Pankaj

SparkSQL query using "PARTITION by" giving wrong output

I have a bunch of csv files for which I am using Pyspark for faster processing. However, am a total noob with Spark (Pyspark). So far I have been able to create a RDD, a subsequent data frame and a temporary view (country_name) to easily query the data.

Input Data

+---+--------------------------+-------+--------------------------+-------------------+
|ID |NAME                      |COUNTRY|ADDRESS                   |DESCRIPTION        |
+---+--------------------------+-------+--------------------------+-------------------+
|1  |                          |QAT    |                          |INTERIOR DECORATING|
|2  |S&T                       |QAT    |AL WAAB STREET            |INTERIOR DECORATING|
|3  |                          |QAT    |                          |INTERIOR DECORATING|
|4  |THE ROSA BERNAL COLLECTION|QAT    |                          |INTERIOR DECORATING|
|5  |                          |QAT    |AL SADD STREET            |INTERIOR DECORATING|
|6  |AL MANA                   |QAT    |SALWA ROAD                |INTERIOR DECORATING|
|7  |                          |QAT    |SUHAIM BIN HAMAD STREET   |INTERIOR DECORATING|
|8  |INTERTEC                  |QAT    |AL MIRQAB AL JADEED STREET|INTERIOR DECORATING|
|9  |                          |EGY    |                          |HOTELS             |
|10 |                          |EGY    |QASIM STREET              |HOTELS             |
|11 |AIRPORT HOTEL             |EGY    |                          |HOTELS             |
|12 |                          |EGY    |AL SOUQ                   |HOTELS             |
+---+--------------------------+-------+--------------------------+-------------------+

I am stuck trying to convert this particular PostgreSQL query into sparksql.

select country,
       name as 'col_name',
       description,
       ct, 
       ct_desc, 
      (ct*100/ct_desc) 
from 
   (select description,
           country,
           count(name) over (PARTITION by description) as ct, 
           count(description) over (PARTITION by description) as ct_desc 
     from country_table
   ) x 
group by 1,2,3,4,5,6

Correct output from PostgreSQL -

+-------+--------+-------------------+--+-------+----------------+
|country|col_name|description        |ct|ct_desc|(ct*100/ct_desc)|
+-------+--------+-------------------+--+-------+----------------+
|QAT    |name    |INTERIOR DECORATING|7 |14     |50.0            |
+-------+--------+-------------------+--+-------+----------------+ 

Here is the sparksql query I am using -

df_fill_by_col = spark.sql("select country, 
                                   name as 'col_name',
                                   description, 
                                   ct, 
                                   ct_desc, 
                                  (ct*100/ct_desc) 
                            from 
                               ( Select description, 
                                        country,
                                        count(name) over (PARTITION by description) as ct, 
                                        count(description) over (PARTITION by description) as ct_desc 
                                 from country_name 
                                )x 
                            group by 1,2,3,4,5,6 ")

df_fill_by_col.show()

From SparkSQL -

+-------+--------+-------------------+--+-------+----------------+
|country|col_name|description        |ct|ct_desc|(ct*100/ct_desc)|
+-------+--------+-------------------+--+-------+----------------+
|QAT    |name    |INTERIOR DECORATING|14|14     |100.0           |
+-------+--------+-------------------+--+-------+----------------+ 

The sparksql query is giving odd outputs especially where few values are null in the dataframe. For the same file and record the ct column is giving double value 7 v/s 14.

Below is the entire code, from reading the csv file to creating dataframe and querying data.

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import csv, copy, os, sys, unicodedata, string, time, glob
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

if __name__ == "__main__":
    spark = SparkSession.builder.appName("PythonSQL").config("spark.some.config.option", "some-value").getOrCreate()
    sc = spark.sparkContext

    lines = sc.textFile("path_to_csvfiles")
    parts = lines.map(lambda l: l.split("|"))
    country_name = parts.map(lambda p: (p[0],   p[1],   p[2],   p[3],   p[4].strip()))

    schemaString = "ID NAME COUNTRY ADDRESS DESCRIPTION"
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    df_schema = StructType(fields)

    df_schema1 = spark.createDataFrame(country_name, df_schema)
    df_schema1.createOrReplaceTempView("country_name")
    df_schema1.cache()  
    
    df_fill_by_col = spark.sql("select country, name as 'col_name', description, ct, ct_desc, (ct*100/ct_desc) from ( Select description, country, count(name) over (PARTITION by description) as ct, count(description) over (PARTITION by description) as ct_desc from country_name )x group by 1,2,3,4,5,6 ")
    df_fill_by_col.show()

Please let me know if there is a way of getting the sparksql query to work.

Thanks, Pankaj

Edit - This code will run on multiple countries and columns

apache-spark

pyspark

apache-spark-sql

postgresql-14

0 Answers

Your Answer

Accepted video resources