Scale Real-Time Streams to Delta Lakehouse with Apache Flink on Azure HDInsight on AKS
This post is co-authored with Keshav Singh, Principal Engineering Lead, Microsoft Purview Data Governance
In this blog, we turn the page and learn about enabling delta format as source and sink for stream processing with Apache Flink. Delta has become a DeFacto ACID compliant Lakehouse format for ecosystem enabling Petabyte scale processing while turning it a single source of truth, it becomes essential to bring it all together on top of Microsoft Fabric. Data engineering in delta format unifies diverse data sources into singular mode for analytics. Lastly as technologies such as Fabric endpoint, Synapse Serverless SQL will get efficient by the day, direct mode delta access will get cheaper and faster with no real need for an edge copy analytics.
Streaming Events can now be unified in Delta format as a sink for enabling Realtime analytics.
Let us consider a Sales Event Scenario, the event has demonstrated structure.
The Sales Source Event is stored in Delta Format on ADLS Gen2.
HDInsight on AKS Cluster Pool
Create a cluster pool to host a set of clusters, these could be Spark, Trino, Flink clusters. With a cluster pool as a concept, and a Platform as a service offering, HDInsight on AKS allows developers to quickly build up a data estate with all their favorite open source workloads, with full configurability and SKU sizing of their choice.
Let’s Provision the Pool.
Next, provision a Flink Cluster, we went with a Session cluster.
In nutshell a session cluster can share resources amongst multiple jobs while an Application Cluster will be resource dedicated towards a particular application.
Once the cluster is provisioned update the flink-configs to add/load the Hadoop class path and ensure to load the cluster’s native class loaders.
Upon applying the changes the cluster will restart, click on the Flink Dashboard and review its available. This is one point for DAG, execution logs and stream processing details.
Application Code
Here is our code for SteamProcessingJob.
This code simply reads the data from a Delta source and stream processes it to Delta Sinks.
package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
public class StreamProcessingJob {
public static final RowType ROW_TYPE = new RowType(Arrays.asList(
new RowType.RowField(“SalesId”, new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField(“ProductName”, new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField(“SalesDateTime”, new TimestampType()),
new RowType.RowField(“SalesAmount”, new IntType()),
new RowType.RowField(“EventProcessingTime”, new TimestampType())
));
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
// Define the sink Delta table path
String deltaTablePath_sink = “abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSink”;
// Define the source Delta table path
String deltaTablePath_source = “abfss://flink@<storage>.dfs.core.windows.net/Streams/SaleSource”;
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, ROW_TYPE);
// Execute the Flink job
env.execute(“FlinkDeltaSourceSinkExample”);
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), “deltaSource”);
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Here is the POM.xml for the JAVA project
<?xml version=”1.0″ encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>deltaflinkproject</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.4.0</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!– https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java –>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!– https://mvnrepository.com/artifact/org.apache.flink/flink-clients –>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
We build a JAR and upload to a convenient directory on ADLS Gen2.
At this point we are ready to submit the StreamProcessingJob on the Flink Cluster. Point to the jar location on the storage and provide the entry class details.
The Job processed the streams based on the logic defined in the JAVA jar code bits.
Callouts and Validations
Flink periodically commits into delta based on the configured checkpointing. env.enableCheckpointing(10000); In our case we issuing commits every 10 seconds.
NOTE : Now one of the critical observations to keep in mind for the initial dataset, incase your JobDuration < Checkpoint duration meaning, you have only 10 records and the job completes much before the first Checkpoint, you will observe only parquet files with no _delta_log directory since the first delta commit was never issued. This is an edge case worth calling out to remember the streaming the (unbounded )semantics are much different than (bounded) batch processing semantics.
The below screenshots depict a periodic processing of this data —
The initial run creates the parquet files but is yet to issue the first delta commit hence we observe only parquet files.
Upon the first commit the delta is initialized and continues to process the streams based on the checkpoint duration.
Upon completion and checkpoints all in-progress files are committed.
Checkpointing is a crucial feature in distributed stream processing frameworks like Flink to ensure fault tolerance and exactly-once semantics.
Relevance ofenv.enableCheckpointing(10000):
Ensures Fault Tolerance: Checkpointing allows Flink to take snapshots of the state of the streaming application at regular intervals. In case of failures, Flink can use these snapshots to restore the state of the application and continue processing from the last successful checkpoint. This ensures fault tolerance and resilience against failures such as machine crashes, network issues, or software bugs.
Consistent State: Checkpointing helps in maintaining consistent state in the face of failures. By periodically saving the state of the application, Flink guarantees that even if failures occur, the state can be recovered to a consistent point.
Exactly-once Processing: Checkpointing, combined with Flink’s processing model, enables exactly-once semantics. With exactly-once processing, each record in the input streams is processed exactly once, even in the presence of failures and restarts. This is crucial for applications where data correctness is paramount, such as financial transactions or real-time analytics.
Performance Considerations: The checkpointing interval (in this case, 10000 milliseconds or 10 seconds) is a trade-off between fault tolerance and performance. Shorter intervals provide better fault tolerance but can impact performance due to the overhead of taking and managing checkpoints. Longer intervals reduce this overhead but increase the potential amount of data loss in case of failures. Choosing an appropriate interval depends on the specific requirements of the application.
Configuration Flexibility: Flink provides flexibility in configuring checkpointing behavior. Developers can tune various parameters such as checkpointing interval, checkpointing mode (e.g., exactly-once, at-least-once), state backend, and storage options based on the specific needs of their application and the underlying infrastructure.
Finally lets validate the delta sink for our processed streams.
Architectural Considerations
In contrast with other streaming offerings,
Flink offers built-in support for managing complex stateful computations efficiently. It provides a unified runtime for both batch and stream processing, allowing seamless integration of stateful operations into streaming jobs. Flink’s state management capabilities include fault tolerance, exactly-once semantics, and flexible state backend options (e.g., memory, RocksDB).
Flink provides strong consistency guarantees with exactly-once processing semantics out of the box. It ensures that each event is processed exactly once, even in the presence of failures or restarts, making it suitable for mission-critical applications.
Flink is designed for high throughput and low-latency processing at scale. It supports fine-grained control over resource allocation and dynamic scaling, allowing efficient utilization of cluster resources. Flink’s pipelined execution model and advanced optimizations contribute to its superior performance.
Flink integrates seamlessly with other components of the Apache ecosystem, such as Apache Kafka, Apache Hadoop, and Apache Hive. It also provides connectors for integrating with various cloud platforms and data sources.
Flink excels in handling complex stateful workloads with its advanced state management, processing guarantees, scalability, and performance optimizations.
Conclusion
We have introduced Azure HDI on AKS (Flink) and emphasized the Delta Lakehouse story.
This blog is for those passionate data engineers who are data native and love to design a system, resilient/frugal/precise, write those hard lines of code, control their destiny and are curious to understand what lies under the hood. We dedicate this blog to all such, and extend you a warm welcome to a fully managed Apache Flink on Azure, with Azure HDInsight on AKS!
Get started today – Microsoft Azure
Read our documentation – What is Apache Flink® in Azure HDInsight on AKS? (Preview) – Azure HDInsight on AKS | Microsoft Learn
Questions? Please reach out to us on aka.ms/askhdinsight
Microsoft Tech Community – Latest Blogs –Read More