MGDC for SharePoint FAQ: How do I process Deltas?
This is a follow up on the blog about delta datasets. If you haven’t read it yet, take a look at MGDC for SharePoint FAQ: How can I use Delta State Datasets?
Our team got some follow-up questions on this, so I thought it would make sense to write a little more and make things clear.
First of all, from some conversations with CoPilot, the basic SQL code for merging a delta would be something like this:
— Start a transaction
BEGIN TRANSACTION;
— Assuming the Users table has a primary key constraint on user_id
— and the UserChanges table has a foreign key constraint on user_id referencing Users
— First, delete the users that have operation = ‘Deleted’ in UserChanges
DELETE FROM Users
WHERE user_id IN
(SELECT user_id
FROM UserChanges
WHERE operation = ‘Deleted’);
— Next, update the users that have operation = ‘Updated’ in UserChanges
UPDATE Users
SET user_name = UC.user_name,
user_age = UC.user_age
FROM Users U
JOIN UserChanges UC ON U.user_id = UC.user_id
WHERE UC.operation = ‘Updated’;
— Finally, insert the users that have operation = ‘Created’ in UserChanges
INSERT INTO Users (user_id, user_name, user_age)
SELECT user_id, user_name, user_age
FROM UserChanges
WHERE operation = ‘Created’;
— Commit the transaction
COMMIT TRANSACTION;
Note that the column names used (shown here as user_id, user_name and user_age) need to be updated for each dataset, but the structure will be the same.
I also asked CoPilot to translate this SQL code to PySpark and it suggested the code below (with a few minor manual touches):
# Import SparkSession and functions
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create SparkSession
spark = SparkSession.builder.appName(“Delta dataset”).getOrCreate()
# Assuming the Users and UserChanges tables are already loaded as DataFrames
users = spark.table(“Users”)
user_changes = spark.table(“UserChanges”)
# First, delete the users that have operation = ‘Deleted’ in UserChanges
users = users.join(user_changes.filter(user_changes.operation == “Deleted”), “user_id”, “left_anti”)
# Next, update the users that have operation = ‘Updated’ in UserChanges
users = users.join(user_changes.filter(user_changes.operation == “Updated”), “user_id”, “left_outer”)
.select(F.coalesce(user_changes.user_name, users.user_name).alias(“user_name”),
F.coalesce(user_changes.user_age, users.user_age).alias(“user_age”),
users.user_id)
# Finally, insert the users that have operation = ‘Created’ in UserChanges
users = users.union(user_changes.filter(user_changes.operation == “Created”)
.select(“user_name”, “user_age”, “user_id”))
After that, there’s the question of how to run this in Azure Data Factory or Azure Synapse.
I would suggest going with Azure Synapse. You could get some inspiration from the template that we published https://go.microsoft.com/fwlink/?linkid=2207816. This includes examples of how to get the data and run a notebook to produce a new dataset.
Another good resource is this guide on “How to transform data by running a Synapse Notebook”. The link is at https://learn.microsoft.com/en-us/azure/data-factory/transform-data-synapse-notebook.
The more notable part missing from the code above is how to read the data from ADLS v2. For that, here is a link to stack overflow article on how to bring the data in and out of ADLS v2 using Linked Services. There is an article specifically on that at https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/tutorial-spark-pool-filesystem-spec.
That’s it! For more general information MGDC for SharePoint, visit the main blog at Links about SharePoint on MGDC.
Microsoft Tech Community – Latest Blogs –Read More