Multi-Threaded Clojure Code Can be Hard Too

Clojure.jpg

I have had the most interesting time with a clojure and Storm problem today figuring out why the following code - executed in a Storm bolt - doesn't count the events properly. I can start by saying that figuring out that it's in this code was a challenge - but it was one that I could get to by simply looking at all the evidence around me.

The message counts that this topology is meant to classify (by latency) were recorded in the Storm UI. I had to take two snapshots of the UI, compare the times and counts, and verify the the total was in the right range - about 30,000 msgs/sec. Then I had the code I'd written to look at the Nimbus data for the topology, and it was giving me the same ballpark figure. Then I looked at another app I'd created to help with PagerDuty alerts - and it was giving me about the same rate.

Yet this code was saying about 100,000 msgs/sec:

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (< 10000 hits)
        (let [ts (now-hour-ms)
              rrk (str *master* "|latency|")
              irk (str *master* "|current-latency|")
              urk (str *master* "|ucs-latency|")]
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))))))

As I started walking through the codeine my head, I realized that I'd have a lot of threads hitting the same inequality at nearly the same time. Each would be over the 10,000 hit limit, and then each would be adding their combined total to the running total in redis.

This was a problem. We needed to have the equivalent of a try lock in clojure. We needed to allow the first one to get the lock, and then all subsequent ones to fail, and skip over the code to update redis. Sure, this may loose a few counts, but on the scale we're talking this is not a big problem.

Certainly not as big as the multiple aggregations being added to redis.

The solution I came up with was kinda slick - add another atom - this time an integer, and then inc it, and only the first one gets to add their totals to redis. The rest skip it. When the first guy is done, he resets the atom, and you have reset the trap for the next time.

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (and (< 10000 hits) (= 1 (swap! _flushing_counts inc)))
        (do
          ;; first, reset the counts to be as close to atomic as possible
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
          ;; now add in the sub-totals to redis
          (let [ts (now-hour-ms)
                rrk (str *master* "|latency|")
                irk (str *master* "|current-latency|")
                urk (str *master* "|ucs-latency|")]
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))
          ;; finally, reset the lock on the flushing
          (reset! _flushing_counts 0)))))

Wonderful little solution, and it works like a charm!