Leverage Microsoft Fabric Delta Lake tables for reporting over billions of rows
Overview
Believe it or not, not all data is meant for analytics. Sometimes reporting requirements include flat data that is not inherently dimensional. This often includes querying large tables with low cardinality columns. Think of:
Audit trails – understanding who accessed what at a certain point in time or what events occurred when
Financial transaction data – querying financial transactions by invoice id
Web analytics – identifying which pages were accessed at what date, time and location
This type of data can have millions or billions of rows to sift through.
So where does this type of reporting fit into your reporting environment? In Microsoft Fabric Lakehouse and Power BI Report Developer paginated reports!
I wanted to test reporting on a Fabric Delta Table with over 6 billion rows of data. To do this I:
Loaded the Yellow Taxi Data 4 times to create a table with 6.5 billion records (with a year added to the date fields each time the data was loaded)
Compacted and Z-Ordered the table
Created 3 parameter tables by extracting the unique values for each column I want to filter on
This optimizes the paginated report parameter lookup
Created a paginated report in Power BI Report Builder
Published the paginated report to the Microsoft Fabric service
Ran the report and spilled my coffee when the results returned in just a few seconds
Create and load Delta Tables in the Fabric Lakehouse
Here’s the PySpark code to create, load and optimize the Delta tables:
# ## 1 – Load Taxi Data
# ##### **Step 1 – Load Data from Taxi Data to create a Delta Table with 6.5 Billion Rows**
# NYC Taxi Data info
blob_account_name = “azureopendatastorage”
blob_container_name = “nyctlc”
blob_relative_path = “yellow”
blob_sas_token = “r”
# Fabric parameters
delta_path = ‘abfss://<yourworkspaceid>@onelake.dfs.fabric.microsoft.com/<yourlakehouseid>/Tables’
from pyspark.sql.functions import col, concat,add_months, expr
from pyspark.sql import SparkSession
import delta
from delta import *
# Allow SPARK to read from Blob remotely
wasbs_path = ‘wasbs://%s@%s.blob.core.windows.net/%s’ % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
‘fs.azure.sas.%s.%s.blob.core.windows.net’ % (blob_container_name, blob_account_name),
blob_sas_token)
print(‘Remote blob path: ‘ + wasbs_path)
# SPARK read parquet
df = spark.read.parquet(wasbs_path)
print(‘Register the DataFrame as a SQL temporary view: source’)
df.createOrReplaceTempView(‘source’)
# Display top 10 rows
print(‘Displaying top 10 rows: ‘)
display(spark.sql(‘SELECT * FROM source LIMIT 10’))
# get count of records
display(spark.sql(‘SELECT COUNT(*) FROM source’))
# 1,571,671,152
# only project certain columns and change longitude and latitude columns to string
df = df.select(
col(‘vendorID’),
col(‘tpepPickupDateTime’),
col(‘tpepDropoffDateTime’),
col(‘startLon’).cast(‘string’).alias(‘startLongitude’),
col(‘startLat’).cast(‘string’).alias(‘startLatitude’),
col(‘endLon’).cast(‘string’).alias(‘endLongitude’),
col(‘endLat’).cast(‘string’).alias(‘endLatitude’),
col(‘paymentType’),
col(‘puYear’),
col(‘puMonth’)
)
table_name = f'{delta_path}/Tables/taxitrips’
# write the first 1.5 billion records
df.write.format(“delta”).mode(“overwrite”).save(table_name)
for x in range(3):
# add another year to the dataframe data fields and write another 1.5 billion records 3 times
df = df.withColumn(“tpepPickupDateTime”, expr(“tpepPickupDateTime + interval 1 year”))
df = df.withColumn(“tpepDropoffDateTime”, expr(“tpepDropoffDateTime + interval 1 year”))
df = df.withColumn(“puYear”, col(“puYear”) + 1)
df.write.format(“delta”).mode(“append”).save(table_name)
delta_table = DeltaTable.forPath(spark, table_name)
# ##### **Step 2 – Optimize the Taxi data table**
delta_table.optimize().executeCompaction()
delta_table.optimize().executeZOrderBy(“puYear”, “puMonth”)
# ##### **Step 3** – Create Dimension tables
# Create dimension over columns we will want to filter on in our report, vendorID, puYear and puMonth
# read from Delta table to get all 6b rows
df = spark.read.format(“delta”).load(table_name)
print(df.count())
# create vendor table
dimdf = df.select(“vendorid”).distinct()
dimdf.sort(dimdf.vendorid.asc())
dimdf.write.format(“delta”).mode(“overwrite”).save(f'{delta_path}/vendors’)
# create year table
dimdf = df.select(“puYear”).distinct()
dimdf.sort(dimdf.puYear.asc())
dimdf.write.format(“delta”).mode(“overwrite”).save(f'{delta_path}/years’)
# create month table
dimdf = df.select(“puMonth”).distinct()
dimdf.sort(dimdf.puMonth.asc())
dimdf.write.format(“delta”).mode(“overwrite”).save(f'{delta_path}/months’)
Create the Paginated Report with Power BI Report Builder
I then created a very simple paginated report with filters over Vendor, Year and Month.
The data source connection type is Azure SQL Database connecting to the Fabric Lakehouse SQL Endpoint. (At this time, the Lakehouse connector does not support parameters):
I built a simple table report with the 3 parameters:
Publish and test in Microsoft Fabric
I published the report to the Fabric workspace and ran it:
Ok the report is not pretty but the performance sure was! According to my Garmin, the report 44 records from over 6 billion rows in 3 seconds.
When run in SQL Script, the equivalent report query ran in less than 2 seconds:
A count over all records returned in less than 6 seconds:
Flat file reporting is not as flashy as Power BI analytical reports and visualizations. However, there are many use cases for it and the speed of reporting over Microsoft Fabric Lakehouse Delta Tables is pretty amazing!
Microsoft Fabric Lakehouse and Delta Lake tables
Delta Lake vs. Parquet Comparison | Delta Lake
Delta Lake Small File Compaction with OPTIMIZE | Delta Lake
Delta Lake Z Order | Delta Lake
Microsoft Tech Community – Latest Blogs –Read More