Archive for the ‘Clojure Coding’ Category

Multi-Threaded Clojure Code Can be Hard Too

Monday, November 3rd, 2014

Clojure.jpg

I have had the most interesting time with a clojure and Storm problem today figuring out why the following code - executed in a Storm bolt - doesn't count the events properly. I can start by saying that figuring out that it's in this code was a challenge - but it was one that I could get to by simply looking at all the evidence around me.

The message counts that this topology is meant to classify (by latency) were recorded in the Storm UI. I had to take two snapshots of the UI, compare the times and counts, and verify the the total was in the right range - about 30,000 msgs/sec. Then I had the code I'd written to look at the Nimbus data for the topology, and it was giving me the same ballpark figure. Then I looked at another app I'd created to help with PagerDuty alerts - and it was giving me about the same rate.

Yet this code was saying about 100,000 msgs/sec:

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (< 10000 hits)
        (let [ts (now-hour-ms)
              rrk (str *master* "|latency|")
              irk (str *master* "|current-latency|")
              urk (str *master* "|ucs-latency|")]
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))))))

As I started walking through the codeine my head, I realized that I'd have a lot of threads hitting the same inequality at nearly the same time. Each would be over the 10,000 hit limit, and then each would be adding their combined total to the running total in redis.

This was a problem. We needed to have the equivalent of a try lock in clojure. We needed to allow the first one to get the lock, and then all subsequent ones to fail, and skip over the code to update redis. Sure, this may loose a few counts, but on the scale we're talking this is not a big problem.

Certainly not as big as the multiple aggregations being added to redis.

The solution I came up with was kinda slick - add another atom - this time an integer, and then inc it, and only the first one gets to add their totals to redis. The rest skip it. When the first guy is done, he resets the atom, and you have reset the trap for the next time.

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (and (< 10000 hits) (= 1 (swap! _flushing_counts inc)))
        (do
          ;; first, reset the counts to be as close to atomic as possible
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
          ;; now add in the sub-totals to redis
          (let [ts (now-hour-ms)
                rrk (str *master* "|latency|")
                irk (str *master* "|current-latency|")
                urk (str *master* "|ucs-latency|")]
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))
          ;; finally, reset the lock on the flushing
          (reset! _flushing_counts 0)))))

Wonderful little solution, and it works like a charm!

Adding Optional JSON Encoding to Bolt

Friday, October 31st, 2014

Storm Logo

I wanted to see what the effect would be to the topology to remove the communication overhead between the JSON encoder bolt and the Kafka sender bolt. Even with the :local-or-shuffle directive, I was worried that the objects created were causing garbage collection, or some kind of overhead, so I modified the defbolt I was using to send kafka messages, to take an optional parameter to indicate that it should use cheshire's JSON encoding on the message prior to sending it out.

That way, if it's there, then it's encoded, and if it's not, then the overhead of this is very minor:

  (defbolt mk-sender ["message"] {:params [cfg topic & [fmt]] :prepare true}
    [conf context collector]
    (let [dest (ep/config cfg)
          top (get-topic dest topic)
          zk (or (:zkStoreConnect dest) (rand-nth (:zookeepers dest)))
          p (kp/producer {(kafka-tag zk) zk})]
      (bolt
        (execute [tuple]
          (doseq [raw (.getValues tuple)
                  :when raw
                  :let [msg (case fmt
                              :json (nil-if-empty (json/generate-string raw))
                              raw)]
                  :when msg]
            (kp/send-messages p top [(kp/message (.getBytes msg))])
            (emit-bolt! collector [msg] :anchor tuple))
          (ack! collector tuple)))))

It works like a charm, and while the overhead wasn't even measurable, I left it in just in case there comes a time I want to simplify the topology by having a kayak bolt that also encodes to JSON.

Added Security Checks on Messages

Thursday, October 30th, 2014

Crazy Lemon the Coder

Today one of the users of my data feed emailed me saying that we were sending bad values for a few fields for a few messages. Turns out, we were getting some data that was clearly either the internal Security Team trying to find weaknesses, or someone was stuffing messages in a vain attempt to get data. Where I'd be expecting a UUID, I was getting "cat /etc/passwd" - no joke.

