## Saturday, August 19, 2017

### Cloud fault-tolerance

I had submitted this paper to SSS'17. But it got rejected. So I made it a technical report and am sharing it here. While the paper is titled "Does the cloud need stabilizing?", it is relevant to the more general cloud fault-tolerance topic.

I think we wrote the paper clearly. Maybe too clearly.

Here is the link to our paperhttps://www.cse.buffalo.edu//tech-reports/2017-02.pdf
(Ideally I would have liked to expand on Section 4. I think that is what we will work on now.)

Below is an excerpt from the introduction of our paper, if you want to skim that before downloading the pdf.
-------------------------------------------------------------------------
The last decade has witnessed rapid proliferation of cloud computing. Internet-scale webservices have been developed providing search services over billions of webpages (such as Google and Bing), and providing social network applications to billions of users (such as Facebook and Twitter). While even the smallest distributed programs (with 3-5 actions) can produce many unanticipated error cases due to concurrency involved, it seems short of a miracle that these web-services are able to operate at those vast scales. These services have their share of occasional mishap and downtimes, but overall they hold up really well.

In this paper, we try to answer what factors contribute most to the high-availability of cloud computing services, what type of fault-tolerance and recovery mechanisms are employed by the cloud computing systems, and whether self-stabilization fits anywhere in that picture.
(Stabilization is a type of fault tolerance that advocates dealing with faults in a principled unified manner instead of on a case by case basis: Instead of trying to figure out how much faults can disrupt the system's operation, stabilization assumes arbitrary state corruption, which covers all possible worst-case collusions of faults and program actions. Stabilization then advocates designing recovery actions that takes the program back to invariant states starting from any arbitrary state.)

Self-stabilization had shown a lot of promise early on for being applicable in the cloud computing domain. The CAP theorem seemed to motivate the need for designing eventually-consistent systems for the cloud and self-stabilization has been pointed out by experts as a promising direction towards developing a cloud computing research agenda. On the other hand, there has not been many examples of stabilization in the clouds. For the last 6 years, the first author has been thinking about writing a "Stabilization in the Clouds" position paper, or even a survey when he thought there would surely be plenty of stabilizing design examples in the cloud. However, this proved to be a tricky undertaking. Discounting the design of eventually-consistent key-value stores and application of Conflict-free Replicated Data Types (CRDTs) for replication within key-value stores, the examples of self-stabilization in the cloud computing domain have been overwhelmingly trivial.

We ascribe the reason self-stabilization has not been prominent in the cloud  to our observation that cloud computing systems use infrastructure support to keep things simple and reduce the need for sophisticated design of fault-tolerance mechanisms. In particular, we identify the following cloud design principles to be the most important factors contributing to the high-availability of cloud services.

• Keep the services "stateless" to avoid state corruption. By leveraging on distributed stores for maintaining application data and on ZooKeeper for distributed coordination, the cloud computing systems keep the computing nodes almost stateless. Due to abundance of storage nodes, the key-value stores and databases replicate the data multiple times and achieves high-availability and fault-tolerance.
• Design loosely coupled distributed services where nodes are dispensable/substitutable. The service-oriented architecture, and the RESTful APIs for composing microservices are very prevalent design patterns for cloud computing systems, and they help facilitate the design of loosely-coupled distributed services. This minimizes the footprint and complexity of the global invariants maintained across nodes in the cloud computing systems. Finally, the virtual computing abstractions, such as virtual machines, containers, and lambda computing servers help make computing nodes easily restartable and substitutable for each other.
• Leverage on low level infrastructure and sharding when building applications. The low-level cloud computing infrastructure often contain more interesting/critical invariants and thus they are  designed by experienced engineers, tested rigorously, and sometimes even formally verified. Higher-level applications leverage on the low-level infrastructure, and avoid complicated invariants as they resort to sharding at the object-level and user-level. Sharding reduces the atomicity of updates, but this level of atomicity has been adequate for most webservices, such as social networks.

A common theme among these principles is that they keep the services simple, and trivially "stabilizing", in the informal sense of the term. Does this mean that self-stabilization research is unwarranted for cloud computing systems? To answer this, we point to some silver lining in the clouds for stabilization research. We notice a trend that even at the application-level, the distributed systems software starts to get more complicated/convoluted as services with more ambitious coordination needs are being build.

In particular, we explore the opportunity of applying self-stabilization to tame the complications that arise when composing multiple microservices to provide higher-level services. This is getting more common with the increased consumer demand for higher-level and more sophisticated web services. The higher-level services are in effect implementing distributed transactions over the federated microservices from multiple geodistributed vendors/parties, and that makes them prone to state unsynchronization and corruption due to  incomplete/failed requests at some microservices. At the data processing systems level, we also highlight a need for self-regulating and self-stabilizing design for realtime stream processing systems, as these systems get more ambitious and complicated as well.

Finally, we point out to a rift in the cloud computing fault model and recovery techniques, which motivates the need for more sophisticated recovery techniques. Traditionally the cloud computing model adopted the crash failure model, and managed to confine the faults within this model. In the cloud, it was feasible to use multiple nodes to redundantly store state, and easily substitute a stateless worker with another one as nodes are abundant and dispensable. However, recent surveys on the topic remark that more complex faults are starting to prevail in the clouds, and recovery techniques of restart, checkpoint-reset, and devops involved rollback and recovery are becoming inadequate.

## Friday, August 18, 2017

This summary combines material from "Twitter Heron: Stream Processing at Scale" which appeared at Sigmod 15 and "Twitter Heron: Towards extensible streaming engines" paper which appeared in ICDE'17.

Heron is Twitter's stream processing engine. It replaced Apache Storm use at Twitter, and all production topologies inside Twitter now run on Heron. Heron is API-compatible with Storm, which made it easy for Storm users to migrate to Heron. Reading the two papers, I got the sense that the reason Heron was developed is to improve on the debugability, scalability, and manageability of Storm. While a lot of importance is attributed to performance when comparing systems, these features (debugability, scalability, and manageability) are often more important in real-world use.

## The gripes with Storm

Hard to debug. In Storm, each worker can run disparate tasks. Since logs from multiple tasks are written into a single file, it is hard to identify any errors or exceptions that are associated with a particular task. Moreover, a single host may run multiple worker processes, but each of them could belong to different topologies. Thus, when a topology misbehaves (due to load or faulty code or hardware) it is hard to determine the root-causes for the performance degradation.

Wasteful for scheduling resources at the datacenter. Storm assumes that every worker is homogenous. This results in inefficient utilization of allocated resources, and often leads to overprovisioning to match the memory needs of the worker with the highest requirements at all the other workers.

Nimbus-related problems. The Storm Nimbus scheduler does not support resource reservation and isolation at a granular level for Storm workers. Moreover, the Nimbus component is a single point of failure.

ZooKeeper-related gripes. Storm uses Zookeeper extensively to manage heartbeats from the workers and the supervisors. This use of Zookeeper limits the number of workers per topology, and the total number of topologies in a cluster, since ZooKeeper becomes the bottleneck at higher numbers.

Lack of backpressure. In Storm, the sender does not adopt to the speed/capacity of the receiver, instead the messages are dropped if the receiver can't handle them.

## Heron's modular architecture

As in Storm, a Heron topology is a directed graph of spouts and bolts. (The spouts are sources of streaming input data, whereas the bolts perform computations on the streams they receive from spouts or other bolts.)

When a topology is submitted to Heron, the Resource Manager first determines how many containers should be allocated for the topology. The first container runs the Topology Master which is the process responsible for managing the topology throughout its existence. The remaining containers each run a Stream Manager, a Metrics Manager and a set of Heron Instances which are essentially spouts or bolts that run on their own JVM. The Stream Manager is the process responsible for routing tuples among Heron Instances. The Metrics Manager collects several metrics about the status of the processes in a container.

The Resource Manager then passes this allocation information to the Scheduler which is responsible for actually allocating the required resources from the underlying scheduling framework such as YARN or Aurora. The Scheduler is also responsible for starting all the Heron processes assigned to the container.

Heron is designed to be compositional, so it is possible to plug extensions into it and customize it. Heron allows the developer to create a new implementation for a specific Heron module (such as the scheduler, resource manager, etc) and plug it in the system without disrupting the remaining modules or the communication mechanisms between them. This modularity/compositionality is made a big deal, and the ICDE17 paper is dedicated entirely to this aspect of Heron design.

Note also that Heron's modular architecture also plays nice with the shared infrastructure at the datacenter. The provisioning of resources (e.g. for containers and even the Topology Master) is cleanly abstracted from the duties of the cluster manager.

## Heron Topology Architecture

Having a Topology Master per topology allows each topology to be managed independently of each other (and other systems in the underlying cluster). Also, failure of one topology (which can happen as user-defined code often gets run in the bolts) does not impact the other topologies.

The Stream Manager is another critical component of the system as it manages the routing of tuples among Heron Instances. Each Heron Instance connects to its local Stream Manager to send and receive tuples. All the Stream Managers in a topology connect between themselves to form n*n connections, where n is the number of containers of the topology. (I wonder if this quadratically growing number of connections would cause problems for very large n.)

As you can see in Figure 5, each Heron Instance is executing only a single task (e.g. running a spout or bolt), so it is easy to debug that instance by simply using tools like jstack and heap dump with that process. Since the metrics collection is granular, this makes it transparent as to which component of the topology is failing or slowing down. This small granularity design also provides resource isolation. Incidentally, this reminds me of the ideas argued in "Performance-clarity as a first-class design principle" paper.

## Saturday, August 12, 2017

### On presenting well

I started thinking about this topic recently as my two PhD students Ailidani Ailijiang and Aleksey Charapko had to make their first presentations in ICDCS 2017. I worked with them to prepare their presentations. I listened to 3 drafts of their presentations and helped them revise the presentations. Since the topic of presenting your work was rekindled, I thought I should do another post on this.

## Most talks suck

I told my students that most of the conference presentations are dreadful. The good news is that the bar is not high. If you prepare sufficiently and practice, you will be already ahead of the curve.

But why do most talks suck? It is mainly because the presenters got it wrong about the purpose of the talk and try to cover everything in their paper. As a result their slides are not well thought, and the presentation follows the paper outline and tries to cover the paper. The presenters then go through these slides quickly, as if covering more ground means the audience will learn more. And the audience get dazed by powerpoint bullets.

This is a rookie mistake. My students also made that mistake in the first draft of their talks. I told them that if they just fixed this problem, they will be almost there. Below I will give you a step by step walkthrough of how to fix this problem.

## Know the game

Covering all things in the paper is a losing battle. The audience has seen a dozen presentations that day, things are getting blurry for them, and their brain is getting mushy. Nowadays you are also competing with Twitter and Facebook for the attention of your audience. (This is one of my biggest pet peeves. What is the deal with people who fly over half the world to get to a conference but then keep checking Twitter and Facebook on their laptops? Seriously?)

Moreover, 20 minutes is a shorter time than you think. You don't stand a chance explaining all your results and the subtleties in your work in 20 minutes. Your best chance is to engage them long enough to communicate the heart of your paper.

So the goal of the presentation is to communicate the essence of your paper, instead of covering all the things in your paper. Instead of presenting too many slides in a poor way, it is best to present fewer slides in a memorable way and get people understand those. If you manage to do this, the audience can read your paper for an in-depth description of your work.

One of the best ways to communicate the idea of a paper is to tell it as a story. A story engages the audience better, and will make the lessons stick longer.

This is the most challenging part of your preparation. You need to frame your talk well. You should find the right place to begin, and develop the right story and context to communicate your work. This requires good hard thinking and several iterations.

I helped my students frame their talks. We found stories for their presentations, and restructured their first draft to follow this storyline rather than the paper outline. Then we did a couple more drafts where we revised and refined the flow. Maybe I should do a future post to give a concrete example on finding a story on a presentation. For now, I will just point to this tangentially related post.

The most important thing for framing is to motivate the problem well. Giving concrete examples, albeit simplified, helps for this. 100% of audience should understand the problem, and at least 50% of audience should understand the solution you offer. (This is not to mean you will make the later part of the presentation hard to understand. You will be considered successful if 50% of audience understands despite your best efforts to simplify your explanation.)

When you are trying to communicate your work in simple terms, using analogies help.

Recall that the purpose of the presentation is to communicate the essence of the result and get people interested to dig in more. How do you do it? You iterate over your slides and remove the slides that don't cut right into the matter. You should have as few slides as you can cut down to. When was the last presentation where you wished you had more slides to present?

I always wished I had less slides to present. For a 20 minute presentation I sometimes get lazy and go with 20 slides. Big mistake! With so many slides, I can't pace myself well and start rushing to finish on time. But if I do my preparation well, I go with 10 slides (with ~3 sentences on each) for a 20 minute presentation. When I had less things to say (because I took the time and prioritized what I want to say), I would be able to communicate those things better and actually manage to teach them to the audience. Yes, although I know the basics about presentations, I botch some presentations up because I am lazy and don't prepare sufficiently.

What is that? Are you telling me it is impossible to present your paper in 10 slides? If you are at least a half-competent researcher/writer, you will know that you can and you should explain things at multiple granularities! You can and should have a 1 sentence summary, 5 sentence summary, and 5 paragraph summary of your work. In your presentation, you should refer to your 1 sentence summary multiple times. The audience should really understand that 1 sentence summary.

If you are well prepared with your slides, and practice your talk out loud at least a couple times, that should help calm your nerves, because you know you are prepared and up to the task.

But I won't lie, it is unnerving to be in the spotlight, and have too many eyeballs on you. (Some people argue this fear of public speaking has evolutionary roots. When the early human was standing in the savannah, and felt eyeballs on him, that was bad news: predators are eyeing him and he should be on the run. I don't know how credible this is. Evolutionary psychology has been used for arguing all sorts of things.)

