Today is a big day. In fact, today is huge. Onyx 0.6.0 is out on Clojars, representing over 5 months of development effort. We have so much to show you, including a performance benchmark.

For those who are new, Onyx is a distributed, masterless, high performance, fault tolerant data processing platform for Clojure. Onyx aggressively uses data structures as its primary language to make computations serializable. Its API exposes a batch and streaming hybrid processing model. Onyx sports a totally ordered log for coordination, and point-to-point communication for guaranteed at-least-once message delivery. Onyx is a good fit for realtime stream processing, batch analytics, and ETL jobs.

At a year and a half old, this product has set a blistering pace for the size of the team. The last two releases have addressed support for deploying cloud-scale clusters. In the previous release, we redesigned coordination to be masterless - utilizing a purely functional log-replication approach. We like to say that Onyx is the "cluster as a value". In this release, we have carefully rearchitected Onyx's message transport layer. The result of our efforts: industry-competitive performance and stability. The goal from the beginning of this project has been to give Clojure a seat at the table of serious distributed systems. I believe we've finally arrived.

A lot of good things have happened since we last released. We got ourselves an official website and Twitter account. We moved off of my personal account to a GitHub organization. We established chatrooms in Gitter and Slack. I was a guest on the Cognicast. We got ourselves some laptop stickers (available to committers!). The growth we've experienced and the embrace of the Clojure community has been beyond anything I anticipated. I'm touched that we've fostered such a wonderful ecosystem together.

If you're eager to dive straight in and skip the news, you can get going with the Starter repository, application template, or 17+ self-contained/run-out-of-the-box examples.

High Performance

The 0.6.0 release had one goal: drastically increase the performance of Onyx without sacrificing reliability. Prior to 0.6.0, Onyx used a brokered, durable queuing protocol to transport messages through HornetQ. While this approach got us off the ground, we ran into scaling problems and bugs that were out of our control. High performance messaging is a core competency to Onyx, and we had to own it.

The work we've done for this release completely tears out HornetQ in favor of a pluggable, point-to-point messaging layer. Out of the box, you can choose to use either the Netty or core.async messaging implementations. Netty is what you'll want to use for production and multi-node clusters. core.async is great for very light-weight, local development. You can switch between these modes by changing a single keyword in your boot-up configuration. We have more messaging implementations that will ship in the next few months to offer increased performance.

Benchmark

Onyx has always been serious about being serious. We designed a reproducible benchmarking suite to measure how fast we can go. We put this release (Git SHA 84e8a8a5711ad2afacb5eda576608eb15ead1cdc) through a series of tests (benchmark SHA a905f63a42dd8e04b1f79fca48d25c6631301f36) on AWS to test Onyx's raw throughput independent of applications that run on top of it. We ran 1 job with 6 tasks and performance tuned it on three different cluster configurations: 5 * c4.xlarge nodes, 5 * c4.2xlarge nodes, and 5 * c4.4xlarge nodes.

The job in question has 1 input task, 4 processing tasks, and 1 output task. Input messages were fabricated by a special plugin generating a 104 byte payload in each message. 4 bytes were allocated for a random integer, and 100 additional bytes were allocated as an opaque payload to simulate a medium sized message. These two values were put into a single map, which creates an Onyx segment. The segments flow through the workflow. The four processing tasks increment the integer by 1. When the message hits the output task, it is discarded. We do this because we're measuring Onyx's throughput independent of any other I/O. All benchmarks used the Netty transport implementation.

Each benchmark used a 1 node ZooKeeper cluster (c4.large) and a 1 node metrics server that hosted Postgres, Riemann, Graphite, and Grafana (c4.large). Measurements were aggregated locally on the Onyx peer and sent to Riemann every 1,000 milliseconds, after which they were flushed from the peer. Riemann then pushed the furthered aggregated values out to Graphite.

Measurements taken were segments processed per second by all tasks, number of timed-out retries per second, VM memory usage, and VM processor usage.

All servers ran with JVM opts: -server -Xmx10g -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=duration=240s,filename=myrecording.jfr, and an anti-affinity for colocating acking daemons on the same physical machine.

