1 year ago
#382660
adv87
How to fix socket.timeout: timed out error
I have a large dataframe of about 9million addresses. I am trying to clean the addresses by standardizing common spelling mistakes/short hand that I have in a dictionary, I have written the following function to perform this task:
import re
from pyspark.sql.functions import udf
def address_replace(address):
class Xlator(dict):
""" All-in-one multiple-string-substitution class """
def _make_regex(self):
""" Build re object based on the keys of the current dictionary """
return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys( )))+r'\b')
def __call__(self, match):
""" Handler invoked for each regex match """
return self[match.group(0)]
def xlat(self, text):
""" Translate text, returns the modified text. """
return self._make_regex( ).sub(self, text)
if (address == '') or (address == None):
result = ''
else:
xlat = Xlator(error_dict)
result = xlat.xlat(address)
return result
address_replaceUDF = udf(address_replace,StringType())
I have tested the function (address_replace) with a simple line of text and it produces the desired changes.
Then I use my udf to create a new column in my dataframe as follows (first removing blank and null addresses):
df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~
(col('RESIDENTIAL_ADDRESS')==''))\
.where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())
df_add =
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))
df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
.where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)
I then want to inspect the results where changes have been made to the data. If I run the above code without the final where clause it produces a result, but with the where clause, I get the following error:
Which if you follow the error file path leads you to this:
I have tried to increase the memory and buffer size available with the following code:
spark = SparkSession.builder.master('local[4]')\
.config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
.config("spark.eventLog.buffer.kb", "200k").getOrCreate()
But this did no resolve the error, this is an important project, please help me if you can, thank
apache-spark
pyspark
apache-spark-sql
socketexception
0 Answers
Your Answer