Here are some things that can help calm your nerves.

If you focus on the message you will not focus on your self and ego. The message is the important thing for the audience, so nobody cares if you occasionally stutter and mess up. You are doing a service to the audience by delivering a message. There is no point in agonizing over whether you had a flawless delivery, and whether you looked good while presenting. If you deliver your message, then your content will speak for itself.

Another thing that can help is to reframe the situation. Your heart is beating hard not because you are scared, but because you are excited that you got the opportunity to talk about your work. And it is perfectly fine being excited about your talk. If you reframe it this way, this can even give you an energy boost for your talk.

You also need some alone time, a couple hours before you give the talk. Visualizing the talk will help a lot. Visualize yourself standing there, arguing the important points of the paper, slowly and confidently. Simulate your talk giving experience in your brain.

And don't worry if you get nervous and botch up some presentations. I did my fair share of botching up presentations, and found that people don't notice since most conference presentations are bad anyways. Chalk those as experience points. It will get better with time and you will slowly build immunity to stage fright. When I first started presenting, I was very nervous about standing in front of large crowds. It felt terrifying. But now it is just Tuesday afternoon, teaching Distributed Systems class. So start getting yourself exposed to crowds to build immunity.

## A job well done

If you do all that hard work in advance, you will get to enjoy yourself at the presentation. You will speak slowly and confidently. You will make eye contact with the audience, and get visual confirmation that people are following you, which will also energize you. You will get questions after the talk, and that is always a good sign people were engaged.

Both my students nailed their talks. Congratulations to them for a job well done. They got several good questions after the talk, and well earned applauses at the end. I know they will do better in other talks in the future, if they don't forget the lessons from this first presentation: find a good story, explain the heart of your work in as little number of slides as possible.

## Related Posts

How I write
How to write your research paper
Book review: "Good prose" and "How to write a lot"

## Wednesday, August 9, 2017

### Paper summary " Encoding, Fast and Slow: Low-Latency Video Processing Using Thousands of Tiny Threads"