The job used the following workflow:

[[:in :inc1]
 [:inc1 :inc2]
 [:inc2 :inc3]
 [:inc3 :inc4]
 [:inc4 :no-op]]

And the catalog:

[{:onyx/name :in
  :onyx/ident :generator
  :onyx/type :input
  :onyx/max-pending pending-size
  :onyx/medium :generator
  :onyx/max-peers n-parallel-input-streams
  :onyx/batch-size batch-size
  :onyx/doc "Fabricates messages"}

 {:onyx/name :inc1
  :onyx/fn :onyx-benchmark.peer/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :inc2
  :onyx/fn :onyx-benchmark.peer/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :inc3
  :onyx/fn :onyx-benchmark.peer/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :inc4
  :onyx/fn :onyx-benchmark.peer/my-inc
  :onyx/type :function
  :onyx/batch-size batch-size}

 {:onyx/name :no-op
  :onyx/ident :core.async/write-to-chan
  :onyx/type :output
  :onyx/medium :core.async
  :onyx/max-peers n-parallel-output-streams
  :onyx/batch-size batch-size
  :onyx/doc "Discards messages"}]

With all of that set up, we ran the tests. The results follow.

5*c4.xlarge results

The benchmark ran with the following specification:

Attribute Value
Region us-east-1
Availability Zone us-east-1e
Tenancy shared
Spot pricing yes
Acker percentage 80
CPU Cores 4
Virtual Peers per machine 5
Total number of virtual peers 25
Log level WARN
Clojure 1.7.0-beta2
Flight Recorder enabled yes
Batch size 20
:onyx/max-pending 50,000 segments
:onyx/pending-timeout 60,000 ms
Parallel input streams 4
Duration 20 minutes

Measurements

1

Figure 1: Onyx tops out around 100,000 segments processed per second per VM.

2

Figure 2: No messages timed out, meaning that all messages were fully processed within 60,000 milliseconds of latency.

3

Figure 3: Memory usage is stable and roughly constant across all VMs.

4

Figure 4: Processor usage is nominally between 75-95%. The gap is likely due to the distribution of somewhat heterogenous tasks to different machines.

Overall - not bad! There's a little room for improvement in terms of processor usage, but we find 500,000 segments per second in aggregate acceptable.

5*c4.2xlarge results

The benchmark ran with the following specification:

Attribute Value
Region us-east-1
Availability Zone us-east-1e
Tenancy shared
Spot pricing yes
Acker percentage 20
CPU Cores 8
Virtual Peers per machine 10
Total number of virtual peers 50
Log level WARN
Clojure 1.7.0-beta2
Flight Recorder enabled yes
Batch size 20
:onyx/max-pending 300,000 segments
:onyx/pending-timeout 60,000 ms
Parallel input streams 6
Parallel output streams 6
Duration 20 minutes

Measurements

1

Figure 5: With 2x the number of cores as the previous VM size, we jump up to ~170-190k segments per segment per VM. We're not quite able to double our throughput, but we're not too far off either.

2

Figure 6: All segments are fully processed within 60,000 milliseconds of latency.

3

Figure 7: Memory is still mostly stable, but grows a little bit faster than the previous benchmark.

4

Figure 8: This time we're only sitting around 75-80% of total processor usage, which would explain why we didn't make the full jump to 200,000 segments per second.

5*c4.4xlarge results

The benchmark ran with the following specification:

Attribute Value
Region us-east-1
Availability Zone us-east-1e
Tenancy shared
Spot pricing yes
Acker percentage 60
CPU Cores 16
Virtual Peers per machine 20
Total number of virtual peers 100
Log level WARN
Clojure 1.7.0-beta2
Flight Recorder enabled yes
Batch size 20
:onyx/max-pending 300,000 segments
:onyx/pending-timeout 60,000 ms
Duration 15 minutes

Measurements

1

Figure 9: Throughput grows to 280,000 - 310,000 segments per second. If we were increasing throughput proportionally to the number of cores in the VM, we'd expect to see 400,000 segments per second. See Figure 12.

