Archive for the ‘Cube Life’ Category

Storm Doesn’t Always Guarantee a Good Balance

Tuesday, November 4th, 2014

Storm Logo

I trust the developers of Storm way too much. Maybe the truth of the matter is that I have assumptions about how I'd have developed certain aspects of Storm, and they seem perfectly logical to me. When those don't turn out to be the way it's really done in Storm, then I feel like I trust the Storm developers too much. Case in point: Balance.

I had a spout with a parallelization hint of 20. I had a topology with 20 workers. Right away, I'm thinking that no matter how the workers are arranged on the physical hardware of the cluster, there should be one - and only one - spout per worker. That way we have balanced the work load, and assuming that there are sufficient resources on each physical box for the configured number of workers, then we have the start of a nicely balanced topology.

Sadly, that assumption was wrong, and determining the truth took me several hours.

I was looking for problems in the topology related to the latency of it - lots of high capacity numbers in the UI. So I started digging into the spouts as well as the bolts, and what I aw in three of my four topologies was that there were six - out of twenty - spout instance that had zero emitted tuples. Not a one.

I looked at the logs, and they were grabbing messages from Kafka... updating the zookeeper checkpoints... just what it should do. I was worried that kafka was messed up, so I restart two of the nodes that seemed to have problems. No difference.

Hours I sent trying to figure this out.

And then I decided to just write down all the machine/partition data for each of the spouts and see if there was something like a missing partition somewhere. And as I wrote them all down I saw there were twenty of them - just like there should be... and yet the spouts reporting zeros had the same machine/partition configurations as another spout that was returning data.

Storm had doubled-up some of the spouts, and left others empty, and done so in a very repeatable way. In a sense, I was pretty upset - why log that you're reading messages if you're really "offline"? Why don't you distribute the spouts evenly? But in the end, I was just glad that things were working. But boy... it was a stressful hour.

More Tuning of Storm Topology

Monday, November 3rd, 2014

Unified Click

I spent a good chunk trying to get my data stream topology to work smoothly, but it's proving to be very elusive. I've tried all manner of things - including expanding to take nearly all of 20 machines up for the one topology. Crazy.

When I got the first version of this up, I wrote an email to my VP saying this is possible, but it's going to be expensive. Very. The hardware isn't cheap, and we're going to need so much of it - due to the inefficiencies in the JVM, in the messaging, in clojure... it's just going to take a ton of metal. I'm not sure they understood the magnitude of this cost.

We're bearing down on the Holiday Season, and we have no excess capacity. None. Twenty worker nodes all jammed up. Now if we need even 100% excess (a 2x spike) - which is nothing in the Cyber Monday sense, then we need an additional 20 machines. Amazing. For a capacity of about 100k msgs/sec.

At my last Finance job we did 3 mil msgs/sec on three boxes. Add in greeks and what-if analysis and it's 5 boxes. The idea that we need 40 boxes for 100k msgs/sec is just crazy. We are building on a tech stack that is super inefficient.

But we are... and it's not my decision - as hard as I've lobbied for a more efficient solution.

Multi-Threaded Clojure Code Can be Hard Too

Monday, November 3rd, 2014

Clojure.jpg

I have had the most interesting time with a clojure and Storm problem today figuring out why the following code - executed in a Storm bolt - doesn't count the events properly. I can start by saying that figuring out that it's in this code was a challenge - but it was one that I could get to by simply looking at all the evidence around me.

The message counts that this topology is meant to classify (by latency) were recorded in the Storm UI. I had to take two snapshots of the UI, compare the times and counts, and verify the the total was in the right range - about 30,000 msgs/sec. Then I had the code I'd written to look at the Nimbus data for the topology, and it was giving me the same ballpark figure. Then I looked at another app I'd created to help with PagerDuty alerts - and it was giving me about the same rate.

Yet this code was saying about 100,000 msgs/sec:

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (< 10000 hits)
        (let [ts (now-hour-ms)
              rrk (str *master* "|latency|")
              irk (str *master* "|current-latency|")
              urk (str *master* "|ucs-latency|")]
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))))))

