Step-by-Step Guide: Building and Integrating Custom Package in ADF Workflow Orchestration Manager
Introduction
The Workflow Orchestration Manager in Azure Data Factory streamlines setting up and managing Apache Airflow environments, enhancing your ability to execute scalable data pipelines efficiently. Apache Airflow, a robust open-source platform, allows for the programming, scheduling, and monitoring of intricate workflows by organizing tasks into data pipelines. This capability is highly valued in data engineering and data science for its adaptability and user-friendliness.
In this guide, I will walk you through a demonstration where we extract insights from GitHub data using the GitHub public API, and run custom operators in a private package within the Workflow Orchestration Manager in Azure Data Factory.
Prerequisites
– Tools and Technologies Needed:
Azure data factory account
knowledge in Apache Airflow
knowledge in Python
– Initial Setup:
ADF: create workflow orchestration manager
Airflow (Optional): In this blog, I’m primarily focusing on running custom operators in Airflow. However, if you want to trigger Azure Data Factory (ADF) pipelines directly from Airflow, you’ll need to establish a connection within the Airflow UI. This setup enables the triggering of ADF pipelines from Airflow, for more details click here.
Table of Contents:
Designing Your Custom Package
Create Custom Package
Building Airflow DAG
Run DAG in ADF Data orchestration manager
Logs And Monitoring
Links
Call-To-Action
Step 1: Designing Your Custom Package
In this tutorial, I am utilizing the GitHub API and have written two Python operators: `GitHubAPIReaderOperator` and `CountLanguagesOperator`. These operators are designed to fetch data from GitHub repositories and count the programming languages used, respectively.
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests
import logging
import re
class GitHubAPIReaderOperator(BaseOperator):
@apply_defaults
def __init__(self, api_url, max_pages=20, token=None, *args, **kwargs):
super(GitHubAPIReaderOperator, self).__init__(*args, **kwargs)
self.api_url = api_url
self.max_pages = max_pages
self.token = token
def execute(self, context):
headers = {“Accept”: “application/vnd.github.v3+json”}
if self.token:
headers[“Authorization”] = f”Bearer {self.token}”
session = requests.Session()
session.headers.update(headers)
next_url = self.api_url
all_data = []
page_count = 0
while next_url and page_count < self.max_pages:
response = session.get(next_url)
response.raise_for_status()
data = response.json()
all_data.extend(data)
next_url = self.get_next_link(response.headers.get(‘Link’))
page_count += 1
return all_data
def get_next_link(self, link_header):
if link_header:
links = link_header.split(‘,’)
next_link = [link for link in links if ‘rel=”next”‘ in link]
if next_link:
match = re.search(r'<(.*)>’, next_link[0])
if match:
return match.group(1)
return None
class CountLanguagesOperator(BaseOperator):
@apply_defaults
def __init__(self, api_url, token=None, *args, **kwargs):
super(CountLanguagesOperator, self).__init__(*args, **kwargs)
self.api_url = api_url
self.token = token
def execute(self, context):
repos = context[‘task_instance’].xcom_pull(task_ids=’fetch_github_data’)
headers = {“Accept”: “application/vnd.github.v3+json”}
if self.token:
headers[“Authorization”] = f”Bearer {self.token}”
session = requests.Session()
session.headers.update(headers)
language_counts = {}
for repo in repos:
languages_url = repo.get(‘languages_url’)
if not languages_url:
continue # Skip repos without a languages URL
try:
response = session.get(languages_url)
response.raise_for_status()
languages_data = response.json()
for language in languages_data.keys():
if language in language_counts:
language_counts[language] += 1
else:
language_counts[language] = 1
except requests.exceptions.HTTPError as error:
if error.response.status_code == 403:
logging.warning(f”Skipping repository due to HTTP 403 Forbidden: {languages_url}”)
continue
else:
raise
# Output the results
for lang, count in language_counts.items():
logging.info(f”{lang} repositories count: {count}”)
return language_counts
Please check API‘s documentation and limitations.
Step 2: Create the Custom Package
Follow steps below to create wheel package in Python
you have to have folder hierarchy:
in the setup file add the package folder name like so:
from setuptools import setup, find_packages
setup(
name=”custom_operators”,
version=”0.1.0″,
package_dir={“”: “src”},
packages=find_packages(where=”src”),
install_requires=[
# List your dependencies here, e.g., ‘numpy’, ‘pandas’
],
classifiers=[
“Programming Language :: Python :: 3”,
“License :: OSI Approved :: MIT License”,
“Operating System :: OS Independent”,
],
python_requires=’>=3.6′,
)
in CMD, run this command to create the wheel package:
pip install setuptools wheelpython setup.py sdist bdist_wheel
This command will create a source distribution and a wheel for your package. The wheel file (.whl) will be stored in a newly created dist/ directory under custom_operators folder.
Step 3: Building Airflow DAG
Now that we have build our custom operators and created the wheel package, now we need to create a dag that will trigger these operators.
for that i created 2 tasks, fetch_github_data and count_languages.
each will call the operators above
from airflow import DAG
from datetime import datetime, timedelta
from custom_operators.github_operators import GitHubAPIReaderOperator,CountLanguagesOperator
default_args = {
‘owner’: ‘airflow’,
‘start_date’: datetime(2023, 1, 1),
‘depends_on_past’: False,
’email_on_failure’: False,
’email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(minutes=5),
}
dag = DAG(
‘github_language_analysis’,
default_args=default_args,
description=’Analyze GitHub repos for language usage’,
schedule_interval=timedelta(days=1),
)
fetch_github_data = GitHubAPIReaderOperator(
task_id=’fetch_github_data’,
api_url=’https://api.github.com/repositories’,
max_pages=10,
token=’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’, # Replace with your actual token
dag=dag
)
count_languages = CountLanguagesOperator(
task_id=’count_languages’,
api_url=’https://api.github.com’,
token=’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’, # Replace with your actual token
dag=dag
)
fetch_github_data >> count_languages
Step 4: Run DAG in ADF Data orchestration manager
Now, we built our DAG and our custom package.
in order to run it in ADF.
1. Create managed Airflow instance in ADF following MS docs.
2. in ADLS workspace, create the folder hierarchy as the following:
In the requirements file, include the path to the custom package stored in your ADLS storage account as follows:
/opt/airflow/dags/custom_operators-0.1.0-py3-none-any.whl
3. In the ADF workspace, click on “Import files.” Navigate to your ADLS storage account, locate the “Airflow” folder, and check the “Import requirements” checkbox.
it will take a few minutes till ADF orchestration manager will update the code and the custom package.
Step 5: Logs and Monitoring
After importing the files, click on the “Monitor” button in the Data Orchestration Manager to view task execution and export Airflow logs. This will open the Airflow UI.
DAG:
Logs in count_languages task :
P.S: For more dynamic work, you can save the languages count as a JSON file and store it in your storage account.
Links:
Install a Private package – Azure Data Factory | Microsoft Learn
How does Workflow Orchestration Manager work? – Azure Data Factory | Microsoft Learn
airflow.operators.python — Airflow Documentation (apache.org)
airflow.providers.microsoft.azure — apache-airflow-providers-microsoft-azure Documentation
Call to Action:
– Make sure to establish all connections before starting to work on managed airflow.
– check MS documentation on Workflow Orchestration Manager.
– Please help us improve by sharing your valuable Workflow Orchestration Manager Preview feedback by emailing us at ManagedAirflow@microsoft.com
– Follow me on LinkedIn: Sally Dabbah | LinkedIn
Microsoft Tech Community – Latest Blogs –Read More