Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single merge to perform update, delete and insert #602

Open
himanshujindal opened this issue Feb 19, 2021 · 7 comments
Open

Single merge to perform update, delete and insert #602

himanshujindal opened this issue Feb 19, 2021 · 7 comments
Assignees
Labels
acknowledged This issue has been read and acknowledged by Delta admins need author feedback Issue is waiting for the author to respond

Comments

@himanshujindal
Copy link

Context: I am performing a merge command (https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge&language-java) on data in s3 stored using delta lake and I have changes that consist updates, inserts and deletes. Currently, I am using two merge commands to apply those changes. Here are how my merge commands look like

// deltaTable is of type - https://docs.delta.io/latest/api/java/io/delta/tables/DeltaTable.html
// insertDf contains all the inserts and updates as spark DataFrame
// condition string is olddata.<columnName> = newdata.<columnName> where columnName is the primary key for the data - note that my data has only one column as primary key
// deleteDf contains all the deletes as spark DataFrame
deltaTable!!.alias("olddata").merge(
                            insertDf.alias("newdata"),
                            conditionString
                        ).whenMatched().updateAll()
                            .whenNotMatched().insertAll().execute()
deltaTable!!.alias("olddata").merge(
                            deleteDf.alias("newdata"),
                            conditionString
                        ).whenMatched().delete().execute()

Problem: This results in the system doing two merges which drives down efficiency of my system. I am trying to figure out a way to apply all the updates in a single merge. However, the merge command only takes one data and one condition. So unless I create a condition using the value from the data and apply the inserts if the value of rows is in my insert data frame and deletes, if the value of rows is in my delete data set, I end up having to write two different merges. Am I missing something? Is there a feature request here that would help simplify applying changes to delta lake?

Note that I want to avoid creating queries using data from the rows as the data is coming from customers and could be prone to sql injection. Also, the condition string in that case could be awfully large since the changes I am applying could be ~1-2GB in size.

@JassAbidi
Copy link
Contributor

JassAbidi commented Feb 24, 2021

I'm not sure if I got the full picture but first thing that come to my mind is to just create a new column for your insertDf and deleteDf and use it in your whenMatched clause ?

 inserts = insertDf .withColumn("operation", lit("UPDATE")
 deletes = deleteDf .withColumn("operation", lit("DELETE")
 changes = inserts.union(deletes)
 deltaTable
                 .as("oldData")
                 .merge(changes.as("newData"), conditionString)
                 .whenMatched("newData.operation = 'UPDATE' ").updateAll()
                 .whenMatched("newData.operation = 'DELETE' "). delete()
                 .whenNotMatched().insertAll().execute()

@vkorukanti vkorukanti added acknowledged This issue has been read and acknowledged by Delta admins need author feedback Issue is waiting for the author to respond labels Oct 7, 2021
@bennetryan
Copy link

Hi All, I need your expertise here. I have got a similar problem here.
I have got two DFs fullDF & snapDF, the idea is to merge the snapDF with fullDF using substring(id).
I need to merge these two DFs on substring(full.id, 0, 15) = substring(snap.id, 0, 15).
When it's a match I need to first delete the matching data from fullDF and then insert the data from snapDF.
If it's not matching then, It's a straightforward insert.
Do let me know if this can be accomplished using a single merge statement.

@nkarpov
Copy link
Collaborator

nkarpov commented Aug 10, 2022

Hi @himanshujindal - I think based on your description that @JassAbidi has shared a good solution (this is similar to how you would consume and apply CDF changes in Delta 2.0, for example). Can you please confirm that works for you, if you solved it otherwise, or if you're still blocked on this? Thanks!

Hi @bennetryan I think your goal is covered by normal MERGE semantics with a single statement.

I need to merge these two DFs on substring(full.id, 0, 15) = substring(snap.id, 0, 15).
When it's a match I need to first delete the matching data from fullDF and then insert the data from snapDF.

You can use the UPDATE statement to overwrite the entire matching row instead of explicitly deleting it and inserting a new one.

MERGE INTO fullDF full
USING snapDF snap
ON substring(full.id, 0, 15) = substring(snap.id, 0, 15)
WHEN MATCHED THEN
     UPDATE SET *
WHEN NOT MATCHED THEN 
     INSERT *

Please let me know if this doesn't address your case

@bennetryan
Copy link

Hi @nkarpov, thank you for your reply. But the problem in my case is the entire id column value may not match which is why I'm using a substring. Also, in a table for substring of id there may be more than one rows, which is why on match of the partial id value I want to first perform a delete on the matched rows and then insert those rows.
Currently, I'm performing two separate transactions, one for delete and the other for the insert part.
I just wanted to know if there was a possibility of performing it in a single transaction.
Thanks in advance!

@nkarpov
Copy link
Collaborator

nkarpov commented Aug 12, 2022

I just wanted to know if there was a possibility of performing it in a single transaction.
Strictly speaking, no, but it's an existing request if you'd like to add a +1 #832

If think replaceWhere can get you some mileage to do an atomic delete + insert, but otherwise, today, MERGE will not allow you to match multiple source (snapDF) rows to replace the matching target rows.

I think it's a good use case though. Would love to work together if you're open to contributing.

@bennetryan
Copy link

Hi @nkarpov, I would be happy to work and contribute to it.

@nkarpov
Copy link
Collaborator

nkarpov commented Aug 25, 2022

There are many approaches...

The least intrusive would be to roll your own custom transaction using OptimisticTransaction (take a look at the existing command implementations). This would require no change to the core Delta code, but you'd be fully responsible for the correctness and maintaining version compatibility, etc. You could use Delta Standalone to access transactional APIs. None of the built-in features like CDC and schema evolution would be there unless you took the time to implement them. It's possible this is OK for your use case.

Going further from there could involve going as far as modifying the existing MERGE. This would be quite complex but not impossible. There's a lot to consider, for example, the existing checks for no duplicate matches in the source table, which would have to be removed in this case. If you'd like to start down that path, please create and share a design doc similar to https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
acknowledged This issue has been read and acknowledged by Delta admins need author feedback Issue is waiting for the author to respond
Projects
None yet
Development

No branches or pull requests

7 participants