I'm tremendously proud to announce the official release of Onyx 0.8.0. In the time since the last release, we've been working hard on solving an inherent complexity in stream processing - state management. Today, we're unveiling a new suite of features to alleviate much of the difficulty in crafting stateful streaming programs. (full changelog here).

Onyx is a scalable, distributed, fault tolerant, high performance data processing platform written in Clojure. It's able to transparently handle both batch and streaming workloads with a technique known as "punctuated streams". Onyx strongly divorces the traditional model of distributed computation into stand-alone sets of behavior and data-driven execution specifications. This style of working flips the normal distributed model on its head, and yields significant increases in ability to understand and reason about large computations.

Follow us on Twitter @OnyxPlatform, or chat with us in Gitter or Slack. We also offer a free self-guided workshop, and commercial support.

Stateful Streaming is Hard

The primary problem that we wanted to solve in this release was managing the state of streaming computations without the application developer needing to worry about correctness. Why is this hard in the first place? Let's consider a seemingly simple scenario. Suppose your job is to process a stream of voting events from polls during a political election. The objective is to determine who won the election, and answer a variety of queries about the status of the candidates throughout election day. What are the factors that make this is a difficult problem?

Idempotency

Perhaps the most obvious problem we'll encounter is the need to count every vote exactly once. This is easy in an application with a local run-time, but is a much trickier problem to deal with in a distributed system. Suppose event id 42 is received, and we're just about to increment the count for the candidate that this voter elected - when suddenly, the machine executing the computation fails! Was the candidate's count incremented? Who knows! Using a bash-in-place variable to track candidate votes will not suffice since we have no way of recovering to a correct state after a failure.

Onyx provides fault-tolerance by replaying messages if they do not complete within the default time range of 60 seconds. Clearly, this is going to be a problem for the above scenario. The reader should note that this is a problem anyway for architectures that don't implement fault tolerance via replay. Even in the case of a point-to-point transfer of data between stages of execution, some sort of transactional context needs to be provided to gain atomic completion semantics.

This feature is often called "exactly-once" processing semantics. I dislike this phrase because exactly-once processing can mean different things to different people. Exactly once side effects are provably impossible for distributed systems. However the idea is that you only update a state machine exactly once for every piece of data that is received is, in fact, possible.

Stream Disorder

Other problems beyond basic correctness arise in our hypothetical scenario. Let's further suppose that we want to bucket votes on a minute-by-minute basis to track trends. As the election night progresses towards midnight, a transient network problem resolves from significantly earlier in the day, and we begin receiving messages timestamped from 8 AM. Many stream processing frameworks can only handle bucketing by processing time (e.g. the wall clock on the machine processing the data), so receiving events way out of order is problematic. Often, the events from 8 AM will be bundled in with the events from later in the day. The essence of the query we were trying to answer gets lost.

Stragglers

One final problem that often bites application developer are stragglers. Stragglers is a term used to refer to a small amount of data that arrives late, even though the bulk of the data arrived on time. Certain computations might need to "wait" to see all the data generated for a particular hour. Some programs, however, can deal with not having quite all the data. Some high percentage, say 99.99%, may be enough to proceed. Most stream processors offer all or nothing semantics. It's not a straightforward feature to offer because once local state is abandoned, receiving a straggler will put you back in the problem of stream disorder!

Durable, Striped Logging, and De-duplication

We took all of the above challenges into account to build a design that shields you from the mechanics those problems. The design approach we took stripes messages to a durable log, via Apache BookKeeper, which we use to build a replicated state machine. Aggregations are built via state machines, with individual state transitions written on a per-message basis, and are replicated across a cluster of machines for fault tolerance. If a peer crashes, the state is built up from scratch by replaying the durably replicated log of operations. If you're using Onyx for state management, you'll need to start up a quorum of BookKeeper servers for it to talk to.

To defend against duplicate messages, we employ an embedded key/value store - namely, RocksDB. RocksDB contains an in-memory Bloom Filter that we use to check for, and ignore, previously applied messages. The filter is periodically pruned to keep the total number of serialized keys small. RocksDB is invisible to the application developer since we run it in embedded mode.

You can read more about how state management works in the dedicated section of the User Guide.

Windows, Triggers, and Refinement

With the design in place to support idempotent state management, we need to introduce a strong enough abstraction to deal with the complexities of modern analytics applications.

Windowing

Windows are a new feature that allow you to group and aggregate data into buckets. Windows create aggregations over portions of a data stream, rolling up and compounding data as it arrives. There are different kinds of windows. As of today, we offer four different types of windows.

Fixed Windows

Fixed windows, sometimes called Tumbling windows, span a particular range and never overlap with other windows. Consequently, a data point will fall into exactly one instance of a window. Fixed Windows are useful for answering queries such as "How many votes were cast each hour?"

Sliding Windows

Sliding windows, on the other hand, span a particular range and do overlap one another. A slide value denotes how long to wait between spawning a new window after the previous one. This is a more sophisticated style of data aggregation as data is able to fall into more than one window extent. You can service queries with this model such as "How many people voted for this candidate in the last 60 minutes, updating the answer every 15 minutes."

Global Windows

Global windows are a simple windowing type that span all of time, forwards and backwards. This is useful for when you want to perform a batch-style computation that is agnostic to any notion of time. You'd pull out global windows when you want to ask "How people voted in this election?"

Session Windows

Session windows are windows that dynamically resize their upper and lower bounds in reaction to incoming data. Sessions capture a time span of activity for a specific key, such as a user ID. If no activity occurs within a timeout gap, the session closes. If an event occurs within the bounds of a session, the window size is fused with the new event, and the session is extended by its timeout gap either in the forward or backward direction. This is useful for retroactively piecing together a user's actions, such as binding all of the votes from a specific person into a group.