As I started walking through the codeine my head, I realized that I'd have a lot of threads hitting the same inequality at nearly the same time. Each would be over the 10,000 hit limit, and then each would be adding their combined total to the running total in redis.

This was a problem. We needed to have the equivalent of a try lock in clojure. We needed to allow the first one to get the lock, and then all subsequent ones to fail, and skip over the code to update redis. Sure, this may loose a few counts, but on the scale we're talking this is not a big problem.

Certainly not as big as the multiple aggregations being added to redis.

The solution I came up with was kinda slick - add another atom - this time an integer, and then inc it, and only the first one gets to add their totals to redis. The rest skip it. When the first guy is done, he resets the atom, and you have reset the trap for the next time.

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (and (< 10000 hits) (= 1 (swap! _flushing_counts inc)))
        (do
          ;; first, reset the counts to be as close to atomic as possible
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
          ;; now add in the sub-totals to redis
          (let [ts (now-hour-ms)
                rrk (str *master* "|latency|")
                irk (str *master* "|current-latency|")
                urk (str *master* "|ucs-latency|")]
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))
          ;; finally, reset the lock on the flushing
          (reset! _flushing_counts 0)))))

Wonderful little solution, and it works like a charm!

Adding Optional JSON Encoding to Bolt

Friday, October 31st, 2014

Storm Logo

I wanted to see what the effect would be to the topology to remove the communication overhead between the JSON encoder bolt and the Kafka sender bolt. Even with the :local-or-shuffle directive, I was worried that the objects created were causing garbage collection, or some kind of overhead, so I modified the defbolt I was using to send kafka messages, to take an optional parameter to indicate that it should use cheshire's JSON encoding on the message prior to sending it out.

That way, if it's there, then it's encoded, and if it's not, then the overhead of this is very minor:

  (defbolt mk-sender ["message"] {:params [cfg topic & [fmt]] :prepare true}
    [conf context collector]
    (let [dest (ep/config cfg)
          top (get-topic dest topic)
          zk (or (:zkStoreConnect dest) (rand-nth (:zookeepers dest)))
          p (kp/producer {(kafka-tag zk) zk})]
      (bolt
        (execute [tuple]
          (doseq [raw (.getValues tuple)
                  :when raw
                  :let [msg (case fmt
                              :json (nil-if-empty (json/generate-string raw))
                              raw)]
                  :when msg]
            (kp/send-messages p top [(kp/message (.getBytes msg))])
            (emit-bolt! collector [msg] :anchor tuple))
          (ack! collector tuple)))))

It works like a charm, and while the overhead wasn't even measurable, I left it in just in case there comes a time I want to simplify the topology by having a kayak bolt that also encodes to JSON.

Added Security Checks on Messages

Thursday, October 30th, 2014

Crazy Lemon the Coder

Today one of the users of my data feed emailed me saying that we were sending bad values for a few fields for a few messages. Turns out, we were getting some data that was clearly either the internal Security Team trying to find weaknesses, or someone was stuffing messages in a vain attempt to get data. Where I'd be expecting a UUID, I was getting "cat /etc/passwd" - no joke.

So I had to take the time to expand the incoming message filters to include these fields so that we filter these out as opposed to thinking they are possible users. It's not a huge deal, but it's just another thing that I didn't expect to have to do today, but really needed to get out because of the security implications.

Updating the Topology and Library Configs

Wednesday, October 29th, 2014

Unified Click

Today I've done quite a bit of reconfiguration in my endpoints library. It's the clojure library I created to make the Storm work I was doing so much easier and fast by isolating all the code of dealing with the external endpoints - redis, databases, RESTful services, etc. so that all the topologies and projects built on this code can start from a given baseline.

Well... today was a lot of re-configuration because the shared Kafka clusters in the different data centers were undergoing some change, and I had to change how I was connecting to them to stay up to date. Normally, this is just the cost of being in an organization, but when I had to change the same service two and three times, well... then it kinda became work.

