Category: Other
Securing GenAI: Safeguarding LLM-Powered Applications with Aqua
In the rapidly evolving world of artificial intelligence, the rise of Generative AI (GenAI) has sparked a revolution in how we interact with and leverage this technology. GenAI is based on large language models (LLMs) that have demonstrated remarkable capabilities, from generating human-like text to powering conversational interfaces and automating complex tasks.
In the rapidly evolving world of artificial intelligence, the rise of Generative AI (GenAI) has sparked a revolution in how we interact with and leverage this technology. GenAI is based on large language models (LLMs) that have demonstrated remarkable capabilities, from generating human-like text to powering conversational interfaces and automating complex tasks. Read More
Setting Sail: Keeping a Weathered Eye on the Horizon of Cloud Security
As I hoist the sail on this new journey with Aqua, I was asked why did I join? Why am I thrilled to be part of this organization and what is it about Aqua’s approach to safeguarding cloud native systems that resonates with me? For close to 20 years I have experienced building, transforming, and leading go-to-market teams within software and SAAS companies, but it was Aqua’s commitment to solving the intricacies of cloud native security that made my decision to join clear.
As I hoist the sail on this new journey with Aqua, I was asked why did I join? Why am I thrilled to be part of this organization and what is it about Aqua’s approach to safeguarding cloud native systems that resonates with me? For close to 20 years I have experienced building, transforming, and leading go-to-market teams within software and SAAS companies, but it was Aqua’s commitment to solving the intricacies of cloud native security that made my decision to join clear. Read More
Discover Cloud Security Issues Faster with Event-based Scanning
In today’s cloud security landscape, the challenge of keeping pace with evolving threats is daunting for security practitioners. Meanwhile, malicious actors operate at lightning speed, often breaching organizations and extracting valuable data within minutes, if not seconds. Imagine what they could accomplish in 24 hours — can any modern organization afford such a gap in their cloud defenses?
In today’s cloud security landscape, the challenge of keeping pace with evolving threats is daunting for security practitioners. Meanwhile, malicious actors operate at lightning speed, often breaching organizations and extracting valuable data within minutes, if not seconds. Imagine what they could accomplish in 24 hours — can any modern organization afford such a gap in their cloud defenses? Read More
Elevating Security: Protecting Containerized Workloads on Mainframes
Mainframes and containers represent two distinct yet impactful elements in today’s modern computing architecture. Mainframes are powerful, centralized computing systems known for their reliability, scalability, and ability to handle massive workloads. While containers, on the other hand, have emerged as a lightweight and flexible solution for deploying, scaling, and managing cloud native applications across various environments.
Mainframes and containers represent two distinct yet impactful elements in today’s modern computing architecture. Mainframes are powerful, centralized computing systems known for their reliability, scalability, and ability to handle massive workloads. While containers, on the other hand, have emerged as a lightweight and flexible solution for deploying, scaling, and managing cloud native applications across various environments. Read More
Aqua: Leading the Charge in Container Security Innovation
Containerization has revolutionized application development and deployment, providing enterprises with enhanced speed, agility, and consistency across different computing environments. However, this transformation introduces complexities, particularly regarding security. With the increasing adoption of containerized applications, there’s a growing demand for specialized security solutions tailored to address these unique challenges.
Containerization has revolutionized application development and deployment, providing enterprises with enhanced speed, agility, and consistency across different computing environments. However, this transformation introduces complexities, particularly regarding security. With the increasing adoption of containerized applications, there’s a growing demand for specialized security solutions tailored to address these unique challenges.Read More
Cloud Workload Security: Aqua Shines in GigaOm’s Radar Report
Cloud workload security (CWS) plays a critical role in safeguarding the integrity and resilience of cloud-native workloads. Defined through the lens of GigaOm, CWS primarily revolves around fortifying the dynamic and continuously evolving workloads within cloud environments. Its significance lies in bolstering security measures by pinpointing vulnerabilities, ensuring adherence to compliance standards, and swiftly mitigating potential threats.
Cloud workload security (CWS) plays a critical role in safeguarding the integrity and resilience of cloud-native workloads. Defined through the lens of GigaOm, CWS primarily revolves around fortifying the dynamic and continuously evolving workloads within cloud environments. Its significance lies in bolstering security measures by pinpointing vulnerabilities, ensuring adherence to compliance standards, and swiftly mitigating potential threats.Read More
CVE-2024-3094: Newly Discovered Backdoor in XZ tools
The xz-utils is a popular compression tool used widely across Linux systems, indicating its critical role in the software ecosystem. The xz-utils backdoor, discovered on March 29, 2024, exposes systems to potential backdoor access and remote code execution. It specifically targets versions 5.6.0 and 5.6.1 of xz-utils on systems using glibc, systemd, and patched OpenSSH. Users are urged to immediately stop usage and downgrade to xz-5.4.x.
The xz-utils is a popular compression tool used widely across Linux systems, indicating its critical role in the software ecosystem. The xz-utils backdoor, discovered on March 29, 2024, exposes systems to potential backdoor access and remote code execution. It specifically targets versions 5.6.0 and 5.6.1 of xz-utils on systems using glibc, systemd, and patched OpenSSH. Users are urged to immediately stop usage and downgrade to xz-5.4.x.Read More
Empowering Security: Bridging Talk to Action at RSA 2024
As we gear up for another exciting RSA Conference, it’s time to take stock of what’s making waves in the world of cybersecurity. Sure, we all know that RSA is the go-to event for all things security, but what’s the buzz this year? What are the hot topics? What’s really new?
As we gear up for another exciting RSA Conference, it’s time to take stock of what’s making waves in the world of cybersecurity. Sure, we all know that RSA is the go-to event for all things security, but what’s the buzz this year? What are the hot topics? What’s really new?Read More
Improving Efficiency and Reducing Runtime Using S3 Read Optimization
By Bhalchandra Pandit | Software Engineer
Overview
We describe a novel approach we took to improving S3 read throughput and how we used it to improve the efficiency of our production jobs. The results have been very encouraging. A standalone benchmark showed a 12x improvement in S3 read throughput (from 21 MB/s to 269 MB/s). Increased throughput allowed our production jobs to finish sooner. As a result, we saw 22% reduction in vcore-hours, 23% reduction in memory-hours, and similar reduction in run time of a typical production job. Although we are happy with the results, we are exploring additional enhancements in the future. They are briefly described at the end of this blog.
Motivation
We process petabytes of data stored on Amazon S3 every day. If we inspect the relevant metrics of our MapReduce/Cascading/Scalding jobs, one thing stands out: slower than expected mapper speed. In most cases, the observed mapper speed is around 5–7 MB/sec. That speed is orders of magnitude slower compared to the observed throughput of commands such as aws s3 cp, where speeds of around 200+ MB/sec are common (observed on a c5.4xlarge instance in EC2). If we can increase the speed at which our jobs read data, our jobs will finish sooner and save us considerable time and money in the process. Given that processing is costly, these savings can add up quickly to a substantial amount.
S3 read optimization
The Problem: Throughput bottleneck in S3A
If we inspect implementation of the S3AInputStream, it is easy to notice the following potential areas of improvement:
Single threaded reads: Data is read synchronously on a single thread which results in jobs spending most of the time waiting for data to be read over the network.
Multiple unnecessary reopens: The S3 input stream is not seekable. A split has to be closed and reopened repeatedly each time one performs a seek or encounters a read error. The larger the split, the greater the chance of it happening. Each such reopening further slows down the overall throughput.
The Solution: Improving read throughput
Architecture
Figure 1: Components of a prefetching+caching S3 reader
Our approach to addressing the above-mentioned drawbacks includes the following:
We treat a split to be made up of fixed sized blocks. The size defaults to 8 MB but is configurable.
Each block is read asynchronously into memory before it can be accessed by a caller. The size of the prefetch cache (in terms of number of blocks) is configurable.
A caller can only access a block that has already been prefetched into memory. That delinks a client from network flakiness and allows us to have an additional retry layer to increase the overall resiliency.
Each time we encounter a seek outside of the current block, we cache the prefetched blocks in the local file system.
We further enhanced the implementation to make it a mostly lock-free producer-consumer interaction. This enhancement improves read throughput from 20 MB/sec to 269 MB/sec as measured by a standalone benchmark (see details below in Figure 2).
Sequential reads
Any data consumer that processes data sequentially (for example, a mapper) greatly benefits from this approach. While a mapper is processing currently retrieved data, data next in sequence is being prefetched asynchronously. Most of the time, data has already been pre-fetched by the time the mapper is ready for the next block. That results in a mapper spending more time doing useful work and less time waiting for data, thereby effectively increasing CPU utilization.
More efficient Parquet reads
Parquet files require non-sequential access as dictated by their on-disk format. Our initial implementation did not use a local cache. Each time there was a seek outside of the current block, we had to discard any prefetched data. That resulted in worse performance compared to the stock reader when it came to reading from Parquet files.
We observed significant improvement in the read throughput for Parquet files once we introduced the local caching of prefetched data. Currently, our implementation increases Parquet file reading throughput by 5x compared to the stock reader.
Improvement in production jobs
Improved read throughput leads to a number of efficiency improvements in production jobs.
Reduced job runtime
The overall runtime of a job is reduced because mappers spend less time waiting for data and finish sooner.
Potentially reduced number of mappers
If mappers take sufficiently less time to finish, we are able to reduce the number of mappers by increasing the split size. Such reduction in the number of mappers leads to reduced CPU wastage associated with fixed overhead of each mapper. More importantly, it can be done without increasing the run time of a job.
Improved CPU utilization
The overall CPU utilization increases because the mappers are doing the same work in less time.
Results
For now, our implementation (S3E) is in a separate git repository to allow faster iterations over enhancements. We will eventually contribute it back to the community by merging it back into S3A.
Standalone benchmark
Figure 2: Throughput of S3A vs S3E
In each case, we read a 3.5 GB S3 file sequentially and wrote it locally to a temp file. The latter part is used to simulate IO overlap that takes place during a mapper operation. The benchmark was run on a c5.9xlarge instance in EC2. We measured the total time taken to read the file and compute the effective throughput of each method.
Production run
We tested many large production jobs with the S3E implementation. Those jobs typically use tens of thousands of vcores per run. In Figure 3, we present a summary of comparison between metrics obtained with and without S3E enabled.
Measuring resource savings
We use the following method to compute resource savings resulting from this optimization.
Observed results
Figure 3: Comparison of MapReduce job resource consumption
Given the variation in the workload characteristics across production jobs, we saw vcore reduction anywhere between 6% and 45% across 30 of our most expensive jobs. The average saving was a 16% reduction in vcore days.
One thing that is attractive about our approach is that it can be enabled for a job without requiring any change to a job’s code.
Future direction
At present, we have added the enhanced implementation to a separate git repository. In the future, we would likely update the existing S3A implementation and contribute back to the community.
We are in the process of rolling out this optimization across a number of our clusters. We will publish the results in a future blog.
Given that the core implementation of S3E input stream does not depend on any Hadoop code, we can use it in any other system where large amounts of S3 data is accessed. Currently we are using this optimization to target MapReduce, Cascading, and Scalding jobs. However, we have also seen very encouraging results with Spark and Spark SQL in our preliminary evaluation.
The current implementation can use further tuning to improve its efficiency. It is also worth exploring if we can use past execution data to automatically tune the block size and the prefetch cache size used for each job.
To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.
StackShare | Tech stack deep dives from top startups and engineering teams
Faster Flink Adoption with Self-Service Diagnosis Tool at Pinterest
Fanshu Jiang & Lu Niu | Software Engineers, Stream Processing Platform Team
At Pinterest, stream data processing powers a wide range of real-time use cases. In recent years, the platform powered by Flink has proven to be of great value to the business by providing near real-time content activation and metrics reporting, with the potential to unlock more use cases in the future. However, to take advantage of that potential, we needed to address the issue of developer velocity.
It can take weeks to go from writing the first line of code to a stable data flow in production. Troubleshooting and tuning Flink jobs can be particularly time-consuming, due to the number of logs and metrics to investigate and the variety of configs available to tune. Sometimes, it requires a deep understanding of Flink internals to find the root cause of issues during development. This can not only affect developer velocity and create a subpar Flink onboarding experience, but also requires significant platform support, causing restrictions to scalability of streaming use cases.
To make investigation easier and faster, we built out a Flink diagnosis tool, DrSquirrel, to surface and aggregate job symptoms, provide insights into the root cause, and suggest a solution with actionable steps. The tool has resulted in significant productivity gains for developers and the platform team since its release.
What is challenging about Flink job troubleshooting?
Massive pool of scattered logs and metrics, only a few of which matter
For troubleshooting, engineers usually:
scroll through a wall of JM/TM logs from YARN UI
check dozens of job/server metric dashboards
search and verify job configs
click through the Flink Web UI job DAG to find details like checkpoint alignment, data skew and backpressure
However 90% of the stats we spend time on are either benign or simply unrelated to the root cause. Having a one-stop-shop that aggregates only useful information and surfaces only what matters to troubleshooting saves enormous amounts of time.
Here are the bad metrics, now what?
This is a commonly asked question once stakeholders identify bad metrics, because more reasoning is required to get the root cause. For example, checkpoint timeout could mean incorrect timeout configuration, but also could be a consequence of backpressure, slow s3 upload, bad GC, or data skew; Lost TaskManager logs could mean bad node, but oftentimes is a result of either heap or RocksDB statebackend OOM. It takes time to understand all that reasoning and thoroughly verify each possible cause. However, 80% of the issue-fixing follows a pattern. This made us wonder — as a platform team, should we analyze the stats programmatically and tell stakeholders what to tune without having them do the reasoning?
Troubleshooting doc is far from enough
We provide a troubleshooting doc to customers. However, with the growing number of troubleshooting use cases, the doc is getting too long to quickly spot the relevant diagnosis and instructions for an issue. Engineers also have to manually apply if-else diagnosis logic to determine the root cause. This has added much friction to self-serve diagnosis, and the reliance on the platform team for troubleshooting remains. Besides, the doc is not great at call-to-action whenever the platform pushes a new job health requirement. We realize that a better tool is needed to efficiently share troubleshooting takeaways and enforce cluster-wise job health requirements.
Dr. Squirrel, a self-service diagnosis tool for troubleshooting
Given the above challenges, we built out DrSquirrel — a diagnosis tool for fast issue detection and troubleshooting guidance designed to:
cut down the troubleshooting time from hours to minutes
reduce the tools developers need for investigations from many to one; and
lower the required Flink internal knowledge for troubleshooting from intermediate to little
In a nutshell, we aggregate useful information in one place, perform job health checks, flag unhealthy ones explicitly, and provide root cause analysis and actionable steps to help fix the issues. Let’s take a look at some feature highlights.
More efficient ways to view logs
For each job run, Dr. Squirrel highlights exceptions that directly trigger restarts (i.e. TaskManager lost, OOM) to help quickly find the relevant exceptions to focus on from a massive pool of logs. It also collects all warnings, errors, and info logs that contain a stack trace in separate sections. For each log, Dr. Squirrel checks the content to see if an error keyword can be found, then provides a link to our step-by-step solution in the troubleshooting guide.
Dr.Squirrel suggestion
All logs are searchable using the search bar. On top of that, Dr. Squirrel provides two ways to view logs more efficiently — Timeline view and Unique exception view. As shown below, the Timeline view allows you to view logs chronologically with class name and pre-populated ElasticSearch link if more details are needed.
Timeline view of logs
With one click, we can switch to the Unique Exception view, where the same exceptions are grouped in one row with metadata such as first, last, and total occurrence. This simplifies the process of identifying the most frequent exceptions.
Unique exception view
Job health at a glance
Dr. Squirrel provides a health check page that enables engineers, whether beginners or experts, to tell confidently whether the job is healthy. Instead of showing plain metric dashboards, Dr. Squirrel monitors each metric for 1 hour and flags explicitly if it passes our platform stability requirements. This is an efficient and scalable way for the platform team to communicate and enforce what is considered stable.
The health check page consists of multiple sections, each focusing on a different aspect of job health. Quick browsing through these sections is all needed to get a good idea of the overall job health:
Basic Job Stats section monitors basic stats such as throughput, rate of full restarts, checkpoint size/duration, consecutive checkpoint failure, max parallelism over the past 1h. When metrics fail the health check, they are marked as Failed and ranked at the top.
Basic Job stats section
Backpressured Tasks tracks the backpressure situation of each operator at fine granularity. No backpressure within a minute is visualized as a green square, otherwise a red square. 60 squares for each operator, representing the backpressure situation of the past 1 hour. This makes it easy to identify how frequently backpressure happens and which operator starts the earliest.
Backpressured Task section
GC Old Gen Time section has the same visualization as backpressure to provide an overview of whether the GC is occurring too often and could potentially affect throughput or checkpoint. With the same visualization, it becomes obvious whether GC and backpressure happen at the same time and whether GC may potentially cause backpressure.
GC old gen section
JobManager/TaskManager Memory Usage tracks the YARN container memory usage, which is the resident set size (RSS) memory of the Flink Java process we collect through daemon running on the worker nodes. RSS memory is more accurate because it includes all sections in the Flink memory model as well as memory that’s not tracked by Flink, such as JVM process stack, threads metadata, or memory allocated from user code through JNI. We mark the configured max JM/TM memory in the graph, as well as 90% usage threshold to help users quickly spot which containers are close to OOM.
JM/TM memory graph
CPU% Usage section surfaces the containers that use more CPU capacity than the vcores they are assigned to. This helps monitor and avoid “Noisy neighbor” issues in the multi-tenant Hadoop cluster. Very high CPU% usage could result in one user’s workload impacting the performance and stability of another user’s workload.
CPU% usage section
Effective configurations
Flink jobs can be configured at different levels, such as in-code configurations at execution level, job properties file, command line arguments at client level and flink-conf.yaml at system level. It’s not uncommon for engineers to configure the same parameter at different levels for testing or hotfixing. With the override hierarchy, it is not obvious what value is eventually taking effect. To address this issue, we built a configuration library that figures out effective configuration values that the job is running with and surfaces these configurations to Dr. Squirrel.
Queryable cluster-wise job healthiness
Provided with abundant job stats, Dr. Squirrel becomes a resource center to learn cluster-wise job healthiness and find insights into platform improvements. For example, what are the top 10 restart root causes or what percentage of jobs run into memory issues or backpressure.
Architecture
As seen in the features above, metrics and logs are gathered all into one place. To collect them in a scalable way, we added a MetricReporter and KafkaLog4jAppender to our Flink custom build to continuously send metrics and logs to kafka topics. The KafkaLog4jAppender also serves to filter out logs that matter to us — warnings, errors, and info logs that come with a stacktrace. Following that is FlinkJobWatcher — a Flink job that joins metrics and logs that come from the same job after a series of parsing and transformation. FlinkJobWatcher then creates a snapshot of job health every 5 min and sends it to the JobSnapshot Kafka topic.
The growing number of Flink use cases have been introducing massive amounts of logs and metrics. FlinkJobWatcher as a Flink job handles the increasing data scale perfectly and keeps the throughput on par with the number of use cases with easy parallelism tuning.
Our Flink custom build
Once the JobSnapshot is available, more data needs to be fetched and merged into the JobSnapshot. For this purpose, we built a RESTful service using dropwizard that keeps reading from the JobSnapshot topic and pulls external data via RPC. The external data sources include YARN ResourceManager to get static data such as username and launch time, Flink REST API to get configurations, an internal tool called Automated Canary Analysis(ACA) to compare time series metrics against a threshold with fine-grained criteria, and a couple of other internal tools that allow us to surface custom metrics like RSS memory and CPU% usage, which are collected from a daemon running on the worker nodes. A nice UI is also built out with React to make job health easy to explore.
Dr. Squirrel web service
Future Work
We will continue improving Dr. Squirrel with better job diagnosis capability to help us move one step closer to fully self-serve onboarding:
Capacity planning: monitor and evaluate throughput, usage of memory and vcores to find the most efficient resource settings.
Integration with CICD: we are running a CICD pipeline to automatically verify and push changes from dev to prod. Dr.Squirrel will be integrated with CICD to provide more confidence about the job health situation as CICD pushes out new changes.
Alert & notification: notify job owner or platform team with a health report summary.
Per-job cost estimate: show cost estimate of each job based on resource usage for budget planning and awareness.
Acknowledgment
Shoutout to Hannah Chen, Nishant More, and Bo Sun for their contributions to this project. Many thanks to Ping-Min Lin for setting up the initial UI work and Teja Thotapalli for the infra setup on the SRE side. We also want to thank Ang Zhang, Chunyan Wang, Dave Burgess for their support and all our customer teams for providing valuable feedback and troubleshooting scenarios to help us make the tool powerful.
StackShare | Tech stack deep dives from top startups and engineering teams
Efficient Resource Management at Pinterest’s Batch Processing Platform
Yongjun Zhang | Software Engineer, Ang Zhang | Engineering Manager, Shaowen Wang | Software Engineer, Batch Processing Platform Team
Pinterest’s Batch Processing Platform, Monarch, runs most of the batch processing workflows of the company. At the scale shown in Table 1, it is important to manage the platform resources to provide quality of service (QoS) while achieving cost efficiency. This article shares how we do that and future work.
Table 1: Scale of Monarch Batch Processing Platform
Introduction of Monarch
Figure 1 shows what Pinterest’s data system looks like at a high level. When users are using Pinterest applications on their mobile or desktop devices, they generate various logs that are ingested to our system via Singer + Kafka (see Scalable and reliable data ingestion at Pinterest) and the resulting data is stored to S3. Then the data is processed and analyzed by various workflows like sanitization, analytics, and machine learning data preparation. The results of the workflows are typically stored back to S3. There are essentially two types of processing platforms: batch and streaming. This blog is about the batch processing platform named Monarch. See this blog for more information about the streaming platform.
As an in-house big data platform, Monarch provides the infrastructure, services, and tools to help users develop, build, deploy, and troubleshoot their batch processing applications (mostly in the form of workflows) at scale. Monarch consists of more than 20 Hadoop YARN clusters built entirely in the Cloud utilizing AWS EC2, and we use many different instance types offered by EC2. The actual EC2 instance type we employ at a cluster depends on its workload; some clusters are more optimized for computing, while others have more memory or disk capacity.
User workflows can be submitted to Monarch from Spinner (an internal workflow platform built on top of Airflow) and other UI based workflow orchestration tools via Job Submission Service, or JSS (see Figure 2). The user workflow source code typically specifies the cluster and queue in which the workflow should run.
Figure 1. Pinterest Data System and the Batch Processing Platform (Monarch).
Figure 2. Pinterest Job Submission Service. See more description in the text.
Resource Management Challenges
Hadoop YARN is used to manage the cluster resources and task scheduling. The cluster resources are represented as a tree of queues. All the resources of the cluster, or all the EC2 instances the cluster has, are represented as the “root” of the tree, and the leaf nodes of the tree are where applications run. The weight configuration of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the resources allocated to the parent. How much resource a child gets is based on the ratio of this child’s weight over the sum of the weights of all sibling nodes. By setting the node weight, we can control how many EC2 instances are assigned to any given queue. YARN supports multiple schedulers, and the Fair Scheduler is used in Monarch.
Figure 3. YARN’s resource allocation: Tree of Queues of Adhoc Structure.
The goal of using a tree of queues to represent resource allocation is to achieve resource isolation between workflows that run in different queues. However, Monarch initially didn’t have a consistent queue structure, as shown in Figure 3. Some queues were allocated to specific projects, some were for organizations, and others for workflows of a certain priority. As a result, there was severe interference between different workflows running in the same queue — more critical workflows were often impacted by non-critical ones.
There were mainly two reasons for interference:
Workflows running in the same queue are treated the same. With no notion of priority, the scheduler has no way to give more resources to more critical workflows.
There is a parameter maxRunningApps to control how many applications can concurrently run in a given queue. This prevents too many applications competing for resources, in which situation no application can make good progress. However, if lower priority workflows are submitted first and saturate the maxRunningApps, then critical workflows submitted later can be stuck for a long time without being scheduled.
To address these issues, we introduced workflow tiering and changed the resource allocation queues to be tier-ed accordingly.
Workflow Tiering and Hierarchical Queue Structure
The workloads on Monarch are typically in the form of workflows. Workflow is represented as a Directed Acyclic Graph (DAG) of multiple jobs to process input data and generate output. The jobs in the same workflow run in parallel or sequentially depending on whether there is dependency on each other. We took two main steps to provide QoS for workflows while achieving cost efficiency.
Firstly, we added tiering to distinguish critical workflows from non-critical ones. The critical workflows typically have higher requirements on the finishing time. We decided to classify workflows into three tiers: tier1, tier2, and tier3 (tier1 has the highest importance). Then we worked with user teams to define the tiering and runtime service level objective (SLO) of all workflows that run on the Monarch platform.
Secondly, we changed the resource queue structure across all clusters to have the notion of tiering, project, and organization. Given that each workflow is associated with a project, each project belongs to a team, and each team belongs to a larger organization, we decided to create a three level hierarchical queue structure: organization, project, and tier. See Figure 4 for an example (“default” is used in place of tier3, for historical reason).
Figure 4. Hierarchical Queues with Organization, Project and Tiering.
Some of the most important configurations of the queues are:
Weight: The weight of a queue determines the amount of resources allocated to it. Child nodes of the same parent node share the parent’s resources based on the relative ratio of their weights.
MaxRunningApps: The maximum number of applications that can run concurrently within the queue. This prevents from having too many applications running in the same queue of limited resources, meaning no applications can make good progress.
Preemption:
preemption: whether to enable preemption
fairSharePreemptionTimeout: number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues.
fairSharePreemptionThreshold: the fair share preemption threshold for the queue. If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues.
allowPreemptionFrom: determines whether the scheduler is allowed to preempt resources from the queue.
We configure tier1 queues to not allow preemption and also configure the other two parameters (fairSharePreemptionTimeout and fairSharePreemptionThreshold) to smaller values than for tier2 and tier3 queues. This allows tier1 queues to acquire resources faster when they are not getting their fair share of resources.
Because Monarch has many clusters, and the workflows running on different clusters could change from time to time, it’s not practical or efficient to manually create the queue structures. We developed a tool that analyzes the historical data of the workflows on the clusters, generates the queue structure, and updates the settings automatically and periodically.
Besides the preemption configuration described above, two of the most important configurations are the queue weight and maxRunningApps. In the next section, we will share more details on the algorithm we use to generate these settings.
Resource Allocation Algorithm
The workflows running in a queue have different requirements at different times. To ensure QoS of the critical workflows, we designed an algorithm to assign queue weight based on historical run data, namely, the Percentile Algorithm.
Figure 5. The Percentile Resource Allocation Algorithm.
The algorithm looks at the historical run data within the most recent time window, such as 30 days, to see how much resource is needed for a given queue. Below is what it does:
Step 1: The queue may be used at some times and may be vacant at other times. When the queue is being used, sometimes X EC2 instances are being used and sometimes Y EC2 instances are being used. The algorithm divides the time window into time units; each unit is a timespan that the same number of EC2 instances are used. The time unit is represented as <timeLength, instanceUsed>. (See the left side of Figure 5)
Step 2: Excluding the time units in which the queue is vacant, sort the time units by the number of instances used in the time unit (see the right side of Figure 5) from smallest to largest.
Step 3: Determine the minimum number of instances to allocate to the queue to make sure a pre-specified time length percentage threshold is met. This threshold means, given the total length of time units (TTIU) that the queue is in use, the allocated resource needs to be enough to satisfy the percentage of TTIU. For example, for a tier1 queue that is used for 240 hours in total within a 30-day window (vacant other times), we’d like to guarantee the resources for 95% of the time, thus it’s 228 hours. The algorithm finds out the number of instances being used at the sorted results from Step 2. For example, tu0 + tu4 + tu7 + tu2 is about 95% of the whole time length in use, then the number of instances used in tu2 is the number of instances to be allocated to this queue. If we were to allocate the number of instances used in tu5, which is larger than used in tu2, it would potentially cause waste because tu2 is only 5% of the whole time the queue is in use.
The 95% threshold above is just an example. We evaluated the resource usage of different tiers and came up with different thresholds based on the size of the clusters and resources used by those workflows. The thresholds are also adjusted from time to time when the percentages of resources used by different tiers change.
There are several reasons we don’t have to guarantee 100% of the resources required at the peak usage time of a given tier1/tier2 queue, thus avoiding waste.
The workflow tiering has a rough distribution such that ~10% workflows are tier1, 20–30% tier2, and 60–70% tier3.
Not all queues are busy at the same time, and the YARN scheduler allows workflows to use resources available at other queues.
Higher tier queues can preempt resources faster.
We measure the resource headroom of a queue by a metric called usage/capacity ratio. The capacity of a queue is the number of instances allocated to the queue times the length of the time window being measured. The usage is measured by YARN as instance-hours. E.g., if the queue uses X instances for Y hours, the resource usage is X * Y instance-hours. In addition, we also measure vcore-hours and memory-hours usage/capacity ratio in a similar fashion to see how balanced the vcore and memory resource usage is. Notice that YARN reported vcore-hours and memory-hours, and we use the dominant-resource (DR) method to calculate the instance-hours here.
The algorithm ensures the percentage threshold is set in a decreasing order from tier1 to tier3 queues, while it also ensures that the usage/capacity ratio is in an increasing order. This means the head room is the largest for tier1, second for tier2, and smallest for tier3.
The resource allocation algorithm also looks at historical run data to determine the maxRunningApps setting and sets this configuration with some headroom for each queue.
Comparing with Autoscaling
Autoscaling is another common approach to save cost in the Cloud, scaling up the cluster when needed and scaling down when peak demand has passed. Because Cloud providers normally charge much higher rates for on-demand capacity than reserved instances, users normally reserve the capacity that is always required and use on-demand instances for the autoscaling.
Autoscaling works well for online services at Pinterest, but we found it is not as cost efficient for batch processing for the following reasons:
Tasks from large scale batch processing can run for hours, and the two options to scale down the cluster are wasteful. Scaling down gracefully and waiting for running tasks to finish (i.e. draining the instances before terminating them) potentially wastes a significant amount of resources because the instances may not be fully utilized. Scaling down by terminating instances forcefully even when tasks are still running on them means unfinished computing is wasted (and longer runtime for the involved jobs) and extra resources are needed to rerun the terminated tasks.
In order for autoscaling using on-demand instances to make economical sense when compared with reserved instances, we estimated the percentage of time of peak consumption of the cluster using on-demand instances will need to be less than 30% for certain instance types. Considering the time it takes to scale down, the percentage would be a lot smaller. However, it’s hard to control this percentage, and resources can easily be wasted if the percentage goes higher.
At Pinterest’s big data processing scale, using autoscaling would require getting hundreds or more instances of desired instance types during peak hours, which is not always possible. Not getting enough resources to run critical workflows could affect the business in a significant way.
By utilizing the resource allocation algorithm described above and workflow tiering, we were able to utilize good reserved instance pricing while still guaranteeing enough resources for critical workflows when needed.
Please note that in this blog, we focus on production workflows, not adhoc workloads like Spark SQL queries from Querybook or PySpark jobs from Jupyter notebooks. On adhoc clusters, we do utilize autoscaling with Spot instances because the peak usage only lasts 2–3 hours on business days.
Workflow Performance Monitoring
When allocating resources for a workflow, the runtime SLO is an important factor to consider. For example, if the workflow uses X instances-hours resources, and the runtime SLO is 12 hours, then the number of instances needed to run this workflow is X / 12.
With the resource allocation being in effect, we need a way to monitor the overall workflow runtime performance. We developed a dashboard to show how each tier workflows are performing in various clusters.
Within a time window of a certain size, for any given workflow, if it is run for X times and Y runs meet SLO, its SLO success ratio is defined as Y/X. It’s ideal if this ratio is 100% for any given workflow, but it’s not feasible for many reasons. As a compromise, we define a workflow as SLO-successful if its SLO success ratio is no less than 90%.
As mentioned earlier, we classified workflows into three tiers. For workflows of each tier, we measure the percentage of workflows that are SLO-successful. Our goal is to have this percentage higher than 90%.
Figure 6 is a snapshot of the dashboard that measures the performance of the 30-day time window. Before the project, the tier1 workflow’s success percentage was around 70%. It has been improved to and stabilized around 90% now. While we try to make most tier1 workflows successful, the same metrics of other tiers are not sacrificed too much because they have less stringent SLO requirements.
Figure 6. Workflow performance monitoring: runtime SLO success ratio of each tier.
Cluster Resource Usage Monitoring
The workflow requirement is not static and may change from time to time. A daily report is done for each cluster on the following metrics:
Total, tier1, tier2, and tier3 usage/capacity ratio (including instance, vcore, memory)
Number of all tier1, tier2, and tier3 workflows running in the cluster (there may be new workflows onboarded, or re-tiering and SLO change of existing workflows)
Based on these metrics, we determine if the cluster is over or under utilized and take actions by either adding more resources to the cluster (organic growth), downsizing the cluster to save cost, or keeping it as is.
Cross-Cluster Routing And Load Balancing
As mentioned earlier, different workflows have different resource needs — some require more memory, some more CPU, and others more disk IO or storage. Their needs may change over time. Additionally, some clusters may become full while others are underutilized over time. Through monitoring resource consumption, we may find better home clusters for the workflows than their current ones. To ask users to change their source code to move the workflow is a tedious process, as we also have to adjust the resource allocation when we move the workflow.
We developed a cross-cluster routing (CCR) capability to change the target cluster of the workflows without the need of users to change settings. To implement this, we added instrumentation logic in the JSS component that can redirect jobs to another cluster as we need.
We also developed a workflow to periodically analyze the cluster usage and choose candidate workflows to move to other clusters to keep improving the load balancing and cost efficiency.
To enable redirecting jobs, we need to do resource allocation change on the target cluster with the above mentioned algorithm. To achieve this, we automated the resource allocation process such that with a single button click (triggering a workflow), it will do both resource allocation and configure job redirection in one step.
Current and Future Work
At the time of writing, our metrics indicate the vcore and memory usage of a fairly big cluster is not balanced, and a lot of vcores are wasted as a result. We are working on splitting this cluster into two clusters of different instance types with CCR support and migrating the workflows running on the original cluster into one of the resulting clusters. We expect with this change we will be able to not only run the applications more reliably, but also save a lot of cost.
Our clusters are located at different availability zones. When one zone has an issue, we can leverage the CCR feature to move critical workflows to another cluster in a different zone. We are working on making this process smoother.
We are also looking into dynamically route jobs at runtime to different clusters when the current load on the target cluster is full.
Acknowledgement
Thanks to Hengzhe Guo, Bogdan Pisica, Sandeep Kumar from the Batch Processing Platform team who helped further improve the implementations. Thanks to Soam Acharya, Jooseong Kim and Hannah Chen for driving the workflow tiering. Thanks to Jooseong Kim, William Tom, Soam Acharya, Chunyan Wang for the discussions and support along the way. Thanks to the workflow team, our platform user teams for their feedback and support.
StackShare | Tech stack deep dives from top startups and engineering teams
MemQ: An Efficient, Scalable Cloud Native PubSub System
By Ambud Sharma | Tech Lead and Engineering Manager, Logging Platform
The Logging Platform powers all data ingestion and transportation at Pinterest. At the heart of the Pinterest Logging Platform are Distributed PubSub systems that help our customers transport / buffer data and consume asynchronously.
In this blog we introduce MemQ (pronounced mem — queue), an efficient, scalable PubSub system developed for the cloud at Pinterest that has been powering Near Real-Time data transportation use cases for us since mid-2020 and complements Kafka while being up to 90% more cost efficient.
History
For nearly a decade, Pinterest has relied on Apache Kafka as the sole PubSub system. As Pinterest grew, so did the amount of data and the challenges around operating a very large scale distributed PubSub platform. Operating Apache Kafka at Scale gave us a great deal of insight on how to build a scalable PubSub system. Upon deep investigation of the operational and scalability challenges of our PubSub environment, we arrived at the following key takeaways:
Not every dataset needs sub-second latency service, latency and cost should be inversely proportional (lower latency should cost more)
Storage and Serving components of a PubSub system need to be separated to enable independent scalability based on resources.
Ordering on read instead of write provides required flexibility for specific consumer use cases (different applications can have different for same dataset)
Strict partition ordering is not necessary at Pinterest in most cases and often leads to scalability challenges.
Rebalancing in Kafka is expensive, often results in performance degradation, and has a negative impact for customers on a saturated cluster.
Running custom replication in a cloud environment is expensive.
In 2018, we experimented with a new type of PubSub system that would natively leverage the cloud. In 2019, we started formally exploring options on how to solve our PubSub scalability challenges and evaluated multiple PubSub technologies based on cost of operations as well as reengineering cost for existing technologies to meet the demands of Pinterest. We finally landed at the conclusion that we needed a PubSub technology that built on the learnings of Apache Kafka, Apache Pulsar, and Facebook LogDevice, and was built for the cloud.
MemQ is a new PubSub system that augments Kafka at Pinterest. It uses a decoupled storage and serving architecture similar to Apache Pulsar and Facebook Logdevice; however, it relies on a pluggable replicated storage layer i.e. Object Store / DFS / NFS for storing data. The net result is a PubSub system that:
Handles GB/s traffic
Independently scales, writes, and reads
Doesn’t require expensive rebalancing to handle traffic growth
Is 90% more cost effective than our Kafka footprint
Secret Sauce
The secret of MemQ is that it leverages micro-batching and immutable writes to create an architecture where the number of Input/output Operations Per Second (IOPS) necessary on the storage layer are dramatically reduced, allowing the cost effective use of a cloud native Object Store like Amazon S3. This approach is analogous to packet switching for networks (vs circuit switching, i.e. single large continuous storage of data such as kafka partition).
MemQ breaks the continuous stream of logs into blocks (objects), similar to ledgers in Pulsar but different in that they are written as objects and are immutable. The size of these “packets” / “objects,” known internally in MemQ as a Batch, play a role in determining the End-to-End (E2E) latency. The smaller the packets, the faster they can be written at the cost of more IOPS. MemQ therefore allows tunable E2E latency at the cost of higher IOPs. A key performance benefit of this architecture is enabling separation of read and write hardware dependending on the underlying storage layer, allowing writes and reads to scale independently as packets that can be spread across the storage layer.
This also eliminated the constraints experienced in Kafka where in order to recover a replica, a partition must be re-replicated from the beginning. In the case of MemQ, the underlying replicated storage only needs to recover the specific Batch whose replica counts were reduced due to faults in case of storage failures. However, since MemQ at Pinterest runs on Amazon S3, the recovery, sharding, and scaling of storage is handled by AWS without any manual intervention from Pinterest.
Components of MemQ
Client
MemQ client discovers the cluster using a seed node and then connects to the seed node to discover metadata and the Brokers hosting the TopicProcessors for a given Topic or, in case of the consumer, the address of the notification queue.
Broker
Similar to other PubSub systems, MemQ has the concept of a Broker. A MemQ Broker is a part of the cluster and is primarily responsible for handling metadata and write requests.
Note: read requests in MemQ can be handled directly by the Storage layer unless the read Brokers are used
Cluster Governor
The Governor is a leader in the MemQ cluster and is responsible for automated rebalancing and TopicProcessor assignments. Any Broker in the cluster can be elected a Governor, and it communicates with Brokers using Zookeeper, which is also used for Governor election.
The Governor makes assignment decisions using a pluggable assignment algorithm. The default one evaluates available capacity on a Broker to make allocation decisions. Governor also uses this capability to handle Broker failures and restore capacity for topics.
Topic & TopicProcessor
MemQ, similar to other PubSub systems, uses the logical concept of Topic. MemQ topics on a Broker are handled by a module called TopicProcessor. A Broker can host one or more TopicProcessors, where each TopicProcessor instance handles one topic. Topics have write and read partitions. The write partitions are used to create TopicProcessors (1:1 relation), and the read partitions are used to determine the level of parallelism needed by the consumer to process the data. The read partition count is equal to the number of partitions of the notification queue.
Storage
MemQ storage is made of two parts:
Replicated Storage (Object Store / DFS)
Notification Queue (Kafka, Pulsar, etc.)
1. Replicated Storage
MemQ allows for pluggable storage handlers. At present, we have implemented a storage handler for Amazon S3. Amazon S3 offers a cost effective solution for fault-tolerant, on-demand storage. The following prefix format on S3 is used by MemQ to create the high throughput and scalable storage layer:
s3://<bucketname>/<(a) 2 byte hash of first client request id in batch>/<(b) cluster>/topics/<topicname>
(a) = used for partitioning inside S3 to handle higher request rates if needed
(b) = name of MemQ cluster
Availability & Fault Tolerance
Since S3 is a highly available web scale object store, MemQ relies on its availability as the first line of defense. To accommodate for future S3 re-partitioning, MemQ adds a two-digit hex hash at the first level of prefix, creating 256 base prefixes that can, in theory, be handled by independent S3 partitions just to make it future proof.
Consistency
The consistency of the underlying storage layer determines the consistency characteristics of MemQ. In case of S3, every write (PUT) to S3 Standard is guaranteed to be replicated to at least three Availability Zones (AZs) before being acknowledged.
2. Notification Queue
The notification system is used by MemQ for delivering pointers to the consumer for the location of data. Currently, we use an external notification queue in the form of Kafka. Once data is written to the storage layer, the Storage handler generates a notification message recording the attributes of the write including its location, size, topic, etc. This information is used by the consumer to retrieve data (Batch) from the Storage layer. It’s also possible to enable MemQ Brokers to proxy Batches for consumers at the expense of efficiency. The notification queue also provides clustering / load balancing for the consumers.
MemQ Data Format
MemQ uses a custom storage / network transmission format for Messages and Batches.
The lowest unit of transmission in MemQ is called a LogMessage. This is similar to a Pulsar Message or Kafka ProducerRecord.
The wrappers on the LogMessage allow for the different levels of batching the MemQ does. Hierarchy of units:
Batch (unit of persistence)
Message (unit of producer upload)
LogMessage (unit application interactions with)
Producing Data
A MemQ producer is responsible for sending data to Brokers. It uses an async dispatch model allowing for non-blocking sends to happen without the need to wait on acknowledgements.
This model was critical in order to hide the upload latencies for the underlying storage layers while maintaining storage level acknowledgements. This leads to the implementation of a custom MemQ protocol and client, as we couldn’t use existing PubSub protocols, which relied on synchronous acknowledgements. MemQ supports three types of acks: ack=0 (producer fire & forget), ack=1 (Broker received), and ack=all (storage received). With ack=all, the replication factor (RF) is determined by the underlying storage layer (e.g. in S3 Standard RF=3 [across three AZs]). In case acknowledgement fails, the MemQ producers can explicitly or implicitly trigger retries.
Storing Data
The MemQ Topic Processor is conceptually a RingBuffer. This virtual ring is subdivided into Batches, which allows simplified writes. Messages are enqueued into the currently available Batch as they arrive over the network until either the Batch is filled or a time-based trigger happens. Once a Batch is finalized, it is handed to the StorageHandler for upload to the Storage layer (like S3). If the upload is successful, a notification is sent via the Notification Queue along with the acknowledgements (ack) for the individual Messages in the Batch to their respective Producers using the AckHandler if the producers requested acks.
Consuming Data
MemQ consumer allows applications to read data from MemQ. The consumer uses the Broker metadata APIs to discover pointers to the Notification Queue. We expose a poll-based interface to the application where each poll request returns an Iterator of LogMessages to allow reading all LogMessages in a Batch. These Batches are discovered using the Notification Queue and retrieved directly from the Storage layer.
Other Features
Data Loss Detection: Migrating workloads from Kafka to MemQ required strict validation on data loss. As a result, MemQ has a built in auditing system that enables efficiently tracking E2E delivery of each Message and publishing metrics in near real-time.
Batch & Streaming Unification: Since MemQ uses an externalized storage system, it enables the opportunity to provide support for running direct batch processing on raw MemQ data without needing to transform it to other formats. This allows users to perform ad hoc inspection on MemQ without major concerns around seek performance as long as the storage layer can separately scale reads and writes. Depending on the storage engine, MemQ consumers can perform concurrent fetches to enable much faster backfills for certain streaming cases.
Performance
Latency
MemQ supports both size and time-based flushes to the storage layer, enabling a hard limit on max tail latencies in addition to several optimizations to curb the jitter. So far we are able to achieve a p99 E2E latency of 30s with AWS S3 Storage and are actively working on improving MemQ latencies, which increases the number of use cases that can be migrated from Kafka to MemQ.
Cost
MemQ on S3 Standard has proven to be up to 90% cheaper (avg ~80%) than an equivalent Kafka deployment with three replicas across three AZs using i3 instances. These savings come from several factors like:
reduction in IOPS
removal of ordering constraints
decoupling of compute and storage
reduced replication cost due to elimination of compute hardware
relaxation of latency constraints
Scalability
MemQ with S3 scales on-demand depending on write and read throughput requirements. The MemQ Governor performs real-time rebalancing to ensure sufficient write capacity is available as long as compute can be provisioned. The Brokers scale linearly by adding additional Brokers and updating traffic capacity requirements. The read partitions are manually updated if the consumer requires additional parallelism to process the data.
At Pinterest, we run MemQ directly on EC2 and scale clusters depending on traffic and new use case requirements.
Future Work
We are actively working on the following areas:
Reducing E2E latencies (<5s) for MemQ to power more use cases
Enabling native integrations with Streaming & Batch systems
Key ordering on read
Conclusion
MemQ provides a flexible, low cost, cloud native approach to PubSub. MemQ today powers collection and transport of all ML training data at Pinterest. We are actively researching expanding it to other datasets and further optimizing latencies. In addition to solving PubSub, MemQ storage can expose the ability to use PubSub data for batch processing without major performance impacts enabling low latency batch processing.
Stay tuned for additional blogs about how we optimized MemQ internals to handle scalability challenges and the open source release of MemQ.
Acknowledgements
Building MemQ would not have been possible without the unwavering support of Dave Burgess and Chunyan Wang. Also a huge thanks to Ping-Min Lin who has been a key driver of bug fixes and performance optimizations in MemQ that enabled large scale production rollout.
Lastly thanks to Saurabh Joshi, Se Won Jang, Chen Chen, Divye Kapoor, Yiran Zhao, Shu Zhang and the Logging team for enabling MemQ rollouts.
StackShare | Tech stack deep dives from top startups and engineering teams
Pinterest Druid Holiday Load Testing
By Isabel Tallam | Senior Software Engineer; Jian Wang | Senior Software Engineer; Jiaqi Gu| Senior Software Engineer; Yi Yang | Senior Software Engineer; and Kapil Bajaj | Engineering Manager, Real-time Analytics team
Like many companies, Pinterest sees an increase in traffic in the last three months of the year. We need to make sure our systems are ready for this increase in traffic so we don’t run into any unexpected problems. This is especially important as Pinners come to Pinterest at this time for holiday planning and shopping. Therefore, we do a yearly exercise of testing our systems with additional load. During this time, we verify that our systems are able to handle the expected traffic increase. On Druid we look at several checks to verify:
Queries: We make sure the service is able to handle the expected increase in QPS while at the same time supporting the P99 Latency SLA our clients need.
Ingestion: We verify that the real-time ingestion is able to handle the increase in data.
Increase in Data size: We confirm that the storage system has sufficient capacity to handle the increased data volume.
In this post, we’ll provide details about how we run the holiday load test and verify Druid is able to handle the expected increases mentioned above.
Pinterest traffic increases as users look for inspiration for holidays.
How We Run Load Tests
As mentioned above, the areas our teams focus on are:
Can the system handle increased query traffic?
Can the system handle the increase in data ingestion?
Can the system handle the increase in data volume?
Can the System Handle Increased Query Traffic?
Testing query traffic and SLA is a main goal during holiday load testing. We have two different options for load testing in our Druid system. The first option generates queries based on the current data set in the Druid data and then runs these queries in Druid. The other option captures real production queries and re-runs these queries in Druid. Both of these options have their advantages and disadvantages.
Sample Versus Production Queries
The first option — using generated queries — is fairly simple to run anytime and does not require preparation like capturing queries. However, this type of testing may not accurately show how the system will behave in production scenarios. A real production query may look different and touch different data, query types, and timeframes than what is tested using generated queries. Additionally, any corner cases would be ignored in this type of testing.
The second option has the advantage of having real production queries that would be very similar to what we expect to see during any future traffic. The disadvantage here, however, is that setting up the tests is more involved, as production queries need to be captured and potentially need to be updated to match the new timeline when holiday testing is performed. In Druid, running the same query today versus one week from today may give different latency results, as data will move through different host stages in which data is supported by faster high-memory hosts in the first days/weeks versus slower disk stages for older data.
We decided to move ahead with real production queries because one of our priorities was to replicate production use cases as closely as possible. We made use of a Druid native feature that automatically logs any query that is being sent to a Druid broker host (broker hosts handle all the query work in a Druid cluster).
Test Environment Setup
Holiday testing is not done in the production environment, as this could adversely impact the production traffic. However, the test needs an environment setup as similar to the production environment as possible. Therefore, we created a copy of the production environment that is short-lived and solely used for testing. To test query traffic, the only stages required are brokers, historical stages, and coordinators. We have several tiers of historical stages in the production environment and we replicated the same setup in the test environment as well. We also made sure to use the same host machine types, configurations, pool size, etc.
The data we used for testing was copied over from production. We used a simple MySQL dump to create a copy of all the segments stored in the production environment. Once the dump is added to the MySQL instance in the test environment, the coordinator will automatically trigger the data to be replicated in the historical stages of the test environment.
Before initiating the copy, however, we needed to identify what data is required. This will depend on the client team and on the timeframe their queries request. In some cases, it may not be necessary to copy all data, but only the most recent days, weeks, or months.
Test environment is set up with the same configuration and hosts as Prod environment.
Our test system first connects to the broker hosts on the test environment, then loads the queries from the log file and sends them to the broker hosts. We use a multi-threaded implementation to increase the QPS being sent to the broker nodes. First, we run tests to identify how many threads are needed as a baseline that matches production traffic — for example, 300 QPS. Based on that, we can define how many threads we need to use for testing expected holiday traffic (two, three, or more times the standard traffic).
In our use case, we had loaded the data received up to a specific date (e.g. October 1st). At this point, we were re-running the captured log files on the same date or the day before, to match production behavior. Our test script also was able to update the time frame in a query to match either the current time or a predefined time to allow running any log file and translating it to the data available on the test environment.
Evaluating the Results
To determine the health of our system, we used our existing metrics to compare QPS and P99 latency on brokers and historical nodes, as well as determining system health via indicators like CPU usage of the brokers. These metrics help us identify any bottlenecks.
Query response time with normal traffic and 2x increase on basic system setup.
Typical bottlenecks can include the historical nodes or the broker nodes.
The historical nodes may show a higher latency for increased QPS, which will in turn increase the overall latency. To resolve this, we would add mirror hosts and increase the number of replicas of the data to support better latency under higher load. This step is something that will take time to implement, as hosts need to be added and data needs to be loaded, which can take several hours depending on the data size. Therefore, this is something that should be completed before traffic increases on the production system.
If the broker nodes are no longer able to handle the incoming query traffic, the size of the broker pool needs to be increased. If this is seen in the test environment, or even the production environment, it is much faster to increase the pool size and can potentially be done ad-hoc as well.
Testing with an increased data size on the test environment helps us determine which steps are needed to support the expected holiday traffic changes. We can make these configuration changes in advance, and we can make the support team aware of changes and of the maximum traffic the system is able to handle within the specified SLA (QPS and P99 latency requirements from the client teams).
Can the system handle the increase in data ingestion?
Testing the capacity for real-time data ingestion is similar to testing query performance. It is possible to start with making an estimate of the supported ingestion rate based on the dimensions/cardinality of the ingested data. However, this is only a guideline, and for some high-priority use cases it is a good idea to test early on.
We set up a test environment that has the same capacity, configuration, etc. as the production environment. However, in this step, some help from client teams may be required as we also need to test with increased data from the ingestion source like Kafka topic.
When reviewing the ingestion test, we focused on several key metrics. The ingestion lag should be low, and the number of both successful and rejected events (due to rejection window exceeded) should be closely similar to comparable values in the production environment. We also include validation of ingested data and general system health of overlord and middle manager stages — the stages handling ingestion of real time data.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Can the system handle the increase in data volume?
Evaluating if the system can handle the increase in data volume is probably the simplest and quickest check, though just as important as the previous steps. For this, we take a look at the coordinator UI: here we can see all historical stages, the pool size, and at what capacity they are currently running. Once clients provide details on the expected increase in data volume, it is a fairly simple process to calculate the amount of additional data that needs to be stored over the holiday period and potentially some period after that.
The space is at a healthy percentage (~70%) allowing for some growth.
Results
In the tests we ran this year, we found that our historical stages are in a very good state and are able to handle the additional traffic expected during the holiday time. We did see, however, that the broker pool may need some additional hosts if traffic meets a certain threshold. We have been sure to keep this communication visible with the client teams and support teams so team members are aware and know that the pool size may need to be increased.
Learnings
Timing is very critical with holiday testing. This project has a fixed end date by which all changes need to be completed in the systems before any traffic increases, and the teams need to make sure to have all the pieces in place before results are due. As is true of many projects as well as this one, we need to leave additional buffer time for unexpected changes in timeline and requirements.
Druid is a backend service, which is not always top of mind for many client teams as long as it is performing well. Therefore, is it a good idea to reach out to client teams before testing starts to get their estimation of expected Holiday traffic increases. Some of our clients reached out to us on their own; however, the due date for any capacity increase requests to governance teams would have already passed. In these cases, or where client teams are not sure yet, it is a good practice to make a general estimation on traffic increase and start testing with those numbers.
Keeping track of holiday planning and applied changes for each year is also a good practice. Having a history of changes every year and keeping track of the actual increase versus the original estimates made beforehand will help to make educated estimates on what traffic increases may be expected in the following year.
Knowing the details on the capacity of brokers and historical stages before the holiday updates makes it easier for teams to evaluate what capacities to reduce the clusters after the holidays as well as considering organic growth on a per-month basis.
Future Work
In this year’s use case, we chose the option of capturing broker logs to retrieve the queries we wanted to re-play back to Druid. This option worked for us at this time, though we are planning to look into other options for capturing queries going forward. The log files option works well for a one-off need, but it would be useful to have continuous logging of queries and storing these in Druid. This can help with debugging issues and identifying high-latency queries that may need some tweaking to get performance improvements.
StackShare | Tech stack deep dives from top startups and engineering teams
Cost Reduction in Goku
By Monil Mukesh Sanghavi | Software Engineer, Real Time Analytics Team; Rui Zhang | Software Engineer, Real Time Analytics Team; Hao Jiang | Software Engineer, Real Time Analytics Team; Miao Wang | Software Engineer, Real Time Analytics Team;
In 2018, we launched Goku, a scalable and high performant time series database system, which served as the storage and query serving engine for short term metrics (less than one day old). In early 2020, we launched GokuL (Goku long term), which extended Goku’s capability by supporting long term metrics data (i.e. data older than a day and up to a year). Both of these completely replaced OpenTSDB. For GokuL, we used 3 clusters of i3.4xlarge SSD backed EC2 instances which, over time, we realized are very costly. Reducing this cost was one of our primary aims going into 2021. This blog post will cover the approach we took to achieve our ambition.
Background
We use a tiered approach to segregate the long term data and store it in the form of buckets.
Table 1: table of a tiered approach
Tiers 1–5 contain the data stored on the GokuL (long term) clusters. GokuL uses RocksDB to store its long term data, and the data is ingested in the form of SST files.
Query Analysis
We analyzed the queries going to the long term cluster and observed the following:
There are very few metrics (approximately ~6K) out of a total of 10B for which data points older than three months were queried from GokuL.
More than half of the GokuL queries had specified rollup intervals of one day or more.
Tier 5 Data Analysis
We randomly selected a few shards in GokuL and analyzed the data. We observed the memory consumption of tier 5 data was much more than all the other tiers (1–4) combined. This was despite the fact that tier 5 contains only one hour of rolled up data, whereas the other tiers contained a mix of raw and 15 minute rolled up data.
Table 2: SST File size for each bucket in MiB
Solutions
It was inferred from the query and tier 5 analysis that tier 5 data (which holds six buckets of 64 days of data each) was the least queried as well as the most disk consuming. We planned our solutions to target this tier as it would give us the most benefits. Mentioned below are some of the solutions which were discussed.
Namespace
Implementation of a functionality called namespace would store configurations like ttl, rollup interval, and tier configurations for a set of metrics following that namespace. Uber’s M3 also has a similar solution. This would help us set appropriate configurations for the select sete.g. set a lower ttl for metrics that do not require longer retention, etc). The time to production for this project was longer, and hence we decided to make this a separate project in the future. This is a project being actively worked upon.
Rollup Interval Adjust for Tier 5 Data
We experimented with changing the rollup interval of tier 5 data from one hour to one day and observed the change in the final SST file(s) size for the tier 5 bucket.
Table 3
The savings that came out of this solution were not strong enough to support putting this into production.
On Demand Loading of Tier 5 Data
GokuL clusters would only store data from tiers 1–4 on startup and would load the tier 5 buckets as necessary (based on queries). The cons of this solution were:
Users would have to wait and retry the query once the corresponding tier 5 bucket from s3 had been ingested by the GokuL host.
Once ingested, the bucket would remain in GokuL unless thrown away by an eviction algorithm.
We decided not to go with this solution because it was not user friendly.
Tiered Storage
We decided to move tier 5 data into a separate HDD based cluster. While there was some notable difference observed in the query latency, it could be ignored because the number of queries hitting this tier was much less. We calculated that tier 5 was consuming approximately 1 TB of each of the 650 hosts in the GokuL cluster. We decided to use the d2.2xlarge instance to store and serve the tier 5 data in GokuL.
Table 4
The cost savings that came out of this solution were huge. We replaced around 325 i3.4xlarge instances with 111 d2.2xlarge instances, and the cost reduction was huge. We reduced nearly 30–35% of our costs with this change.
To support this, we had to design and implement tier-based routing in the goku root cluster, which routes the queries to short term and long term leaf clusters. This was one of the solutions that gave us a huge cost savings.
In the future, we can evaluate if we can reduce the number of replicas and compromise on availability in opposition to the low number of queries.
RocksDB Tuning
As mentioned above, GokuL uses RocksDB to store the long term data. We observed that the RocksDB options we were using were not optimal for Goku’s data that has high volume and low QPS.
We experimented with using a stronger compression algorithm (ZSTD with level 5), and this reduced the disk usage by 40%. In addition to this, we enabled the partitioned index filter wherein only the top level index is loaded into memory. On top of this, we enabled caching with higher priority for filter and index blocks so that they use the same cache as the data blocks and also minimize the performance impact.
With both the above changes, we noticed that the latency difference was not large and the reduction in data space usage was approximately 50%. We immediately put this into production and shrunk the size and cost of our GokuL clusters by another half.
What’s Next
Namespace
As mentioned, we are actively working on the implementation of the namespace feature, which will help us reduce the long term cluster costs even further by reducing the ttl for most of the current metrics that do not need the high retention anyways.
Acknowledgments
Huge thanks to Brian Overstreet, Wei Zhu, and the observability team for providing and supporting solutions on the table.
StackShare | Tech stack deep dives from top startups and engineering teams
3 Innovations While Unifying Pinterest’s Key-Value Storage
By Jessica Chan | Engineering Manager, MySQL & Key-Value Storage
Engineers hate migrations. What do engineers hate more than migrations? Data migrations. Especially critical, terabyte-scale, online serving migrations which, if done badly, could bring down the site, enrage customers, or cripple hundreds of critical internal services.
So why did the Key-Value Systems Team at Pinterest embark on a two-year realtime migration of all our online key-value serving data to a single unified storage system? Because the cost of not migrating was too high. In 2019, Pinterest had four separate key-value systems owned by different teams with different APIs and featuresets. This resulted in duplicated development effort, high operational overhead and incident counts, and confusion among engineering customers.
In unifying all of Pinterest’s 500+ key-value use cases (over 4PB of unique data serving 100Ms of QPS) onto one single interface, not only did we make huge gains in reducing system complexity and lowering operational overhead, we achieved a 40–90% performance improvement by moving to the most efficient storage engine, and we saved the company a significant amount in costs per year by moving to the most optimal replication and versioning architecture.
In this blog post, we selected three (out of many more) innovations to dive into that helped us notch all these wins.
But first, some background
Before this effort, Pinterest used to have four key-value storage systems:
Terrapin: a read-only, batch-load, key-value storage built at Pinterest and featured in Designing Data-Intensive Applications based on HDFS
Rockstore: a multi-mode (readonly, read-write, streaming-write) key-value storage also built at Pinterest, based on the open-source Rocksplicator framework, written in C++, and using RocksDB as a storage engine
UserMetaStore: a read-write key-value storage with a simplified thrift API on top of HBase
Rocksandra: a read-write, key-value storage based on a version of Cassandra, which used RocksDB under the hood
One of the biggest challenges when consolidating to a single system is assessing the feasibility of both achieving feature parity across all systems and integrating those features well into a single platform. Another challenge is to determine which system to consolidate to, and whether to go with an existing system or to consider something that doesn’t already exist at Pinterest. And a final, nontrivial challenge is to convince leadership and hundreds of engineers that migrating in the first place is a good idea.
Before embarking on such a large undertaking, we had to step back. A working group dedicated a few months to deep-dive on requirements and technologies, analyze tradeoffs and benefits, and come up with a final proposal that was ultimately approved. Rockstore, which was the most cost-efficient and performant, simplest to operate and extend, and provided the lowest migration cost, was chosen as the one storage system to rule them all.
We won’t describe the entire migration project in this post, but we’ll highlight some of the best parts.
Innovation 1: API abstractions allow us to seamlessly migrate customer data
We know that in code, strong abstractions lead to cleaner interfaces and more flexibility to make changes “under the hood” without disruption. This is especially true of organizations as well. While each of the four storage systems had their own thrift API abstractions, the fact that there were four interfaces, and some of them, like Terrapin, still required customers to know internal details about the architecture in order to use it (leaky abstraction), made life difficult for both customers and platform owners.
A diagram might be helpful to illustrate the complexity of maintaining four separate, key-value storage systems. If you were a customer, which would you choose?
Figure 1: Four separate Key Value Systems at Pinterest, each with their own APIs, set of unique features and underlying architectures, and varying degrees of performance and cost.
We introduced a new API, aptly called the KVStore API, to be the new unified thrift interface that would absorb the rest. Once everyone is on a single unified API that is built with the intention to be general, the platform team can have the flexibility to make changes, even change storage engines, under the hood without involving customers. This is the ideal state:
Figure 2: The ideal state is a single unified Key-Value interface, reducing the complexity both for customers and for platform owners. When we can consolidate our resources as a company and invest in a single platform, we can move faster and build better.
The migration to get from four systems to the ideal one above was split into two phases: the first, targeting read-only data, and the second, targeting read-write data. Each phase required its own unique migration strategy to be the least disruptive to customers.
Phase 1: Read-only data migration (totally seamless)
The read-only phase was first because it was simpler (immutable data is easier to migrate than mutable data receiving live writes) and because it targeted the majority of customers (about 70% were using Terrapin). Because Terrapin was so prolific and established in our code base, having everyone migrate their APIs to access KVStore would have taken a ton of time and effort with very little incremental value.
We decided to instead migrate most Terrapin customers seamlessly: no changes were required of users calling Terrapin APIs, but unbeknownst to callers, the Terrapin API service was augmented with an embedded KVStore API library to retrieve data from Rockstore. And because Terrapin is a batch-loaded system, we also found a central base class and rerouted workflows to double-load data into Rockstore instead of Terrapin (and then eventually we cut Terrapin off).
Figure 3: By introducing a routing layer between the Terrapin APIs and the Terrapin leaf storage, we can achieve a data migration and eliminate the costly and less stable Terrapin storage system for immediate business impact, all without asking customers to take any action. The tradeoff here is the tech debt and layer of indirection: we are now asking customers to clean up their usage of the Terrapin API in order to directly call KVStore API.
Because Rockstore was more performant and cost-efficient than Terrapin, users saw a 30–90% decrease in latency. When we decommissioned the storage infrastructure of Terrapin, the company also saw $7M of annualized savings, all without users needing to lift a finger (with just a few exceptions). The tradeoff is that we now have some tech debt of ensuring that users clean up their code by moving off of deprecated Terrapin APIs and onto KVStore API so that we no longer have a layer of indirection.
Phase 2: Read-write data migration (partially seamless)
The read-write side presented a different picture: there were fewer than 200 use cases to tackle, and the number of call sites was less extreme, but building feature parity for a read-write system as opposed to read-only involved some serious development. In order to be on par with UserMetaStore (essentially HBase), Rockstore needed a brand new wide-column format, increased consistency modes, offline snapshot support, and higher durability guarantees.
While the team took the time to develop these features, we decided to “bite the bullet” and ask all users to migrate from UserMetaStore’s API to KVStore API from the get-go. The benefit of doing this is it’s a low-risk, low-effort move. Thanks again to the power of abstraction, we implemented a reverse proxy so that customers moving to KVStore API were actually still calling UserMetaStore under the hood. By making this small change now, customers were buying a lasting contract that wouldn’t require such changes again for the foreseeable future.
Figure 4: Instead of taking the same approach as we did with Terrapin in Figure 3, we decided asking customers to migrate their APIs up front made more sense for unifying the read-write storage systems. Once customers moved to our KVStore API abstraction layer, we were free to move their data from UserMetaStore to Rockstore under the hood.
Some of the biggest challenges were actually not technical. Finding owners of the data was an archeological exercise, and holding hundreds of owners accountable for completing their part was difficult due to competing priorities. But when it was done, and when the Rockstore platform was ready, the team was completely unblocked to backfill the data from UserMetaStore to Rockstore without any customer involvement. We also vowed to make sure all data was attributed to owners going forward.
Innovation 2: A wide-column format eliminated both CPU and network load for large payloads
Some of the most popular Terrapin workloads had an interesting property: use cases would store values consisting of large blobs of thrift structures but only need to retrieve a very small piece of that data when read.
At first, these callers would download the huge values that they stored, deserialize them on the client side, and read the property they needed. This very quickly revealed itself to be inefficient in terms of unnecessary network load, throughput degradation, and wasteful client CPU utilization.
The Terrapin solution to this was to introduce an API feature called “trimmer,” where you could specify a Thrift struct and the fields you wanted from it in the request itself. Terrapin would not only retrieve the object, it would also deserialize it and return only the fields requested. This was better in that the network bandwidth was reduced, important especially for reducing cross-AZ traffic costs, but it was worse in terms of both platform cost and leaky abstractions. More CPU utilization meant more machines were needed, and business logic in the platform meant that Terrapin needed to know about required thrift structures. Performance also takes a hit since clients are waiting for this increased processing time.
To solve this in Rockstore and unblock the migration, the team decided against simply re-implementing the trimmer. Instead, we introduced a new file format that accommodated a wide-column access pattern. This means that instead of storing a binary blob of data that can be deserialized into a thrift structure, you can actually store and encode your data structure in a native format that can be retrieved like a key-value pair using a combination of primary keys and local keys. For example, if you have a struct UserData that is a mapping of 30 fields keyed to a user id, instead of storing a key-value pair of (key: user id, value: UserData), you can instead store (key: user id, (local key: UserData field 1, local value: Userdata value 1), (local key: UserData field 2, local value: Userdata value 2)), etc.
The API is then designed to allow you to either access the entire row (all columns associated with user id) or only certain properties (UserData field 3 and 12 of user id). Under the hood, Rockstore is performing a blazing fast range scan or single-point key-value lookup. This accounted for some of the more extreme performance improvements that we ultimately observed. Goodbye network and CPU costs!
Innovation 3: A versioning system for batch-loaded, read-only data unblocked instant data migrations between clusters
One of the biggest pain points of the read-only mode of Rockstore was the inability to move data once it was loaded onto a cluster. If customer data grew beyond what was provisioned for it, or if a certain cluster became unstable, it took two weeks and two or three teams to coordinate changes to workflows, reconfigure thrift call sites, and budget time to double-upload, monitor, and read data to and from the new location.
Another pain point of the read-only mode Rockstore was that it only supported exactly two versions due to how it implements versioning. This was incompatible with Terrapin requirements, which supported fewer than two for cost savings and more than two for critical datasets which require on-disk instant rollback.
The solution to this is what we call “timestamp-based versioning.” Rockstore read-only used to have “round-robin versioning,” where each new version uploaded into the system would either be version One or version Two. Once all the partitions of an uploaded version were online, the version map would simply flip. This created the exactly-two version constraint. Another constraint that bound customers to a specific cluster was the fact that customers needed to specify a serverset address that corresponded to the cluster on which their data lived. Another leaky abstraction! When the data moved, customers needed to make changes to follow it.
In timestamp-based versioning, every upload is attributed a timestamp and registered to a central metastore called Key-Value Store Manager (KVSM), which was used to coordinate cluster map configurations. Once more, the power of abstraction comes in: by calling KVStore APIs, as a customer you no longer need to know on which cluster your data lives. KVStore figures that out for you using the cluster map configuration.
Not only does this abstraction allow for as few as one version or as many as 10 to be stored on disk or in S3 (to trade off cost savings and rollback safety), but moving a dataset from one cluster to another is as simple as a single API call to change the cluster metadata in KVSM and kicking off a new upload. Once the metadata is updated, the new upload will automatically be loaded to the new cluster. And once online, all serving maps will point requests to that location. Thanks to timestamp-based versioning, two weeks of effort has been reduced to a single API call.
Thank you for reading about our journey to a single, abstracted, key-value storage at Pinterest. I’d like to acknowledge all the people that contributed to this critical and technically challenging project: Rajath Prasad, Kangnan Li, Indy Prentice, Harold Cabalic, Madeline Nguyen, Jia Zhan, Neil Enriquez, Ramesh Kalluri, Tim Jones, Gopal Rajpurohit, Guodong Han, Prem Thangamani, Lianghong Xu, Alberto Ordonez Pereira, Kevin Lin, all our partners in SRE, security, and Eng Productivity, and all of our engineering customers at Pinterest which span teams from ads to homefeed, machine learning to signal platform. None of this would be possible without the teamwork and collaboration from everyone here.
StackShare | Tech stack deep dives from top startups and engineering teams
99% to 99.9% SLO: High Performance Kubernetes Control Plane at Pinterest
By Shunyao Li | Software Engineer, Cloud Runtime
Over the past three years, the Cloud Runtime team’s journey has gone from “Why Kubernetes?” to “How to scale?”. There is no doubt that Kubernetes based compute platform has achieved huge success at Pinterest. We have been supporting big data processing, machine learning, distributed training, workflow engine, CI/CD, internal tools — backing up every engineer at Pinterest.
Why Control Plane Latency Matters
As more and more business-critical workloads onboard Kubernetes, it is increasingly important to have a high-performance control plane that efficiently orchestrates every workload. Critical workloads such as content model training and ads reporting pipelines will be delayed if it takes too long to translate user workloads into Kubernetes native pods.
To measure control plane performance, we introduced top line business metrics through Service Level Indicator and Objective (SLI/SLO) in early 2021. We measure control plane SLI by reconcile latency, defined as the time from when a user change is received to when it propagates out of the control plane. For example, one of reconcile latency measurements for batch jobs is the delay between workload creation and Pod creation.
The initial SLO was set to 99%. At the time of writing this post, we are proudly serving a control plane SLO of 99.9%. This post is about how we improved the control plane to achieve high performance.
Control Plane in a Nutshell
Control plane is the nerve center of the Kubernetes platform and is responsible for workload orchestration. It listens to changes from the Kubernetes API, compares the desired state of resources with their actual status, and takes actions to make sure the actual resource status matches the desired status (reconciliation). Workload orchestration also includes making scheduling decisions about where to place workloads.
Kubernetes control plane consists of a set of resource controllers. Our resource controllers are written in the controller framework, which has an informer-reflector-cache architecture. Informers use the List-Watch mechanism to fetch and monitor resource changes from the Kubernetes API. Reflector updates cache with resource changes and dispatches events for handling. Cache stores resource objects and serve List and Get calls. The controller framework follows the producer-consumer pattern. The event handler is the producer and is responsible for queuing reconcile requests, while the controller worker pool is the consumer who pulls items from workqueue to run the reconciliation logic.
Figure 1: Kubernetes Controller Framework
Challenge 1: Worker Pool Efficiency
The controller worker pool is where the actual status to desired status reconciliation occurs. We leveraged the metrics provided by the workqueue package to gain a deep insight into the worker pool efficiency. These metrics are:
Work duration: how long it takes to process an item from workqueue
Queue duration: how long an item stays in workqueue before being processed
Enqueue rate: how often an item gets enqueued
Retry rate: how often an item gets retried
Queue depth: current depth of workqueue
Among these metrics, queue depth draws our attention as its spikes highly correlate with control plane performance degradation. Spikes in queue depth indicate head-of-line blocking. This usually happens when a large number of irrelevant items are enqueued in a short period of time. For those items that really need to be reconciled, they end up waiting in the queue for a longer time and cause SLI dips.
Figure 2: Correlation between control plane queue depth spikes and control plane instant SLI dips.
To resolve the head-of-line blocking, we categorize informer events and handle them with different priorities. User-triggered events have a high priority and need to be reconciled immediately, e.g., Create events triggered by users creating workloads or Update events triggered by users updating the labels of workloads. On the other hand, some system-triggered events are low priorities, e.g., a Create event during informer initialization, or an Update event during informer periodic resync. They don’t affect our SLI and are not as time-sensitive as user-triggered events. They can be delayed so they don’t pile up in the queue and block urgent events. The following section is about how to identify and delay these system-triggered events.
Create Events During Informer Initialization
Each time we update the controller, the informer initializes its List-Watch mechanism by issuing a List call to the API server. It then stores the returned results in its cache and triggers a Create event for each result. This results in a spike in the queue depth. The solution is to delay any subsequent Create events for existing objects; an object cannot be created twice by the user, and any subsequent Create events must come from informer initialization.
Figure 3: Control plane queue depth spikes to 10k during an informer initialization, resulting in a dip in control plane instant SLI.
Update Events During Informer Periodic Resync
Periodically, the informer goes over all items remaining in its cache, triggering an Update event for each item. These events are enqueued at the same time and result in a queue depth spike. As shown in Figure 2, the queue depth spike aligns with the informer periodic resync interval we configured.
Update events triggered by periodic resync are easy to identify, where the old and new objects are always the same since they both come from the informer cache. The solution is to delay Update events whose old and new objects are deep equal. The delay is randomized so that queue depth spikes can be smoothed out by scattering resync requests over a period of time.
Result
The above optimizations solved the head-of-line blocking problem caused by inefficient worker pools. As a result, there are no longer recurring spikes in control plane queue depth. The average queue depth during informer periodic resync has been reduced by 97%, from 1k to 30. The instant SLI dips caused by the control plane queue depth spikes have been eliminated.
Figure 4: Improvement on workqueue efficiency
Challenge 2: Leadership Switch
Only the leader in the controller fleet does the actual reconciliation work, and leadership switch happens pretty often during deployments or controller pod evictions. A prolonged leadership switch can have a considerable negative impact on the control plane instant SLI.
Figure 5: Control plane leadership switches result in instant SLI dips.
Leader Election Mechanisms
There are two common leader election mechanisms for the Kubernetes control plane.
Leader-with-lease: the leader pod periodically renews a lease and gives up leadership when it cannot renew the lease. Kubernetes native components including cluster-autoscaler, kube-controller-manager, and kube-scheduler are using leader-with-lease in client-go.
Leader-for-life: the leader pod only gives up leadership when it is deleted and its dependent configmap is garbage collected. The configmap is used as a source of truth for leadership, so it is impossible to have two leaders at the same time (a.k.a. split brain). All resource controllers in our control plane are using the leader-for-life leader election mechanism from the operator framework to ensure we have at most one leader at a time.
In this post, we focus on the optimization of the leader-for-life approach to reduce control plane leadership switch time and improve control plane performance.
Monitoring
To monitor the leadership switch time, we implemented fine-grained leadership switch metrics with the following phases:
Leaderless: when there is no leader
Leader ramp-up: the time from a controller pod becoming leader to its first reconciliation. The new leader pod cannot begin to reconcile as soon as it becomes the leader; instead, it must wait until all relevant informers are synchronized.
Figure 6: Diagram of the leadership switch procedure
Figure 7: Control plane leadership switch monitored by the proposed leadership switch metrics
As shown in Figure 7, the control plane leadership switch usually takes more than one minute to complete, which is unacceptable for a high-performance control plane. We proposed the following solutions to reduce the leadership switch time.
Reduce Leaderless Time
The leader-for-life package hardcoded the exponential backoff interval between attempts to become a leader, starting from 1s to a maximum of 16s. When a container requires some time to initialize, it always hits the maximum of 16s. We make the backoff interval configurable and reduce it to fit our situation. We also contributed our solution back to the operator framework community.
Reduce Leader Ramp-up Time
During the leader ramp-up time, each resource informer in each cluster initiates a List call to the API server and synchronizes its cache with the returned results. The leader will only start reconciliation when all informer caches are synchronized.
Preload Informer Cache
One way to reduce the leader ramp-up time is to have standby controller pods preload their informer cache. In other words, the initialization of the informer cache is no longer exclusive to the leader but applies to every controller pod upon its creation. Note that registering event handlers is still exclusive to the leader, otherwise we will suffer from a split brain.
Use Readiness Probe to Ensure Graceful Rolling Upgrade
The informer cache preload procedure runs in the background and does not block a standby pod from becoming the leader. To enforce the blocking, we define a readiness probe by HTTP GET request to periodically check if all informer cache are synchronized. With a rolling upgrade strategy, the old leader pod is killed after the new standby pod is ready, which ensures the new pod is always warmed up when it becomes the leader.
Result
Table 1: Improvement on control plane leadership switch monitored by the proposed leadership switch metrics (4 observations before and after)
Table 1 shows the improvement on the control plane leadership switch. The average control plane leadership switch time has been decreased from 64s to 10s, with an 85% improvement.
What’s Next
With these efforts, we revamp the control plane performance and redefine its SLO from 99% to 99.9%. This is a huge milestone for the Kubernetes-based compute platform, demonstrating unprecedented reliability and availability. We are working on achieving higher SLOs and have identified the following areas where the control plane performance can be further improved.
Proactive leadership handover: The leadership handover in leader-for-life is passive because it depends on observation from external components to release resource lock. The time spent on garbage collection accounts for 50% of our current leadership handover time. Proactive leadership handover is performed by the leader when it receives SIGTERM and intentionally releases its lock before exiting. This will significantly reduce the leadership switch time.
Reconcile Quality of Service (QoS): In this post, we present our optimization of worker pool efficiency in terms of delayed enqueue v.s. immediate enqueue. For future work, we want to introduce reconcile QoS and workqueue tiering (for example, creating different queues for different tiers of workloads to ensure that high tiers are not interfered with and blocked).
Acknowledgement
Shout out to Suli Xu and Harry Zhang for their great contributions in building a high-performance control plane to support business needs. Special thanks to June Liu, Anson Qian, Haniel Martino, Ming Zong, Quentin Miao, Robson Braga and Martin Stankard for their feedback and support.
StackShare | Tech stack deep dives from top startups and engineering teams
Improving Distributed Caching Performance and Efficiency at Pinterest
Kevin Lin | Software Engineer, Storage and Caching
Introduction
Pinterest’s distributed caching system, built on top of open source technologies memcached and mcrouter, is a critical component of the production infrastructure stack. Pinterest’s cache-as-a-service platform is responsible for driving down application latency across the board, reducing the overall cloud cost footprint, and ensuring adherence to strict sitewide availability targets.
Today, Pinterest’s memcached fleet spans over 5000 EC2 instances across a variety of instance types optimized along compute, memory, and storage dimensions. Collectively, the fleet serves up to ~180 million requests per second and ~220 GB/s of network throughput over a ~460 TB active in-memory and on-disk dataset, partitioned among ~70 distinct clusters.
As a core driver of reduced sitewide latency, the distributed caching tier is subject to stringent performance and latency requirements. Additionally, a key consequence of the sheer size of the fleet is that even small efficiency optimizations have an outsized impact on the total service cost footprint. Several years of operational experience running memcached at scale in production have provided unique insight into practical optimizations for driving improved performance and efficiency across the entire caching stack.
In this article, we will share some context on the observability and performance testing tools that enable optimization exploration work, followed by a deep dive into practical optimizations currently running in our production environment along dimensions of hardware selection strategy, compute efficiency, and networking performance.
High-level description of the available surface area for performance optimization for memcached running on virtual machines in public cloud environments
Monitoring, observability, and evaluation
All performance optimization efforts start with precise quantitative measurement and a structured, reproducible mechanism for generating workloads for isolated evaluation.
Critical monitoring prerequisites for all the performance evaluation conducted over the years include:
Server-side metrics for request throughput, network throughput, resource utilization, and hardware-level parameters (NIC statistics like per-queue packet throughput and EC2 allowance exhaustion, disk response times and in-flight I/O requests, etc.)
Client-side metrics for cache request percentile latency, timeout and error rates, and per-server availability (SLIs), as well as top-level application performance indicators like service RPC P99 response time
Pinterest leverages both synthetic load generation and production shadow traffic to evaluate the impact of tuning and optimizations. Historically, synthetic benchmarking has been useful for detecting performance regressions or improvements under maximum load, while shadow traffic evaluation has been more reflective of server resource utilization and overall performance under a real workload at scale.
Synthetic load generation: memtier_benchmark is an open source tool capable of generating a synthetic load against a memcached cluster with configurable parameters for the number of concurrent clients and threads, read/write ratio, data sizes, and transport mechanism (plaintext or TLS).
Production shadow traffic: mcrouter is an open source memcache-protocol routing proxy deployed as a client-side sidecar in the Pinterest fleet. It provides building blocks to design transparent shadow traffic routing policies with configurable traffic percentages and source/target cluster(s), allowing for flexible dark traffic experimentation across a variety of workload classes.
Together, these tools permit high-signal performance evaluation with zero or minimal impact to critical-path production traffic.
Performance and efficiency
Cloud hardware
Distributed caching at Pinterest serves a diverse array of workloads. In general, each class of workload can be categorized along the following high-level dimensions:
Throughput (compute)
Data volume (memory and/or disk capacity)
Data bandwidth (network and compute)
Latency requirement (compute)
While memcached can be arbitrarily horizontally scaled in and out to address a particular cluster’s bottleneck, vertically scaling individual hardware dimensions allows for greater cost efficiency for specific workloads. In practice, this entails standardization on a fixed pool of EC2 instance types optimized for each workload class.
Workload profile: Moderate throughput, moderate data volume
EC2 instance family: r5
Rationale: r5 family instances offer a vCPU-DRAM ratio that works well for most vanilla cache use cases at Pinterest. This instance type is considered the “baseline” against which others are evaluated.
Workload profile: High throughput, low data volume
EC2 instance family: c5
Rationale: c5 family instances are more cost efficient for use cases that would otherwise slot into the r5 type but hold significantly less memory. Maintaining the same vCPU count as its r5 counterpart allows it to serve the same throughput volume at a lower overall cost.
Workload profile: High data volume, relaxed latency requirement
EC2 instance family: r5d
Rationale: r5d family instances are functionally equivalent to r5 family instances but with an instance-colocated NVMe SSD used by extstore for secondary storage. r5d is cost efficient for clusters with high data volume such that there are tangible improvements to hit rate as data is written to disk. Due to the slower disk (relative to i3 family instances), higher tail latency is expected.
Workload profile: Massive data volume, relaxed latency requirement
EC2 instance family: i3 and i3en
Rationale: i3 and i3en family instances ship with a fast and sizable instance-colocated disk, which tangibly increases extstore performance for workloads with a very high ratio of working data on disk relative to DRAM. Additionally, they offer comparable memory capacity to r5 series instances, which reduces extstore thrashing by maintaining a reasonable DRAM to disk usage ratio.
EC2 instance type distribution for the Pinterest memcached fleet
In particular, using extstore to expand storage capacity beyond DRAM into a local NVMe flash disk tier increases per-instance storage efficiency by up to several orders of magnitude, and it reduces the associated cluster cost footprint proportionally. EC2’s storage-optimized instance types provide locally attached solid state drives capable of high random IOPS and R/W throughput, allowing onboarding of extstore use cases with massive data volumes and high request throughput without compromising tail latency.
The introduction of different shapes of storage-optimized EC2 instance types to the fleet (in particular, the lower-tier variants of the i3en instance family containing multiple independent disks per instance) further drives down costs while offering improvements in I/O performance and cost efficiency. Pinterest configures these instances with Linux software RAID at level RAID0 to combine multiple hardware block devices into a single, logical disk for userspace consumption. By striping reads and writes fairly across two disks, RAID0 doubles maximum theoretical I/O throughput with a best-case two-fold reduction in effective disk response time at the cost of a doubled MTTF. This increased hardware performance for extstore at the expense of an increased theoretical failure rate is a highly worthwhile tradeoff. Operating workloads on a public cloud necessitates designing infrastructure to be ephemeral cattle, capable of self-healing in the event of instance failures. A topology control plane for mcrouter automatically and gracefully responds to unexpected changes in server capacity; instance loss is a non-issue.
Compute
Approximately half of all caching workloads at Pinterest are compute-bound (i.e. purely request throughput-bound). Successful optimizations in compute efficiency translate into the ability to downsize clusters without compromising serving capacity.
More precisely, compute efficiency for memcached is defined as the additional rate of requests that can be serviced by a single instance for each percentage point increase in instance CPU usage, without increasing request latency. In simpler terms, an optimization that improves compute efficiency is one that allows memcached to serve a higher request rate at lower CPU usage, without changing request latency characteristics.
At Pinterest, most workloads (including the distributed cache fleet) run on dedicated EC2 virtual machines. Many historical efficiency improvements stem from optimizations in the hardware layer itself, like migrating to different instance families or upgrading to newer generations of existing instance types. However, operating workloads on dedicated (virtualized) machines offers unique opportunities for optimizations at the hardware-software boundary.
Memcached is somewhat unique among stateful data systems at Pinterest in that it is the exclusive primary workload, with a static set of long-lived worker threads, on every EC2 instance on which it is deployed. This is in contrast to database workloads which might have, for example, multiple colocated processes for decoupled storage and serving layers. To this end, one simple but highly effective optimization is tuning process scheduling in order to request the kernel prioritize CPU time for memcached at the expense of deliberately withholding CPU time from other processes on the host, like monitoring daemons. This involves running memcached under a real-time scheduling policy, SCHED_FIFO, with a high priority — instructing the kernel to, effectively, allow memcached to monopolize the CPU by preempting (and thus deliberately starving) all non-realtime processes whenever a memcached thread becomes runnable.
$ sudo chrt — — fifo <priority> memcached …
Example invocation of memcached under a SCHED_FIFO real-time scheduling policy
This one-line change, after rollout to all compute-bound clusters, drove client-side P99 latency down by anywhere between 10% and 40%, in addition to eliminating spurious spikes in P99 and P999 latency across the board. Additionally, it afforded the ability to raise the steady-state operation CPU usage ceiling by 20% without introducing latency regressions. Ultimately, this shaved close to 10% off memcached’s total fleet-wide cost footprint.
Week-over-week comparison of client-side P99 cache latency for one service while real-time scheduling was rolled out to its corresponding dedicated memcached cluster
Ratio of time spent by memcached waiting for execution by the kernel versus wall clock time, before and after real-time scheduling was enabled (data is collected from schedstat in the /proc filesystem)
Stabilization of spurious latency spikes after real-time scheduling was enabled on a canary host (red-colored series)
Networking
There are a few key dimensions when considering networking performance:
Data bandwidth, packet throughput, and TCP connection limits. EC2 imposes hard limits on per-instance PPS, aggregate bandwidth, and TCP connections (though only when deployed in a security group with TCP ingress rules). Excess usage beyond these limits is reported by the Elastic Network Adapter (ENA) driver and accessible via ethtool. Confusingly, EC2 also expresses total NIC bandwidth capabilities in terms of burst loads rather than steady-state loads, thus requiring some degree of trial-and-error to determine the practical bandwidth ceiling for workloads like memcached with predictable network characteristics.
Connection latency and reliability. Is there a way to make initial TCP connections to memcached faster and more reliable, especially under burst scenarios where thousands of clients are simultaneously establishing connections?
Overhead associated with transport-layer features like TLS. Is there a way to reduce the encryption/decryption compute overhead of TLS? Additionally, is there a way to reduce the cost of the initial setup cost (i.e. TLS handshake)?
From a cloud consumer’s perspective, EC2-enforced network limits can and should effectively be considered inherent hardware limitations. Unfortunately, there is no mechanism to work around these limits other than horizontally scaling the fleet to reduce per-instance usage.
In Pinterest’s caching architecture, mcrouter is a universal routing proxy and the single application-facing entry point into the distributed caching tier. Each mcrouter instance (effectively, every individual host in a service cluster) creates a statically sized, long-lived TCP connection pool to every individual memcached server in a cluster. Connection pool sizes are deterministically derived from the number of logical cores available on the host system, typically ranging from 8 to 72 for canonical instance types. This results in upwards of tens of thousands of active established TCP connections per server host, and easily over a million total connections per server cluster — necessitating a strategy for maintaining minimal connection latency and connection reliability at scale.
TCP Fast Open (TFO) is a mechanism for reducing the latency overhead of establishing a TCP connection by optimizing away one RTT in an otherwise costly TCP 3WHS (3-way handshake), while also allowing eager transmission of early data during the handshake itself. While originally intended for end users on unreliable home and mobile networks connecting to remote edge servers, TFO has demonstrated tangible improvements in connection reliability in a closed cloud environment as well. Implementing TFO support in memcached reduced average TCP connection durations of successive sessions by ~10%, most prominently in connections established across an Availability Zone boundary.
Packets exchanged between client and server during TFO cookie setup and a subsequent TFO-initiated session with early data
Separately, raising the sysctl parameter value for net.core.somaxconn and associated listen backlog size in the userspace listen(2) callsite in memcached improved burst connection availability for high-throughput clusters. Previously, deploying a new memcached binary would cause spikes in ECONNREFUSED server errors caused by exhausted server-side TCP accept queues driven by thundering herds of simultaneous inbound connections from thousands of client mcrouter instances. A more generous listen backlog threshold reduced per-server downtime and fixed the brief but frequent SLO violations whenever a shared tenancy cluster was deployed.
Lastly, TLS plays an important role for in-transit data encryption between memcached and mcrouter, and it is enabled for 100% of cache traffic within Pinterest in order to comply with sitewide authentication, authorization, and auditing policies. Even with hardware-accelerated cryptography, TLS adds non-trivial initial and steady-state overhead, due to a post-connect TLS handshake and application-layer encryption/decryption during network I/O, respectively. TLS session resumption, after implementation in memcached, reduced fleet-wide client-side connection timeout rates by allowing reuse of previously cached TLS sessions. One avenue for tackling steady-state overhead is kernel TLS (kTLS) — a mechanism to offload the TLS record layer from userspace to the kernel, implemented either in software or offloaded to supported dedicated NIC hardware for completely transparent inline data encryption/decryption. TLS session resumption was upstreamed by Pinterest to memcached and is available in version 1.6.3 onward; kTLS is an ongoing and relatively experimental optimization area.
Future work
Infrastructure optimization is a critical objective for Pinterest that ultimately drives a more delightful experience for Pinners while reducing our own cloud cost footprint. We look forward to continuing to explore avenues for improving cache performance and efficiency at all layers of the stack, from application clients and routing proxies to the servers themselves. In the near term, we intend to continue evaluation of software kernel TLS, explore compatibility of memcached with newer generations of EC2 instance types for improved price-to-performance characteristics, and application/proxy-side software optimizations like in-flight compression for improved storage efficiency. We hope to additionally build an end-to-end automated performance regression testing framework to track the impact of these optimizations over time.
Thanks to the entire Storage and Caching team at Pinterest for supporting this work, especially Ankita Girish Wagh and Lianghong Xu.
StackShare | Tech stack deep dives from top startups and engineering teams
Optimizing Pinterest’s Data Ingestion Stack: Findings and Learnings
By Ping-Min Lin | Software Engineer, Logging Platform
At Pinterest, the Logging Platform team maintains the backbone of data ingestion infrastructure that ingests terabytes of data per day. When building the services powering these pipelines, it is extremely important that we build efficient systems considering how widespread and deep in the stack the systems are. Along our journey of continuous improvement, we’ve figured out basic but useful patterns and learnings that could be applied in general — and hopefully for you as well.
MemQ: Achieving memory-efficient batch data delivery using Netty
MemQ is the next-gen data ingestion platform built in-house and recently open-sourced by the Logging Platform team. When designing the service, we tried hard to maximize the efficiency of our resources, specifically, we focused on reducing GC by using off-heap memory. Netty was chosen as our low-level networking framework due to its great balance between flexibility, performance, and sophisticated out-of-the-box features. For example, we used ByteBuf heavily throughout the project. ByteBufs are the building blocks of data within Netty. They are similar to Java NIO ByteBuffers, but allow the developers much more control of the lifecycle of the objects by providing a “smart pointer” approach for customized memory management using manual reference counting. By using ByteBufs, we managed to transport messages with a single copy of data by passing off-heap network buffer pointers, further reducing cycles used on garbage collection.
The typical journey of a message in the MemQ broker: Each message received from the network will be reconstructed via a length-encoded protocol that will be allocated into a ByteBuf that is off of the JVM heap (direct memory in Netty terms), and will be the only existing copy of the payload throughout the whole pipeline. This ByteBuf reference will be passed into the topic processor and put into a Batch along with other messages that are also waiting to be uploaded to the storage destination. Once the upload constraints are met, either due to the time threshold or the size threshold, the Batch will be dispatched. In the case of uploading to a remote object store like S3, the whole batch of messages will be kept in a CompositeByteBuf (which is a virtual wrapper ByteBuf consisting of multiple ByteBufs) and uploaded to the destination using the netty-reactor library, allowing us to create no additional copies of data within the processing path. By building on top of ByteBufs and other Netty constructs, we were able to iterate rapidly without sacrificing performance and avoid reinventing the wheel.
Singer: Leveraging asynchronous processing to reduce thread overheads
Singer has been around at Pinterest for a long time, reliably delivering messages to PubSub backends. With more and more use cases onboarded to Singer, we’ve started to hit bottlenecks on memory usage that led to frequent OOM issues and incidents. Singer has memory and CPU resources constrained on nearly all fleets at Pinterest to avoid impact on the host service e.g. our API serving layer. After inspecting the code and leveraging debugging tools such as VisualVM, Native Memory Tracking (NMT), and pmap, we noticed various potential improvements to be done, most notably reducing the number of threads. After performing NMT result analysis we noticed the number of threads and the memory used by the stack as a result of these threads (allocated due to the Singer executor and producer thread pools).
Taking a deeper look into the source of these threads, the majority of these threads come from the thread pools for each Kafka cluster Singer publishes to. The threads in these thread pools are used to wait for Kafka to complete writing messages to a partition and then report the status of the writes. While the threads do the job, each thread in the JVM (by default) will allocate 1MB of memory used for the thread’s stack.
A Singer NMT report showing the different memory regions a JVM process allocates. The Thread entry represents the thread stack. Arena contains the off-heap/direct memory portion managed outside of the JVM heap.
Even with lazy allocation of the stack memory on the underlying operating systems until the thread is actually used, this still quickly adds up to hundreds of MBs of the process’ memory. When there are a lot of log streams publishing to multiple partitions on different clusters, the memory used by thread stacks can be easily comparable to the 800MB default heap size of Singer and eats into the resources of the application.
Each submission of KafkaWriteTask will occupy a thread. Full code can be found here
By closely examining the usage of these threads, it quickly becomes clear that most of these threads are doing non-blocking operations such as updating metrics and are perfectly suitable for asynchronous processing using CompletableFutures provided starting in Java 8. The CompletableFuture allows us to resolve the blocking calls by chaining stages asynchronously, thus replacing the usage of these threads that had to wait until the results to come back from Kafka. By utilizing the callback in the KafkaProducer.send(record, callback) method, we rely on the Kafka producer’s network client to be completely in control of the multiplexing of networking.
A brief example of the result code after using CompletableFutures. Full code can be found here
Once we convert the original logic into several chained non-blocking stages, it becomes obvious to use a single common thread pool to handle them regardless of the logstream, so we use the common ForkJoinPool that is already at our disposal from JVM. This dramatically reduces the thread usage for Singer, from a couple of hundred threads to virtually no additional threads. This improvement demonstrates the power of asynchronous processing and how network-bound applications can benefit from it.
Kafka and Singer: Balancing performance and efficiency with controllable variance
Operating our Kafka clusters has always been a delicate balance between performance, fault tolerance, and efficiency. Our logging agent Singer, at the front line of publishing messages to Kafka, is a crucial component that plays a heavy role in these factors, especially in routing the traffic by deciding which partitions we deliver data to for a topic.
The Default Partitioner: Evenly Distributed Traffic
In Singer, logs from a machine would be picked up and routed to the corresponding topic it belongs to and published to that topic in Kafka. In the early days, Singer would publish uniformly to all the partitions that topic has in a round-robin fashion using our default partitioner. For example, if there were 3000 messages on a particular host that needed to be published to a 30 partition topic, each partition would roughly receive 100 messages. This worked pretty well for most of the use cases and has a nice benefit where all partitions receive the same amount of messages, which is great for the consumers of these topics since the workload is evenly distributed amongst them.
DefaultPartitioner: Producers and Partitions are fully connected
The Single Partition Partitioner: In Favor of the Law of Large Numbers
SinglePartitionPartitioner: Ideal scenario where connections are evenly distributed
As Pinterest grew, we had fleets expanding to thousands of hosts, and this evenly-distributed approach started to cause some issues to our Kafka brokers: high connections counts and large amounts of produce requests started to elevate the brokers’ CPU usage, and spreading out the messages means that the batch sizes are smaller for each partition, or lower efficiency of the compression, resulting in higher aggregated network traffic. To tackle this, we implemented a new partitioner: the SinglePartitionPartitioner. This partitioner solves the issue by forcing Singer to only write to one random partition per topic per host, reducing the fanout from all brokers to a single broker. This partition remains the same throughout the producer’s lifetime until Singer restarts.
For pipelines that had a large producer fleet and relatively uniform message rates across hosts, this was extremely effective: The law of large numbers worked in our favor, and statistically, if the number of producers is significantly larger than partitions, each partition will still receive a similar amount of traffic. Connection count went down from (number of brokers serving the topic) times (number of producers) to only (number of producers), which could be up to a hundred times less for larger topics. Meanwhile, batching up all messages per producer to a single partition improved compression ratios by at least 10% in most use cases.
SinglePartitionPartitioner: Skewed scenario where there are too few producers vs. partitions
The Fixed Partitions Partitioner: Configurable variance for adjusting trade-offs
Despite coming up with this new solution, there were still some pipelines that lie in the middle ground where both solutions are subpar, such as when the number of producers is not large enough to outnumber the number of partitions. In this case, the SinglePartitionPartitioner would introduce significant skew between partitions: some partitions will have multiple producers writing to them, and some are assigned very few or even no producers. This skew could cause unbalanced workloads for the downstream consumers, and also increases the burden for our team to manage the cluster, especially when storage is tight. We thus recently introduced a new partitioner that can be used on these cases, and even cover the original use cases: the FixedPartitionsPartitioner, which basically allows us to not only publish to one fixed partition like the SinglePartitionPartitioner, but randomly across a fixed number of partitions.
This approach is somewhat similar to the concept of virtual nodes in consistent hashing, where we artificially create more “effective producers” to achieve a more continuous distribution. Since the number of partitions for each host can be configured, we can tune it to the sweet spot where the efficiency and performance are both at desired levels. This partitioner could also help with “hot producers” by spreading traffic out while still maintaining a reasonable connection count. Although a simple concept, it turns out that having the ability to configure the degree of variance could be a powerful tool to manage trade-offs.
FixedPartitionsPartitioner: Less skew while still keeping connection count lower than the default
Relative compression ratio and request rate skew with different numbers of fixed partitions on a 120 partition topic on 30 brokers
Conclusion and Acknowledgements
These learnings are just a few examples of improvements the Logging Platform team has been making. Despite their seemingly different nature, the ultimate goal of all these improvements was to achieve better results for our team and our customers. We hope that these findings are inspiring and could spark a few ideas for you.
None of the content in this article could have been delivered without the in-depth discussions and candid feedback from Ambud Sharma, Eric Lopez, Henry Cai, Jeff Xiang, and Vahid Hashemian on the Logging Platform team. We also deeply appreciate the great support from external teams that provided support and input on the various improvements we’ve been working on. As we strive for continuous improvement within our architecture, we hope we will be able to share more interesting findings in our pursuit of perfecting our system.
StackShare | Tech stack deep dives from top startups and engineering teams
Pinterest Druid Holiday Load Testing
By Isabel Tallam | Senior Software Engineer; Jian Wang | Senior Software Engineer; Jiaqi Gu| Senior Software Engineer; Yi Yang | Senior Software Engineer; and Kapil Bajaj | Engineering Manager, Real-time Analytics team
Like many companies, Pinterest sees an increase in traffic in the last three months of the year. We need to make sure our systems are ready for this increase in traffic so we don’t run into any unexpected problems. This is especially important as Pinners come to Pinterest at this time for holiday planning and shopping. Therefore, we do a yearly exercise of testing our systems with additional load. During this time, we verify that our systems are able to handle the expected traffic increase. On Druid we look at several checks to verify:
Queries: We make sure the service is able to handle the expected increase in QPS while at the same time supporting the P99 Latency SLA our clients need.
Ingestion: We verify that the real-time ingestion is able to handle the increase in data.
Increase in Data size: We confirm that the storage system has sufficient capacity to handle the increased data volume.
In this post, we’ll provide details about how we run the holiday load test and verify Druid is able to handle the expected increases mentioned above.
Pinterest traffic increases as users look for inspiration for holidays.
How We Run Load Tests
As mentioned above, the areas our teams focus on are:
Can the system handle increased query traffic?
Can the system handle the increase in data ingestion?
Can the system handle the increase in data volume?
Can the System Handle Increased Query Traffic?
Testing query traffic and SLA is a main goal during holiday load testing. We have two different options for load testing in our Druid system. The first option generates queries based on the current data set in the Druid data and then runs these queries in Druid. The other option captures real production queries and re-runs these queries in Druid. Both of these options have their advantages and disadvantages.
Sample Versus Production Queries
The first option — using generated queries — is fairly simple to run anytime and does not require preparation like capturing queries. However, this type of testing may not accurately show how the system will behave in production scenarios. A real production query may look different and touch different data, query types, and timeframes than what is tested using generated queries. Additionally, any corner cases would be ignored in this type of testing.
The second option has the advantage of having real production queries that would be very similar to what we expect to see during any future traffic. The disadvantage here, however, is that setting up the tests is more involved, as production queries need to be captured and potentially need to be updated to match the new timeline when holiday testing is performed. In Druid, running the same query today versus one week from today may give different latency results, as data will move through different host stages in which data is supported by faster high-memory hosts in the first days/weeks versus slower disk stages for older data.
We decided to move ahead with real production queries because one of our priorities was to replicate production use cases as closely as possible. We made use of a Druid native feature that automatically logs any query that is being sent to a Druid broker host (broker hosts handle all the query work in a Druid cluster).
Test Environment Setup
Holiday testing is not done in the production environment, as this could adversely impact the production traffic. However, the test needs an environment setup as similar to the production environment as possible. Therefore, we created a copy of the production environment that is short-lived and solely used for testing. To test query traffic, the only stages required are brokers, historical stages, and coordinators. We have several tiers of historical stages in the production environment and we replicated the same setup in the test environment as well. We also made sure to use the same host machine types, configurations, pool size, etc.
The data we used for testing was copied over from production. We used a simple MySQL dump to create a copy of all the segments stored in the production environment. Once the dump is added to the MySQL instance in the test environment, the coordinator will automatically trigger the data to be replicated in the historical stages of the test environment.
Before initiating the copy, however, we needed to identify what data is required. This will depend on the client team and on the timeframe their queries request. In some cases, it may not be necessary to copy all data, but only the most recent days, weeks, or months.
Test environment is set up with the same configuration and hosts as Prod environment.
Our test system first connects to the broker hosts on the test environment, then loads the queries from the log file and sends them to the broker hosts. We use a multi-threaded implementation to increase the QPS being sent to the broker nodes. First, we run tests to identify how many threads are needed as a baseline that matches production traffic — for example, 300 QPS. Based on that, we can define how many threads we need to use for testing expected holiday traffic (two, three, or more times the standard traffic).
In our use case, we had loaded the data received up to a specific date (e.g. October 1st). At this point, we were re-running the captured log files on the same date or the day before, to match production behavior. Our test script also was able to update the time frame in a query to match either the current time or a predefined time to allow running any log file and translating it to the data available on the test environment.
Evaluating the Results
To determine the health of our system, we used our existing metrics to compare QPS and P99 latency on brokers and historical nodes, as well as determining system health via indicators like CPU usage of the brokers. These metrics help us identify any bottlenecks.
Query response time with normal traffic and 2x increase on basic system setup.
Typical bottlenecks can include the historical nodes or the broker nodes.
The historical nodes may show a higher latency for increased QPS, which will in turn increase the overall latency. To resolve this, we would add mirror hosts and increase the number of replicas of the data to support better latency under higher load. This step is something that will take time to implement, as hosts need to be added and data needs to be loaded, which can take several hours depending on the data size. Therefore, this is something that should be completed before traffic increases on the production system.
If the broker nodes are no longer able to handle the incoming query traffic, the size of the broker pool needs to be increased. If this is seen in the test environment, or even the production environment, it is much faster to increase the pool size and can potentially be done ad-hoc as well.
Testing with an increased data size on the test environment helps us determine which steps are needed to support the expected holiday traffic changes. We can make these configuration changes in advance, and we can make the support team aware of changes and of the maximum traffic the system is able to handle within the specified SLA (QPS and P99 latency requirements from the client teams).
Can the system handle the increase in data ingestion?
Testing the capacity for real-time data ingestion is similar to testing query performance. It is possible to start with making an estimate of the supported ingestion rate based on the dimensions/cardinality of the ingested data. However, this is only a guideline, and for some high-priority use cases it is a good idea to test early on.
We set up a test environment that has the same capacity, configuration, etc. as the production environment. However, in this step, some help from client teams may be required as we also need to test with increased data from the ingestion source like Kafka topic.
When reviewing the ingestion test, we focused on several key metrics. The ingestion lag should be low, and the number of both successful and rejected events (due to rejection window exceeded) should be closely similar to comparable values in the production environment. We also include validation of ingested data and general system health of overlord and middle manager stages — the stages handling ingestion of real time data.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Sample metrics for successfully ingested events, rejected events and kafka ingestion lag.
Can the system handle the increase in data volume?
Evaluating if the system can handle the increase in data volume is probably the simplest and quickest check, though just as important as the previous steps. For this, we take a look at the coordinator UI: here we can see all historical stages, the pool size, and at what capacity they are currently running. Once clients provide details on the expected increase in data volume, it is a fairly simple process to calculate the amount of additional data that needs to be stored over the holiday period and potentially some period after that.
The space is at a healthy percentage (~70%) allowing for some growth.
Results
In the tests we ran this year, we found that our historical stages are in a very good state and are able to handle the additional traffic expected during the holiday time. We did see, however, that the broker pool may need some additional hosts if traffic meets a certain threshold. We have been sure to keep this communication visible with the client teams and support teams so team members are aware and know that the pool size may need to be increased.
Learnings
Timing is very critical with holiday testing. This project has a fixed end date by which all changes need to be completed in the systems before any traffic increases, and the teams need to make sure to have all the pieces in place before results are due. As is true of many projects as well as this one, we need to leave additional buffer time for unexpected changes in timeline and requirements.
Druid is a backend service, which is not always top of mind for many client teams as long as it is performing well. Therefore, is it a good idea to reach out to client teams before testing starts to get their estimation of expected Holiday traffic increases. Some of our clients reached out to us on their own; however, the due date for any capacity increase requests to governance teams would have already passed. In these cases, or where client teams are not sure yet, it is a good practice to make a general estimation on traffic increase and start testing with those numbers.
Keeping track of holiday planning and applied changes for each year is also a good practice. Having a history of changes every year and keeping track of the actual increase versus the original estimates made beforehand will help to make educated estimates on what traffic increases may be expected in the following year.
Knowing the details on the capacity of brokers and historical stages before the holiday updates makes it easier for teams to evaluate what capacities to reduce the clusters after the holidays as well as considering organic growth on a per-month basis.
Future Work
In this year’s use case, we chose the option of capturing broker logs to retrieve the queries we wanted to re-play back to Druid. This option worked for us at this time, though we are planning to look into other options for capturing queries going forward. The log files option works well for a one-off need, but it would be useful to have continuous logging of queries and storing these in Druid. This can help with debugging issues and identifying high-latency queries that may need some tweaking to get performance improvements.
StackShare | Tech stack deep dives from top startups and engineering teams
Cost Reduction in Goku
By Monil Mukesh Sanghavi | Software Engineer, Real Time Analytics Team; Rui Zhang | Software Engineer, Real Time Analytics Team; Hao Jiang | Software Engineer, Real Time Analytics Team; Miao Wang | Software Engineer, Real Time Analytics Team;
In 2018, we launched Goku, a scalable and high performant time series database system, which served as the storage and query serving engine for short term metrics (less than one day old). In early 2020, we launched GokuL (Goku long term), which extended Goku’s capability by supporting long term metrics data (i.e. data older than a day and up to a year). Both of these completely replaced OpenTSDB. For GokuL, we used 3 clusters of i3.4xlarge SSD backed EC2 instances which, over time, we realized are very costly. Reducing this cost was one of our primary aims going into 2021. This blog post will cover the approach we took to achieve our ambition.
Background
We use a tiered approach to segregate the long term data and store it in the form of buckets.
Table 1: table of a tiered approach
Tiers 1–5 contain the data stored on the GokuL (long term) clusters. GokuL uses RocksDB to store its long term data, and the data is ingested in the form of SST files.
Query Analysis
We analyzed the queries going to the long term cluster and observed the following:
There are very few metrics (approximately ~6K) out of a total of 10B for which data points older than three months were queried from GokuL.
More than half of the GokuL queries had specified rollup intervals of one day or more.
Tier 5 Data Analysis
We randomly selected a few shards in GokuL and analyzed the data. We observed the memory consumption of tier 5 data was much more than all the other tiers (1–4) combined. This was despite the fact that tier 5 contains only one hour of rolled up data, whereas the other tiers contained a mix of raw and 15 minute rolled up data.
Table 2: SST File size for each bucket in MiB
Solutions
It was inferred from the query and tier 5 analysis that tier 5 data (which holds six buckets of 64 days of data each) was the least queried as well as the most disk consuming. We planned our solutions to target this tier as it would give us the most benefits. Mentioned below are some of the solutions which were discussed.
Namespace
Implementation of a functionality called namespace would store configurations like ttl, rollup interval, and tier configurations for a set of metrics following that namespace. Uber’s M3 also has a similar solution. This would help us set appropriate configurations for the select sete.g. set a lower ttl for metrics that do not require longer retention, etc). The time to production for this project was longer, and hence we decided to make this a separate project in the future. This is a project being actively worked upon.
Rollup Interval Adjust for Tier 5 Data
We experimented with changing the rollup interval of tier 5 data from one hour to one day and observed the change in the final SST file(s) size for the tier 5 bucket.
Table 3
The savings that came out of this solution were not strong enough to support putting this into production.
On Demand Loading of Tier 5 Data
GokuL clusters would only store data from tiers 1–4 on startup and would load the tier 5 buckets as necessary (based on queries). The cons of this solution were:
Users would have to wait and retry the query once the corresponding tier 5 bucket from s3 had been ingested by the GokuL host.
Once ingested, the bucket would remain in GokuL unless thrown away by an eviction algorithm.
We decided not to go with this solution because it was not user friendly.
Tiered Storage
We decided to move tier 5 data into a separate HDD based cluster. While there was some notable difference observed in the query latency, it could be ignored because the number of queries hitting this tier was much less. We calculated that tier 5 was consuming approximately 1 TB of each of the 650 hosts in the GokuL cluster. We decided to use the d2.2xlarge instance to store and serve the tier 5 data in GokuL.
Table 4
The cost savings that came out of this solution were huge. We replaced around 325 i3.4xlarge instances with 111 d2.2xlarge instances, and the cost reduction was huge. We reduced nearly 30–35% of our costs with this change.
To support this, we had to design and implement tier-based routing in the goku root cluster, which routes the queries to short term and long term leaf clusters. This was one of the solutions that gave us a huge cost savings.
In the future, we can evaluate if we can reduce the number of replicas and compromise on availability in opposition to the low number of queries.
RocksDB Tuning
As mentioned above, GokuL uses RocksDB to store the long term data. We observed that the RocksDB options we were using were not optimal for Goku’s data that has high volume and low QPS.
We experimented with using a stronger compression algorithm (ZSTD with level 5), and this reduced the disk usage by 40%. In addition to this, we enabled the partitioned index filter wherein only the top level index is loaded into memory. On top of this, we enabled caching with higher priority for filter and index blocks so that they use the same cache as the data blocks and also minimize the performance impact.
With both the above changes, we noticed that the latency difference was not large and the reduction in data space usage was approximately 50%. We immediately put this into production and shrunk the size and cost of our GokuL clusters by another half.
What’s Next
Namespace
As mentioned, we are actively working on the implementation of the namespace feature, which will help us reduce the long term cluster costs even further by reducing the ttl for most of the current metrics that do not need the high retention anyways.
Acknowledgments
Huge thanks to Brian Overstreet, Wei Zhu, and the observability team for providing and supporting solutions on the table.
StackShare | Tech stack deep dives from top startups and engineering teams