This paper was written earlier than the "PyWren: Occupy the Cloud" paper, and appeared in NSDI'17. In fact this paper has influenced PyWren. The related work says "After the submission of this paper, we sent a preprint to a colleague who then developed PyWren, a framework that executes thousands of Python threads on AWS Lambda. ExCamera’s mu framework differs from PyWren in its focus on heavyweight computation with C++-implemented Linux threads and inter-thread communication."

I had written about AWS Lambda earlier. AWS Lambda is a serverless computing framework (aka a cloud-function framework) designed to execute user-supplied Lambda functions in response to asynchronous events, e.g., message arrivals, file uploads, or API calls made via HTTP requests.

While AWS Lambda is designed for executing asynchronous lightweight tasks for web applications, this paper introduced the "mu" framework to run general-purpose massively parallel heavyweight computations on it. This is done via tricking Lambda workers into executing arbitrary Linux executables (like LinPack written in C++). The mu framework is showcased by deploying an ExCamera application for compute-heavy video encoding. The ExCamera application starts 5000-way parallel jobs with IPC on AWS Lambda.

The paper provides the mu framework as opensource software.

## Going from Lambda to Mu

The Lambda idea is that the user offloads her code to the cloud provider, and the provider provisions the servers and runs her code. It is easy to start 1000s of threads in parallel in  sub-second startup latency (assuming warm Lambda instances). Upon receiving an event, AWS Lambda spawns a worker, which executes in a Linux container with up to two 2.8 GHz virtual CPUs, 1,536 MiB RAM, and about 500 MB of disk space.
So if Lambda uses container technology, what is its difference from just using containers? The difference is that once invoked the container stays warm at some server waiting another invocation and this does not get charged to you since you are not actively using it. Lambda bills in 100ms increments, and only when you call the Lambda instance with a function invocation. In contrast, EC2 deploys in minutes and bills in hours, and the Google Compute Engine bills in 10 minute increments.

I think another significant difference is that Lambda forces you to use the container via function calls and in stateless mode. This is a limitation, but maybe this forced constraint accounts for most of the success/interest in this platform. Stateless and disaggregated way of doing things may be more amenable for scalability.

The mu platform has two important components external to the Lambda platform: the coordinator and the rendezvous center. And of course it has Lambda workers, thousands of them available on command.

## The mu coordinator

The mu platform has an explicit coordinator. (Recall that the PyWren platform design had a thin shim driver at the laptop.) The coordinator is a long-lived server (e.g., an EC2 VM) that launches jobs and controls their execution.   The coordinator is truly the puppeteer. It manipulates the actions of each Lambda workers individually, and the workers comply as drones. The coordinator contains all of the logic associated with a given computation in the form of per-worker finite-state-machine (FSM) descriptions. For each worker, the coordinator maintains an open TLS connection, the worker’s current state, and its state-transition logic. When the coordinator receives a message from a worker, it applies the state-transition logic to that message, producing a new state and sending the next RPC request to the worker using the AWS Lambda API calls. (These HTTP requests are a bottleneck when launching thousands of workers, so the coordinator uses many parallel TCP connections to the HTTP server--one per worker-- and submits all events in parallel.) For computations in which workers depend on outputs from other workers, mu’s coordinator uses dependency-aware scheduling: the coordinator first assigns tasks whose outputs are consumed, then assigns tasks that consume those outputs.

## The Lambda workers

The workers are short-lived Lambda function invocations. All workers in mu use the same generic Lambda function. The user only installs one Lambda function and workers spawn quickly because the function remains warm. The user can include additional executables in the mu worker Lambda function package. The worker can then execute these in response to RPCs from the coordinator. The coordinator can instruct the worker to retrieve from or upload to AWS S3, establish connections to other workers via a rendezvous server, send data to workers over such connections, or run an executable.

## The mu rendezvous server

The mu platform employs a rendezvous server that helps each worker communicate with other workers. Like the coordinator, the rendezvous server is long lived. It stores messages from workers and relays them to their destination. This means that the rendezvous server’s connection to the workers can be a bottleneck, and thus fast network connectivity between workers and the rendezvous is required.

## Using the mu platform

To design a computation, a user specifies each worker’s sequence of RPC requests and responses in the form of a FSM, which the coordinator executes.  The simplest of mu’s state-machine components represents a single exchange of messages: the coordinator waits for a message from the worker, sends an RPC request, and transitions unconditionally to a new state. To encode control-flow constructs like if-then-else and looping, the mu Python library includes state combinator components. An if-then-else combinator might check whether a previous RPC succeeded, only uploading a result to S3 upon success.

## ExCamera video encoding application

The paper implements a video encoder intended for fine-grained parallelism, called ExCamera. ExCamera encodes tiny chunks of the video in independent threads (doing most of the “slow” work in parallel), then stitches those chunks together in a “fast” serial pass, using an encoder written in explicit state-passing style with named intermediate states.

In formats like 4K or virtual reality, an hour of video may take many hours to process as video compression relies on temporal correlations among nearby frames. However, ExCamera cuts this by an order of magnitude by deploying 1000s of Lambda workers in parallel and coordinating them via the mu platform.

As another application of the mu platform, the authors did a live demo for their NSDI17 conference talk. They had recorded 6 hours of video at the conference, and using OpenFace and mu platform on AWS Lambda they selected the frames where George Porter, a coauthor, appeared in the feed. This was done in minutes time and was indeed pretty bold thing to do at a presentation.

## Limitations

The paper has a limitations section. (I always wanted to include such a section in my papers. Maybe by including the limitations section you can preemptively disarm straightforward criticisms.) The limitations lists:
+ This is evaluated only on two videos.
+ If everybody used Lambda this way, would this still work?
+ The pipeline specification that the user should provide is complex.
+ A worker failure kills the entire job.

The paper mentions an experiment that ran 640 jobs, using 520,000 workers in total, each run for about a minute on average. In that experiment three jobs failed. This is a low failing rate, but this doesn't scale well with increasing job sizes.

## Tuesday, August 8, 2017

### Paper summary: Occupy the Cloud: Distributed Computing for the 99%

"We are the 99%!" (Occupy Wall Street Movement, 2011)

The 99% that the title of this paper refers to is the non-cloud-native and non-CS-native programmers. Most scientific and analytics software is written by domain experts like biologists and physicists rather than computer scientists. Writing and deploying at the cloud is hard for these folks. Heck it is even hard for the computer science folk. The paper reports that an informal survey at UC Berkeley found that the majority of machine learning graduate students have never written a cluster computing job due to complexity of setting up cloud platforms.

Yes, cloud computing virtualized a lot of things, and VMs, and recently containers, reduced the friction of deploying at the clouds. However, there are still too many choices to make and things to configure before you can get your code to deploy & run at the cloud. We still don't have a "cloud button", where you can push to get your single machine code deployed and running on the cloud in seconds.

But we are getting there. AWS Lambda and Google Cloud Functions aim to solve this problem by providing infrastructure to run event-driven, stateless functions as microservices. In this "serverless" model, a function is deployed once and is invoked repeatedly whenever new inputs arrive. Thus the serverless model elastically scales with input size. Here is an earlier post from me summarizing a paper on the serverless computation model.

This paper, which appeared on arXiv in February 2017 and revised June 2017, pushes the envelope on the serverless model further in order to implement distributed data processing and analytics applications. The paper is a vision paper, so it is low on details at some parts, however a prototype system, PyWren, developed in Python over AWS Lambda, is made available as opensource.

In order to build a data processing system, the paper dynamically injects code into these stateless AWS Lambda functions to circumvent its limits and extend its capabilities. The model has one simple primitive: users submit functions that are executed in a remote container; the functions are stateless; the state as well as input, and output is relegated to the shared remote storage. (This fits well with the rising trend of the disaggregated storage architecture.) Surprisingly, the paper finds that the performance degradation from using such an approach is negligible for many workloads.

After summarizing the code injection approach, I will mention how PyWren can implement increasingly more sophisticated data processing applications ranging from all Map, to Map and monolithic reduce, and MapReduce, and finally a hint of parameter-server implementation.

## Code injection