So I had to take the time to expand the incoming message filters to include these fields so that we filter these out as opposed to thinking they are possible users. It's not a huge deal, but it's just another thing that I didn't expect to have to do today, but really needed to get out because of the security implications.

Updating the Topology and Library Configs

Wednesday, October 29th, 2014

Unified Click

Today I've done quite a bit of reconfiguration in my endpoints library. It's the clojure library I created to make the Storm work I was doing so much easier and fast by isolating all the code of dealing with the external endpoints - redis, databases, RESTful services, etc. so that all the topologies and projects built on this code can start from a given baseline.

Well... today was a lot of re-configuration because the shared Kafka clusters in the different data centers were undergoing some change, and I had to change how I was connecting to them to stay up to date. Normally, this is just the cost of being in an organization, but when I had to change the same service two and three times, well... then it kinda became work.

In the end, I was able to get all the versions up on nexus and deployed to all the versions, as needed, so that we stayed up, but it was a challenging day in communication due to the costs of the changes.

It’s Hard to Stay Quiet Sometimes

Wednesday, October 29th, 2014

Unified Click

I know I'm not typical - not in the least. I can get carried away at times, my passion can get the best of me... but it's because I'm driven to give the best I can in all that I choose to do, and it really bothers me when I can't do that. I mean really bothers me when I can't do that.

This morning I came into work to see that things weren't looking all that good for the latency on my data stream. In fact, it's worse than I've ever seen:

Latency Graphs

Twelve solid hours of off the chart latencies. And when I dug into the Capacity numbers it was clear what was going on... we are getting lots of back-pressure on publishing to the shared Kafka cluster, and that backed up the bolts feeding those, and so on, and so on...

My concern is that we're only retaining data on the source kafka cluster for 72 hours, so if this continues for too long, we're going to be in real trouble because we're soon going to be hitting the retention limit on the kafka cluster, and then we'll start losing data.

Not good.

Yet the solution is simple - give me three more boxes, put them on my existing kafka cluster, and put all the traffic back to mine. That's where it used to be - and until the number of clients got too high - it was perfectly fine. Add a few boxes for the additional client load, and it'll be fine again.

And that's what makes staying quiet very hard... It's politics. They are playing politics with the quality of a service that has my name on it. And that really irks me. I have the ability to deliver a great product, but it's politics that is causing me to be associated with a bad one.

Excellent Code for JSONP Responses

Tuesday, October 28th, 2014

Clojure.jpg

Today I needed to do some real updating to my JSONP code in one of my services. Up to now, it's been OK for all the clients, but it's clear that we're getting to the limits of the simple approach we initially took to dealing with JSONP responses:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (fn [out]
                 (->> out
                      (OutputStreamWriter.)
                      (BufferedWriter.)
                      (json/generate-stream ob))))}))

which didn't really allow for any errors, or issues that might crop up.

Sadly, we're starting to see things crop up, and so with Gary's help - the original author of the original code, who has updated his code to handle a great many more things, we have:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (bound-fn [out]
                 (with-open [osw (OutputStreamWriter. out)
                             bw (BufferedWriter. osw)]
                   (let [error-streaming
                         (fn [e]
                           ;; Since the HTTP headers have already been sent,
                           ;; at this point it is too late to report the error
                           ;; as a 500. The best we can do is abruptly print
                           ;; an error and quit.
                           (.write bw "\n\n---ERROR WHILE STREAMING JSON---\n")
                           (.write bw (str e "\n\n"))
                           (warnf "Streaming exception for JSONP: %s"
                                  (.getMessage e)))]
                     (try
                       (json/generate-stream ob bw)
                       ;; Handle "pipe closed" errors
                       (catch IOException e
                         (if (re-find #"Pipe closed" (.getMessage e))
                           (info "Pipe closed exception: %s" (.getMessage e))
                           (error-streaming e)))
                       (catch Throwable t
                         (error-streaming t)))))))}))

All the tests I've run this looks to do everything I need - it works - Whew!, but with the trapping, we should be able to have a lot better control when things go wrong.

Overhead of Storm Communication

Monday, October 27th, 2014

Storm Logo