Triggers

Windows describe how data is accreted, bucketed, and compacted. Onyx separates the concern of when to take action against the state that the window has built up. These actions, or stimuli, are known as Triggers. When a trigger is fired, a user defined function is invoked with the current window state. Onyx ships a number of triggers out of the box. Multiple triggers can be used against the same window.

Timers

Timers sleep for a period of time, then wake up and automatically fire the trigger. You'd want to use this to sync the state from a window every particular duration.

Message Quantity

You can fire a trigger after every N number of messages has been encountered.

Punctuation

Punctuation is a term used in the literature to define a message the denotes the "end" of a message stream. A user supplied predicate is tested against every message. When the predicate evaluates to true, this trigger fires.

Watermarks

This trigger fires when a message is encountered with a timestamp greater than the upper bound for the window in which this message falls into. This is a shortcut function for a punctuation trigger that fires when any piece of data has a time-based window key that above another extent, effectively declaring that no more data for earlier windows will be arriving.

Percentile Watermarks

Percentile watermarks are like normal watermarks, except they will fire if they see a message with a timestamp that is "close enough" to the end of the upper bound of the window, as defined by a user supplied percentage value. This lets you deal with stragglers.

Refinement

Finally, we present refinements - the feature that ties Windows and Triggers together. When a trigger is fired, the window contents are synced to a user supplied function. The state, then, can be transformed on the machine.

Accumulating

The state of a window extent is maintained exactly as is after the trigger invocation. This is useful if you want to an answer to a query to "become more correct over time".

Discarding

The state of a window extent is set back to the value it was initialized with after the trigger invocation. You'd want to use this if the results from one periodic update bear no connection to subsequent updates.

Dealing with the Problem

Now that you have an overview about all the new features that Onyx 0.8.0 offers, let's revisit our vote stream example and see how it handles a few different analytics queries:

  • How many votes were cast in total? Update the answer every 5 seconds.
(def windows
  [{:window/id :count-votes
    :window/task :process-vote
    :window/type :global
    :window/aggregation :onyx.windowing.aggregation/count
    :window/window-key :event-time
    :window/doc "Counts the total number of votes."}])

(def triggers
  [{:trigger/window-id :count-votes
    :trigger/refinement :accumulating
    :trigger/on :timer
    :trigger/period [5 :seconds]
    :trigger/sync :my.ns/sync-to-kv-store
    :trigger/doc "Syncs the state to the K/V store every 5 seconds."}])
  • How many votes were cast each hour? Update the answer every 10 votes.
(def windows
  [{:window/id :count-votes
    :window/task :process-vote
    :window/type :fixed
    :window/aggregation :onyx.windowing.aggregation/count
    :window/window-key :event-time
    :window/range [1 :hour]
    :window/doc "Counts the total number of votes per hour."}])

(def triggers
  [{:trigger/window-id :count-votes
    :trigger/refinement :accumulating
    :trigger/on :segment
    :trigger/period [10 :elements]
    :trigger/fire-all-extents? true
    :trigger/sync :my.ns/sync-to-kv-store
    :trigger/doc "Syncs the state to the K/V store every 10 segments."}])
  • How many votes were cast each hour, sliding every 20 minutes? Update the answer after a message that is 95% close to the end of the period. Discard state each time a vote passes the watermark timestamp.
(def windows
  [{:window/id :count-votes
    :window/task :process-vote
    :window/type :sliding
    :window/aggregation :onyx.windowing.aggregation/count
    :window/window-key :event-time
    :window/range [1 :hour]
    :window/fixed [20 :minutes]
    :window/doc "Counts the total number of votes per hour, sliding every 20 minutes."}])

(def triggers
  [{:trigger/window-id :count-votes
    :trigger/refinement :discarding
    :trigger/on :percentile-watermark
    :trigger/watermark-percentage 0.5.0
    :trigger/sync :my.ns/sync-to-kv-store
    :trigger/doc "Syncs the state to the K/V store at 95% event time completion."}])
  • Who is winning the election?
;; Assumed to be in the catalog.
(def task
  {:onyx/name :count-votes
   :onyx/fn :my.ns/count-votes
   :onyx/type :function
   :onyx/group-by-key :candidate ;; <- Implicit group-by operation.
   :onyx/uniqueness-key :voter-id
   :onyx/flux-policy :kill
   :onyx/min-peers 3
   :onyx/batch-size 20})

(def windows
  [{:window/id :count-votes
    :window/task :process-vote
    :window/type :global
    :window/aggregation :onyx.windowing.aggregation/count
    :window/window-key :event-time
    :window/doc "Maintains the total number of votes per candidate"}])

(def triggers
  [{:trigger/window-id :count-votes
    :trigger/refinement :accumulating
    :trigger/on :timer
    :trigger/period [5 :seconds]
    :trigger/sync :my.ns/sync-to-kv-store
    :trigger/doc "Syncs the state to the K/V store every 5 seconds."}])

A Strong Background

We did a heavy amount of research in academia and industry leading up to building our implementation of state management. We'd specifically like to call out the Google DataFlow paper presented at this year's VLDB conference. It was instrumental in our understanding of how to present a comprehensive API to tackle the complex analytics queries that are increasingly common place today. We also used an exact implementation of Window ID (See Semantics and Evaluation Techniques for Window Aggregates in Data Streams below). Here is a sample of the papers that we used to prepare for this release:

We've also kept a small amount of our scratch notes in the Design Proposal section of the core repository.

Towards 0.9.0

We take extraordinary pride in building and maintaining Onyx. The next few months will be spent sharpening the performance profile of Onyx, as well as enhancing the usability of the platform with a suite of supporting tools. We look forward to showing you what's in store. Onyx is production-ready at this point, and we're still just getting the party started!