In the end, I was able to get all the versions up on nexus and deployed to all the versions, as needed, so that we stayed up, but it was a challenging day in communication due to the costs of the changes.

It’s Hard to Stay Quiet Sometimes

Wednesday, October 29th, 2014

Unified Click

I know I'm not typical - not in the least. I can get carried away at times, my passion can get the best of me... but it's because I'm driven to give the best I can in all that I choose to do, and it really bothers me when I can't do that. I mean really bothers me when I can't do that.

This morning I came into work to see that things weren't looking all that good for the latency on my data stream. In fact, it's worse than I've ever seen:

Latency Graphs

Twelve solid hours of off the chart latencies. And when I dug into the Capacity numbers it was clear what was going on... we are getting lots of back-pressure on publishing to the shared Kafka cluster, and that backed up the bolts feeding those, and so on, and so on...

My concern is that we're only retaining data on the source kafka cluster for 72 hours, so if this continues for too long, we're going to be in real trouble because we're soon going to be hitting the retention limit on the kafka cluster, and then we'll start losing data.

Not good.

Yet the solution is simple - give me three more boxes, put them on my existing kafka cluster, and put all the traffic back to mine. That's where it used to be - and until the number of clients got too high - it was perfectly fine. Add a few boxes for the additional client load, and it'll be fine again.

And that's what makes staying quiet very hard... It's politics. They are playing politics with the quality of a service that has my name on it. And that really irks me. I have the ability to deliver a great product, but it's politics that is causing me to be associated with a bad one.

Excellent Code for JSONP Responses

Tuesday, October 28th, 2014

Clojure.jpg

Today I needed to do some real updating to my JSONP code in one of my services. Up to now, it's been OK for all the clients, but it's clear that we're getting to the limits of the simple approach we initially took to dealing with JSONP responses:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (fn [out]
                 (->> out
                      (OutputStreamWriter.)
                      (BufferedWriter.)
                      (json/generate-stream ob))))}))

which didn't really allow for any errors, or issues that might crop up.

Sadly, we're starting to see things crop up, and so with Gary's help - the original author of the original code, who has updated his code to handle a great many more things, we have:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (bound-fn [out]
                 (with-open [osw (OutputStreamWriter. out)
                             bw (BufferedWriter. osw)]
                   (let [error-streaming
                         (fn [e]
                           ;; Since the HTTP headers have already been sent,
                           ;; at this point it is too late to report the error
                           ;; as a 500. The best we can do is abruptly print
                           ;; an error and quit.
                           (.write bw "\n\n---ERROR WHILE STREAMING JSON---\n")
                           (.write bw (str e "\n\n"))
                           (warnf "Streaming exception for JSONP: %s"
                                  (.getMessage e)))]
                     (try
                       (json/generate-stream ob bw)
                       ;; Handle "pipe closed" errors
                       (catch IOException e
                         (if (re-find #"Pipe closed" (.getMessage e))
                           (info "Pipe closed exception: %s" (.getMessage e))
                           (error-streaming e)))
                       (catch Throwable t
                         (error-streaming t)))))))}))

All the tests I've run this looks to do everything I need - it works - Whew!, but with the trapping, we should be able to have a lot better control when things go wrong.

Improper Use of Kafka

Monday, October 27th, 2014

Kafka Broker

Today has been one of those days that I knew was coming, didn't know when it'd arrive, but knew without a shadow of a doubt that it was going to be a defining moment for me at The Shop. Today, I kinda butted heads with the manager of the centralized Kafka cluster about how to publish to his cluster - in fact how to properly publish to any kafka cluster, and what the issues are with Kafka 0.7 and adding new boxes.

First, the set-up. I've been asked to publish to the shared kafka cluster by my new manager because he sees nothing up upside to using shared services. He believes that if the shared services are not up to the task, that he can then apply pressure to get them up to the task. I am not so optimistic. I will gladly ask for help from anyone - as long as the product will benefit from it. But if the product suffers, I don't care who it is - they have to shape up or ship out.

