Onyx 0.5.0 hit Clojars this morning, and I couldn't be more proud. More than 4 months of hard work shaped this release, featuring some of my most creative work to date. If you're not yet familiar, Onyx is a batch and stream processing distributed computation system for Clojure. I open sourced it during my talk at StrangeLoop about 6 months ago. Its super-power is its merciless emphasis on using data structures to construct computations in remote languages, eventually executing your own plain Clojure functions out on a cluster. You can join our user-group to learn more.

At the center of today's release showcase is a prolific design change: Onyx has become a masterless computation system. Following directly from this new capability are a few distinctive features, including realtime notification of cluster events via core.async, tunable node allocation/scheduling algorithms, and a host of APIs to enhance usability.

In this post, I'm going to gently take you through how we were able to achieve a compute system with no centralized coordinating entity. For a very in depth explanation about how everything works, you can check out the Onyx Internal Documentation, which details algorithm specifics, step-by-step examples, and edge cases. Next, I'll describe how to use some of the new features, and how they're easily implemented atop a masterless cluster. I'll finish out the post by unveiling what's next up my sleeve.

If you'd like to skip the design-talk, you can go right for the release notes, 15+ self-contained/run-out-of-the-box examples, or starter repository. Otherwise, put on your distributed systems hat and hold on tight! (Mine is permanently glued on.)

Part 1: We don't need no stinking Nimbus

I'm not sure if I made it look easy to anyone who was watching, but getting the first public release of Onyx ready for StrangeLoop was a challenge to say the least. Cranking out features and documentation each night for months on end amplified the best and worst parts of Onyx's design. At any rate, we made it to the finish line with a respectfully powerful platform.

Following that engagement, I had some time to reflect on what the most error-prone and cognitively straining pieces of Onyx are. Distributed systems veterans won't be surprised that coordination bit me the hardest. Since that realization, I've had mesmeric focus on eliminating this heap of complexity. But before we talk about that, a bit of background on Onyx 0.4.1 and prior.

The pros of a centralized coordination node

The basic "worker node" in Onyx is called a Peer. Peers do the hard computational work and message processing. Onyx 0.4.1 and prior featured a single coordination node (the "Coordinator") that directed the rest of the peers on which tasks to perform. The coordinator detected failing machines, reassigned tasks in response to changing workloads, and monitored for process timeouts. Coupled with making the coordinator highly available with a hot stand-by and ZooKeeper, this approach has some good characteristics:

  • Most znodes in ZooKeeper will only have one watch that triggers on change - the coordinator
  • Znode contention is very low. Usually only the coordinator and one peer are accessing each znode
  • Coordination decisions happen in a single place, and can be easily traced down to a single location in the code executing on a single box

Both Storm and Hadoop take this approach, with Nimbus and the Name Node respectively.

The cons of a centralized coordination node

There are undeniable drawbacks to a centralized coordinator. And in my case, I eventually threw my hands in the air because:

  • More bugs have come up inside the coordinator than any other part of Onyx
  • Multiple implementations of the coordinator needed to be supported in parallel (HTTP, memory)
  • Some state in ZooKeeper is mutable, and the coordinator needed an atomic read of some znodes - but not all
  • Most writes and reads to ZooKeeper need to have (application/semantic level) serializable consistency - the burden is on me to do it right
  • There's a burden on the user to stand up a coordinator and its failover instances
  • It's hard to support good scheduling algorithms when a subset of the information is ZooKeeper is mutable
  • Task rotation to support peer failure replacement is really hard with the 0.4.0 design - for my own poor design decisions

Towards a masterless design

My goal is to support Onyx long-term, and I needed a design that was going to survive. And so, I made the leap and aggressively tore up more than half of the code base. We'll go through this section and introduce new components and ideas to finally arrive at a masterless architecture. The result is heavily influenced by the designs of Datomic, Kafka, and Microsoft's CORFU.

The Log

This design centers around a totally ordered sequence of commands using a log structure. The log acts as an immutable history and arbiter. It's maintained through ZooKeeper using a directory of persistent, sequential znodes. Virtual peers, or "nodes" if you like, act as distributed processes that consume from the log one entry at a time in order. Each peer maintains a local value in memory called a replica. A replica is an immutable value that reflects the functional application of log entries.

Log entries represent deterministic, idempotent functions to be applied to the local replica. These functions update the replica with cluster activity, such as "a new job has been submitted", or "this peer has left the cluster". The peer plays the log from the beginning, applying each log entry to its local replica. The local replica is transformed from one immutable value to another. Since all log entries are numbered from 0 to N, the current log entry that a peer is reading serves as a point in time - say, k, and can be thought of as a strictly monotonically increasing logical clock. At the time a peer starts up, it initializes its local replica to the "origin value" (empty map). You can find a description about every log command in Onyx in the documentation.

Figure 1: The log is an immutable sequence of functional commands, and each peer maintains a local replica reflecting the functional application of those commands

Figure 2: The peer plays the log in order, reading the functional command and updating its replica

