Archive for the ‘Clojure Coding’ Category

The Effects of Being Marginalized

Wednesday, November 12th, 2014

Bad Idea

Part of the New Direction that we are embarking on is to re-examine the topology that runs the data feed. Now I had built this, and done experiments covering months to get this to the point it was. Yet one of the managers talked to another Storm developer in another division, and that developer convinced him - without any knowledge of the specifics of the topology - that what I was doing was "All wrong". So my manager told me to work with this guy to fix it.

So I followed his advice and made the changes.

It didn't help, but it didn't make things worse.

Now today I'm being asked to revisit the topology and "Scale it back" because my manager is convinced there is something else in play here. OK, so I do as I'm told - because they are clearly not interesting in my opinions or we'd still be at the topology structure we had before all this.

So I start the experiments with a baseline, and then I start halving things: half the workers, half the bolts - all to get numbers to see if I'm going in the right direction. I don't fine-tune things until I'm really close, and half is easy to work with because we're still doing coarse tuning.

After about three experiments, we're far better, far faster, and have higher peak capacities during time of load. All is looking very good. Then I look at the Storm UI to the Capacity numbers, and I fine tune a little. This guy is a little high, so add some. This guy is very low, he doesn't need as much. Not a lot, just a little tweaking.

In the end we're looking a lot better from all the metrics. Good.

And the values?

Yup... just the ones I had before all this started.

The Joy of Using GitHub/E

Wednesday, November 12th, 2014

GitHub Source Hosting

I really am amazed about the real joy in using GitHub. It's hard to imagine the vision they had... Let's take git, and then build an entire culture and eco-system around it - all in the browser! That's some vision. Yet this morning I was able to put targeted comments on the lines of a pull-request with syntax-highlighted examples, and see it all in preview mode. It made me smile. This kind of attention to detail is really inspirational to me.

It's more than source control - it's workflow... collaboration - it's a wonderful framework with which to do group development - or personal development, for that matter. I really do enjoy working with it - even on really bad code. And the style... it's not always been perfect, and they have made some changes I might not agree with, but they have done it up to the nines. There is certainly no way someone is going to say they haven't sweated the details.

So even when I have to spend 30 mins making comments on code that should not have been written, I'm happy that I'm doing it in GitHub. What a great tool!

Interesting Ideas with Carl

Friday, November 7th, 2014

Salesforce.com