Tuning Storm topologies isn't easy. Sure... if you have a topology that just does simple calculations, like my Disruption Detector - then it's easy. Almost deceptively simple. But if you have a topology whose bolts actually have to do something, then it gets a lot more complex. And the more they do, the more complex it becomes.

And one of the insidious things about Storm is the communication buffers. These are using memory not in the heap, so they have the ability to crush a box even if you are careful with the -Xmx4G settings to make sure you don't overload the box. No... these are tricky little beasts, and the conflicts raised by improper use of Kafka is really challenging.

The tradeoff is really one of connectivity versus balance. If the Kafka data is well-balanced across all boxes and all partitions, then you can simply default to using the :local-or-shuffle connectivity directive in your Storm topology, and you should be in good shape. This scheme says: If I have a bolt in the same worker, use it. If not, then look to others and balance accordingly.

And it's that stickiness to the local worker that's the kicker.

If we have an unbalanced kafka cluster, then some of the readers will have more data to process than others, but with this affinity for the local bolts, the excess data will be queued up on the local bolts in the worker, and not get spread out in the cluster. This makes for an uneven distribution of work, and that's very bad.

But if we use the :shuffle directive then every message will be looking to balance out the load by checking all the similar bolts in the topology and messages will be moved from worker to worker, box to box, without regard for really the benefit of that movement. It's not like you can put in a Cost Function for the movement of n bytes - Oh... that would be sweet.

So what happens is that you end up having multi-GB communication buffers on all boxes, and that can crush a system. So you have to be careful - you can't remove all the :shuffle directives - especially if you have an unknown - and potentially unbalanced - kafka source. But you can't have too many of them, either.

Finding the right topology when you're doing real work, is not trivial.

Cleaning Up Parsing Errors

Friday, October 24th, 2014

Unified Click

From time to time, I've been asked to add features to my data stream by groups, and one of these groups at The Shop deals with affiliates directing traffic - and hopefully sales - to the site. If so, they get paid, which is good for everyone. One of the things that's important about this is to accurately track who came from where, and all that. To that end, I've had to implement some rather complex if-then-else logic into the code to match what the existing attribution code is.

Well... it seems I probably made a mistake in being a little too inclusive with some of the data. I was including a URL in the data I'm being sent when I really didn't want to include the domain in that URL - just the parameters.

When this was pointed out, I realized that the instructions I'd received from the group about this feature were relatively vague, and after I really dug into what they were asking, and comparing the code, it was likely the case that I wanted the options, but not the domain in the URL.

Not at all clear, but hey... I can admit that I didn't get it right - easy fix.

Moving Sharded Redis Servers

Friday, October 24th, 2014

Redis Database

This morning I found that I really needed to move some of my shared redis servers. Due to the needed bandwidth, I've got eight redis servers on one 196GB RAM box, but this morning, I saw that the total RAM in use was over 170GB, and it was causing issues when redis tried to fork and save the data.

This is what I love about redis and this sharding - I can simply shut things down, move the redis dump.rdb files, fire up the servers on the new machines, and everything will load back up and be ready to go. A simple change in the sharing logic to point to the new machine for those moved servers, and everything is back up and running. Very nice.

Yes, it's very manual as well, but sometimes, the costs of that manual configuration are really worth it. Today it paid off. Big.

Finding the Joy in Life Again

Wednesday, October 22nd, 2014

Great News

I honestly would have put money on the fact that this would not have happened today. Big money.

I'm sitting on the bus riding to work, and I realize that I'm pretty happy without a pain-causing personal relationship in my life. That was a wow! moment. I've been separated for about 2 years, and the divorce is in the works, but I would have bet real money I'd feel horrible for the rest of my natural life. But today... on the bus... for a few minutes... I didn't.

That was huge for me. Huge.

Then I'm in work, updating a few postings with the results of the tests I'd done overnight, and I'm back into the swing of posting like I used to. It's been a long two years, but I'm back to writing about what I'm doing, and it's really helping. I'm feeling like I'm enjoying myself again.

This, too, was huge for me.

I don't expect this to last all day... but the fact that I have felt this way tells me that I need to keep doing what I'm doing - keep moving forward, and then maybe this will come again. And maybe when it comes again, it'll last longer. Maybe.