The functional commands that are added to the log represent deterministic, side-effect free, idempotent functions in Clojure. That means if 10 peers play the first 10 log entries, they will all end up with the exact same local replica. This is why we say Onyx is "the cluster as a value". We can generalize this notion to the kth log entry. This idea ends up being the foundation that allows peers to process the log against completely independent timelines. The peers do not coordinate directly with one another.

Figure 3: Peers operate on independent timelines, and only coordinate via the log - never directly with one another

As a reaction to each replica transformation, the peer may write more commands to the tail of the log. Reactions are also deterministic and conditional. Each peer is assigned a unique ID, and it searches the replica each time it updates to determine whether it should append reactions to the log, or carry out side-effects (via a different protocol than the replica transformation!). This determinism means that any peer can predict how any another peer will react to a particular command. This ends up being an important principle for scheduling jobs and tasks across the cluster. Peers store everything in memory - so if a peer fails, it simply reboots from scratch with a new identifier and plays the log forward from the beginning. Since it has a new identifier, it has no association to the commands it previously issued, which mitigates zombie issues.

Figure 4: Peers may choose to react to a command by adding more commands to the log

Finally, the peer can carry out side-effects after a replica transformation. Common side effects include starting execution tasks, talking to ZooKeeper, and writing to core.async channels. Isolating side effects as a distinct part of a replica update means that a large subset of the Onyx test suite can operate on pure functions alone. Again, since each peer is tagged with a unique ID, it knows how to search the replica to determine if it should carry out side effects. In contrast to all peers having the same replica after k updates, peers will execute different side effects playing the log entries up to k. This juxtaposition is how work is accomplished in a clustered setting without requiring a single coordinator.

Detecting peer failure

After reading above, you're undoubtedly wondering how Onyx can be fault tolerant and detect node failure in the cluster. Discussing machine failure and how a peer joins the cluster go hand in hand, but I'll start with the former since it lays a better basis for explaining the latter.

One of the ways that Onyx is fault tolerant is its detection and reaction to a peer leaving the cluster or otherwise failing. In prior versions of Onyx, the central coordination node would watch all ephemeral heart beat nodes in ZooKeeper and react upon disconnection. We have to get more creative when the coordination node is no longer present. I chose a ring based approach. That is to say, every peer is responsible for watching exactly one other peer, such that the relationship of "who is watching who" forms a circle. The observer-subject information is stored inside the replica, so all peers know who everyone else is observing.

Figure 5: The peers form a ring. An arrow from A to B indicates that peer A observes peer B for failure

When a machine fails, exactly one peer in the cluster is responsible for reporting its death by appending a command to the tail of the log. As this log entry is applied to the peer's replica who submitted the entry originally, this peer "closes the ring" by adding a ZooKeeper watch to the peer that the failed peer was observing. Precisely the right peers know when to act and who to apply which side effects to because of the shared, deterministic value of the replica.

Figure 6: Peer 5 crashes. Peer 1 submits an entry to the log indicating the Peer 5 is down

Figure 7: When Peer 1 processes the original log entry that it reported about Peer 5's death, it closes the ring and observes Peer 4. Note the replica change, which all peers see as they process the log entry

Joining the cluster

Now that I've introduced the concept of the observer ring, it becomes much easier to explain how a peer joins the cluster. When a peer wishes to join the cluster, it must engage in a 3 phase protocol. Three phases are required because the peer that is joining needs to coordinate with another peer to change its ZooKeeper watch. I call this process "stitching" a peer into the cluster, and it utilizes the log to safely add the new peer. You can think of this as "growing the ring", the opposite of how the ring "shrinks" when a peer leaves the cluster. If a peer can't join the cluster (for a few reasons, such as not finding peers to stitch with, see the official docs for more), it aborts and tries again. This protocol ensures that any participating peers can abort or fail at any stage and nothing "bad" will happen.

Figure 8: Peer 5 wants to join the current 4 node cluster

Figure 9: Peer 5 initiates the first phase of the join protocol. Peer 1 prepares to accept Peer 5 into the ring by adding a watch to it

Figure 10: Peer 5 initiates the second phase of the join protocol. Peer 5 notifies Peer 4 as a peer to watch. At this point, a stable "mini ring" has been stitched along the outside of the cluster. We note that the link between Peer 1 and 4 is extraneous

Figure 11: Peer 5 has been fully stitched into the cluster, and the ring is in tact

Garbage collection

One of the primary obstacles that this design imposes is the requirement of seemingly infinite storage. Log entries are only ever appended - never mutated. If left running long enough, ZooKeeper will run out of space. Similarly, if enough jobs are submitted and either completed or killed, the in memory replica that each peer houses will grow too large. Onyx requires a garbage collector to be periodically invoked.

To this end, Onyx exposes its garbage collector via a public API. It does two things. The caller of gc will place an entry onto the log. As each peer processes the log entry, it carries out a deterministic, pure function to shrink the replica. The second thing will occur when the gc caller invokes the side effects for this log entry. The caller will have specified a unique ID such that it is the only one that is allowed to trim the log. The caller will take the current replica (log entry origin to this log entry), and store it in an "origin" znode. Anytime that a peer boots up, it first reads out of the origin location. Finally, the caller deletes log entry N to this log entry minus 1. This has the dual effect of making new peers start up faster, as they have less of the log to play. They begin in a "hot" state.

