Unveiling Generative AI Bulk Processing and Ingestion Pattern
Generative and embeddings models have taken the world by storm in recent years, producing high-quality natural language responses for various tasks and domains. Organizations, start-ups, and innovators across the world have been exploring the applications of this capability through prototyping, small-scale proof of concepts, and influencing text outputs through prompt engineering. As they gain more understanding of generative AI concepts such as context length, tokens, embeddings, and attention, as well as methods to avoid hallucinations, they discover new use cases and opportunities for leveraging this technology. One of the most popular and widely applicable use cases is search-based answer generation, which can enhance the user experience and satisfaction for any industry that relies on information retrieval and query answering.
This technical blog describes a pattern for bulk processing and ingestion with generative AI, which can help organizations grow their generative AI solutions from prototype and proof of concept phases to pilot and production workloads. The described bulk processing and ingestion pattern leverages the parallel and distributed computing capabilities of Azure platform to generate and store large volumes of natural language responses based on a given set of PDF documents. The blog also provides a walkthrough of the sample code made available on the Github repository, which implements this pattern using Azure OpenAI, Azure Functions, Azure Queue Storage, Azure Document Intelligence, Azure CosmosDB and potentially Azure AI Search. The code can be easily customized and adapted to different generative AI models and use cases, such as knowledgebase population for search-based answer generation.
Architecture Overview
Figure 1: Key components and execution flow
The solution takes advantage of the event-driven processing paradigm. As soon as the input documents are uploaded to a designated Azure Storage location, they trigger an event that invokes Azure Functions written in Python to extract text from the documents. The extracted text is then split into smaller segments of text (based on a configurable token count value) and concatenated into larger segments (also configurable). This approach allows for flexibility in generating text at different levels of granularity, depending on the context length supported by the large language models (LLMs).
The concatenated segments are then processed in parallel, making API calls to one or more Azure OpenAI endpoints to apply a default prompt predefined in the Azure CosmosDB database. The prompt to be applied can be customized at the document level by adding its id to the blob metadata. If the document blob has a prompt_id metadata tag that points to a user-defined prompt value available in the Azure CosmosDB, then that prompt will be used. Otherwise, the default prompt (also configurable) will be applied. All intermediate outputs are stored in Azure Storage for traceability back to the source document. The text can also be vectorized and indexed by an AI Search service by cloning and modifying the Run LLM Prompt function. The vectorization step is not implemented in sample code.
Code components from different publicly available accelerators made by internal teams are reused in this solution. We thank them for their work.
Flow Details
Incremental offload (Step 1)
When processing large sets of documents, it is a best practice to split the work in small batches of documents and have them processed one set of documents at a time. You can build a simple pipeline that fetches and offloads the documents batch by batch into designated storage account container. Next batch can be added upon successful completion of previous. Alternatively, there could be use cases where you can offer upload functionality directly to users through a web application.
Queued for processing (Steps 2 and 3a)
Each arriving document triggers the Azure Function that queues the document to be submitted to Document Intelligence endpoint. The code implementation of this pattern currently supports processing of PDF files and can be easily extended to handle other file types.
Submitted to Document Intelligence (Steps 4, 5 and 6)
As the next step, the BLOB URL of the document is appended with a SAS token and submitted to Document Intelligence endpoint. The Document Intelligence API returns a unique identifier upon accepting the document for extracting the text. This unique identifier is passed to the next queue to keep polling the Document Intelligence endpoint until it has processed the document.
Verify that Document Intelligence processing is completed and chunk the text (Steps 7, 8 and 9)
The time to analyze a document depends on the size (for example, number of pages) and associated content on each page. This step keeps polling Document Intelligence until the status code 200 is received. Otherwise requeue the message to trigger a new instance of the check without looping inside the code. This is a powerful feature that queues offer. Once the status code 200 is received, the text response is chunked and details of the merged chunks are passed to the next que to apply prompt (or vectorize and ingest into AI Search) or any other task that you would like to accomplish as part of the flow.
Figure 2: Chunks queue message structure
Apply LLM prompt (Steps 12, 13, 14)
Each merged chunk is prefixed with system message (configurable) and default (or user defined) prompt retrieved from the Azure CosmosDB which acts as a metadata and information logging database. The updated prompt is passed to Azure OpenAI chat completion endpoint. The response from Azure OpenAI endpoint is saved into both Azure Storage and Azure CosmosDB. In case your use case requires few shot examples to be passed per prompt, you may store these few shot examples in the Azure CosmosDB and fetch them dynamically along with the prompt to be given to a large language model.
(Alternatively) Vectorize and ingest into AI Search (Steps 15, 16, 17, 18)
You could easily clone the Apply LLM prompt function to achieve embedding generation for the merged chunk and store each chunk as an AI Search document along with reference to the source document.
Intermediate outputs
Saving the outputs produced by each flow step is a best practice when a flow is broken into multiple steps. These outputs can be handy to re-run part of the flow by rehydrating the storage queue with messages. The provided sample code that implements this pattern stores all intermediate outputs in the Azure Storage.
Figure 3: After text is extracted by Document Intelligence API
Figure 4: Chunks are created
Figure 5: Prompt output
Addressing request throttling
The sample code in Github demonstrates use of multiple (you may also use a single) endpoints to distributes the request across Azure Document Intelligence and Azure OpenAI endpoints. Please see below additional considerations for mitigating the request throttling.
Azure Document Intelligence
Implement retry logic in your application.
If you find that you’re being throttled on the number of POST requests, consider adding a delay between the requests.
Increase the workload gradually. Avoid sharp changes.
Create a support request to increase transactions per second (TPS) limit.
Azure OpenAI
Implement client-side retry logic to wait the retry-after-ms time and retry.
Consider redirecting the traffic to other models, deployments.
Move quota from another deployment, if necessary.
Please review references section of this article for additional information links to product documentation.
Monitoring the progress
A detailed process log is maintained in the Azure CosmosDB at the granularity of each merged chunk. You can review the process logs and stay on top of the processing status by using NoSQL API for Azure CosmosDB. Sample queries are show below, you may write additional queries. For instance chunks that may have encountered error or were skipped and the supporting error message to perform further investigations.
Figure 6: View chunk progress log
Figure 7: LLM output saved in the Azure CosmosDB
Execution Times
Please see below the execution time for the test dataset and setup combinations in our sandbox environment. With each test we added more files and increased the infrastructure configuration to confirm the scaling aspect of this pattern. These numbers may vary based on your workload and implementation details – whether you are applying an LLM prompt to chunks or generating embeddings or doing both, chunk size, the output token size you have selected, the LLM model, etc.
References
GitHub Code sample – GenAI-Batch-Ingester
Azure OpenAI Service REST API reference
Azure OpenAI – Staying within rate limits
Azure OpenAI – PTU
Document Intelligence API
Document Intelligence – Mitigating throttling
Azure Queue storage trigger and bindings for Azure Functions
Microsoft Tech Community – Latest Blogs –Read More