An AWS Lambda function gives you 5 minutes (300secs) of execution time at single core and 1.5 Gb RAM, and also gives you 512 MB in /tmp. PyWren exploits this 512MB tmp space to read Anaconda Python Runtime libraries. (This linked talk clarified PyWren code injection for me.)

PyWren serializes the user submitted  Python function using cloudpickle. PyWren submits the serialized function along with each serialized datum by placing them into globally unique keys in S3, and then invokes a common Lambda function. On the server side, PyWren invokes the relevant function on the relevant datum, both extracted from S3. The result of the function invocation is serialized and placed back into S3 at a pre-specified key, and job completion is signaled by the existence of this key. In this way, PyWren is able to reuse one registered Lambda function to execute different user Python functions and mitigate the high latency for function registration, while executing functions that exceed Lambda’s code size limit.

## Implementing Map, MapReduce, and the Parameter-Server

Map implementation. Many scientific and analytic workloads are embarrassingly parallel. The map primitive provided by PyWren makes addressing these use cases easy. Calling the map launches as many stateless functions as there are elements in the list that one is mapping over.

Map + Monolithic reduce. An easy way to implement MapReduce is to do the Reduce in just one machine. For this one machine to perform reduce, they use a dedicated single r4.16xlarge instance. This machine offers a very large amount of CPU and RAM for $14 an hour. MapReduce via BSP. To perform Reduce over many workers, we can use the bulk synchronous processing (BSP) model. To implement the BSP model and data shuffling across the stages PyWren leverages the high-bandwidth remote storage AWS S3 provides. To showcase this approach, they implemented a word count program in PyWren and found that on 83M items, it is only 17% slower than PySpark running on dedicated servers. The paper does not describe how BSP is implemented. I guess this is the responsibility of the driver program on the scientist's laptop. Eric Jonas, one of the authors of this work, calls this the shim handler, that submits the lambda functions to AWS. So I guess this driver checks the progress on the rounds by polling S3, and prepare/invoke the lambda functions for the next round. The paper also implements a more ambitious application, Terasort, using the PyWren MapReduce. Since this application produces a lot of intermediate files to shuffle in between, they say S3 becomes a bottleneck. So they use AWS elastic cache, a Redis in-memory key-value store. Using this, they show that PyWren can sort 1TB data in 3.4 minutes using 1000 lambda workers. The Parameter-server implementation. The paper claims to also implement Parameter-Server again using Redis inmemory keyvalue store. But there are no details, so it is unclear if the performance of using that is acceptable. ## Discussion They find that it is possible to achieve around 30-40 MB/s write and read performance per core to S3, matching the per-core performance of a single local SSD on typical EC2 nodes. They also show that this scales to 60-80 GB/s to S3 across 2800 simultaneous functions. Using AWS Lambda is only ~2× more expensive than on-demand instances. The paper says that this cost is worthwhile "given substantially finer-grained billing, much greater elasticity, and the fact that many dedicated clusters are often running at 50% utilization". As for limitations, this works best if the workers do not need to coordinate frequently and use most of the 5 minutes (i.e. 300s) of lambda function execution time for computing over the data input to its 1.5GB RAM to produce the output data. So the paper cautions that for applications like particle simulations, which require a lot of coordination between long running processes, the PyWren model of using stateless functions with remote storage might not be a good fit. It looks like beyond the map functionality, ease of programming is still not that great. But this is a step in the right direction. ## Monday, August 7, 2017 ### ICCCN'17 trip notes, days 2 and 3 ## Keynote 2 (Day 2) Bruce Maggs gave the keynote on Day 2 of ICCCN. He is a professor at Duke university and Vice President of Research at Akamai Technologies. He talked about cyber attacks they have seen at Akamai, content delivery network (CDN) and cloud services provider. Akamai has 230K servers, 1300+ networks, 3300 physical locations, 750 cities, and 120 countries. It slipped out of him that Akamai is so big, it can bring down internet, if it went evil, but it would never go evil :-) Hmm, should we say "too big to go evil?". This, of course, came up as a question at the end of the talk: how prepared is the Internet for one of the biggest players, such as Google, Microsoft, Netflix, Yahoo, Akamai, going rouge? Bruce said, the Internet is not prepared at all. If one of these companies turned bad, they can melt internet. I followed up that question with rouge employee and insider threat question. He said that, the Akamai system is so big that it doesn't/can't work with manual instruction. They have a very big operational room, but that is mainly to impress investors. Because at that scale, the human monitoring/supervision does not work. They have autonomous systems in place, and so the risk of screw-up due to manual instruction is very low. (This still doesn't say much of the risk of a computerized screw up.) OK, back to his talk. Akamai has customers in eCommerce, media and entertainment, banks (16/20 of the global banks), and almost all major antivirus software vendors. He gave some daily statistics: 30+ TB/s traffic served, 600 million IPv4 addresses, 3 trillion http requests, and 260 terabytes compressed logs. Then he started talking about DDOS attacks, where the attackers want to do damage to the provider by its overwhelming resources. The attackers often recruit an army of compromised drone/bot machines, and they look for amplification of the requests sent by this drone army. Bruce showed a graph of largest DDOS attacks by year. The attacks were exponentially growing in size in GB/s. 2017 saw the largest attack by a factor of two, where it reached 600Gbps gigabit per second at some point during the attack. WOW! In 2016, 19 attacks exceeded 100 Gbps. The March 12, 2016, DNS reflection attack reached 200 GB/s. Th most popular attacks are the ones with the largest amplification, which is defined as the rate of request to response. DNS reflection attack has 28 to 54 amplification. The mechanism used for blocking this attack was built by "prolexic" IP anycast scrubbing centers. In this setup the origin server had dozens of scrubbing centers/servers that filter the requests first and allow only good ones to go the origin server. It looks like these CDN guys are faring wars with the attackers on the Internet on a daily basis. It turns out attackers generally perform pre-attack reconnaissance using short burst of attacks/probes, and the CDN companies also monitor/account for these tactics. Bruce gave some statistics about DDOS attack frequency. The surprising thing is the gaming industry is the target of majority of attacks at 55%. It is followed by Software technology at 25%, Media at 5%, and Finance at 4%. Why target the gaming houses? A DDOS slows the online game, and upsets the gamers. So the attackers do this to extort the gaming companies. Bruce also talked about the attack on krebsonsecurity.com, the blog for the security researcher Jay Krebs. Akamai hosted this page pro bono. But this site got a huge attack stemming from IOT bots. This was more than twice in volume of any attack they have seen. Akamai held up, but after a couple days of strong attacks, this started costing dear money to Akamai, who was doing this pro bono. After September 26, Google took over hosting the Krebs site pro bono. Bruce talked about many other attacks, including the SQL attack: select * from employees where lname= '' or '1'='1'. The lesson is you should sanitize your SQL input! Akamai scrubs bad looking SQL. Another attack type is bot-based account takeover. The attackers first attack and obtain password dumps. And then they exploit the fact that many people use same username and password across services. The attackers then take the big password file, break it into pieces, send it to compromised routers, and these routers try these combinations with bank accounts, and look for lucky matches. This is not a DDOS attack, in fact they try to do this inconspicuously at rates as slow as couple per hour. My takeaway from the presentation is whatever these CDNs are charging their customer companies is still a good deal. Because the way things are setup currently, it is hard for a small company like a bank, media site, etc. to withstand these attacks alone. On the other hand, I hope these CDN companies stay on the top of their game all the time. Because they have huge responsibility, they are too big/dangerous to fail. It is scary to think that it is Akamai who is serving the https, not the banks. In other words, Akamai has the private keys for the banks, and serve https on their behalf. ## Panel 2 (Day 2) Panel 2 was on "Cloud Scale Big Data Analytics". The panelists were: Pei Zhang(CMU); Vanish Talwar(Nutanix); Indranil Gupta (UIUC); Christopher Stewart (Ohio State University); Jeff Kephart (IBM). Indy Gupta talked about intent-based distributed systems harking back to the "intent-based networking" term coined by Cisco. He cautioned that we are not catering to our real users. We invented the internet, but missed the web. We developed p2p, but missed its applications. He cautioned that we are dangerously close to missing the boat for big data analytics. The typical users of big data analytics are not CS graduates, but rather physics, biology, etc. domain experts. And they don't understand "scheduling", "containers/VMs", "network and traffic virtualization". And neither should they be forced to learn/understand this in an ideal world. They know what performance they need, such as latency, throughput, and deadlines, and we should design our big data systems to be able to serve them based on these end goals/metrics. Jeff Kephard from IBM TJ Watson talked about embodied cognition and symbiotic cognitive computing, but in a twist of fate had to attend the panel as a disembodied Skype participant. Yunqiao Zhang from Facebook talked about disaggregated storage and mapreduce at Facebook. The idea here is to separate the compute and storage resources so that they can evolve/sprawl/and get utilized independently. The two disaggregated systems, i.e., the compute and storage systems, are tethered together by very fast Ethernet. The network speed and capacity today is so good, it is possible and economical to do this without worrying about traffic. This was very interesting indeed. I found a talk on this which I will listen to learn more about the disaggregated MapReduce at Facebook. Pei Zhang from CMU at Silicon Valley talked about collecting IoT data from the physical world. Chris Stewart from The Ohio State University talked about the need for becoming transparent for big data systems from data collection, management, algorithm design, to the data translation/visualization layers. The question and answer session included a question on the gap between the data mining and cloud systems communities. The panel members said that more collaboration is needed, while it is inevitable and even useful to look at the same problems from different perspectives. Couple panel members remarked that today the best place these communities collaborate is inside companies like Facebook and Google. ## Keynote 3 (Day 3) Henning Schulzrinne talked about "Telecom policy: competition, spectrum, access and technology transitions". He has been working at the government at the last 7 years on and off and so was able to give a different perspective than the academic. He talked about opportunities for research that go beyond classical conference topics. He listed the key challenges as: + competition & investment poorly understood + spectrum is no longer just bookkeeping + rural broadband is about finding the right levers + emergency services still stuck in pre-internet He talked at length about network economics. What we as CS guys have been optimizing turned out to be a very small sliver of the network economics: equipment 4%, construction 11%, operations 85%. We the CS researchers have been optimizing only equipment and have been ignoring economics! We should focus more on facilitating operations. Operations is not efficient, if we can figure out how to make networks more easily operable, and require less human resources, we will have larger impact than tweaking protocols. He talked also about rural broadband, and mentioned that the drones/balloons are inapplicable as their capacity is not enough. The cost of deployment in rural is high, and the incentive for companies to deploy is low. But, pretty much everyone has wired telephone service, how did that happen? There was an unspoken bargain: the government said to ATT we'll give you monopoly, and you'll give us universal service. This was never stated but understood. He said to solve the rural broadband problem, policy levers need to pulled. + decrease cost of serving: dig once: bury cable during street repair & construction + provide funding: universal service fund (US$8 billion from tax money).

