1 year ago

#262057

test-img

sai m

Spark Last task takes time to complete

I am having two parquet files 1st one is of 10 GB and 2nd is of 800 MB.

I was executing below query in azure databricks on spark cluster and storing result into DBFS.

val interim_nobed_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_interim_nobed").repartition(8)
interim_nobed_main.createOrReplaceTempView("hsc_interim_nobed")


Data repartition

+--------------------+--------+
|SPARK_PARTITION_ID()|   count|
+--------------------+--------+
|                   1|56426303|
|                   6|56426352|
|                   3|56426283|
|                   5|56426248|
|                   4|56426265|
|                   7|56426335|
|                   2|56426293|
|                   0|56426317|
+--------------------+--------+
val facl_decn_bed_interim_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_facl_decn_bed_interim").repartition(8)
facl_decn_bed_interim_main.createOrReplaceTempView("h_facl_decn_bed_interim")

Data repartition
+--------------------+--------+
|SPARK_PARTITION_ID()|   count|
+--------------------+--------+
|                   1|14345224|
|                   6|14345233|
|                   3|14345223|
|                   5|14345221|
|                   4|14345222|
|                   7|14345229|
|                   2|14345224|
|                   0|14345226|
+--------------------+--------+

Data is almost equally partioned.

spark.sql("""
select
nb.*,
facl.h_id as facl_decn_bed_h_id,
facl.bed_day_decn_seq_nbr as facl_decn_bed_seq_nbr,
case when facl.h_id is not null then concat(substr(nb.cse_open_dttm, 1, 10), ' 00:00:00.000') else cast(null as string) end as engage_dt
from hsc_interim_nobed nb
left outer join h_facl_decn_bed_interim facl on
(nb.h_id=facl.h_id and nb.facl_decn_h_id=facl.h_id)
where nb.facl_decn_h_id is not null
union all
select
nb.*,
cast(null as int) as facl_decn_bed_h_id,
cast(null as int) as facl_decn_bed_seq_nbr,
cast(null as string) as engage_dt
from hsc_interim_nobed nb
where nb.facl_decn_h_id is null""").write.format("delta").mode("overwrite").option("header","true").save(s"${Interimpath}/cdf_hsc_set1_interimdemo")

what could be the reason for this. And do I solve this?

scala

apache-spark

azure-databricks

skew

0 Answers

Your Answer

Accepted video resources