“Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.”
“Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.”
This is considered by some to be the greatest revelation in the history of computer science, and there’s no debate that this philosophy has been instrumental in the success of Unix and its derivatives. Beyond Unix, it’s easy to see how this philosophy has been fundamental to the fantastic success of the Hadoop ecosystem.
Nevertheless, a recurring problem in Computer Science is latching on to a great idea and applying it indiscriminately, even when it doesn’t make sense. I’m going to argue that while the Unix philosophy works for batch workflows, it is a poor fit for stream processing.
In the Beginning, there was Batch
In 2008, when I started work on what would become VoltDB, there was a tremendous amount of buzz around Hadoop. At the time Hadoop and “MapReduce” were almost synonymous. Seven years later, “MapReduce” is being subsumed by better technologies, but the Hadoop ecosystem is strong.
If MapReduce is waning, what has made the Hadoop ecosystem so successful? I’m probably not going out on a limb to suggest that HDFS is the key innovation of Hadoop. HDFS jump-started the Cambrian explosion of batch-processing we’re now experiencing by doing three things:
- It stores data redundantly across machines, rather than using RAID. This is something most distributed file systems do, but it’s still crucial to protecting data.
- It abstracts access through a library, rather than a local file system driver. This means apps have to be specifically coded to use it, but it also frees the implementers of HDFS from having to appear as a local filesystem, which is a poor model for robust distributed systems.
- It treats datasets and files as immutable blobs. Yes, you can append to files and you can delete files, but HDFS makes it hard enough to make small mutations that data sets are largely treated as immutable.
These three traits are why HDFS enables so many Hadoop ecosystem tools to do so many different things.
Between HDFS replication and data set immutability, no matter how flawed your software or hardware consuming the data is, the original data is safe. A successful batch job takes one immutable data set as input and produces another as output. If a job fails or corrupts due to hardware or software problems, the job can be killed, the output deleted, the problem addressed and then the job restarted. The input data is still pristine. HDFS and the batch nature of the work allows this robustness. Contrast this with a system that modifies a dataset in place, and you see that immutability mostly absolves developers from managing partial failure.
So developers building Hadoop ecosystem tools to run on HDFS are absolved from the most difficult problems of distributed systems. This enables new batch-oriented tools to be developed quickly. End-users are free to adopt new tools aggressively, as they can be run in addition to an already-working system without introducing new risk.
HDFS enables a vast ecosystem of tools that can be combined into billions of different combinations, and adjusted as business needs change.
So What if Latency Matters?
Unsurprisingly, not all apps are a great fit for batch. Many data sets have value that must be extracted immediately or opportunities and insight are lost. We call these apps “Fast Data” apps, as opposed to “Big Data” apps.
Unfortunately, low-latency and HDFS don’t mix. HDFS isn’t magical and, like all things in engineering, comes with tradeoffs. Almost all of the tools built around HDFS as a repository for data accept that the latency between ingestion and the desired end response is measured in minutes or hours, not seconds or milliseconds.
This is typically because input datasets must be loaded before processing, and the output result must be fully written before it is accessed. This process can be chunked to reduce latency, but in order to maintain efficiency, these chunks are still at least seconds’ worth of work to process. The second latency issue is that most batch systems favor retry over redundancy when handling faults. This means, in the case of hardware or software failure, users will see significant spikes in latency.
Thus we’ve ruled out HDFS for latency-sensitive processing problems. So what do we use instead? Surely, without HDFS as the unifying foundation, the pattern of cobbling together many special-purpose systems to build a larger system using glue code and luck won’t translate to Fast Data, right?
You may have guessed this was a rhetorical question.
Let’s examine Storm, an exemplar tool for stream processing. Storm offers a framework for processing tuples. A set of processors, each with inputs and outputs, is connected in a topology. The user supplies the per-tuple processing code and the framework manages where that code is run, much like a MapReduce Hadoop job. Storm also manages the optional re-processing of tuples when software or hardware fails. Storm has a real “MapReduce” for streams feel to it, but there is a crucial difference: Storm is responsible for the data it is processing, where MapReduce is processing data kept safe by HDFS.
Keeping with the Unix and Hadoop philosophy of specialized tools, Storm focuses on distributing data and processing to a commodity cluster of Storm workers. It leaves other key functions to other systems.
- It has no built-in state management.
- It relies on ZooKeeper for agreement.
- Its re-process-on-failure feature requires the external data source to have the data available for re-consumption.
A recent talk from a Twitter engineer described a problem his team solves with Storm. They count unique users of mobile apps for given time periods. The tricky part is the volume: reportedly 800,000 messages per second, making this a poor fit for more traditional systems. The stack they use involves Kafka, Storm, Cassandra, and of course, ZooKeeper.
We can assume ZooKeeper, Kafka, Storm and Cassandra all use at least three nodes each to run with reasonable safety. Three is a bit of a magic number in distributed systems; two node clusters have a harder time agreeing on the state of the cluster in failure scenarios. So now we’re operating and monitoring four systems, at least twelve nodes, and the interops/glue between all of the above. If a network between two Cassandra nodes fails, are the symptoms the same as if the network between Storm nodes failed, or ZooKeeper nodes failed? Each of these systems has different failure semantics, and can cause different symptoms or cascading failures in other parts of the stack.
While four systems wouldn’t be odd in an HDFS-based batch system, the crucial difference here is that user data ownership is passed between systems, rather than being wholly the responsibility of HDFS. Sometimes state only has meaning if data from multiple systems are combined. Development and testing stacks with four systems isn’t 33% harder than stacks with three systems either; it can be as much as four times as hard, depending on how the systems connect with each other.
Builders of such systems readily admit this, but some of them have found a “solution”.
Enter Lambda
So batch is too slow, and Fast Data offerings are less than robust and a bit too opaque. What to do?
Nathan Marz(link is external), formerly of Twitter and creator of the Apache Storm project, proposed the Lambda Architecture as the solution. Rather than address the flaws directly, you simply run both the batch and streaming systems in parallel. Lambda refers to the two systems as the “Speed Layer” and the “Batch Layer”. The Speed Layer can serve responses in seconds or milliseconds. The Batch Layer can be both a long-term record of historical data as well as a backup and consistency check for the speed layer. Proponents also argue that engineering work is easier to divide between teams when there are two discrete data paths. It’s easy to see the appeal.
But there’s no getting around the complexity of a Lambda solution. Running both layers in parallel, doing the same work, may add some redundancy, but it’s also adding more software, more hardware and more places where two systems need to be glued together. It’s this lack of natural integration that makes Lambda systems so difficult. The fact that the work can be divided amongst teams is itself a tacit acknowledgement that Lambda solutions may require “teams,” plural.
So why is the Lambda Architecture gaining acceptance? I believe there are two ways to answer. First, it’s because Lambda has some real advantages. For starters, it can scale. The number one rule of 21st century data management: If a problem can be solved with an instance of MySQL, it’s going to be. With Fast Data, we’re often talking about hundreds of thousands of events per second to process, something well beyond what MySQL can handle. There is a set of Lambda proponents who know that Lambda is complicated and operationally challenging, but are aware of few other options.
The alternative reason people turn to the Lambda Architecture is habit. People who work with Big Data are used to going to the Apache Software Foundation’s website, picking from a menu of software products, and connecting them up. Some people just want to wire things together from smaller cogs, whether it’s the easiest path or not.
Challenging Lambda
Many voice similar arguments about the complexity and cost of typical Lambda solutions. The best way to back up such an argument is by pointing to an alternative. And the way to build a successful alternative to Lambda seems to be to improve the efficiency and robustness of the speed layer.
LinkedIn/Apache has Samza. Spark has a streaming flavor. Google has Millwheel. All of these make arguments that the speed layer can be less burdensome on a developer. Perhaps you’re not aware of VoltDB’s pitch as a Fast Data engine, but we feel that the design of the VoltDB system makes it one of the best possible approaches to this problem.
First, VoltDB offers horizontal scaling and high per-machine throughput. It can easily ingest and process millions of tuples per second with redundancy, while using fewer resources than alternative solutions. We’ve seen examples where VoltDB requires an order of magnitude fewer nodes to achieve the same scale. Substantially smaller clusters aren’t just cheaper to build and run, they’re easier to manage.
Second, VoltDB offers strong consistency, a strong development model, and can be directly queried using industry-standard SQL. These features are all about enabling developers to focus on their business logic and not on the stupid corner cases that we all deal with when building a system at scale. With VoltDB, we don’t ask developers to learn or use non-standard query languages or processing languages. We don’t ask developers to worry about concurrent access or conflicts. If you can express your logic in single-threaded Java code and SQL, you can probably scale that logic to millions of operations per second.
Finally, VoltDB is operationally simpler than competing solutions. Within a VoltDB cluster, all nodes are the same; there are no leader nodes, no agreement nodes; there are no tiers or processing graphs. If a node fails, it can be replaced with any available hardware or cloud instance and that new node will assume the role of the failed node. Furthermore, by integrating processing and state, a Fast Data solution based on VoltDB requires fewer total systems to monitor and manage.
This is not theoretical; it is the feedback we get from actual users with Fast Data systems in production. Some systems make easy problems easy and wow developers with trivial tutorial bliss, surprising them later as real-world scale and complexity creep in. We set out to build a system that makes the hardest problems easier and intractable problems tractable. My favorite thing a customer said about VoltDB was, “It does what it says on the tin.”
A Speed Layer Example
We set out to add an example app to the 5.0 release of VoltDB that represented a typical speed layer application. Last fall, I went to a talk given by an engineer at Crashlytics about how they use the Lambda Architecture to track the number of unique users of mobile applications. It was perfect.
To summarize the problem as presented, every time an end-user uses a smartphone app, a message is sent with an app identifier and a unique device id. This happens 800,000 times per second over thousands of apps. App developers pay to see how many unique users have used their app each day, with per-day history available for some amount of time going back.
For their solution, Crashlytics has built a very typical Lambda stack. In the speed layer, they have enlisted Kafka for ingestion, Storm for processing, Cassandra for state and Zookeeper for distributed agreement. In the batch layer, tuples are loaded in batches into S3, then are processed with Cascading and Amazon Elastic MapReduce. To reduce processing and storage load on the system, the HyperLogLog cardinality estimation algorithm has been enlisted as well.
The VoltDB sample application(link is external) has several components.
The Sample Data Generator
Generating fake but plausible data is often the hardest part of a sample app. Our generator generates tuples of ApplicationID, DeviceID with non-linear distributions. This component is also easily changed to support more/fewer applications/devices, or different distributions. View the code on Guthub(link is external).
The Client Code
It’s the job of the client code to take generated data and feed it to VoltDB. The code looks like most VoltDB sample apps: it offers some configuration logic, some performance monitoring, and the connection code. View the code on Github(link is external).
The Ingest Logic
This is the key part of the application, and it’s what separates VoltDB from other solutions. The idea was to build a stored procedure to handle the incoming ApplicationID, DeviceID tuples, and then write to the relational state any updated estimate for the ApplicationID given.
First we found a HyperLogLog implementation in Java on Github. Then we simply wrote down the straightforward logic to achieve the test. I tried a few variations and used the performance monitoring in the client code to pick the best choice. The bulk of the logic is about 30 new lines of code which can easily scale to over a million ops/sec with full fault tolerance on a cluster of fewer than 10 commodity nodes. This is a significant win over alternative stacks. View the code on Github(link is external).
Web Dashboard
Finally, the example includes an HTML file with accompanying Javascript source to query VoltDB using SQL over HTTP and display query results and statistics in a browser. The provided dashboard is very simple; it shows ingestion throughput and a top-ten list of most popular apps. Still, because the VoltDB ingestion logic hides most implementation complexity – such as the use of the HyperLogLog – and boils the processing down to relational tuples, the top-ten query is something any SQL beginner could write. View the code on Github(link is external).
It took a little more than a day to build the app and the data generator; some time was lost adding some optimizations to the HyperLogLog library used. We felt validated: this was not hard to do with VoltDB.
Example in hand, we then decided to push our luck. We created versions that don’t use a cardinality estimator, but use exact counts. As expected the performance is good when the data size is smaller, but is slower as it grows, unlike the HyperLogLog version. We also created a hybrid version that keeps exact counts until 1000 devices have been seen, then switches to the estimating version. You can really see the power of strong consistency when it’s so trivial to add what would be a complex feature in another stack. Furthermore, changing this behavior required changing only one file, the ingestion code; the client code and dashboard code are unaffected. Both of these modifications are bundled with the example.
Finally, we made one more modification. We added a history table, and logic in the ingestion code that would check on each call if a time period had rolled over. If so, the code would copy the current estimate into a historical record and then reset the active estimate data. This allows the app to store daily or hourly history within VoltDB. We didn’t include this code in the example, as we decided too many options and the additional schema might muddy the message, but it’s quite doable to add.
The last point I’m trying to make here is that VoltDB isn’t just a good platform for static Fast Data apps. The strong consistency, developer-friendly interfaces and standard SQL access make VoltDB apps really easy to enhance and evolve over time. It’s not just easier to start: it’s more nimble when it’s finished.
So When Should You Use Lambda?
I have a pretty easy answer to this question. If your company has a Lambda-based solution in production solving some other problem, and it’s easy to see how that same stack could be used to solve your problem, then leveraging current experience may make a lot of sense. To be specific, the following should be true:
- They have a set of software tools/systems that they have used in production.
- They have pre-existing glue configuration and code to connect these systems.
- They have experience monitoring and supporting these systems.
- The problem under consideration is a reasonable fit for the provided tools and integration.
If you find yourself adding another system to add “one more thing” to the existing stack, or you find that you have to make extensive changes to glue code that exists, you should consider alternatives.
If you’re starting from scratch, downloading a list of tarballs from the ASF and trying to figure out how you connect them together to make a Lambda App, please stop. I know this feeling of staring at the recently un-tarred directories, the sense that anything is possible and the world is your ninja rockstar oyster. It doesn’t last.
Conclusion
It’s easy to see the circumstances and history that led us here, but from an engineering point of view, the Lambda Architecture is rather unfortunate. It’s also important to understand that there are few good options when dealing with low latency requirements at this scale, and these systems have been deployed with some success.
The single largest issue with these systems is the lack of integration. This leads to complex development, complex testing, and complex operational support. The best solutions to Fast Data problems have been conceived with the big picture in mind. How will this system tackle ingestion, agreement, processing and state? Using discrete components to handle these four jobs is more flexible, but at tremendous cost.
For the time being, the only system I’m aware of that can meet the scale and latency requirements of today’s Fast Data applications, and handle ingestion, agreement, processing and state, is VoltDB. It’s worth checking out.