Creating a Storm Topology Latency

We have been tracking and reporting on an end-to-end latency for my main data-pipe, and that was something that I felt was the most useful metric to look at. But I was overruled. Management wanted to have a metric that looked at the internal latency of the storm topology and kafka cluster. Sadly, that's not really possible without really hacking away at Storm. But on this, they relented. I was allowed to grab a timestamp at the first possible moment the message is in the topology, and then base the latency on that.

The solution I came up with was to make a new storm bolt that would be directly connected to the spout from the kafka-reader that tagged once per tuple, and got this before the tuple was decoded, so that it included the decoding time:

  ;;
  ;; onboard - a simple Bolt to take the first string in the incoming
  ;;           tuple and assume it's JSON and parse it into a standard
  ;;           clojure data container, tag it with the current time in
  ;;           milliseconds, and emit it. The keys will be converted to
  ;;           symbols for consistency in most of the project. This will
  ;;           ONLY emit the object if it's not-nil. It will ALWAYS ack the
  ;;           tuple. The arguments to this bolt are the keyword of the
  ;;           tag that will be used to augment the message with the
  ;;           timestamp, and a predicate function that will return 'true'
  ;;           if the parsed message is to be processed.
  ;;
  (defbolt onboard ["object"] {:params [tag pfn?]}
    [tuple collector]
    (let [ts (System/currentTimeMillis)]
      (doseq [p (.getValues tuple)
              :let [obj (nil-if-empty (parse-string p))]
              :when (and obj (if pfn? (pfn? obj) true))]
        (emit-bolt! collector [(assoc obj tag ts)] :anchor tuple)))
    (ack! collector tuple))

the idea is that the predicate function can be included so that I can pre-reject those messages that I know are going to cause me grief. What are these messages?, you ask? They are JSON messages that have URLs as keys, and as long as they are strings, that's fine, but I'm decoding the JSON and converting the keys to clojure keywords, and in that case, these URLs are killing the serialization in Storm.

Thankfully, there's a very simple way to make a predicate function to remove these messages, and I don't need them anyway. This way, I skip the parsing of the JSON data, and processing a useless message.

After this, I needed to expand the latency work to have an additional key for this topology latency, and then I could pretty much do everything else as planned.