Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Delta Lake MERGE INTO will hang Spark session without result (10000 columns). #3118

Open
DevKretov opened this issue May 20, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@DevKretov
Copy link

DevKretov commented May 20, 2024

Bug

Which Delta project/connector is this regarding?

I am working with Delta Lake 3.1.0 and Spark 3.5.1.

Describe the problem

I am experimenting with Delta Lake as the primary storage solution for my tabular data, which is updated daily. I tried to mimic the basic use case - an existing target table is updated by the new data that can change existing data, i.e. upserts. I am using a MERGE INTO operation, where the target table is my Delta table and the table with updates is simply saved as a Parquet file.

My tables are special in the number of columns - there are up to 10000 columns, most of them are binary. One column contains an identifier of a row, represented as a string hash, which is used in the matching condition of the merge operation.

I am experimenting with a small main table having 5000 rows, which is 10 MB of one Parquet file on disk and the same table, but stored as a plain Parquet, which has several small 3.5 MB files.

My merge operation takes extremely long, probably stuck without any computation. What am I missing? I haven't tried partitioning since the size of the whole table is just 10 MB. I expected this operation to be extremely fast even in the case of non-optimized tables.

I will appreciate your help, thank you!

UPD: added the code to reproduce the issue.

Spark version: 3.5.1. Delta Lake (delta-spark) version: 3.1.0. Instance: r5n.16xlarge

The only output that I get is

24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1094.0 KiB 
24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1097.6 KiB

Then the Spark session hangs and does nothing. The code does not finish.

Steps to reproduce

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta import *

def get_spark():
    builder = SparkSession.builder.master("local[4]").appName('SparkDelta') \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.driver.memory", "32g") \
        .config("spark.jars.packages", 
                "io.delta:delta-spark_2.12:3.1.0,"
                "io.delta:delta-storage:3.1.0") \
    
    spark = builder.getOrCreate()

    return spark
    
    
spark = get_spark()

### GENERATE SOME FAKE DATA

import pandas as pd
import numpy as np 

num_rows = 5000
num_cols = 10000
array = np.random.rand(num_rows, num_cols)

df = pd.DataFrame(array)
df = df.reset_index()
df.to_csv('features.csv')


### DATA TO DELTA LAKE TABLE

df_spark = spark.read.format('csv').load('features.csv')
df_spark.write.format('delta').mode("overwrite").save('deltalake_features')

# Creating data to merge - just first 100 rows
df_slice = df.iloc[:100, 0:2]
df_slice.to_csv('features_slice_100.csv')

df_spark_slice = spark.read.format('csv').load('features_slice_100.csv')


### MERGE INTO code

target_delta_table = DeltaTable.forPath(spark, 'deltalake_features')
df_updates = df_spark_slice
df_target = target_delta_table.toDF()

(
    target_delta_table.alias('target')
    .merge(
        df_updates.alias('updates'),
        (
            (F.col('target._c1') == F.col('updates._c1'))
        )
    )
    .whenMatchedUpdate(set={
        '_c2': F.lit(42)
    })
    .execute()
)

Observed results

The Spark session hangs for no reason.

Expected results

I expect the merge to finish successfully.

Further details

If I do a simple inner join, the operation takes just 1 minute to complete, so I'd like to see the same speed here.

Environment information

  • Delta Lake version: 3.1.0
  • Spark version: 3.5.1
  • Scala version: 2.12.18
@DevKretov DevKretov added the bug Something isn't working label May 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant