Here at OpsClarity, we leverage Apache Storm extensively for business critical workloads. OpsClarity collects and processes massive numbers of metrics in real-time, and real-time streaming is a natural fit for our requirements. Streaming compute technologies can handle rapid fire events as they come in, and provide continuous updates. For OpsClarity, it implies that our infrastructure can continuously collect data and process it in real time to provide updated anomaly detection models, compute service and system health, dynamically aggregate services into clusters, evaluate and update service dependencies and connections and so on. Streaming never stops. It just runs forever, humming along like an update engine.
Stream computing is an area of massive innovation, with many relatively young open source projects trying to solve different problems that arise in the process. Storm is a sort of a “grandfather”, but even so we aren't yet at version 1.0, which is coming soon. It is natural that these streaming services, Storm included, don't yet have a mature ecosystem of tooling around them, and it can be extremely difficult to identity the metrics that need to be collected, collect them and then use the metrics in a meaningful way to be able to effectively monitor and troubleshoot.
Challenges of Monitoring Storm
Most teams I've seen rely on custom metrics to monitor each job, and spend an inordinate amount of in the Storm UI to assess how things are going. It is quite hard to do better than this, in part because the metrics one could monitor in a Storm cluster are so dynamic. Every counter is part of a job, called a topology in Storm lingo, which then relates to other other components such as Spouts, Bolts, streams and executors.
- Storm Topology Tasks
- Storm Architecture
Just monitoring the number of messages handled by Storm requires one to work across a multitude of dimensions. It is rare for ops and development team to monitor time series of messages processed, latency, errors, and so forth, for all Storm jobs. Usually, they resort to just looking at the Storm UI to get the current counts. However, Storm UI doesn’t provide good overview of how metrics performed over time.
Storm Metrics API
Part of the problem is that most Storm users don't seem to be aware that Storm has a metrics API. For a long time, Storm has had a Web UI, but since version 0.91 that same Web UI service has had another endpoint for JSON data. The same counters seen in the Web UI are exposed in a JSON format. Part of the confusion seems to come from missing documentation. One could say this is a secret API although it is left out in broad daylight. There is documentation in a Github wiki on the pull request for the metrics API, but not apparently on the official Apache documentation site. It’s tempting to call this blog "One Secret API Storm Users Won't Want To Miss" and wait for a BuzzFeed offer letter, but it is probably an innocent oversight and a common problem on projects with lots of innovation.
- Github wiki for Storm metrics API
Key Storm Metrics
So, now that we can get metrics, what are the most important that require attention? There are general purpose Storm metrics, counting up mechanical performance of Storm workers. If you imagine each worker sending its internal work counts up to ZooKeeper, and the Web UI pulling them from ZooKeeper and rolling them up, you have a decent picture of what's going on under the hood. This however, doesn't tell you anything about “correctness” of the work. It just indicates that something happened, and how long it took and if it had to be retried.
The key metrics for any stream processing data framework are message throughput, errors, and latency. The Storm metrics for those are "transferred", "failed", and "completeLatency" or "processLatency" depending on the component.
Throughput can be measured in a few different ways, but if you want physical messages going out of a stream, the Storm metric you want is called "transferred." If you don't care that the same message was fanned out to multiple destinations, you can use the logical message counter called "emitted." And if you want the logical count of messages that got a Storm acknowledgment, the count you want is "acked." In simple cases, these are all the same, but "transferred" is closer to a pure throughput measure in the general case.
Errors in Storm are metric counters called "failed" and mechanically represent when that message didn't complete within its timeout. Usually this means the message will be replayed, but either way this is a key metric to watch as it probably represents a problem to solve in the Storm job or the cluster's capacity. For example, a Bolt that does a lookup in another service might encounter network issues and timeout, causing the message to fail.
Latency metrics in Storm turn out to be slightly tricky and blunt, but in practice they can show a lot about the workload over time. Each stream maintains a running ten minute window of average latency, however it does not help you track attributes like 99th percentile performance. You can see the average and get a count of "failed" messages that exceeded the timeout limit, but Storm doesn't provide data for the range in between those points. For Spouts, latency metrics are called "completeLatency." For Bolts, they are called "processLatency." Bolts have another timer called "executeLatency," which is just for the CPU workload and don't include the overhead of message passing.
The other trick here is the rollup. By default, both the Web UI and the JSON metrics endpoint show a lifetime value of all metrics. Since jobs can run for long periods, or conceptually forever, this lifetime average latency is usually not interesting. It would perhaps be interesting you could compare these metrics against the previous run of the same job, so you could see if an update to the job reduced performance, but the rollup resets to zero every time the job starts. So we're left to collect raw data and do the best we can. The smallest raw measure is that 10-minute moving average, which we can get by setting the query parameter "window=600."
That default lifetime metric view is great for all the other Storm metrics, because you can plot the deltas and get an accurate view of the underlying data even if data collection is inconsistent or at different intervals. It just doesn't work well for latency because the metric is provided as an average instead of a sum. If Storm provided an ever-increasing totalLatency metric we'd be set with just the lifetime view.
Now that we know more about what we want to track, what about those dimensions? How do we add things together to get what we want monitor? And as we organize a view into the data, let's think about what questions we are trying to answer. Storm provides two high level views into the metrics: logical and physical. The logical view looks at how the logic of the work is broken up: this job has these Spouts and Bolts, and in turn these Spouts and Bolts have streams. This is the logical view, because, for example, the same type of Bolt might be running on lots of machines to parallelize the work done by the Bolt. This view is good for answering questions about the performance of an individual job, ignoring the way the work is split up across Storm workers.
The machine or physical view, on the other hand, looks at things like supervisors and executors. An executor on a worker is doing work for a set of components, so they have a Spout or Bolt dimension. This view might tell you if a job has a hot spot in the cluster, sending work to a slow machine, or if a worker in the cluster is doing an unbalanced amount of work. In the end these two views all add up to the same thing, but you have to be careful how you roll them up and not double count by mixing them together.
Also, Bolts could have input and output streams, so one message could be counted more than once through a Bolt. It is probably not intuitive or easy to interpret if you add both input and output stream throughput counters together, so we should probably pick an incoming rollup or an outgoing rollup to keep things separated. Intuitively we probably want to know how many messages are going in, and how many messages are going out, rather than adding those together in a grand total of messages. A decent first level monitor for a Storm job is on Spout output, and last-Bolt output, so the ins and outs of the overall job are watched without necessarily paying close attention to what happened in the middle. Then, the middle can be looked at in a detail view if something is going wrong or if some tuning is needed.
Along with message counts going in and out of each topology, one could roll up latency and error counts to complete the high level picture. Storm provides a rollup for the total time it takes a message to get through a topology. Error counting could just be a grand total since you just need to see the error count spike to signal that something is going wrong, and use that second level view for breakdown on pointing to where the errors are happening.
Finally, I'd like to mention one last interesting Storm metric called "capacity." Capacity is an aggregation on the "executeLatency" metric we skipped over above, and represents the percentage of time a Bolt has spent in execution, over the past 10 minutes. As this number reaches 1.00, it is like CPU locally going to 100% and is probably a good clue that the Bolt is working overly hard and could use some help. The typical way to do this is to add workers and increase parallelism, but it could be something going wrong in the logic of the Bolt of course. Also, akin to CPU, this scales by the number of cores, which has to be tuned with the "supervisor.cpu.capacity" setting. This is just a divisor for the capacity metric and doesn't impact anything else. Storm doesn't automatically detect the number of cores for this calculation, so if you see "capacity" metrics going over 1 you likely need to adjust supervisor.cpu.capacity to get the correct basis for this metric.
Hopefully, this gives a good intro to Storm metrics and what should be monitored. OpsClarity offers a monitoring service where all of these metrics are collected automatically for you, and makes it super easy to monitor not only Storm but also the surrounding systems. Here's an example of applying a latency monitor on a Storm job.
- OpsClarity Statistical Monitors for Storm
OpsClarity provides deep metric collection for Apache Storm along with deep visibility into key metrics like latency and throughput. As Storm is an extremely complex and distributed system, configuring these metrics in a general purpose tool, though feasible, can entail putting in considerable elbow grease. We at OpsClarity, automatically detect your Storm cluster and its components, auto-configure metric collection and set monitors to generate alerts that matter.
Posted on behalf of David Brinegar, OpsClarity.com.