This morning I was chatting with Carl (that's not his name) - the guy that used to be my manager, but went to the West Coast, and now is looking to move back to Chicago... We were chatting about an idea he had - of using Salesforce.com as the source of data for sales projection algorithms. Then I remembered that Heroku got acquired by Salesforce and there's a specialized connect platform between Heroku and Salesforce for just this kind of scalable application building.

Heroku also handles Postgres as a Service, and they support clojure very nicely. In all, it sounded like a really nice platform to build this on. I can't way to see what Carl comes up with next.

Fixing Up a Database Mapping

Wednesday, November 5th, 2014

Clojure.jpg

Today I ran into a Legacy Bug - some little bit of code that used to work, but hasn't been used in so long that when someone really did try to use it - well, it kinda worked, but not exactly. It was really a simple database column mapping. What I had was:

  (defn hit-to-array
    "Function to format the 'hit' map of data into a simple sequence of the
    values for the map - in a specific order so that they can be easily
    understood."
    [arg]
    (if arg
      (map arg [:variant :country :browser :t-src :b-cookies])))

and I needed to change it to:

  (defn hit-to-array
    "Function to format the 'hit' map of data into a simple sequence of the
    values for the map - in a specific order so that they can be easily
    understood."
    [arg]
    (if arg
      (map arg [:variant :country :browser :traffic_source :users_count])))

because we had done a database restructure, and the rows coming back were now focused on the business names, and not the internal representation. The old code was returning nil, and the new code was properly finding the fields.

Similarly, I had to change:

  (defn sale-to-array
    "Function to format the 'sale' map of data into a simple sequence of the
    values for the map - in a specific order so that they can be easily
    understood."
    [arg]
    (if arg
      (-> arg
        (util/update :billings util/to-2dp)
        (map [:variant :country :browser :t-src :d-chan :orders :qty
              :billings :consumers :est_monthly_billings
              :est_monthly_billings_pct]))))

to:

  (defn sale-to-array
    "Function to format the 'sale' map of data into a simple sequence of the
    values for the map - in a specific order so that they can be easily
    understood."
    [arg]
    (if arg
      (-> arg
        (util/update :billings util/to-2dp)
        (map [:variant :country :browser :traffic_source :demand_channel
              :orders_count :qty :billings_total :buyers_count
              :est_monthly_billings :est_monthly_billings_pct]))))

because we dropped the qty field - no one wanted it - and we again changed the names to those more user focused names. It's not a big deal, but it makes a big deal to the guys now implementing the feature.

I wrote this up quite a while ago, and it never got called, but there wasn't a really horrible error - so it didn't crash when they called it - it just didn't return the right data. Now it does.

I love these simple fixes - and it's pretty much all about the functional style of coding.

Grabbing Metadata from the Email Opens

Wednesday, November 5th, 2014

Unified Click

I have to admit when I'm wrong - it's just the only decent thing to do, in my opinion. And today, I was schooled by my boss's boss's boss at The Shop saying that we did, in fact, have the user-agent in the nginx logs for the user email open messages I was just finishing up on. I was sure we didn't, and when I showed him... well... I instantly apologized for my mistake - I was looking right past it. Totally my mistake.

The reason this all came up was that the messages have an app_version field, and typically, for the other user actions, this is the browser and version for the web, or the version of the Android or iOS app - something that lets us know a little bit more about the platform it's coming from. Sadly, without this user-agent, I was stuck looking at the URL of the nginx log, and that didn't have much of anything really useful.

With this, I was able to easily parse it - already had the functions for it - and then drop that in just like all the other messages. It was a very simple fix, but it had a profound effect on the data quality. Much nicer to know this. Much.

Adding Email Opens – Data Can Surprise You

Tuesday, November 4th, 2014

Unified Click

This afternoon, before I leave to go vote, I wanted to add in the code to decode all the email opens that occur in a day. I have actually been working on decoding these messages for a while, but I've had to divert my attention to other, more pressing, needs of late. Finally, this afternoon, I was able to get back to the email opens, and it was nice to close it out.

It was a basic addition to the topology, and while I could have combined it with the other email-based data feed, I have chosen to keep it separate for now - just to be able to monitor the send traffic separate from the open traffic. I will say that I did have one logic error in the code - and that was because the email opens are simple nginx logs, and those aren't formatted as JSON - so I had to parse that first, and then process it, and I had the initial checks done before the parsing. They always failed, and that was an issue.

But a quick logical walk-through, and I found the problem, and we were off to the races. What I was surprised about was the very moderate levels of traffic at 2:00 pm in the afternoon. Now it's probably much heavier when the sends are done, so we'll have to watch it in the morning, but it's nice to see that the addition isn't a torrent that floods all processing - immediately.

I was expecting more load - maybe tomorrow will show it to me.

Storm Doesn’t Always Guarantee a Good Balance

Tuesday, November 4th, 2014

Storm Logo

I trust the developers of Storm way too much. Maybe the truth of the matter is that I have assumptions about how I'd have developed certain aspects of Storm, and they seem perfectly logical to me. When those don't turn out to be the way it's really done in Storm, then I feel like I trust the Storm developers too much. Case in point: Balance.

I had a spout with a parallelization hint of 20. I had a topology with 20 workers. Right away, I'm thinking that no matter how the workers are arranged on the physical hardware of the cluster, there should be one - and only one - spout per worker. That way we have balanced the work load, and assuming that there are sufficient resources on each physical box for the configured number of workers, then we have the start of a nicely balanced topology.

Sadly, that assumption was wrong, and determining the truth took me several hours.

I was looking for problems in the topology related to the latency of it - lots of high capacity numbers in the UI. So I started digging into the spouts as well as the bolts, and what I aw in three of my four topologies was that there were six - out of twenty - spout instance that had zero emitted tuples. Not a one.

I looked at the logs, and they were grabbing messages from Kafka... updating the zookeeper checkpoints... just what it should do. I was worried that kafka was messed up, so I restart two of the nodes that seemed to have problems. No difference.

Hours I sent trying to figure this out.

And then I decided to just write down all the machine/partition data for each of the spouts and see if there was something like a missing partition somewhere. And as I wrote them all down I saw there were twenty of them - just like there should be... and yet the spouts reporting zeros had the same machine/partition configurations as another spout that was returning data.

Storm had doubled-up some of the spouts, and left others empty, and done so in a very repeatable way. In a sense, I was pretty upset - why log that you're reading messages if you're really "offline"? Why don't you distribute the spouts evenly? But in the end, I was just glad that things were working. But boy... it was a stressful hour.

More Tuning of Storm Topology

Monday, November 3rd, 2014

Unified Click

I spent a good chunk trying to get my data stream topology to work smoothly, but it's proving to be very elusive. I've tried all manner of things - including expanding to take nearly all of 20 machines up for the one topology. Crazy.

When I got the first version of this up, I wrote an email to my VP saying this is possible, but it's going to be expensive. Very. The hardware isn't cheap, and we're going to need so much of it - due to the inefficiencies in the JVM, in the messaging, in clojure... it's just going to take a ton of metal. I'm not sure they understood the magnitude of this cost.

We're bearing down on the Holiday Season, and we have no excess capacity. None. Twenty worker nodes all jammed up. Now if we need even 100% excess (a 2x spike) - which is nothing in the Cyber Monday sense, then we need an additional 20 machines. Amazing. For a capacity of about 100k msgs/sec.

At my last Finance job we did 3 mil msgs/sec on three boxes. Add in greeks and what-if analysis and it's 5 boxes. The idea that we need 40 boxes for 100k msgs/sec is just crazy. We are building on a tech stack that is super inefficient.

But we are... and it's not my decision - as hard as I've lobbied for a more efficient solution.

Multi-Threaded Clojure Code Can be Hard Too

Monday, November 3rd, 2014

Clojure.jpg

I have had the most interesting time with a clojure and Storm problem today figuring out why the following code - executed in a Storm bolt - doesn't count the events properly. I can start by saying that figuring out that it's in this code was a challenge - but it was one that I could get to by simply looking at all the evidence around me.

The message counts that this topology is meant to classify (by latency) were recorded in the Storm UI. I had to take two snapshots of the UI, compare the times and counts, and verify the the total was in the right range - about 30,000 msgs/sec. Then I had the code I'd written to look at the Nimbus data for the topology, and it was giving me the same ballpark figure. Then I looked at another app I'd created to help with PagerDuty alerts - and it was giving me about the same rate.

Yet this code was saying about 100,000 msgs/sec:

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (< 10000 hits)
        (let [ts (now-hour-ms)
              rrk (str *master* "|latency|")
              irk (str *master* "|current-latency|")
              urk (str *master* "|ucs-latency|")]
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))))))

