[Solved] left join on a key if there is no match then join on a different right key to get value
Looking to automatically optimize YOUR SQL query? Start for free.

EverSQL Database Performance Knowledge Base

left join on a key if there is no match then join on a different right key to get value

I have two spark dataframes, say df_core & df_dict:

There are more cols in df_core but it has nothing to do with the question here

df_core:

id
1_ghi
2_mno
3_xyz
4_abc

df_dict:

id_1      id_2      cost
1_ghi     1_ghi     12
2_mno     2_rst     86
3_def     3_xyz     105

I want to get the value from df_dict.cost by joining the 2 dfs.

Scenario: join on df_core.id == df_dict.id_1

If there is a no match for df_core.id for the foreign key df_dict.id_1 (for above example: 3_xyz) then, the join should happen on df_dict.id_2

I am able to achieve the join for the first key but have not sure about how to achieve the scenario

final_df = df_core.alias("df_core_alias").join(df_dict, df_core.id== df_dict.id_1, 'left').select('df_core_alias.*', df_dict.cost)

The solution need not be a dataframe operation. I can create Temp Views out of the dataframes & then run SQL on it if that's easy and/or optimized.

I also have a SQL solution in-mind (not tested):

SELECT
    core.id,
    dict.cost
FROM
    df_core core LEFT JOIN df_dict dict
    ON core.id = dict.id_1
    OR core.id = dict.id_2

Expected df:

id       cost
1_ghi    12
2_mno    86
3_xyz    105
4_abc

Well the project plan is too big to add in the comment so I've to question here

below is the spark plan for isin:

== Physical Plan ==
*(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
+- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
   :- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
   :  +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 =  )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
   :     +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
   +- BroadcastExchange IdentityBroadcastMode
      +- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>

The Filter in BroadcastNestedLoopJoin is coming from previous df_core transformations but as we know spark's lazy-evaluation, we're seeing it here in the project plan

Moreover, I just realized that the final_df.show() works fine for any solution I use. But what's taking infinite time to process is the next transformation that I'm doing over the final_df which is my actual expected_df. Here's my next transformation:

expected_df = spark.sql("select region_type, cost, core_sector_value, count(core_id) from final_df_view group by region_type, cost, core_sector_value order by region_type, cost, core_sector_value")

& here's the plan for the expected_df:

== Physical Plan ==
*(5) Sort [region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(region_type#26 ASC NULLS FIRST, cost#13 ASC NULLS FIRST, core_sector_value#21 ASC NULLS FIRST, 200)
   +- *(4) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[count(core_id#22)])
      +- Exchange hashpartitioning(region_type#26, cost#13, core_sector_value#21, 200)
         +- *(3) HashAggregate(keys=[region_type#26, cost#13, core_sector_value#21], functions=[partial_count(core_id#22)])
            +- *(3) Project [region_type#26, COST#13, CORE_SECTOR_VALUE#21, CORE_ID#22]
               +- BroadcastNestedLoopJoin BuildRight, LeftOuter, CORE_ID#22 IN (DICT_ID_1#10,DICT_ID_2#11)
                  :- *(1) Project [CORE_SECTOR_VALUE#21, CORE_ID#22, region_type#26]
                  :  +- *(1) Filter ((((isnotnull(response_value#23) && isnotnull(error_code#19L)) && (error_code#19L = 0)) && NOT (response_value#23 =  )) && NOT response_value#23 IN (N.A.,N.D.,N.S.))
                  :     +- *(1) FileScan parquet [ERROR_CODE#19L,CORE_SECTOR_VALUE#21,CORE_ID#22,RESPONSE_VALUE#23,source_system#24,fee_type#25,region_type#26,run_date#27] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/outfile/..., PartitionCount: 14, PartitionFilters: [isnotnull(run_date#27), (run_date#27 = 20190905)], PushedFilters: [IsNotNull(RESPONSE_VALUE), IsNotNull(ERROR_CODE), EqualTo(ERROR_CODE,0), Not(EqualTo(RESPONSE_VA..., ReadSchema: struct<ERROR_CODE:bigint,CORE_SECTOR_VALUE:string,CORE_ID:string,RESPONSE_VALUE:string>
                  +- BroadcastExchange IdentityBroadcastMode
                     +- *(2) FileScan csv [DICT_ID_1#10,DICT_ID_2#11,COST#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/XXXXXX/datafiles/client..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DICT_ID_1:string,DICT_ID_2:string,COST:string>

Seeing the plan, I think that the transformations are getting too heavy for in-memory on spark local. Is it best practice to perform so many different step transformations or should I try to come up with a single query that would encompass all the business logic?

Additionally, could you please direct to any resource for understanding the Spark Plans we get using explain() function? Thanks

How to optimize this SQL query?

The following recommendations will help you in your SQL tuning process.
You'll find 3 sections below:

  1. Description of the steps you can take to speed up the query.
  2. The optimal indexes for this query, which you can copy and create in your database.
  3. An automatically re-written query you can copy and execute in your database.
The optimization process and recommendations:
  1. Create Optimal Indexes (modified query below): The recommended indexes are an integral part of this optimization effort and should be created before testing the execution duration of the optimized query.
Optimal indexes for this query:
ALTER TABLE `df_dict` ADD INDEX `df_dict_idx_id_1_id_2` (`id_1`,`id_2`);
The optimized query:
SELECT
        core.id,
        dict.cost 
    FROM
        df_core core 
    LEFT JOIN
        df_dict dict 
            ON core.id = dict.id_1 
            OR core.id = dict.id_2

Related Articles



* original question posted on StackOverflow here.