Friday, December 29, 2017

Thursday, December 28, 2017

Paper summary. Real-Time Machine Learning: The Missing Pieces

This paper, dated March 11, 2017 on arxiv, is from UB Berkeley.  Here is Prof. Michael Jordan's Strata Hadoop conference talk on this.

The paper first motivates the need for real-time machine learning. For this it mentions in-situ reinforcement learning (RL) that closes the loop by taking actions that affect the sensed environment. (The second paragraph mentions that such RL can be trained more feasibly by using simulated/virtual environments: by first trying multiple actions [potentially in parallel] to see their affect in simulation before interacting with the real world. Again this requires real-time performance as the simulation should be performed faster than real-time interaction.)

Based on this application scenario, here are their desired requirement from the ML platform.
R1: low latency
R2: high throughput
R3: dynamic task creation (RL primitives such as Monte Carlo tree search may generate new tasks during execution)
R4: heterogeneous tasks (tasks would have widely different execution times and resource requirements)
R5: arbitrary dataflow dependencies (BSP doesn't cut it)
R6: transparent fault-tolerance
R7: debuggability and profiling

The platform architecture

The paper does not give a name to the platform, but the talk calls it Ray. Ray allows arbitrary functions to be specified as remotely executable tasks, with dataflow dependencies between them. Ray uses imperative programming and does not support symbolic computation graphs, as far as I can see. The talk mentions that programming is done in Python. So, at this point Ray is more like a set of Python libraries paired with Redis database for keeping control state and with Spark RDD support for maintaining object-store as shared memory.

Two principal architectural features are a centralized control plane and a hybrid scheduler. The centralized control state is held by Redis, a replicated key-value store. It looks like the control state does not have any control logic in it, it is just passive storage. (So TensorFlow's variables also qualify as control state.)  The hybrid scheduler idea aims to help with providing low-latency. Workers submit tasks to their local schedulers which decide to either assign the tasks to other workers on the same physical node or to “spill over” the tasks to a global scheduler. Global schedulers can then assign tasks to local schedulers based on global information about resource availability. Neither the logically centralized control state nor the two-level hierarchy scheduling are new innovative concepts.

The tasks creation is left totally to the application developer. Any task can create new tasks without blocking on their completion, but this creates a dependency from the caller to the callee. Moreover, Ray uses the dataflow execution model in which tasks become available for execution if and only if their dependencies have finished executing. The combination of this unrestrained task creation with hybrid scheduling provides a lot of rope to the developer to hang himself.

Tasks are called with split-phase asynchronous execution model. When you call a task, the task returns "future", which just denotes acknowledgement, but the task will later call you back with the result when its computation is completed. The caller may potentially call "get" on the future to block until the callee finishes execution. Ray also has a "wait" primitive to time out from waiting on straggler tasks. Again it is the developer's responsibility to figure out how to use this correctly.


I think the platform is weak on "ease of use". Ray is so minimal that it is unclear if we couldn't have gotten the same level of support from using a systems programming language with concurrency primitives and thread safety, such as Rust. Rust uses the actor model and is very suitable for building a dataflow execution application, as has been demonstrated by rewriting Naiad on Rust recently.

While Ray aims real-time machine learning, it doesn't have a way for shedding load. To provide load shedding support, it is possible to adopt the SEDA architecture, so the system does not grind to a halt when it is presented with too many tasks at once.

Tuesday, December 26, 2017

TensorFlow-Serving: Flexible, High-Performance ML Serving

This paper by Google appeared at NIPS 2017. The paper presents a system/framework to serve machine learning (ML) models.

The paper gives a nice motivation for why there is a need for productizing model-serving using a reusable, flexible, and extendable framework. ML serving infrastructure were mostly ad-hoc non-reusable solutions, e.g. "just put the models in a BigTable, and write a simple server that loads from there and handles RPC requests to the models."
However, those solutions quickly get complicated and intractable as they add support for:
+ model versioning (for model updates with a rollback option)
+ multiple models (for experimentation via A/B testing)
+ ways to prevent latency spikes for other models or versions concurrently serving, and
+ asynchronous batch scheduling with cross-model interleaving (for using GPUs and TPUs).

This work reminded me of the Facebook Configerator. It solves the configuration management/deployment problem but for ML models.
"What is even more surprising than daily Facebook code deployment is this: Facebook's various configurations are changed even more frequently, currently thousands of times a day. And hold fast: every single engineer can make live configuration changes! ... Discipline sets you free. By being disciplined about the deployment process, by having built the configerator, Facebook lowers the risks for deployments and can give freedom to its developers to deploy frequently."
The paper is also related to the "TFX: A TensorFlow-Based Production-Scale Machine Learning Platform" paper. While that one focused on the data management and model training side of things, this one focuses more on model serving.

The model serving framework

The TensorFlow-Serving framework, the paper presents can be used in any of these ways:
(1) a C++ library consisting of APIs and modules from which to construct an ML server,
(2) an assemblage of the library modules into a canonical server binary, and
(3) a hosted service.

The framework can serve TensorFlow models as well as other types of ML models. The libraries, and an instance of the binary, are provided as open-source.

The TensorFlow Serving library

The library has two parts: (1) lifecycle management modules that decide which models to load into memory, sequence the loading/unloading of specific versions, and offer reference-counted access to them; (2) modules that service RPC requests to carry out inference using loaded models, with optional cross-request batching.

In the model lifecycle management, the paper mentions "the canary and rollback" usecase. By default the Source aspires the latest (largest numbered) version of each /servable/, but you can override it and go with a canary/rollback policy. Here after the newest version is deployed, the system continues to send all prediction request traffic to the (now) second-newest version, while also teeing a sample of the traffic to the newest version to enable a comparison of their predictions. Once there is enough confidence in the newest version, the user would then switch to aspiring only that version and unloads the second-newest one. If a flaw is detected with the current primary serving version (which was not caught via canary), the user can request to rollback to aspiring a specific older version.

While optimizing for fast inference is mentioned as a theme, the paper does not elaborate on this. It just says that the framework can support using TPUs/GPUs by performing inter-request batching similar to the one in

While the paper mentions that the framework can serve even lookup tables that encode feature transformations, there is no explanation whether any special optimization is (or can be) employed for improving the performance for serving different types of models. For example, for the nonparametric and sparse models that are popularly used in recommendation systems, would the framework provide optimizations for faster lookup/inference?

The TensorFlow serving hosted service

With the hosted service, Google likes to capture the users who has money and wants a no-fuss solution. To use the hosted service, the user just uploads her model to it and it gets served. It looks like the hosted service is also integrated with a datacenter resource management and scheduler as well. The hosted service also offers features such as validating model quality before serving a new version, or logging inferences to catch training/serving skew bugs.

In the figure above, the synchronizer is the master that commands and monitors the workers serving the jobs. The router component is interesting; the paper mentions it uses a hedged backup requests to mitigate latency spikes. This means the request is sent to multiple job servers and the earliest response is used; effective protection against stragglers.

Sunday, December 24, 2017

WPaxos: a wide area network Paxos protocol (Part 1)

Paxos is great for solving the fault-tolerant coordination problem in a datacenter. But for cross datacenter coordination (which is needed for distributed databases, filesystems, configuration management, etc.), it hits  the WAN latency barrier. The multi-decree Paxos (Multi-Paxos) algorithm, implemented in variants like Raft and Zab, relies on electing a distinguished leader to serialize updates and hence cannot deal with write-intensive scenarios across the wide area networks (WAN).

An alternative is the leaderless Paxos algorithms. Generalized Paxos and EPaxos employ opportunistic leaders for non-interfering commands and are able to reduce 3 message delays to 2, and allow concurrent leaders to commit. But the fast agreement incurs the cost of a much larger quorum named fast-quorum (3/4ths of the nodes) and hits the WAN latency barrier as well.

Another alternative (as employed in Google Spanner) is to use multiple Paxos groups with partitioned data/object space. In order to provide flexibility/versatility, this partitioned approach employs an external service to manage configuration and move data/objects across groups.

Recently we introduced another alternative with our WPaxos work. WPaxos is a simple and pure Paxos protocol that provides flexible, performant, and fault-tolerant coordination over the WAN. WPaxos is lean and obviates the need for another service/protocol for data/object relocation. Since WPaxos provides the same Paxos safety guarantees to the face of concurrency, asynchrony, and faults, its performance can be tuned orthogonally and aggressively by tuning a couple parameters. In this post, jointly written with Ailidani Ailijiang and Aleksey Charapko, we give an overview of WPaxos. For details and performance evaluation, we refer you to our paper.

Flexible flexible quorums

WPaxos leverages on the flexible quorum idea that weaken the "all quorums should intersect" assertion in Paxos to instead "quorums from different phases should intersect". That means, it is possible to use any phase-1 quorum (Q1) that intersect with any phase-2 quorum (Q2), instead of using majority quorums. A clever instantiation of this is the grid quorum system. Let Q1 be a row in the grid, and Q2 be a column. Since any row and column always intersects at one node, any $q1 \in Q1$ is guaranteed to intersect with any $q2 \in Q2$.

Our insight is to notice that if we deploy each column in one of the geo-distributed regions/zones, we can achieve a really fast Paxos phase-2 since the Q2 quorum is within the same LAN. Note that, when failures and leader changes are rare, the phase-2 (where the leader tells the acceptors to decide values) occurs much more frequent than phase-1 (where a new leader is elected). So it makes sense to improve the performance by reducing Q2 latency at the expense of making the infrequently used Q1 slower.

Before going further with describing the protocol, let's elaborate on why the grid-shaped deployment is safe for using in Paxos. In majority quorums, any 2 phase-1 quorums intersect, which means 2 nodes cannot be elected as a leader for the same ballot. However, this is not necessary! In the grid quorum, node i can be elected as leader by the first row for some ballot b, while node j is elected by the second row for a b'>b. In order to make a decision, Node i has to start phase-2 and satisfy a phase-2 (column) quorum. Since the Q2 always include one node from the second row, that node rejects node i's phase-2 message, preventing any conflicting decisions to be made by i and j.

Unfortunately, this grid quorum deployment cannot tolerate a single zone failure. The WPaxos default quorum derives from the grid quorum layout, and picks f+1 (majority) nodes in a zone of 2f+1 nodes to tolerate f node failures. In addition, to tolerate F zone failures within Z zones, Q1 is selected from Z-F zones and Q2 from F+1 zones. For example, in the following figure of 12 nodes, while a majority quorum may tolerate 5 failures, the WPaxos default quorum can tolerate one row plus one column, in total of 6 failures.

Here is a TLA+ specification for the Q1 and Q2 quorum systems used in WPaxos. Q1 and Q2 columns do not need to be rigid rows and columns; the first responding node set that satisfy a Q1 or Q2 definition suffice.

The multi-leader federation

WPaxos uses multiple leader nodes to concurrently serve a set of objects in the system. The nodes may steal leadership of objects from each other using phase-1 of Paxos executed over a Q1 quorum. Then the node commits the updates to those objects over its corresponding Q2 quorums, and can execute phase-2 multiple times until another node steals those objects.

To mitigate the dueling leaders problem, where two nodes constantly propose a higher ballot number than the opponent, each object gets its own commit log with separate ballot and slot numbers. This also means that WPaxos provides  per-object linearizability.


Since the basic WPaxos is a simple and pure flavor of Paxos, it enjoys its safety guarantees. Since the basic WPaxos is also very flexible and offers knobs for tunability, we can extend the protocol to improve its performance easily.

The locality adaptive object stealing optimization moderates the trigger-happy object stealing in WPaxos based on a migration policy. The intuition behind the policy is to move objects to a zone where the clients will benefit the most, since moving objects frequently is expensive. By using an invited-stealing approach, the permission to steal is handed to the zone that has the most requests for the objects in some period of time.

The replication set optimization allows a configurable replication factor where a subset of Q2 quorum is selected to send phase-2 messages, instead of broadcasting to entire system. The size of replication set ranges from F+1 zones up to the total number of Z zones. This provides a trade-off between communication overhead and a more predictable latency, since the replication zone may not always be the fastest to reply.

Transactions can be implemented on top of WPaxos entirely within the protocol, and avoids the need for integrating an extra 2-phase-commit service. The node that initiates a transaction operation, first steals all objects needed for that transaction via multiple Q1 accesses. This is done in increasing order of the objects IDs to avoid deadlock and livelock. Then the node commits the transaction in phase-2 in seperate object logs, and collating/serializing the logs together by comparing the slot number of common objects in the transactions. (We have not implemented transactions in WPaxos yet!)

Dynamic reconfiguration is achieved similar to Raft in two steps, where current configuration C = <Q1, Q2>, the new configuration C’ = <Q1’, Q2’>. First, a union of both old and new configuration C+C’ is proposed and committed by the quorums combined. Then the leader may propose the new config C’ and activate after commit in Q2’. WPaxos further reduces the two steps into one in special cases where the reconfiguration operation is limited to add/remove one row or column at a time.

Give this a Go

We model checked our WPaxos specification in TLA+/PlusCal to verify its consistency properties.

We also implemented WPaxos in Go to evaluate its performance. Please give it a whirl, and let us know what you think.

Friday, December 22, 2017

Retroscope: Retrospective cut-monitoring of distributed systems (part 3)

This post continues the discussion on monitoring distributed systems with Retroscope. Here we focus on cut monitoring approach Retroscope uses. (This post is jointly written with Aleksey Charapko and Ailidani Ailijiang.)

Retroscope is a monitoring system for exploring global/nonlocal state history of a distributed system. It differs from other monitoring tools due to the way it inspects the system state. While request tracers inspect the system by following the trace of a request (i.e. request r in the figure), Retroscope performs cut monitoring and examines the system at consistent global cuts, observing the state across many machines and requests. It moves along the system history and scans a progression of states one cut at a time, checking cut  Ts1 and then Ts2 and so on.

Retroscope’s cut monitoring approach is complementary to the request tracing solutions, and brings a number of advantages. First, by exposing the nonlocal state, Retroscope enables users to examine nonlocal properties of distributed applications. Using Retroscope you can inspect state distributed across many machines and can reason about the execution of a complex distributed application through invariant checking. Furthermore, by sifting through many past nonlocal states, you can perform root-cause analysis and use the across-node context to diagnose race conditions, nonlocal state inconsistencies, and nonlocal invariant violations.

To illustrate some of these benefits, we use Retroscope and the Retroscope Query Language (RQL) to study the data staleness of replica nodes in a ZooKeeper cluster. Staleness is a non-local property that cannot be easily observed by other monitoring techniques. To our surprise, we found that even a normally operating cluster can have a large staleness. In one of our observations in AWS EC2, some ZooKeeper replicas were lagging by as much as 22 versions behind the rest of the cluster as we discuss at the end of this post.

Feasibility of Cut Monitoring

Ok, if cut monitoring is so useful why was this not done before? The answer is cut monitoring was not very feasible. A standard way to do cut monitoring is with vector clocks (VC), but VC do not scale well for large systems due to its O(N) space complexity. Moreover, using VC results in identifying excess number of concurrent cuts for a given point, many of which are false positives that do not occur in actual system execution.

Retroscope employs hybrid logical clocks (HLC) and a scalable stream processing architecture to provide a feasible end-to-end solution for cut monitoring. The NTP-synchronized physical clock component of HLC shrinks the number of consistent cuts at a given point to only 1. (It may be argued that this reduces the theoretical coverage compared to VC, but this a good tradeoff to take to improve performance and avoid false-positives resulting from VC.) Using HLC also allows us to construct consistent cuts without the need to coordinate across nodes. Finally, the HLC size is constant, and this reduces the communication overheads. We talked about these advantages in Part 1.

To achieve a scalable implementation of Retroscope, we leveraged Apache Ignite for stream processing, computation, and storage. We arranged the log ingestion in a way to minimize data movement and to improve data locality and achieve maximal parallelism when searching. We had covered these issues in Part 2. 

In our prototype, Retroscope processing deployed on one quad-core server was processing over 150,000 consistent cuts per second. Horizontal scalability is one of the strongholds of Retroscope’s architecture. Adding more compute power, allows Retroscope to redistribute the tasks evenly across all available servers and achieve a nearly perfect speedup (93% going from 4 to 8 servers).

Ok, now back to the ZooKeeper case study to show the advantages cut monitoring approach.

The ZooKeeper Case Study 

Users interact with Retroscope via the declarative Retroscope Query Language (RQL). The users only need to specify the nonlocal predicates to search for, and leave the rest for the system to figure out.

To illustrate Retroscope and RQL, we considered the replica staleness monitoring in Apache ZooKeeper a. In ZooKeeper, a client can read data from any single replica, and if the replica is not fully up-to-date, the client will read stale data. The staleness is a nonlocal property, because it is defined by considering the states of other replicas at that same point in time.  Using a simple RQL query, we can find the cuts that violate normal (less than 2 versions) staleness behavior of a cluster:
SELECT r1 FROM zklog
WHEN Max(r1) - Min (r1) > 1 ;
In this query, r1 is the version of a node’s state. The system retrospectively looks at past application states and search for the ones that satisfy this staleness predicate.

We observed many cuts having the staleness problem, with a few larger spike (up to 22 version stale!) that captured our attention. To investigate the causes for the excessive staleness cases, we need to inspect the message exchange in the system at those points. Here is the query we use for that:
SELECT r1, sentCount, recvCount, diff, staleness
FROM zklog
AND GLOBAL staleness
AND (staleness := Max(r1) - Min (r1))
AND (diff:= NodeSum(sentCount) - NodeSum(recvCount))
AT TIME t1 TO t2

In this query we included another nonlocal property: the number of messages in transit between nodes. The query scans through past cuts around the time of observed staleness we identified earlier. This allows us to visualize both staleness and the number of messages being in-transit between nodes in the cluster. We see that the staleness spikes at the same time as the number of “in-flight” messages increases.

The number of messages “stuck” in the network tells us still only a little about the communication patterns in the cluster. To gain more insight in the message exchanges, we look at the in-flight messages more rigorously and examine the sets of sent and received messages at each node with this query:
SELECT sentM, recvM, inFlight, r1, staleness
FROM zklog
GLOBAL staleness
AND (staleness := Max(r1) - Min(r1))
AND (inFlight := Flatten(sentM) \ Flatten(recvM))

We run this query with a custom query processor that visualizes the results as a “heat-map” of message exchange. Here is an example of how messages were flowing in the system right before and at the peak of the staleness event. The deeper blue color represents greater number of messages being in the network between nodes. We see more messages in-flight in both directions between nodes #3 (leader) and #4, suggesting that staleness is caused by messages being stuck in-transit between these nodes for longer than usual. This indicates a possibility of a momentary millibottleneck in the network between the node #3 and node #4.

Our Retroscope implementation is available as open source project on GitHub. We invite you to use the tool and drop us a note about your use cases.

Wednesday, December 20, 2017

Useful podcasts update

1.5 years ago, I had posted a list of the useful podcasts I subscribe to. This is a good time to update that list with some recent favorites.

Listening to insightful podcasts is a great way to patch/upgrade your personal operating system. So if you have some good ones you can recommend, let me know.

Masters of Scale with Reid Hoffman 

This podcast talks about startups and scaling them. Reid Hoffman is a master of this domain, and he brings and interviews great people. He had Mark Zuckerberg, Sheryl Sandberg, Peter Thiel, and Eric Schmidt in his series.  Every episode is great. The last one I listened to was Part 1 with Barry Diller. I relate very close to  Barry's approach to learning by deconstructing and understanding from the fundamental elements, and feeling inadequate until good insight and understanding is gained.

DILLER:​ ​By purpose or by temperament, I’m only interested in those things where I haven’t figured it out, and I really do think that however it happened, that when I was presented endlessly with things I didn’t understand, the only thing I could do—because my brain is slower, and therefore is more literal—and therefore my process is, I have to get it down to its tiniest particle, or else…I can’t come in and understand an equation, if you can put it in equation terms, unless I de-equation it—I can’t pick it up. So I’m forced – by a lack of brain matter, I am forced to – no I’m not saying it – it’s true! To break it down as hardly low as I can get it, and only then—and that’s learning. That’s real – that is joyous work to me, is getting through those layers, down to something, and then once I’m down there, once I’m actually at the very, very base of it, I can actually start to do something good.

Trailblazers by Walter Isaacson

I am really fond of this podcast as well. Of course we know of Isaacson from his great biographies. Listening him swiftly cover the history of innovation in a selected area each week is a blast. I learn so much from this podcast. In the most recent episode "the toy to the world", I learned how important it is to separate the means from the ends. It is best to have a simple elegant vision and pursue that rather than doing the trendy hip thing. Yep, true not just for toy companies, but everything as well.

Revisionist History by Malcolm Gladwell

Some people criticize Malcolm Gladwell's book and writing as being oversimplified. I got that vibe from a couple of his books; I think it is OK if you treat the books as conversation starter/instigator on that topic rather than the ultimate judgment. Instigator he is, and that is why I think Gladwell excels at the podcast medium. Every episode was interesting. And for the episodes on education inequality, Gladwell really transcends himself.

Some other nice podcasts I occasion to include:
+ Hidden Brain by Shankar Vedantam
+ Recode decode by Kara Swisher
+ This week in startups by Jason Calacanis
+ Design matters by Debbie Millman
+ Futility closet by Greg & Sharon Ross
+ Finding mastery by Michael Gervais
+ Unmistakeable creative by Srini Rao

Monday, December 18, 2017

Upgrade your personal operating system

It is not hard to make analogies, because all analogies are to some extent inaccurate. When Paul Graham made the analogy about hackers and painters, many people criticized and said that there are many other hackers and vocation analogies. But analogies, while inaccurate, can also be useful. Analogies can give you a new perspective, and can help you translate things from one domain to the other to check if it is possible to learn something from that.

Computer and brain analogy has been made for several decades now. It is not surprising that trendy technologies of the times (hydraulics, mechanics, steam engines, clocks, electricity, telegraph) have been historically used as metaphors explaining the brain. So the computer metaphor is in all likelihood as laughable as the earlier ones. But again that doesn't mean it can't be useful.

The Getting Things Done system has made the brain and RAM analogy. It stated that unrecorded tasks waste space in the RAM, and you need to move those out by recording them externally with an agenda system. That way, the mind is freed from the job of remembering everything that needs to be done, and can concentrate on actually performing those tasks.

I think that was a very useful analogy. Before integrating the emacs org-mode to my life, I always had open loop tasks that caused me worries, and eating up cycles in my brain: "Oh, I should remember doing this", "Woe to me, I am procrastinating on this", etc. After successfully adopting emacs org-mode as my to-do list and agenda manager (which, took a couple years, and several aborted tries), the benefit I got out was the clarity of mind, and the release of all that background anxiety.

Many people also say that they get the same "RAM garbage-collection" benefits from their daily meditation practice. I think writing a daily journal, or freewriting morning pages also works wonders for "garbage-collecting" your RAM.

A personal operating system?

Let's stretch this analogy further to see if we can get some mileage out of this.

We all have modes of reacting to things and going about our lives. Some of these may be very primal: fight or flight. Some of these can be bad and damaging for you: go for the lazy option, save energy, maximize pleasure, be a couch potato and procrastinate on things until a crisis/deadline looms. These are some examples of the processes or modus operandi of your personal operating system (POS).

It makes sense to be aware of these processes and try to improve them to your advantage. 

I was fascinated to learn about the OODA loop: the observe-orient-decide-act process. Since then I have been creating OODA loop customizations for common tasks I encounter. For the writing tasks I encounter, my customized OODA loop is the mess up, tidy-up, revise loop. (Here is an instantiation of that for writing research papers.)

For studying, writing, or researching, my process is also to take on the task in units of 30 minutes. I use the pomodoro technique and customized it over time a lot. I am right now on my 3rd version of pomodoro process. I will write about my recent pomodoro setup later on. It involves detaching and going meta for 4 minutes in the beginning and end of pomodoro. In these slots, I step back and force myself to think meta: Is this the right approach? What else I can try? Am I making good progress? Can this be made transformative?

Upgrading and patching your POS

Going along with this personal operating system (POS) analogy, you should realize that you should patch, upgrade, refactor/rewrite your POS regularly. You don't want to get stuck with Windows 98, do you? It doesn't matter how good your hardware is, you will be limited by what OS you use. And a good OS can do wonders for even a low-end machine, as I saw first hand when I installed FreeBSD+xfce on an old laptop.

For patching your POS, you should have a monitoring and bug tracking system in place. You should monitor yourself, and should also consider feedback from friends/family as well. Of course, you need idle time to reflect on your processes, and identify/consider the bugs in your processes. Writing these down, ruminating/meditating on them helps. Of course, for this, you should set aside idle time and guard this idle time. If I were to identify malware and viruses for the POS, I would put anxiety inducing news, social media, and mindless Web browsing among the top. Those eat away and infect the precious self-check cycles our POS needs to maintain itself.

For upgrading and an occasional upgrading of your POS, I think it is important to seek and embrace new paradigms. A great way of doing this is by reading books (i.e., downloading new apps to your POS). Of course as you read books, you should apply your filter to them, and be willing to live for some time with multiple conflicting opinions/views on an issue. As time goes you can slowly distill these, and come to your own conclusions. I find that most of the time, the conclusion is a nuanced compromise perspective. To change the world, you need to start by changing how you view it and redefine your role.
Yesterday I was clever, so I wanted to change the world. Today I am wise, so I am changing myself. -Rumi
A new perspective is worth 80 IQ points. -Alan Kay

Saturday, December 16, 2017

Distributed system exams

Last week was the finals week at UB. I gave my final exam on distributed systems on Monday.

Many factors go into designing the exam. In a written exam, especially when you have a large class (mine was 75 students), you have to consider the ease of reading/grading the questions. So instead of asking open ended questions, I ask short answer questions such that the space provided under each question is enough to give complete answers. Secondly, since it is impossible to cover everything the course taught, I try to hit the salient points. Finally, I make sure that I ask sufficient number of straightforward questions so that the students that did a decent job of following the course and studying can get about 70 points out 100.

This link shows the final with solutions. I solve the final during the exam while my two TAs and occasionally I proctor the exam.

And this link shows the midterm I gave earlier in the semester. Note the Paxos question, question # 4. I tried to keep the question brief and concrete. A student that understands the Paxos protocol can answer easily, but a student that memorized some facts about Paxos without fully understanding it will not be able to solve 4.3 and 4.4.

Thursday, December 14, 2017

TLA+/Pluscal solution for modeling of 2-phase commit transactions

I had posted about the second TLA+ project I assigned for the distributed systems class the other day. Here is the solution to the project. The solution is pretty simple and straightforward. (This was built by modifying the /P2TCommit.tla/ at Lamport's site to add a TM and a BTM agent).

Initial definitions

Lines 12-13 set the initial RMstates for each replica and the initial TMstate.

Lines 15-18 define the canCommit and canAbort conditions. If all RMs are "prepared" (they are ready for a commit decision), the TM uses this to set tmState="commit". CanCommit is also true if there is an RM that already has "committed" state. This is possible if the TM already made the "commit" decision and an RM went ahead with it and transitioned to "committed" from "prepared". Then if the TM failed, and BTM is to make a decision, the BTM sets tmState="commit" in order to keep Consistency.

If there exists an RM with "abort" state or an RM with a "failed" state, canAbort becomes truthified, provided that there does not exist an RM with "committed" state. This Line 18 is there to prevent inconsistency with the Line 16 condition. If there is a failed node, it is possible to Abort, but not if a previous Commit was signaled and made an RM="committed".

RM modeling

Lines 37-38 show the actions RM take. Since "working" and "prepared" states are nonterminal states, the RM keeps trying an action until a terminal state is reached. The RM may call one of the macros Prepare, Decide, and Fail.

Prepare first checks that the RM is in "working" state, and if so transitions it to the "prepared" state. (In the 2-phase commit implementations, generally this happens upon a prepare-request sent by the TM, but this modeling keeps things simple and abstracts that away.)

Decide changes the RM state to "committed" if  tmState="commit", and changes it to "aborted" if tmState="abort". Note that, when Decide is called RM is either in "working" or "prepared" state. As the canCommit condition shows above tmState goes to "commit" only if all RMs are in "prepared" state, so the allowed transitions for RM is prepared->committed, prepared->aborted, and working->aborted.

Fail changes the RM state to "failed".
Don't worry about all RMs go to failed, the model checker is going to check all possible scenarios including all up, one down (at any of the dozens of possible points in computation), two down (at any of the dozens of possible points in computation).

TM modeling

The TM model is simple. The TM checks if it canCommit or canAbort and updates tmState accordingly. TM can also fail which makes the tmState="hidden". To keep things simple yet interesting, I only made the TM fail after it makes a decision. But if you notice, I put a label before the fail action. That makes these two updates nonatomic: That is, the tmState will be available for RMs to read for a duration, before the TM fails.

BTM modeling

The BTM model is very similar to the TM model. The BTM checks for tmState="hidden" before it interjects, but otherwise it makes the same decision. Of course there will be some complications, maybe an RM changed state seeing TMs decision, or maybe an RM failed, since the TMs decision.

For simplicity we assume the BTM does not fail.


Termination checks if all processes stop executing eventually. TypeOK is just a sanity check.

Consistency checks that there are no 2 RMs such that one says "committed" and other says "aborted". That was what the 2-phase commit was trying to prevent in the first place.

Model checking

Model checking with TM failure (with the BTM code commented out) led to a Termination violation as expected. That is if TM fails before the tmState is read by all RMs, the prepared RMs will block and won't be able to terminate.

When we uncomment the BTM code, the BTM is able to take over, and the Termination is also satisfied. Of course we didn't get to this flawless model all at once. The model checker found some counterexamples to my flawed models, and I fixed them. I guess this post would be more instructive if I had documented some of the mistakes I made instead of  showing only the final state of the code. Well, next time.

But that was easy

How were we able to solve this problem? Isn't this supposed to be a complicated problem in the database literature? There were a lot of attempts to make 2-phase commit fault-tolerant, fix it with a backup TM. Lamport and Gray paper on Transaction Commit says that all of those attempts had bugs in corner cases:
A non-blocking commit protocol is one in which the failure of a single process does not prevent the other processes from deciding if the transaction is committed or aborted. They are often called Three-Phase Commit protocols. Several have been proposed, and a few have been implemented [3, 4, 19]. They have usually attempted to “fix” the Two-Phase Commit protocol by choosing another TM if the first TM fails. However, we know of none that provides a complete algorithm proven to satisfy a clearly stated correctness condition. For example, the discussion of non-blocking commit in the classic text of Bernstein, Hadzilacos, and Goodman [3] fails to explain what a process should do if it receives messages from two different processes, both claiming to be the current TM. Guaranteeing that this situation cannot arise is a problem that is as difficult as implementing a transaction commit protocol.

The reason we succeeded easily is because we cheated: we assumed clean failures and perfect failure detectors. If we didn't have perfect failure detectors, this would be a mess: A node may act as if another is dead, when the other node is alive and act as if this one is dead. This asymmetry of information is the root of all evil in distributed systems.

In a future post, I plan to make the failure detectors more fuzzy in this 2PC with backup TM code and show what type of problems can arise.

Before I end this post, let me answer this hanging question: How do we deal with partial failures and imperfect failure detectors? We use Paxos to cut the Gordian Knot and act as a definitive judge to get the processes agree on the current state/configuration of the system. This is exactly what Lamport and Gray show in their Consensus on Transaction Commit to make 2-phase commit fault-tolerant. I will discuss that work also in a future blog post.

Tuesday, December 12, 2017

Paper writing woes

I am busy submitting papers for the ICDCS deadline. Today was a long day filled with just polishing the writing of the papers. At this point all I can think of is the dorodangos.

And this Peugot commercial. For no particular reason. None at all.

Sunday, December 10, 2017

Reasoning compositionally about security

Prof. Andrew Myers from Cornell visited our department at UB couple weeks ago, and gave a talk. I had taken some notes during the talk, and figured I should organize them a little and post here.

I was delighted to see Andrew had a blog. I just wish he posted more frequently. I especially liked the upgoer-five-editor composed "What I do" post, the "GashlyCode-Tinies" post, and the "Deserialization considered harmful" posts.

I like it when researchers and professors blog. Blogging gives us a different venue with an informal format to present our opinions. This enables us to provide intuitive and simpler explanations, opine open-endedly without needing to provide proof/validation, and even show our humorous and human-sides. Blogging is ideal for professing, I think.

Anyways back to Andrew's talk. The talk was about a couple recent papers
"Secure Information Flow Verification with Mutable Dependent Types" and "Verification of a practical hardware security architecture through static information flow analysis"

Compositional security enforcement

An idea that worked for compositional security enforcement in software is to  "control the flow of information throughout a computing system". The secret flow shouldn't get into public flow and leak outside. In other words, like in Ghostbusters, you must not let the secret and public information streams cross inside a component. The components that possess this property compose and by composing them together you get end-to-end security in your system. (I am not in the security field, but I had heard about this idea applied to achieve Android system security.)

Andrew had worked on this idea for software systems, and recently wondered if we can use this idea also for achieving hardware security. This is because even if you have security at the software level, if the hardware level leaks, you didn't achieve anything. And there were very interesting exploits in recent years using side-channel attacks and timing attacks to leak information from the hardware (i.e., data cache, instruction cache, computation unit, memory controller).

Secure HDLs

The idea is to develop a secure hardware description language (HDL) that uses the information flow type ideas described above to ensure that hardware is secure at design time. Chip design already uses Verilog as an HDL and synthesize chips from Verilog programs. (Chip design is still a relatively constrained domain that synthesis from high-level code is possible.) So Andrew's team add security annotations to Verilog to provide SecVerilog.

SecVerilog is essentially Verilog plus some dependent security labels. The idea is to design a chip system that doesn't leak, by modeling/verifying it in Secverilog. The security model is that the attacker sees contents of public hardware state (high/low) at each clock tick.

Using SecVerilog Andrew's team produced a formally verified MIPS processor. The static analysis overhead of SecVerilog was extremely low: it was 2 seconds for the complete MIPS processor.

Friday, December 8, 2017

TLA+/Pluscal modeling of 2-phase commit transactions

For the second project in my distributed systems class Fall 17, I assigned modeling of the two-phase transaction commit protocol. I ask students to model what happens when the initiator/transaction manager (TM) fails, how would a backup (TM) take over, and what type of problems could arise.

Here is the description of the project. I will post the solution later on.

2 phase commit

In a distributed system, a transaction is performed by a collection of processes called resource managers (RMs), each executing on a different node. The transaction ends when the transaction manager (TM) issues a request either to commit or to abort the transaction. For the transaction to be committed, each participating RM must be willing to commit it. Otherwise, the transaction must be aborted. The fundamental requirement is that all RMs must eventually agree on whether the transaction is committed or aborted.

Here is a model of 2-phase commit. (This was built by modifying the /P2TCommit.tla/ at Lamport's site to add a TM agent). I decided to stay with the shared memory model rather than the message passing model to keep the project simple. The interesting scenarios is still possible under the shared memory model.

Here are the constraints on the shared memory communication. A RM node can only read tmState and read/update its own rmState. It cannot read other RM's rmState. A TM node can read all RM nodes' rmState and read/update tmState. A BTM node can read all RM nodes' rmState and read/update tmState.

2 phase commit modeling and validation

If no faults occur, the 2-phase commit algorithm is correct. In the presence of a crash fault, however, problems can arise. In the questions below, we will use TLA+/PlusCal to explore what problems may arise, and how to properly design the protocol to overcome those problems.

Part 1.

  • Fill in the reducted PlusCal code. (The /macro Fail/ models RM failure.)
  • Add /Consistency/ and /Termination/ properties.
  • Model check /Consistency/ and /Termination/ with no failures (RMMAYFAIL=FALSE and TMMAYFAIL=FALSE). You should see no errors.
  • Model check with RM failure (RMMAYFAIL=TRUE and TMMAYFAIL=FALSE). You should see no errors. 

Part 2.

  • Model check with RMMAYFAIL=FALSE and TMMAYFAIL=TRUE. (No need to recover a failed TM.)
  • Write in the comments section, after the "==================" line, your findings/observations. Comment whether the /Termination/ property is violated by a TM failure. 

Part 3.

  • Add a backup TM process to take over if primary crashes. (Assume the BTM cannot fail. TMMAYFAIL can only affect TM not BTM.)
  • Test satisfaction of /Consistency/ and /Termination/ properties with no TM or RM failures. Make sure BackupTM terminates, so the /Termination/ property is also satisfied as well as /Consistency/ property. 
  • Model check with both TM and RM failure allowed (RMMAYFAIL=TRUE and TMMAYFAIL=TRUE). Write down your observations. (No need to recover a failed RM or TM.)

Here is the synchronized consensus problem I assigned as the first project in the class.

Here is some previous discussion/context about why I started assigning TLA+/PlusCal modeling projects in distributed systems classes.

Tuesday, December 5, 2017

Paper Summary. The Case for Learned Index Structures

This paper was put on Arxiv yesterday and is authored by Tim Kraska, Alex Beutel, Ed Chi, Jeff Dean, Neoklis Polyzotis.

The paper aims to demonstrate that "machine learned models have the potential to provide significant benefits over state-of-the-art database indexes".

If this research bears more fruit, we may look back and say, the indexes were first to fall, and gradually other database components (sorting algorithms, query optimization, joins) were replaced with neural networks (NNs).

In any case this is a promising direction for research, and the paper is really thought provoking.


Databases started as general, one-size fits all blackboxes. Over time, this view got refined to "standardized sizes" to OLAP databases and OLTP databases.

Databases use indexes to access data quickly. B-Trees and Hash-maps are common techniques to implement indexes. But along with the blackbox view, the databases treat the data as opaque, and apply these indexes blindly without making any assumptions about the data. However, it is obvious that not knowing about the data distribution leaves performance on the table. Consider this thought experiment. If the keys are from the range of 0 to 500m, it is faster to just use the key as index, rather than using a hash. This observation can be extended to other data distributions, if we know the cumulative distributed function (CDF) of the data. We can generalize by saying "CDF*key*record-size" gives us the approximate position of the record the key refers to.

Ok, so, by knowing about the data distribution, we can achieve performance gains. But now we lost reusability when we go full whitebox. We can't afford to go full whitebox, inspecting the data, and designing the indexes from scratch every time.

The paper shows that by using NNs to learn the data distribution we can have a graybox approach to index design and reap performance benefits by designing the indexing to be data-aware.

The case for applying NNs to indexing is shown over the following three index-types:

  • B-trees, which are used for handling range queries
  • hash-maps, which are used for point-lookup queries
  • bloom-filters, which are used for set inclusion checks

I will only summarize the section on how to replace the B-tree structure. For the hash maps, the learned structure is a straightforward function based on CDF of the data.


B-trees provide a hierarchical efficient index.

Why is it even conceivable to replace B-tree with an NN model? Conceptually, b-tree maps a key to a page. We can have a model that also performs key to position mapping, and for the error range, we can do a variant of binary search (or expanded ring search) to locate the page.

OK, then, how do we know min_error and max-error? We train the model with the data we have. The data is static, the NN makes a prediction and then learns from these errors. (Even simple logistic regression may work for simple distributions.)

What potential benefits can we reap by replacing B-tree with a model:

  • smaller indexes: less main-memory or L1 cache storage
  • faster lookup: as a result of smaller indexes
  • more parallelism (TPU), instead of hierarchical if statements as in B-tree.

The key insight here is to trade computation for memory, banking on the trend that computation is getting cheaper (and if you can do it on TPU/GPU you reap more benefits). The evaluation of the paper doesn't even go into using TPUs for this.

The paper includes several strategies to improve the performance of the learned index, including using a recursive model index, hierarchical models, and hybrid models. For evaluation results, please refer to the paper.

Monday, December 4, 2017

Enabling API Virtualization on Android for Platform Openness

This morning I was at Taeyeon Ki's dissertation proposal. He presented his work on enabling openness (hackability/tinkerability) in Android platform-side.

He defined openness as being able to participate in innovation and distribution, and gave SDN and FUSE are examples of openness.

He argued that mobile vendors control their platforms tightly, and this prevents tinkering the platform and obstructs innovation. Even though you can innovate at the app side, the OS/platform side is closed for tinkering. He posed the question: how can we enable anyone to easily develop distribute new platform-level functionality?

To answer this question, he presented his work in two projects: Reptor and Mimic.


Reptor is a bytecode instrumentation tool enabling api virtualization on Android. Reptor does this by intercepting calls and redirecting them to a new implementation of the method called.

Intersecting calls to methods is not straightforward. Doing just method name substitution cannot handle calls to class level features: class hierarchy, interface, abstract class, etc. So instead of replacing the method, the Reptor tool replaces entire class. Class level replacement approach works well with callbacks in Android as it ensures that typecasting is correctly handled.

One use case for Reptor is to change some platform-level components used in an app with a more suitable one depending on the region. China and Korea do not have Google Play store, they run their own app-stores for Android. So in China and Korea Google Maps (which is an integral component of Play Store) is not available. Apps there use a downgraded static web map mostly. But with Reptor, it is possible to wrap those applications and redirect the maps calls to Amazon map. Taeyeon showed a demo of this on the Airbnb app.

Reptor has been dummy-tested with 1200 popular apps from Google Play. It has been tested for 32 benchmarked apps with more in depth tests, where runtime checks performed. For this in-depth tests for showing runtime equivalence the Mimic tool described below has been used. The tests showed that Reptor achieves minimal overhead in memory, and performance.

While run-time testing via Mimic is helpful for checking if Reptor substitution at the application of one platform-component with another is "correct" with respect to the platform-level classes, better definition of "correctness" and better framing/formalization of the problem will help this work.


Mimic is an automated UI behavior comparison testing system that provides runtime behavioral compatibility check between original and instrumented app. (I think this is under submission or to appear state.)

When I heard this description, I combined this and Reptor together in my mind, and all I could think of was that the combination can be great for automating click farms for smartphone apps!

Taeyeon had something more useful and honest in mind. There are a dozen different android versions in use. A typical app posts several updates a month and the  developers struggle to test their apps with all these different environments. Mimic combined with Reptor can help perform these tests in an automated way? (Well, still sounds a lot like click farm to me :-)

Taeyon explained their follow-the-leader testing model, where Mimic replicates the user behavior at the leader to the followers.

Saturday, December 2, 2017


Open Computing Language (OpenCL) is a framework for writing programs that execute across heterogeneous platforms consisting of central processing units (CPUs), graphics processing units (GPUs), digital signal processors (DSPs), or field-programmable gate arrays (FPGAs). Heterogeneous computing refers to systems that use more than one kind of processor or cores for high performance or energy efficiency.

OpenCL views a computing system as consisting of a number of compute devices (GPUs, CPUs, FPGAs) attached to a host processor (a CPU). It defines a C-like language for writing programs. Functions executed on an OpenCL device are called /kernels/. A single compute device typically consists of several compute units, which in turn comprise multiple processing elements (PEs). A single kernel execution can run on all or many of the PEs in parallel.

In addition to its C-like programming language, OpenCL defines an API that allows programs running on the host to launch kernels on the compute devices and manage device memory. Programs in the OpenCL language are intended to be compiled at run-time, so that OpenCL-using applications are portable between implementations for various host devices.

OpenCL is an open standard maintained by the non-profit technology consortium Khronos Group. Conformant implementations are available from Altera, AMD, Apple, ARM, Creative, IBM, Imagination, Intel, NVIDIA, Qualcomm, Samsung, Vivante, Xilinx, and ZiiLABS. Although OpenCL provides an alternative to CUDA, it has some support from  NVDIA.

OpenCL is supported by Android, FreeBSD, Arch Linux, Linux, macOS, Windows operating systems.

CUDA to OpenCL translators

A prototype implementation exists for CUDA to OpenCL translator.

A 2015 paper, Bridging OpenCL and CUDA: a comparative analysis and translation, also provides CUDA-OpenCL translation as well as OpenCL-CUDA translation.

These translators, however, do not provide industry-grade and ruggedized implementations.

OpenCL for ML

OpenCL support is still underwhelming for deep learning, but it is getting better.

Recently an OpenCL port of Caffee was made available. This Caffe port was shown/evaluated for AMD chipsets, but it should also apply for ARM platforms that support OpenCL.

Thursday, November 30, 2017

What I learn from teaching

After I teach a class I am debilitated/incapacitated for about an hour. I can't do anything productive in that time. This is because teaching demands a lot of mental power (at least for me). I am sure, being an introvert doesn't help either; I need some recovery time after too much exposure. When I am lecturing though, I let go (at least after the first 5-10 minutes or so), and I don't think about myself or being introvert. After the initial warm up period, I get excited about what I am teaching, and all I can think of is the message I am trying to convey. This is actually good. When I find myself lost in the moment (or maybe that is getting into the teaching zone), I know I am doing good. I had written about this earlier on presenting: you should center on the message not on yourself. Oh wow, actually I wrote twice on that topic.

So, in that down hour after teaching, what do I do? I have regret-flashbacks about some parts of the lecture I delivered. I have remorse about how I botched some parts of the lecture and how I couldn't answer a student's question better. The reason I botch up a part of the lecture is often because I haven't internalized that part yet. The reason I wasn't able to make a smooth transition from one part to another part is often because I didn't have a better understanding/insight about how those parts fit. It is likely I was missing some context about it. Or maybe more likely that I knew the context, but I didn't internalize it well enough. There are many different levels of knowing, and the more reliable/resilient/dependable level of knowing is knowing by experience. Teaching is a way to have hands-on training in your research area. You learn through your regrets about how you couldn't explain it better. So "teaching is an integral part of a faculty/researcher's job" is a cliche for a reason: it is very true and important.

Richard Feynman wrote about teaching in several memoirs. This one is especially relevant: "If you're teaching a class, you can think about the elementary things that you know very well. These things are kind of fun and delightful. It doesn't do any harm to think them over again. Is there a better way to present them? Are there any new problems associated with them? Are there any new thoughts you can make about them? The elementary things are easy to think about; if you can't think of a new thought, no harm done; what you thought about it before is good enough for the class. If you do think of something new, you're rather pleased that you have a new way of looking at it."

And here Feynman is saying it is better not to be too methodical when teaching. I can listen to Feynman all day.

Here is relevant post of mine on teaching.
Here are some of my other posts labeled teaching.

Monday, November 27, 2017

Paper summary. Blazes: Coordination analysis and placement for distributed programs

This paper is by Peter Alvaro, Neil Conway, Joseph M. Hellerstein, and David Maier. It appears in October 2017 issue of ACM Transactions on Database Systems. A preliminary conference version appeared in ICDE 2014. 

This paper builds a theory of dataflow/stream-processing programs, which cover the Spark, Storm, Heron, Tensorflow, Naiad, TimelyDataflow work.

The paper introduces compositional operators of labels, and shows how to infer the coordination points in dataflow programs. When reading below, pay attention to the labeling section, the labels "CR, CW, OR, OW", and the section on reduction rules on labels.

To figure out these coordination points, the Blazes framework relies on annotations of the dataflow programs to be supplied as an input. This is demonstrated in terms of a Twitter Storm application and a Bloom application.

The paper has many gems.  It says at the conclusion section in passing that when designing systems, we should pay attention to coordination locality, not just data-access locality. (Maybe often the two are related/correlated, and this leads people to confuse them.) It also makes practical lessons about how to organize your dataflow programs: move replication upstream, caching downstream. I wish the paper elaborated more on these points at the conclusion.

I am not done with the paper yet. It is a 31 page paper, packed with information. I gave it a first pass. I think I will be reading this paper again and keep thinking on it.

In my summary below I borrow from the paper figures and a lot of text with small edits.


The key idea in Blazes is that even when components are order-sensitive, it is often possible to avoid the cost of global ordering without sacrificing consistency. In many cases, Blazes can ensure consistent outcomes via *sealing* which indicates when partitions of a stream have stopped changing. (Is "sealing" like "watermark" in the Google streaming paper?)

Some contributions of the Blazes work are:

  • The paper shows how to analyze the composition of consistency properties via a term-rewriting technique over dataflow paths.
  • The paper demonstrates how the semantics of components can be modeled using a system of programmer-supplied annotations (graybox approach). 
  • The paper discusses two alternative coordination strategies, ordering and sealing, and shows how to automatically generate application-aware coordination code that uses the cheaper sealing technique in many cases.

Dataflow consistency

Look at these anomalies. Just look at them. This figure should have come with a trigger warning for distributed systems developers.

But it is possible to go to the first-principles and cut the Gordian knot. If we could ensure that every component in a dataflow produces a deterministic set of outputs regardless of the order in which its inputs are delivered, then we could prevent all of these anomalies.

There are two approaches to doing that.

*Consistency via Coordination*:  A sufficient strategy for preventing the anomalies (or arbitrary components) is to remove the nondeterminism in delivery order by coordinating the (otherwise asynchronous) processing of inputs by using a coordination system such as ZooKeeper. This is a general (independent of semantics of computation) approach but imposes throughput and latency costs.

*Consistency via Component Properties*: A dataflow component is *confluent* if it produces the same set of outputs for all orderings of its inputs. At any time, the output of a confluent component (and any redundant copies of that component) is a subset of the unique, final output.

In an ideal world, we would build dataflows out of strictly confluent components. Unfortunately this is an infeasible assumption. When we add an order-sensitive component to an otherwise confluent dataflow, how can we avoid falling back on a global coordination strategy? This is the question the paper investigates.

Remark: Note that confluent is a stronger property than convergent. Convergent replicated components are guaranteed to eventually reach the same state, but this final state may not be uniquely determined by component inputs. As Figure 5 indicates, convergent components allow cross-instance nondeterminism, which can occur when reading "snapshots" of the convergent state while it is still changing. Consider what happens when the outputs of a convergent component (e.g., GETs posed to a key/value store)  flow into a replicated stateful component (e.g., a replicated cache). If the caches record different stream contents, then the result is replica divergence.

Annotated dataflow graphs

The *CR* annotation indicates that a path through a component is confluent and stateless; that is, it produces deterministic output regardless of its input order, and the path does not modify the component's state.

*CW* denotes a path that is confluent and stateful. That is, calls to the API may modify the internal state of the component, but the final state is determined only by the input contents and not their order.

The annotations *OR_gate* and *OW_gate* denote non-confluent paths that are stateless or stateful, respectively.

The *Seal_key* annotation means that the stream is punctuated on the subset key of the stream's attributes.

The label *Async* corresponds to streams with deterministic contents whose order may differ on different executions or different stream instances. Streams labeled *NDRead* may exhibit cross-run nondeterminism, having different contents in different runs, or cross-instance nondeterminism within a single run if the system is replicated. Finally, streams labeled *Taint* may exhibit persistent nondeterministic contents, or replica divergence in replicated systems.

Reduction relation

Combination rules model what happens when multiple input streams contribute to a single output stream in a dataflow. Taint is the strongest label: if any of the paths contributing to an output stream can exhibit permanent nondeterministic contents, then so can that output stream. Async is the weakest, and will be dominated by any labels with which it combines.

Coordination selection

Blazes will repair nonconvergent and nonconfluent dataflows by constraining how messages are delivered to individual components. When possible, Blazes will recognize the compatibility between sealed streams and component semantics, synthesizing a seal-based strategy that avoids global coordination. Otherwise, it will enforce a total order on message delivery, say using ZooKeeper.

*Sealing Strategies.* To determine that the complete partition contents are available, the consumer must a) participate in a protocol with each producer to ensure that the local per-producer partition is complete, and b) perform a unanimous voting protocol to ensure that it has received partition data from each producer. The voting protocol is a local form of coordination, limited to the "stake holders" contributing to individual partitions. When there is only one producer instance per partition, Blazes need not synthesize a voting protocol. Once the consumer has determined that the contents of a partition are (henceforth) immutable, it may process the partition without any further synchronization.

Blazes framework

Blazes can be directly applied to existing programming platforms based on distributed stream or dataflow processing, including Twitter Storm and Spark Streaming. Programmers of stream processing engines interact with Blazes in a graybox manner: they provide simple semantic annotations to the blackbox components in their dataflows, and Blazes performs the analysis of all data ow paths through the program.

Case study: Word count example

The Storm word-count dataflow can be reduced to Taint in the following way once we supply an Async label for its input interface.

If, on the other hand, the input stream is sealed on batch, then Blazes instead produces this reduction:

Because a batch is atomic (its contents may be completely determined once a punctuation arrives) and independent (emitting a processed batch never affects any other batches), the topology will produce deterministic contents in its (possibly nondeterministically ordered) outputs, a requirement for Storm’s replay-based fault-tolerance—under all interleavings.

Figure 20 shows that the sealed solution outperforms the ordering based solution significantly as the number of workers increase.


Recently I had summarized a computational model for TensorFlow. It was a preliminary basic attempt at creating an operational semantics for TensorFlow programs. It would be interesting to compare that with the Blazes approach, as they are solving related problems.

The TLA+ approach seems to be more fine granularity and can do refinement checks between different models. But it is preliminary so far. The Blazes approach can handle components. It looks like it is more coarse granularity but can be applied easier.

Saturday, November 25, 2017

On dataflow systems, Naiad and Tensorflow

The below definition for dataflow programming is from Wikipedia (with some small edits):
"Traditionally, a program is modeled as a series of operations happening in a specific order; this may be referred to as sequential, procedural, or imperative programming. The program focuses on commands for programming, where data is normally /at rest/.

In contrast, dataflow programming emphasizes the movement of data; it models a program as a series of connections with explicitly defined inputs and outputs, and connect operations. An operation runs as soon as all of its inputs become available. Thus, dataflow languages are inherently parallel and can work well in large, decentralized systems."

Some examples of dataflow frameworks are map-reduce, Spark, Storm & Heron, GraphX, GraphLab, Naiad (now implemented in Rust as Timely-Dataflow), and Tensorflow.

Map-Reduce uses a very simple directed-acyclic-graph (DAG) with only two operations: map and reduce. Spark extends map-reduce to a couple dozen operations in addition to map and reduce, and implements data storage/processing to be in-memory. More specifically, in Spark, a computation is modeled as a directed acyclic graph (DAG), where each vertex denotes a Resilient Distributed Dataset (RDD) and each edge denotes an operation on RDD. RDDs are collection of objects divided in logical partitions that are stored and processed as in-memory, with shuffle/overflow to disk.

Twitter's Storm and Heron are dataflow stream-processing frameworks. In Storm (and its subsequent modular implementation Heron), bolts and spouts corresponds to dataflow operations and streams.

I had written summaries about GraphX and GraphLab approaches to dataflow as well.

In this post, I will focus more on Naiad (and timely-dataflow), and try to compare/contrast that with TensorFlow.

Naiad, timely-dataflow, and differential-dataflow

Naiad introduced a dataflow programming framework that allows cycles in the dataflow graph. Naiad's dataflow model supports structured loops allowing feedback in the dataflow, stateful data flow vertices capable of consuming and producing records without global coordination, and notifications for vertices once they have received all records for a given round of input or loop iteration. Here is my summary of Naiad from 2014.

Frank McSherry has been singlehandedly (I don't know, maybe he uses both his hands) implementing Naiad in Rust for the last couple years, and calls the project timely-dataflow. He also has another project that provides implementation of differential dataflow in Naiad using timely-dataflow on Rust.

Differential dataflow (i.e., incremental dataflow) means streaming iterative computation and doing incremental computation only in response to changing data. "If you change one input to your computation, you would prefer not to re-evaluate the entire computation. Rather, changes to the input produces some changes in the output of your first dataflow operator, which are then changes to the inputs of subsequent operators. Eventually these changes either evaporate or spill out the end of your dataflow as changed outputs. As changes flow around the dataflow, they happen at various logical times. These times reflect the epoch of streaming input data, and iteration counters for any loops they may be contained in. Rather than eagerly aggregate these changes, we leave them disaggregated, allowing future computation to use arbitrary subsets of the changes. Although we leave ourselves more work to do for each change, we end up with an orders-of-magnitude reduction in the numbers of changes."

Theres is a technical distinction between differential and incremental. Incremental works with "sequence of arbitrary updates" whereas differential works with  "partial order of arbitrary updates". The former has been around a while, has lots of prior art, etc. Differential is pretty new and while more general is pretty similar in most cases (e.g. streaming join is the same for both of them; streaming join in a loop is only possible in differential).

Naiad versus TensorFlow

TensorFlow is a special instantiation/application of Naiad's timely-dataflow model of cyclic dataflow processing for the domain of machine learning and specifically deep-learning. It is well integrated with tools, Python, GPUs, etc. Here is my summary of TensorFlow if you need a refresher.

I think TensorFlow traded off generality of Naiad and gained much more in return. This is what some people call "addition by subtraction".

Naiad aimed to satisfy real-time querying. TensorFlow is OK with batch training. TensorFlow can still output a result at the end of each minibatch, not a big deal. The real-time querying requirement in Naiad may be an unrealistic and unnecessarily/impractical requirement to shoot for in practical systems. Who knows. Are there practical applications that require tight real-time querying with the most recent updates reflected in the view? (More on this, at the end, on the applications discussion.)

Naiad aimed at precisely correct output. If something changes slightly, Naiad will recompute effected things (incrementally, yay) and give you the correct output. On the other hand, if something changes, TensorFlow will consider the changed things as a new minibatch to train with, as a result of which not much may change. Machine Learning and Deep Learning tolerate uncertainty anyhow. Minibatches are how TensorFlow handles differential/incremental dataflow implicitly. No complicated mechanism is required. If things change, that is another minibatch to train for, and most of the previously computed model parameters may remain unaffected.

(Naiad's differential/incremental processing imposes extra memory requirements. It is unclear to me how much state needs to be held at the stateful-operations about the input-graph so that incremental processing is still possible.)

Finally, Naiad has a join operation a very strong/capable yet a costly operation. TensorFlow does not have a join operation, but that makes TensorFlow operations easier to shard/partition across machines. Maybe TensorFlow doesn't have join operations because its inputs are assumed to be independent. Naiad assumes inputs can be dependent: like parts of a graph, or update to an earlier graph, streaming into Naiad. So Naiad has a join operator but pays the cost for that powerful operator.

Naiad's generality should open it up for more potential applications. Differential dataflow is ideal for graph algorithms that update the graph or operates on a dynamic graph. This should be useful for search engines; when the internet graph and links update, you shouldn't recompute the entire result in batch. I don't know what Google uses most recently, but Google has been using an incremental transactional system leveraging BigTable, called Percolator, to solve this problem, without needing the differential dataflow computing power of Naiad.

Other application areas for Naiad's computing power could be social network applications and recommendation systems. But it seems like social network applications do not need to be very precise, approximate answers are OK with them. And recommendations systems do not need very tight real-time constraints yet.