2

Figure 10: All segments are fully processed within 60,000 milliseconds of latency.

3

Figure 11: Memory consumption remains mostly flat

4

Figure 12: Nominal CPU usage drops to 70-77%, again explaining why we're missing our linearly increasing target of 400,000 segments per second per machine. Note the proportions - 25% unused CPU and 25% less throughput.

Open source. Open benchmarks. Open community.

Experimental Discussion

Onyx does not have currently use a scheduler that is intelligent enough to evenly allocate ackers to machines. Therefore, for small EC2 instance types, we started with 100% acker percentages, to reduce the chance of ackers being unevenly spread. This has some overhead, and as the size of the instances increased, the chance of a very uneven spread of acker peers was reduced, and thus we reduced the percentage of acker virtual peers.

CPU usage per instance was generally reported at around 75%. As a result, we increased the number of virtual peers in excess of the number of cores per machine. We're still not exactly sure why we're bottlenecking on a resource other than CPU, but we're confident that we'll be over the hurdle soon and closer to linearly increasing performance. Ultimately we felt that a 5 node cluster pushing in aggregate around 1.5 million messages per second was plenty good enough for now. Performance work will continue in future releases.

Future Work

In the future, we hope to run these benchmarks for longer, with a wider variety of workloads and topologies. We're also interested in measuring quantile latency for the next report. Finally, we'd like to test with significantly larger cluster sizes. Benchmarking is no small job. If you're interested in helping out on this front, please get in touch.

Enhancements

While performance was the main goal of this release, we managed to get a lot of smaller tasks completed as well.

Based on user feedback, Kafka seems to be the the primary means that developers are interested in ingesting data from. We rewrote the onyx-kafka plugin from scratch to essentially mirror what Storm's Kafka Spout provides. That is, the Onyx Kafka plugin will dynamically discover brokers from ZooKeeper and automatically reconnect to an In Sync Replica (ISR) if any brokers go down.

We also took a little detour to create onyx-s3. onyx-s3 is an S3 reader that handles faults by check-pointing to ZooKeeper. We haven't yet implemented an S3 writer, but we'd love some help on this front!

Days before this release, we snuck in a plugin for Factual's durable-queue. durable-queue makes a great alternative to core.async for local development.

Finally, we spent some time sharpening our operations-story. We've built a lifecycle plugin called onyx-metrics to automatically capture ad-hoc metrics and instruments running workflows. onyx-metrics currently tracks throughput and latency over four quantiles, emitting its output to standard out or a websocket that connects directly to Onyx's dashboard. Track anything, emit anywhere.

Dashboard

Finally, if you're interested in using Onyx at work, we now offer commercial support and training.

The Road Ahead

So what's next for the Onyx platform? This release has made Onyx industry ready, but there's still plenty of work to do. Onyx 0.7.0 will have two primary focus points.

The highest priority at this point is for Onyx is to have a story for implementing functionality in languages other than Clojure. We'll be working towards bridging into other JVM-based languages, and then out into Python and Ruby. It's critical that we're able to expand beyond Clojure to grow as a community - especially to the data science arena where Clojure is less popular than in the analytics world.

A close second for our attention are more advanced resource schedulers. In order to increase the predictability of Onyx's performance, we need to start building out hardware and application aware resource allocators that can efficiently distribute load based on the number and type of jobs running. We'll likely be pulling something directly out of the literature to advance this effort.

We're also very happy to announce that we'll be at both LambdaJam in Chicago (July) and StrangeLoop in St. Louis (September) conducting interactive training on the Onyx Platform.

Wrap Up

Before I finish up this post, I want to thank all of the incredible people who have been a part of this journey. Onyx is a hugely ambitious undertaking, and it wouldn't be half of what it is without everyone's help. In particular, Lucas Bradstreet has been a full-time developer on this project since February. Onyx genuinely wouldn't have achieved its level of performance without him. This year has been great, and I'm looking forward to what the rest of this year brings us!

So, are you ready to dive in?

lein new onyx-app hello-onyx-world

See you there.

-- @MichaelDrogalis