So - against my wishes - I started publishing to the shared kafka cluster. We started having a lot of problems, but everyone was happy - save me. They added machines to the cluster, and because the topics I publish to already existed in the cluster, the known bug in Kafka 0.7 didn't allow the automatic rebalancing of the topic to the new boxes. You have to publish a message - no matter how small - to the specific boxes under the same topic name, and then they will start picking up traffic - automatically.

I know this because I ran into this problem, had to figure it out, and finally did after creating a little function to send an empty JSON message to a specific partition on a specific box in the cluster. But it worked like a champ, so I knew how this worked for Kafka 0.7.

Today, I had a disagreement with the manager of the shared cluster because he wanted people to write to specific machines, and then use the load balancer to assign different machines to different publishing clients. Sadly, this is not how kafka is meant to be used. It's meant to be used with a single automatic configuration based on the cluster configuration in zookeeper, and in this way, distributing the load to all the boxes in the cluster in equal share.

The manager's idea allows the load balancer to direct the traffic - but allows things to be very unbalanced, and therefore complicating all the topologies based on these topics. It's just bad design to use Kafka in this way. But it does get around the problem of adding boxes to the cluster and activating the topics on the new boxes.

But that's trivial with the 4 line clojure function I wrote:

  (defn direct-injection
    "Function to send a single message to a specified topic on the specified
    kafka broker - bypassing all the zookeeper stuff to make sure that this
    one message gets to this one broker. This is essential for bootstrapping
    a new broker box to an existing topic."
    [broker topic msg]
    (let [p (kp/producer {"broker.list" "2:kafka-broker3:9092"})]
      (kp/send-messages p topic [(kp/message (.getBytes msg))])))

and it only needs to be run once for each existing topic on each new box. It's trivial.

Finally, I got them to see this, and do this, and my publishing automatically picked this up and started pushing messages to all boxes evenly. As you should with Kafka.

The moral of the story of today is that you can use shared tools - and it can appear to save you time and money - but appearances are deceptive, and you can shoot yourself int he foot so fast that you will find that careful consideration on all deployment issues is really the time - and money - saver.

Overhead of Storm Communication

Monday, October 27th, 2014

Storm Logo

Tuning Storm topologies isn't easy. Sure... if you have a topology that just does simple calculations, like my Disruption Detector - then it's easy. Almost deceptively simple. But if you have a topology whose bolts actually have to do something, then it gets a lot more complex. And the more they do, the more complex it becomes.

And one of the insidious things about Storm is the communication buffers. These are using memory not in the heap, so they have the ability to crush a box even if you are careful with the -Xmx4G settings to make sure you don't overload the box. No... these are tricky little beasts, and the conflicts raised by improper use of Kafka is really challenging.

The tradeoff is really one of connectivity versus balance. If the Kafka data is well-balanced across all boxes and all partitions, then you can simply default to using the :local-or-shuffle connectivity directive in your Storm topology, and you should be in good shape. This scheme says: If I have a bolt in the same worker, use it. If not, then look to others and balance accordingly.

And it's that stickiness to the local worker that's the kicker.

If we have an unbalanced kafka cluster, then some of the readers will have more data to process than others, but with this affinity for the local bolts, the excess data will be queued up on the local bolts in the worker, and not get spread out in the cluster. This makes for an uneven distribution of work, and that's very bad.

But if we use the :shuffle directive then every message will be looking to balance out the load by checking all the similar bolts in the topology and messages will be moved from worker to worker, box to box, without regard for really the benefit of that movement. It's not like you can put in a Cost Function for the movement of n bytes - Oh... that would be sweet.

So what happens is that you end up having multi-GB communication buffers on all boxes, and that can crush a system. So you have to be careful - you can't remove all the :shuffle directives - especially if you have an unknown - and potentially unbalanced - kafka source. But you can't have too many of them, either.

Finding the right topology when you're doing real work, is not trivial.