1 year ago

#382660

test-img

adv87

How to fix socket.timeout: timed out error

I have a large dataframe of about 9million addresses. I am trying to clean the addresses by standardizing common spelling mistakes/short hand that I have in a dictionary, I have written the following function to perform this task:

import re
from pyspark.sql.functions import udf

def address_replace(address):
    class Xlator(dict):
        """ All-in-one multiple-string-substitution class """
        def _make_regex(self):
            """ Build re object based on the keys of the current dictionary """
            return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys(  )))+r'\b')

        def __call__(self, match):
            """ Handler invoked for each regex match """
            return self[match.group(0)]

        def xlat(self, text):
            """ Translate text, returns the modified text. """
            return self._make_regex(  ).sub(self, text)

    if (address == '') or (address == None):
        result = ''
    else:
        xlat = Xlator(error_dict)
        result = xlat.xlat(address)

    return result

address_replaceUDF = udf(address_replace,StringType())

I have tested the function (address_replace) with a simple line of text and it produces the desired changes.

Then I use my udf to create a new column in my dataframe as follows (first removing blank and null addresses):

df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~ 
                                                  (col('RESIDENTIAL_ADDRESS')==''))\
                                          .where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())

df_add = 
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))

df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
      .where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)

I then want to inspect the results where changes have been made to the data. If I run the above code without the final where clause it produces a result, but with the where clause, I get the following error:

Socket Timeout Error Message

Which if you follow the error file path leads you to this:

Line 707 in file socket.py

I have tried to increase the memory and buffer size available with the following code:

spark = SparkSession.builder.master('local[4]')\
     .config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
      .config("spark.eventLog.buffer.kb", "200k").getOrCreate()

But this did no resolve the error, this is an important project, please help me if you can, thank

apache-spark

pyspark

apache-spark-sql

socketexception

0 Answers

Your Answer

Accepted video resources