I have two spark dataframes, say df_core & df_dict:
There are more cols in df_core but it has nothing to do with the question here
df_core:
id
1_ghi
2_mno
3_xyz
4_abc
df_dict:
id_1 id_2 cost
1_ghi 1_ghi 12
2_mno 2_rst 86
3_def 3_xyz 105
I want to get the value from df_dict.cost by joining the 2 dfs.
Scenario: join on df_core.id == df_dict.id_1
If there is a no match for df_core.id for the foreign key df_dict.id_1 (for above example: 3_xyz) then, the join should happen on df_dict.id_2
I am able to achieve the join for the first key but have not sure about how to achieve the scenario
final_df = df_core.alias("df_core_alias").join(df_dict, df_core.id== df_dict.id_1, 'left').select('df_core_alias.*', df_dict.cost)
The solution need not be a dataframe operation. I can create Temp Views out of the dataframes & then run SQL on it if that's easy and/or optimized.
I also have a SQL solution in-mind (not tested):
SELECT
core.id,
dict.cost
FROM
df_core core LEFT JOIN df_dict dict
ON core.id = dict.id_1
OR core.id = dict.id_2
Expected df:
id cost
1_ghi 12
2_mno 86
3_xyz 105
4_abc
Well the project plan is too big to add in the comment so I've to question here
below is the spark plan for isin:
== Physical Plan ==
*(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
:- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
: +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 = )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
: +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>
The Filter in BroadcastNestedLoopJoin is coming from previous df_core transformations but as we know spark's lazy-evaluation, we're seeing it here in the project plan
Moreover, I just realized that the final_df.show() works fine for any solution I use. But what's taking infinite time to process is the next transformation that I'm doing over the final_df which is my actual expected_df. Here's my next transformation:
expected_df = spark.sql("select region_type, cost, core_sector_value, count(core_id) from final_df_view group by region_type, cost, core_sector_value order by region_type, cost, core_sector_value")
& here's the plan for the expected_df:
== Physical Plan ==
*(5) Sort [region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST, 200)
+- *(4) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[count(core_id#22)])
+- Exchange hashpartitioning(region_type#26, cost#13, core_sector_value#21, 200)
+- *(3) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[partial_count(core_id#22)])
+- *(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
:- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
: +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 = )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
: +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
+- BroadcastExchange IdentityBroadcastMode
+- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>
Seeing the plan, I think that the transformations are getting too heavy for in-memory on spark local. Is it best practice to perform so many different step transformations or should I try to come up with a single query that would encompass all the business logic?
Additionally, could you please direct to any resource for understanding the Spark Plans we get using explain() function? Thanks
The following recommendations will help you in your SQL tuning process.
You'll find 3 sections below:
ALTER TABLE `df_dict` ADD INDEX `df_dict_idx_id_1_id_2` (`id_1`,`id_2`);
SELECT
core.id,
dict.cost
FROM
df_core core
LEFT JOIN
df_dict dict
ON core.id = dict.id_1
OR core.id = dict.id_2