Figure 12: A peer can start by reading out of the origin, and continue directly to a particular log location.

Part 2: The new toys to play with

Realtime event notification

Now that we've talked about Onyx's new log-based design, it's very straightforward to introduce the event notification service. The idea is that, given an Onyx deployment ID and a ZooKeeper address, Onyx will give you back a core.async channel. Every time there's an event written to the log, a map describing what happened is put onto your channel. The channel is preloaded with log entries from the origin onward, meaning you can go back and replay history.

So what can you do when you subscribe to the log? An open set of things:

  • Send an email every time an Onyx job completes
  • Fire off a Riemann or CloudWatch alert when a peer fails and leaves the cluster
  • Automatically spin up more Onyx peers when tasks take an average of more than 5 minutes to complete
  • Programmatically resubmit jobs every 15 minutes, but never allow more than 4 active jobs running at the same time

This is just the beginning. Tapping into the activity of your cluster with the full power of Clojure is going to let us go places that other frameworks can't reach.

Cluster allocation algorithms

In a masterless design, there is no single entity that assigns tasks to peers. The Onyx cluster is self-organizing as it distributes work across all peers. Onyx now ships with a variety of job and task schedulers. Job schedulers define how many peers each particular job receives. Task schedulers define, given N peers received from a job scheduler, how those N peers are distributed across a single job's tasks. Onyx offers Greedy, Round Robin, and Percentage job and task schedulers right out of the box.

As jobs, tasks, and peers are all added and removed, Onyx automatically rebalances the workload, and takes a special effort to leave as many peers undisturbed as possible. No slots, no rebooting your cluster, no redeploying with a new configuration. It's all happening at runtime on your behalf. Relax - I got this.

Percentage job and task schedulers

Greedy and round robin schedulers are easy enough to understand. But in this post, I want to call out one particularly cool new scheduler - the percentage scheduler. The job and task percentage schedulers behave more or less the same, so let's consider how the job percentage scheduler works.

The Percentage job scheduler allows jobs to be submitted with a percentage value. The percentage value indicates what percentage of the cluster will be allocated to this job. The use case for this scheduler is for when you have a relatively static number of jobs and a varying number of peers. For example, if you have 2 jobs - A and B, you'd give each of these percentage values - say 70% and 30%, respectively. If you had 100 virtual peers running, 70 would be allocated to A, and 30 to B. If you then added 100 more peers to the cluster, job A would autoscale upward to 140 peers, and job B 30. Now if someone took a baseball bat to a rack in your data center, and you only had 10 peers left, Onyx would autoscale down job A to 7 peers, and B to 3.

Part 3: Winning the war

And with that, we've completed the summary of what's changed in Onyx 0.5.0. Rewriting thousands of lines of code for a cleaner design has been a fun experience, but there's an important lesson to be learned. Despite all the change the codebase has been through, the information model remains 100% the same. Clients in other languages, on other machines, in other data centers remain unaffected by these changes. Good systems are built on top of strong information models.

We'll now begin shifting our focus towards 0.6.0. After StrangeLoop, I knew there were two things that needed attention. One of them was coordination, the other was performance. In the coming weeks, you can expect development to start dropping HornetQ out of the picture in favor of an HttpKit/Netty messaging design. HornetQ was great to get up and going, but it hasn't lived up to my expectations in a number of ways. I've also run into a few bugs with message grouping that result in occasional dropped data. If you need Onyx for extremely high performance and bullet-proof reliability, wait a few more months before adopting. If you can accept a small amount of alpha-behavior, go ahead and give 0.5.0 a whirl.

Looking forward towards performance, we're taking a careful look at how Apache Storm achieves excellent performance characteristics. You can push a screaming million messages/second through a small Storm cluster and observe linear scaling as you add more machines. Onyx will generally follow Storm's strategy of using random bit-sets, xor'ing, and in-memory acking to deliver similar performance. The goal is to be able to declare Onyx 0.6.0 truly industry-ready.

Finally, I wanted to take a minute to thank Lucas Bradstreet (@ghaz). Lucas has been submitting patches, proof reading documentation, and giving design advice since 0.4.0 came out. It's a wicked cool thing to see others take an interest in your project, but it's on a whole new level when someone takes time out of their busy day to contribute to your creation. As you can guess, Onyx's log-based design enables some exceptionally interesting infrastructure and deployment tooling. If you or your company are interested in using Onyx in production, you'll definitely want to have a chat with Lucas and learn about the tooling he's cooking up to make Onyx a first class player in the big data ecosystem.

You can expect us to be back in a couple of months with the 0.6.0 release, and a benchmark in hand. Until then, happy Onyx'ing!

My name is Michael Drogalis. By day, I'm a developer with the LonoCloud group at ViaSat. By night, I build Onyx. If you'd like to get in touch, you can tweet me @MichaelDrogalis or email me at mjd3089 at rit dot edu.