Azure Functions: Support for HTTP Streams in Python is now in Preview!
HTTP streams lets you accept and return data from your HTTP endpoints using FastAPI request and response APIs enabled in your functions. These APIs lets the host process large data in HTTP messages as chunks instead of reading an entire message into memory.
This feature makes it possible to handle large data stream, OpenAI integrations, deliver dynamic content, and support other core HTTP scenarios requiring real-time interactions over HTTP. You can also use FastAPI response types with HTTP streams. Without HTTP streams, the size of your HTTP requests and responses are limited by memory restrictions that can be encountered when processing entire message payloads all in memory.
To get started, the following prerequisites are required:
Azure Functions runtime version 4.34.1, or a later version.
Python version 3.8, or a later supported version.
Python v2 programming model
Then, enable HTTP streaming in your Azure Function app. HTTP streams are disabled by default. You need to enable this feature in your application settings and also update your code to use the FastAPI package.
Add the azurefunctions-extensions-http-fastapi extension package to the requirements.txt file in the project.
Add the following code to the function_app.py file in the project, which imports the FastAPI extension:
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
When deploying, add these application settings: “PYTHON_ISOLATE_WORKER_DEPENDENCIES”: “1” “PYTHON_ENABLE_INIT_INDEXING”: “1”
When running locally, you also need to add these same settings to the local.settings.json project file.
Following are a few example code snippets on using HTTP streams with Azure Functions in Python.
This example is an HTTP triggered function that streams HTTP response data. You might use these capabilities to support scenarios like sending event data through a pipeline for real time visualization or detecting anomalies in large sets of data and providing instant notifications.
import time
import azure.functions as func
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
def generate_count():
“””Generate a stream of chronological numbers.”””
count = 0
while True:
yield f”counting, {count}nn”
count += 1
@app.route(route=”stream”, methods=[func.HttpMethod.GET])
async def stream_count(req: Request) -> StreamingResponse:
“””Endpoint to stream of chronological numbers.”””
return StreamingResponse(generate_count(), media_type=”text/event-stream”)
This example is an HTTP triggered function that receives and processes streaming data from a client in real time. It demonstrates streaming upload capabilities that can be helpful for scenarios like processing continuous data streams and handling event data from IoT devices.
import azure.functions as func
from azurefunctions.extensions.http.fastapi import JSONResponse, Request
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.route(route=”streaming_upload”, methods=[func.HttpMethod.POST])
async def streaming_upload(req: Request) -> JSONResponse:
“””Handle streaming upload requests.”””
# Process each chunk of data as it arrives
async for chunk in req.stream():
process_data_chunk(chunk)
# Once all data is received, return a JSON response indicating successful processing
return JSONResponse({“status”: “Data uploaded and processed successfully”})
def process_data_chunk(chunk: bytes):
“””Process each data chunk.”””
# Add custom processing logic here
pass
Note, you must use an HTTP client library to make streaming calls to a function’s FastAPI endpoints. The client tool or browser you’re using might not natively support streaming or could only return the first chunk of data. You can use a client script like this to send streaming data to an HTTP endpoint.
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os
# Azure Function App
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
endpoint = os.environ[“AZURE_OPEN_AI_ENDPOINT”]
api_key = os.environ[“AZURE_OPEN_AI_API_KEY”]
# Azure Open AI
deployment = os.environ[“AZURE_OPEN_AI_DEPLOYMENT_MODEL”]
temperature = 0.7
client = openai.AsyncAzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version=”2023-09-01-preview”
)
# Get data from Azure Open AI
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content: # Get remaining generated response if applicable
await asyncio.sleep(0.1)
yield delta.content
# HTTP streaming Azure Function
@app.route(route=”stream-cities”, methods=[func.HttpMethod.GET])
async def stream_openai_text(req: Request) -> StreamingResponse:
prompt = “List the 100 most populous cities in the United States.”
azure_open_ai_response = await client.chat.completions.create(
model=deployment,
temperature=temperature,
max_tokens=1000,
messages=[{“role”: “user”, “content”: prompt}],
stream=True
)
return StreamingResponse(stream_processor(azure_open_ai_response), media_type=”text/event-stream”)
Microsoft Tech Community – Latest Blogs –Read More