As I started walking through the codeine my head, I realized that I'd have a lot of threads hitting the same inequality at nearly the same time. Each would be over the 10,000 hit limit, and then each would be adding their combined total to the running total in redis.

This was a problem. We needed to have the equivalent of a try lock in clojure. We needed to allow the first one to get the lock, and then all subsequent ones to fail, and skip over the code to update redis. Sure, this may loose a few counts, but on the scale we're talking this is not a big problem.

Certainly not as big as the multiple aggregations being added to redis.

The solution I came up with was kinda slick - add another atom - this time an integer, and then inc it, and only the first one gets to add their totals to redis. The rest skip it. When the first guy is done, he resets the atom, and you have reset the trap for the next time.

  (defn inc-it!
    "Function to take the key that represents the latency 'bucket' and atomically
    update the count for that bucket - checking to see if it's time to flush the
    counts to redis. If so, then reset the counts and write all the increments to
    redis. This will maintain the counts while still flushing them to redis many
    times a second. The buffer loss will be next to nothing."
    [k-ls k-ucs]
    (let [ls-cnts (swap! _ls_counts util/update k-ls inc)
          ucs-cnts (swap! _ucs_counts util/update k-ucs inc)
          hits (reduce + (vals ls-cnts))]
      (if (and (< 10000 hits) (= 1 (swap! _flushing_counts inc)))
        (do
          ;; first, reset the counts to be as close to atomic as possible
          (reset! _ls_counts { :ls_na 0, :lt10 0, :lt60 0, :lt180 0,
                               :lt600 0, :gt600 0 })
          (reset! _ucs_counts { :ucs_na 0, :lt500m 0, :lt1000m 0,
                                :lt2000m 0, :lt5000m 0, :gt5000m 0 })
          ;; now add in the sub-totals to redis
          (let [ts (now-hour-ms)
                rrk (str *master* "|latency|")
                irk (str *master* "|current-latency|")
                urk (str *master* "|ucs-latency|")]
            (wcar :detector
              (doseq [[k v] ls-cnts
                      :when (pos? v)]
                (car/hincrby (str rrk (name k)) ts v)
                (car/incrby (str irk (name k)) v))
              (doseq [[k v] ucs-cnts
                      :when (pos? v)]
                (car/incrby (str urk (name k)) v))))
          ;; finally, reset the lock on the flushing
          (reset! _flushing_counts 0)))))

Wonderful little solution, and it works like a charm!

Adding Optional JSON Encoding to Bolt

Friday, October 31st, 2014

Storm Logo

I wanted to see what the effect would be to the topology to remove the communication overhead between the JSON encoder bolt and the Kafka sender bolt. Even with the :local-or-shuffle directive, I was worried that the objects created were causing garbage collection, or some kind of overhead, so I modified the defbolt I was using to send kafka messages, to take an optional parameter to indicate that it should use cheshire's JSON encoding on the message prior to sending it out.

That way, if it's there, then it's encoded, and if it's not, then the overhead of this is very minor:

  (defbolt mk-sender ["message"] {:params [cfg topic & [fmt]] :prepare true}
    [conf context collector]
    (let [dest (ep/config cfg)
          top (get-topic dest topic)
          zk (or (:zkStoreConnect dest) (rand-nth (:zookeepers dest)))
          p (kp/producer {(kafka-tag zk) zk})]
      (bolt
        (execute [tuple]
          (doseq [raw (.getValues tuple)
                  :when raw
                  :let [msg (case fmt
                              :json (nil-if-empty (json/generate-string raw))
                              raw)]
                  :when msg]
            (kp/send-messages p top [(kp/message (.getBytes msg))])
            (emit-bolt! collector [msg] :anchor tuple))
          (ack! collector tuple)))))

It works like a charm, and while the overhead wasn't even measurable, I left it in just in case there comes a time I want to simplify the topology by having a kayak bolt that also encodes to JSON.