He talked about recycling TV broadband spectrums and how this is a very active issue now. He also talked about serving the disabled via the subtitles requirements, text-to-911, voip emergency, and wireless 911 services.

To conclude he asked us to think about the network problem holistically, including economics and policy in the equation. Many of the problems are incentive problems. And there is a need to think in decades not conference cycles! The network performance is rarely the key problem; academics work on things that can be measured, even when they are not that important.

## Panel 3 (Day 3)

The Panel 3 was on "Federal Funding for Research in Networking and Beyond". The panelists were Vipin Chaudhary (US NSF); Richard Brown (US NSF); and Reginald Hobbs (US Army Research Lab).

Rick Brown talked about NSF CISE divisions:
+ CNS: computer network systems
+ CCF: computing and communication foundations
+ IIS: information & intelligent systems
+ OAC: Office of Advanced Cyberinfrastructure

He mentioned that NSF was established by congress in 1950 with the yearly  budget of $3.5 billion, with the post ww2 understanding of importance of science to the country. NSF promotes bottom up basic research culture in contrast to NIH NASA DARPA which tells you what to work on and build. The total NSF 2017 budget 7.8 billion. NSF gets around 50K proposals, funds 10K of them. 95% of budget goes to the grants, only 5% goes to operational costs. Reginal Hobbs talked about the funding & research collaboration opportunities at the Army Research Laboratory. Vipin Chaudhary talked first about NSF broadly and then specifically about the office of advanced cyberinfrastructure at NSF. He said that the CISE budget is approximately 840M, and in computer science 83% of academic research in CS is covered by NSF. (I didn't expect this ratio to be this high.) He described the NSF I-Corps program at the end of his talk, which was very interesting for its support for entrepreneur activities. This program helps you to figure out if you are facing valley of death or black hole in your research commercialization process. Most academic spinouts fail because they develop something no one cares about. I-Corps provides support for you to meet with customers and test your hypothesis about what your product should be based on their feedback. ## Wednesday, August 2, 2017 ### ICCCN'17 trip notes I flew with United airlines, with a connection at Chicago at noon to arrive Vancouver at 3:00pm PST. The second flight took 4.5 hours and was not easy. On the plus side, the flights were on time. I don't want to jinx it but I have not had any flight trouble the last 2-3 years. Chicago O'Hare airport still scares me. For at least 4-5 instances, I spent extra nights at O'Hare airport due to canceled or missed flights. The entry to Vancouver was nice. We used screens to check ourselves in at the border. After answering some questions, and taking a selfie, the screen gave me a printout. The Canadian border agents, only took 5 seconds checking my passport and printout before allowing me in the country. Finally a border check that doesn't suck. It was also pleasant to travel from the airport to the conference hotel, at the Vancouver Waterfront. I bought a ticket for$9, and jumped on the Canada Line train at the airport. The two-car train did not have any operator, and operated smoothly loading/unloading passengers through 12 stations, about 12 miles, in less than 30 minutes. I guess those trains support some form of remote control if something goes unexpected on the rails.

I stayed at the ICCCN conference hotel, to keep things convenient. I missed the sessions on the first day, but made it to the reception at 6pm. At the reception, I met colleagues/friends and we had a nice chat catching up on things, exchanging notes/advice about faculty stuff.

After the reception, I hit the Waterfront for a relaxing stroll. This was around 8pm. I walked along the sea path till it got dark. Vancouver is a laid back place. The fresh smell and the cool breeze from the ocean makes the city a relaxing place. And then there are lush green mountains around the city. I could feel my stress and worries melting away. (But of course, Vancouver is not as laid back as Hawaii, where ICCCN was held the previous year. Yes, last year, I went to Kona Hawaii, and had a great time there, and never even mentioned that on Twitter or in my blog once. I must be a true gentleman!)

I went to bed around 10pm PST, which is 1am EST. I was extremely tired but of course due to jet lag I woke up at 2am and then was definitely up at 5:30am. So I went to the gym at 4th floor. I was hoping there would be a sauna and a pool. Bingo! Sauna, steam room, jacuzzi, and pool. I spent 10-15 minutes at each one. I was there from 6-7am. Only two other people showed up during that time. This hotel has 30 floors, and about 20 rooms at each floor and only 3 people were using the gym in the morning. This is a very low ratio, but I am not surprised. If something is worth doing, a vast majority of people will pass up.

## Day 2

I attended the keynote on the morning of the second day. It was very well delivered by Bruce Maggs, Vice President of Research for Akamai Technologies. I will provide my notes from that in my next post.

After the keynote, I hit the Waterfront again to take on the Stanley park.  The Waterfront seaside path continues around the Stanley park for 10 miles. So to travel around the Stanley park, I rented a bike at the Waterfront at the UrbanWaves shop. The ride was totally worth it for the beautiful scenery.

It seems like Vancouver has some strong runners. A couple of the runners completed the ~10 miles circuit faster than I could bike. Of course, I biked leisurely and stopped many times to enjoy the view and take pictures, but still the stamina and speed of these runners were impressive. They were slim and toned, and ran in perfect form, hitting the ground with front/mid foot first with nice strides.

I attended the panel after lunch. I will talk about the panel in the next post as well. After the panel, I was feeling very tired. Coffee didn't help, and I had to head up to my room to nap. Before the Banquet dinner at 7pm, I went to Waterfront again to write this post.

OK, I shouldn't be having this much fun. I should get back to writing those NSF project reports.

## Monday, July 31, 2017

### A Comparison of Distributed Machine Learning Platforms

This paper surveys the design approaches used in distributed machine learning (ML) platforms and proposes future research directions. This is joint work with my students Kuo Zhang and Salem Alqahtani. We wrote this paper in Fall 2016, and I will be going to ICCCN'17 (Vancouver) to present this paper.

ML, and in particular Deep Learning (DL), has achieved transformative success in speech recognition, image recognition, and natural language processing, and recommendation/search engines recently. These technologies have very promising applications in self-driving cars, digital health systems, CRM, advertising, internet of things, etc. Of course, the money leads/drives the technological progress at an accelerated rate, and we have seen many ML platforms built recently.

Due to the huge dataset and model sizes involved in training, the ML platforms are often distributed ML platforms and employ 10s and 100s of workers in parallel to train the models. It is estimated that an overwhelming majority of the tasks in datacenters will be machine learning tasks in the near future.

My background is in distributed systems, so we decided to study these ML platforms from a distributed systems perspective and analyze the communication and control bottlenecks for these platforms. We also looked at fault-tolerance and ease-of-programming in these platforms.

We categorize the distributed ML platforms under 3 basic design approaches:
1. basic dataflow, 2. parameter-server model, and 3. advanced dataflow.

We talk about each approach in brief, using Apache Spark as an example of the basic dataflow approach, PMLS (Petuum) as an example of the parameter-server model, and TensorFlow and MXNet as examples of the advanced dataflow model. We provide a couple evaluation results comparing their performance. See the paper for more evaluation results. Unfortunately, we were unable to evaluate at scale as a small team from academia.

At the end of this post, I present concluding remarks and recommendation for future work for distributed ML platforms. Skip to the end, if you already have some experience with these distributed ML platforms.

## Spark

In Spark, a computation is modeled as a directed acyclic graph (DAG), where each vertex denotes a Resilient Distributed Dataset(RDD) and each edge denotes an operation on RDD. RDDs are collection of objects divided in logical partitions that are stored and processed as in-memory, with shuffle/overflow to disk.

On a DAG, an edge E from vertex A to vertex B implies that RDD B is a result of performing operation E on RDD A. There are two kinds of operations: transformations and actions. A transformation (e.g., map, filter, join) performs an operation on a RDD and produces a new RDD.

The Spark user models the computation as a DAG which transforms & runs actions on RDDs. The DAG is compiled into stages. Each stage is executed as a series of tasks that run in parallel (one task for each partition). Narrow dependencies are good for efficient execution, whereas wide dependencies introduce bottlenecks since they disrupt pipelining and require communication intensive shuffle operations.

Distributed execution in Spark is performed by partitioning this DAG stages on machines. The figure shows the master-worker architecture clearly. The driver
contains two scheduler components, the DAG scheduler and the task scheduler, and tasks and coordinates the workers.

Spark was designed for general data processing, and not specifically for machine learning. However, using the MLlib for Spark, it is possible to do ML on Spark. In the basic setup, Spark stores the model parameters in the driver node, and the workers communicate with the driver to update the parameters after each iteration. For large scale deployments, the model parameters may not fit into the driver and would be maintained as an RDD. This introduces a lot of overhead because a new RDD will need to be created in each iteration to hold the updated model parameters. Updating the model involves shuffling data across machines/disks, this limits the scalability of Spark. This is where the basic dataflow model (the DAG) in Spark falls short. Spark does not support iterations needed in ML well.

## PMLS

PMLS was designed specifically for ML with a clean slate. It introduced the parameter-server (PS) abstraction for serving the iteration-intensive ML training process.

The PS (shown in the green boxes in the figure) is maintained as distributed in-memory key-value store. It is replicated & sharded: Each node serves as primary for a shard of the model (parameter space), and secondary/replica for other shards. Thus the PS scales well with respect to the number of nodes.

The PS nodes store & update model parameters, and respond to the requests from workers. The workers request up-to-date model parameters from their local PS copy and carry out computation over the partition of dataset assigned to them.

PMLS also adopts the Stale Synchronous Parallelism (SSP) model, which relaxes the Bulk Synchronous Parellelism (BSP) model where workers synchronize at the end of each iteration. SSP cuts some slack to the workers for synchronization, ensures the fastest worker cannot be *s* iteration ahead of the slowest worker. The relaxed consistency model is still OK for ML training due to noise tolerance of the process. I had covered this in an April 2016 blog post.

## TensorFlow

Google had a parameter-server model based distributed ML platform, called DistBelief. (Here is my review of the DistBelief paper.) From what I can tell, the major complaint about DistBelief was that it required messing with low-level code for writing ML applications. Google wanted any of its employees to be able to write ML code without requiring them to be well-versed in distributed execution ---this is the same reason why Google wrote the MapReduce framework for big data processing.

So TensorFlow is designed to enable that goal. TensorFlow adopts the dataflow paradigm, but the advanced version where the computation graph does not need to be a DAG but can include cycles and support mutable state. I think Naiad design might have some influence on TensorFlow design.

TensorFlow denotes computation with a directed graph of nodes and edges. The nodes represent computations, with mutable state. And the edges represent multidimensional data arrays (tensors) communicated between nodes. TensorFlow requires the user to statically declare this symbolic computation graph, and uses rewrite & partitioning of the graph to machines for distributed execution. (MXNet, and particularly DyNet, uses dynamic declaration of the graph, which improves on ease & flexibility of programming.)

The distributed ML training in TensorFlow  uses parameter-server approach as the figure shows. When you use the PS abstraction in TensorFlow, you use a parameter-server and data parallelism. TensorFlow says you can do more complicated stuff, but that requires writing custom code and marching into uncharted territory.

## Some evaluation results

For our evaluations we used Amazon EC2 m4.xlarge instances. Each contains 4 vCPU powered by Intel Xeon E5-2676 v3 processor and 16GiB RAM. EBS Bandwidth is 750Mbps. We used two common machine learning tasks for evaluation: 2-class logistic regression and image classification using multi-layered neural networks. I am only providing couple graphs here, check our paper for more experiments. Our experiments had several limitations: we used small number of machines, and couldn't test to scale. We also limited to CPU computing, and didn't test with GPUs.

This figure shows the speed of platforms for logistic regression. Spark performs good here behind PMLS and MXNet.

This figure shows the speed of platforms for DNNs. Spark sees greater performance loss going to two layers NN compared to single layer logistic regression. This is due to more iterative computation needed. We kept the parameters at the driver in Spark because they could fit, things would have been much worse if we kept the parameters in an RDD and updated after every iteration.

This figure shows the CPU utilization of the platforms. Spark application seems to have significantly high CPU utilization, which comes mainly as serialization overhead. This problem has been pointed out before by earlier work.

## Concluding remarks and future directions

ML/DL applications are embarrassingly parallel, and not very interesting from concurrent algorithms perspective. It is safe to say the parameter-server approach won for training in distributed ML platforms.

As far as bottlenecks is concerned, network still remains as a bottleneck for distributed ML applications. Instead of work on more advanced general purpose dataflow platforms, it is more useful to provide better data/model staging; treat data/model as first class citizen.

However, there can be some surprises and subtleties. In Spark, the CPU overhead was becoming the bottleneck before the network limitations. The programming language used in Spark, i.e., Scala/JVMs, affected its performance significantly. Therefore there is especially a need for better tools for monitoring and/or performance-prediction of distributed ML platforms. Some tools addressing the problem for Spark data processing applications have been proposed recently, such as Ernest and CherryPick.

There are many open questions for distributed systems support for ML runtime, such as resource scheduling and runtime performance improvement. With runtime monitoring/profiling of the application, the next generation distributed ML platforms should provide informed runtime elastic provisioning/scheduling of the computation, memory, network resources for the tasks running atop.

Finally there are open questions for programming & software engineering support. What are suitable [distributed] programming abstractions for ML applications? Also more research needed for verification and validation (testing DNNs with particularly problematic input) of distributed ML applications.

## Thursday, July 27, 2017

### Paper summary: Zorua: A holistic approach to resource virtualization in GPU

This paper recently appeared in MICRO'16 and addresses the problem of ease of managing GPU as a computational resource.

GPU computing today struggles with the following problems:
• Programming ease: The programmer needs to statically allocate GPU resources (registers, scratchpad, threads) to threads and this is hard and non-optimal as tuning is hard.
• Portability: An optimized specification on one GPU may be suboptimal (losing upto 70% performance) on another GPU.
• Performance: Since the programmer allocates resources statically and fixed manner, the performance suffer and dynamic underutilization occur when the program resource utilization vary through execution.

To address the above problems, Zorua (named after the shapeshifting illusionist Pokemon) virtualizes the GPU resources (threads, registers, and scratchpad). Zorua gives the illusion of more GPU resources than physically available, and dynamically manages these resources behind the scenes to co-host multiple applications on the GPU, alleviating the dynamic underutilization problem alluded above.

To create this illusion, Zorua employs a hardware-software codesign that consists of 3 components: (1) the compiler annotates the program to specify the resource needs of each phase of the application; (2) a runtime system, referred to as the coordinator, uses the compiler annotations to dynamically manage the virtualization of the different on-chip resources; and (3) the hardware employs mapping tables to locate a virtual resource in the physically available resources or in the swap space in main memory.

Of course, this illusion will fail when you try to cram more than feasible to the GPU. But the nice thing about Zorua is it fails gracefully: The coordinator component of Zorua schedules threads only when the expected gain in thread-level parallelism outweighs the cost of transferring oversubscribed resources from the swap space in memory.

One thing that I wondered was why Zorua needed the compiler annotation, and why the runtime alone was not sufficient. I think the compiler annotation helps buy us the graceful degradation property. GPU computing is not very nimble; it has a lot of staging overhead. The annotations give a heads up to the coordinator for planning the scheduling in an overhead-aware manner.

The paper does not mention any application of Zorua for machine learning applications. I think Zorua would be useful for making DNN serving/inference applications colocate on the same GPU and preventing the server-sprawl problem, as it alleviates the dynamic underutilization problem.

I wonder if Zorua can provide benefits for machine learning training. Since machine learning training is done in batch, it would utilize GPU pretty consistently, stalling briefly in between rounds/iterations. However, by running two iterations in off-step manner as in Stale-Synchronous Parallelism, it may be possible to get benefit from Zorua GPU virtualization.

A related work for using GPUs efficiently for Deep Learning is the Poseidon work. Poseidon optimizes the pipelining of GPU computing at a very fine granularity, at the sub DNN layer, to eliminate the stalling/idle-waiting of the GPU.

## Wednesday, July 5, 2017

### Paper Summary: Two decades of recommender systems at Amazon.com

This is a short article that appeared as a retrospective piece for the 2003 "Item-to-Item Collaborative filtering" paper as it was awarded a test-of-time award. This article is by Brent Smith and Greg Linden.

I am not a machine-learning/data-mining guy, so initially I was worried I wouldn't understand or enjoy the article. But this was a very fun article to read, so I am writing a summary.

The item-based collaborative filtering is an elegant algorithm that changed the landscape of collaborative filtering which was user-based till then. User-based means "first search across other users to find people with similar interests (such as similar purchase patterns), then look at what items those similar users found that you haven't found yet". Item-based is based on the idea that "people who buy one item are unusually likely to buy the other." So, for every item i1, we want every item i2 that was purchased with unusually high frequency by people who bought i1.

The beauty of the approach is most of the computation is done offline. Once the related items table is built, we can generate recommendations quickly as a series of lookups. Moreover since the number of items sold is less than the users, this scales to better user numbers.

This was implemented for Amazon.com for recommending related products (mostly books at that time). Since 2003, item-based collaborative filtering has been adopted by YouTube and Netflix, among others.

## Defining related items

This section was tricky and fun. Statistics is not a very intuitive area. At least for me. While reading this section I saw proposals to fix things, and thought they would work, and I was wrong. Twice.

To define related, we should define what it means for Y to be unusually-likely to be bought by X buyers. And for figuring this out, we should first figure out the reverse, what is the expected ratio that X buyers would buy Y if the two items were unrelated.

The straightforward way to estimate the number of customers, Nxy, who have bought both X and Y would be to assume X buyers had the same probability, P(Y) = |Y_buyers|/|all_buyers|, of buying Y as the general population and use |X_buyers| * P(Y) as the estimate, Exy, of the expected number of customers who bought both X and Y. In fact, the original 2003 algorithm had used this ratio.

But this ratio is misleading, because  for almost any two items X and Y, customers who bought X will be much more likely to buy Y than the general population. "Heavy buyers" are to blame for this situation. We have a biased sample. For any item X, customers who bought X (this set has many heavy buyers in it by definition) will be likely to have bought Y more than the general population.

Figure 1 shows how to account for this effect.

Now, knowing Exy, we can use it to evaluate whether Nxy, the observed number of customers who bought both X and Y, is higher or lower than randomly would be expected. For example, Nxy-Exy gives an estimate of the number of non-random cooccurrences, and [Nxy-Exy]/Exy gives the percent difference from the expected random co-occurrence.

In another surprise, neither of those work quite well. The first will be biased towards popular Ys, and the second makes it to easy for low-selling items to have high scores. The chi-square score, $[Nxy−Exy]/\sqrt{Exy}$ strikes the balance.

## Extensions

The article talks about tons of extensions possible. Using the feedback data about user clicks on recommendations, it is possible to further tune the recommender. One should also take into account time of purchases, causality of purchases, compatibility of purchases. One should also account for aging the history and aging the recommendation as the user ages.

Worth noting was the observation that some items have more weight. They found that a single book purchase can say a lot about a customer's interests than an arbitrary product, letting them recommend dozens of highly relevant items.

For the future, the article envisions intelligent interactive services where shopping is as easy as a conversation, and the recommender system knows you as well as your spouse or a close friend.

## Thursday, June 15, 2017

### Paper Summary: DeepXplore, Automated Whitebox Testing of Deep Learning Systems

This paper was put on arxiv on May 2017, and is authored by Kexin Pei, Yinzhi Cao, Junfeng Yang, Suman Jana at Columbia and Lehigh Universities.

The paper proposes a framework to automatically generate inputs that trigger/cover different parts of a Deep Neural Network (DNN) for inference and identify incorrect behaviors.

It is easy to see the motivation for high-coverage testing of DNNs. We use DNN inference for safety-critical tasks such as self-driving cars; A DNN gives us results, but we don't know how it works, and how much it works. DNN inference is opaque and we don't have any guarantee that it will not mess up spectacularly in a slightly different input then the ones it succeeded. There are too many corner cases to consider for input based testing, and rote testing will not be able to cover all bases.

DeepXplore goes about DNN inference testing in an intelligent manner. It shows that finding inputs triggering differential behaviors while achieving high neuron coverage for DL algorithms can be represented as a joint optimization problem and solved efficiently using gradient-based optimization techniques. (Gradients of DNNs with respect to inputs can be calculated accurately and can be used to solve this joint optimization problem efficiently.)

DeepXplore also leverages multiple DL systems with similar functionality as cross-referencing oracles and thus avoid manual checking for erroneous behaviors. For example, use Uber, Google, Waymo for the driving video, and compare outputs. Majority voting determines the correct behavior. DeepXplore counts on a majority of the independently trained DNNs not to be susceptible to the bug. This is similar to N-version programming for building resilience against software bugs.

Here is DeepXplore workflow for generating test images. DeepXplore takes unlabeled test inputs as seeds and generates new test inputs that cover a large number of different neurons (i.e., activates them to a value above a customizable threshold) while causing the tested DNNs to behave differently.

Figure 6 shows how "gradient ascent" can be employed in this joint optimization problem. This is a walk up-hill towards less certain scoring, so it is a gradient-ascent, rather than a gradient-descent. Starting from a seed input, DeepXplore performs the guided search by the gradient in the input space of two similar DNNs supposed to handle the same task such that it finally uncovers the test inputs that lie between the decision boundary of these two DNNs. Such test inputs will be classified differently by the two DNNs.

The team implemented DeepXplore using Tensorflow 1.0.1 and Keras 2.0.3 DL frameworks. They used Tensorflow's implementation of gradient computations in the joint optimization process. Tensorflow also supports creating subDNNs by marking any arbitrary neuron's output as the subDNN's output while keeping the input same as the original DNN's input. They used this feature to intercept and record the output of neurons in the intermediate layers of a DNN and compute the corresponding gradients with respect to the DNN’s input. All the experiments were run on a Linux laptop with 16GB RAM. I guess since this is inference rather than training, a laptop sufficed for the experiments.

A criticism to the paper could be this. Yes, DeepXplore catches a bad classification on an image, that is good and useful. But probably the self-driving application already has built-in tolerance to occasional misclassifications. For example, the temporal continuity can help; previous images and next images correctly classify the road, so an interim misclassification would not be very bad. Moreover, application-specific invariants can also act as safety net, e.g., do not steer very sharp, and use a Kalman filter. It would be interesting to do evaluations also in an end-to-end application setting.

UPDATE (6/17/2018): I have received clarification from Yinzhi Cao, one of the authors, about these points. Here are his comments:

First, our light effect (or other changes) can be added constantly over the time domain, and thus DeepXplore should be able to fool the decision engine all the time.  That is, the previous images and next images will also lead to incorrect decisions.

Second, DeepXplore can ask the decision engine to gradually switch the steering so that a Kalman filter may not help.  For example, the switching from left to right or vice versa is not that sudden so that a Kalman filter cannot rule out the decision.

## Wednesday, June 14, 2017

### Scalability, but at what COST

This paper is by Frank McSherry, Michael Isard, Derek G. Murray and appeared in HotOS 2015. The authors are all listed as unaffiliated because this is around the time where Microsoft Research Silicon Valley lab was closed, where they used to work. Michael and Derek are at Google working on TensorFlow framework, but Frank McSherry is still at large and unaffiliated. Frank has a great blog, where you will learn more than you ever wanted to know about dataflow, Rust, differential privacy, and the art of influencing people and making friends.

COST, defined per system for a problem, is the configuration required before the system outperforms a competent single-threaded implementation. They show that many big data systems have surprisingly large COST, often hundreds of cores.

Let's repeat this again: some single threaded implementations were found to be more than an order of magnitude faster than published results (at SOSP/OSDI!) for systems using hundreds of cores.

The paper's goal is to shed light on this issue so that "future research is directed toward distributed systems whose scalability comes from advances in system design rather than poor baselines and low expectations." (That has gotta be one of the snarkiest lines in a computer science paper. Well, discounting those from Dijkstra, that is.)

What does better baselines mean? It means using better graph layout and better algorithms for performance. The paper gives as an example the label propagation algorithm. The paper argues that label propagation is used for graph connectivity not because it is a good algorithm, but because it fits within the "think like a vertex" computational model, whose implementations scale well. The paper claims the appealing scaling properties are largely due to the algorithm's sub-optimality, as label propagation does more work than better algorithms.

Yes, and on the other hand, I can also see the appeal in the Giraph "think like a vertex" approach (or for that matter the MapReduce and Spark approaches). Giraph optimized for simplicity and ease-of-development. If you make it simple and easy to use, people will be happy to use it, adapt it, and throw it cluster resources when needed. One may argue this is a good tradeoff. Instead of letting people think harder and make programming harder for them, make it easy but wasteful on computing resources. After all, humans are much more expensive than computers, and scalability in terms of human cost is also a factor for practical/industrial systems. A similar argument for BlockChains has been made here, arguing social scalability is more important than computational-efficiency or even computational-scalability.

Of course this can be a false dichotomy, there are (and will be) systems/frameworks that provide both scalability in terms of human cost (by being easy-to-develop-with) and also computationally efficient. And we should strive to design such systems.

The analysis and evaluation was given/studied in the context of graph algorithms: pagerank and connected components. For embarrassingly parallel algorithms, such as SGD, this analysis and the results would not apply.

Here are Hacker News discussions on this paper.
https://news.ycombinator.com/item?id=11855594
https://news.ycombinator.com/item?id=8901983

## Tuesday, June 13, 2017

### Paper Summary: Neurosurgeon, collaborative intelligence between the cloud and mobile edge

This paper is by Yiping Kang, Johann Hauswald, Cao Gao, Austin Rovinski, Trevor Mudge, Jason Mars, and Lingjia Tang from University of Michigan, and appeared at ASPLOS 17.

In Deep Learning (DL), you have a long, computation-intensive training phase where you micro-fiddle/fudge the model parameters until you get desired accuracy. Then you deploy this optimized model parameters (i.e., the Deep Neural Network [DNN])for inference with real-world inputs. The paper is about this inference/serving layer of DL.

In the serving layer, the input goes through the DL with the tuned model parameters activating some subset of neurons at each layer and finally activating the correct neuron[s] at the output layer. This can still be a computation intensive process as the model has millions of parameters, and you apply matrix multiplication layer after layer. So this serving layer still has many juicy problems to work on.

A very relevant problem is that executing inference at the mobile can be slow because of the computational and energy limitations of the mobile. Executing at the cloud backend server is fast, but how do you get the input there? Uploading the input to the cloud can be slow, especially if the input is a large image and the connection is slow. So there is a tradeoff.

In Section 4, the paper shows how beneficial it can be to perform a proper DL inference partitioning. For image processing/computer vision (CV), e.g., AlexNet, partitioning at a middle layer is the most optimal for both latency and energy optimization. Since the input image is large (512Mb is used), uploading it to the cloud is both time and energy consuming. However, if you execute the convolutional layers followed by the pooling at the mobile, you reduce the size of the intermediate output data and it is time and energy efficient to upload this to the cloud. The rest of the computation, carried on the cloud, consists of processing fully connected layers, that are computation intensive. If we were to execute them also on the mobile, we would be waiting for the mobile CPU/GPU to finish execution, where as uploading the intermediate output to the cloud and executing the rest of the layers at the cloud finishes earlier.

The paper also finds that, for Automatic Speech Recognition (ASR) and Natural Language Processing (NLP) applications, usually the best approach is to execute everything at the mobile.

## Enter Neurosurgeon

Are we done here then? Why do we need a neurosurgeon tool, if a static lookup can do the trick? At this point, the paper makes another argument. You can't just use this one time static observation per application class (CV, ASR, NLP) and be done with it. The best partition point for a DNN architecture depends on the DNN's topology, which manifests itself in the computation and data size variations of each layer. Moreover, the connectivity conditions are changing, so you need to monitor and adjust your decision with the current network quality.

(The paper also argues that changing cloud backend conditions are a factor, but I am not convinced with the datacenter can get busy/overloaded argument. The evaluation experiments for that part is done synthetically.)

The proposed system to address this problem, Neurosurgeon, consists of a deployment phase and a runtime system that manages the partitioned execution of an inference application. Figure 10 shows the design of Neurosurgeon.

As part of the deployment stage, Neurosurgeon runs once per mobile and server platform for producing performance prediction models. This is application and NN independent. It tries different NN layer types for these mobile and server platforms and estimates regression line wrt changing configuration parameters.

The runtime stage is where the Neurosurgeon uses the layer performance prediction models produced in the deployment stage to dynamically choose the best DNN partition models. Neurosurgeon analyzes the target DNN’s constituent layers, and uses the prediction models to estimate, for each layer, the latency on mobile and cloud, and power consumption on the mobile. As Algorithm 1 shows, this is a simple evaluation of the conditions to choose the partition point.

Figures 11 and 12 show results for latency and energy-efficiency improvements achieved using Neurosurgeon.