Archive for September, 2014

Creating a Storm Topology Latency

Tuesday, September 30th, 2014

We have been tracking and reporting on an end-to-end latency for my main data-pipe, and that was something that I felt was the most useful metric to look at. But I was overruled. Management wanted to have a metric that looked at the internal latency of the storm topology and kafka cluster. Sadly, that's not really possible without really hacking away at Storm. But on this, they relented. I was allowed to grab a timestamp at the first possible moment the message is in the topology, and then base the latency on that.

The solution I came up with was to make a new storm bolt that would be directly connected to the spout from the kafka-reader that tagged once per tuple, and got this before the tuple was decoded, so that it included the decoding time:

  ;;
  ;; onboard - a simple Bolt to take the first string in the incoming
  ;;           tuple and assume it's JSON and parse it into a standard
  ;;           clojure data container, tag it with the current time in
  ;;           milliseconds, and emit it. The keys will be converted to
  ;;           symbols for consistency in most of the project. This will
  ;;           ONLY emit the object if it's not-nil. It will ALWAYS ack the
  ;;           tuple. The arguments to this bolt are the keyword of the
  ;;           tag that will be used to augment the message with the
  ;;           timestamp, and a predicate function that will return 'true'
  ;;           if the parsed message is to be processed.
  ;;
  (defbolt onboard ["object"] {:params [tag pfn?]}
    [tuple collector]
    (let [ts (System/currentTimeMillis)]
      (doseq [p (.getValues tuple)
              :let [obj (nil-if-empty (parse-string p))]
              :when (and obj (if pfn? (pfn? obj) true))]
        (emit-bolt! collector [(assoc obj tag ts)] :anchor tuple)))
    (ack! collector tuple))

the idea is that the predicate function can be included so that I can pre-reject those messages that I know are going to cause me grief. What are these messages?, you ask? They are JSON messages that have URLs as keys, and as long as they are strings, that's fine, but I'm decoding the JSON and converting the keys to clojure keywords, and in that case, these URLs are killing the serialization in Storm.

Thankfully, there's a very simple way to make a predicate function to remove these messages, and I don't need them anyway. This way, I skip the parsing of the JSON data, and processing a useless message.

After this, I needed to expand the latency work to have an additional key for this topology latency, and then I could pretty much do everything else as planned.

Tuning Storm Topologies – Again

Monday, September 29th, 2014

Unified Click

I spent the majority of the day tuning and scaling the primary data-pipe storm topology as we have been adding then removing features, and we had to get this to a size that handled the load, but didn't burden the system by using workers that weren't needed. Every little change doesn't necessarily require a re-tune of the topology, but when I'm adding or removing bursts of 100k msgs/sec, it's really important to make sure that the new system responds and is as efficient as reasonable.

The problem is that it takes a considerable amount of time to change something, and watch it develop. This isn't like a success/failure, it's a matter of degree, and it takes between 5 and 30 mins to know if the change has helped or hurt the overall cluster.

So that was my day.

Expanded Latency Detection for Graphing

Thursday, September 25th, 2014

Detective.jpg

We have been detecting and calculating the latency of the main data stream we're creating, but we haven't really made it generally available to graphing via the tools built inside The Shop, and today was the time to make that change.

The extension was really pretty simple, we had everything we needed in-place, we just needed to add some data to redis, and then have a process that would read this data out of redis, clear the data, and the counting could continue. Pretty simple.

We picked the redis structure:

  dd|current-latency|lt10 => long (count)
  dd|current-latency|lt60 => long (count)
  dd|current-latency|lt180 => long (count)
  dd|current-latency|lt600 => long (count)
  dd|current-latency|gt600 => long (count)

where the keys are for less than 10 sec, less than 60 sec, less than 180 sec, less than 600 sec, and more than 600 sec. These are not meant to be particularly fine grained, but they are more than adequate to see if there is a trend, and at less than 10 sec, we're more than fine.

The code to read/reset the values is amazingly simple:

  (defn pull-current
    "Function to extract and reset the 'current-latency' counts from
    redis and return them to the caller. This is the way to
    monitor the latency as it develops so that differences don't
    have to be taken. Simply call this function on a regular interval,
    and graph - it's pretty simple."
    []
    (let [lk (keys @_ls_counts)
          rlk (map #(str *master* "|current-latency|" (name %)) lk)]
      (into {}
        (map vector lk (wcar :detector
                         (mapv #(or (car/getset % 0) 0) rlk))))))

I do love the way that carmine makes working with redis in clojure a breeze.

We then put this into one of the monitord scripts, and we had everything we needed. Nice.

Speeding up Experiment Processors

Thursday, September 25th, 2014

Finch Experiments

This morning has been a big push on speeding up the experiment reporting engines - the first part was speeding up the SQL they used, and then we had to smartly cache the results of certain values that are calculated based on the stability of the time-series data. These calculations are very expensive - even with the speed-ups in the SQL pulling the data from the database, we needed to make these workers far faster at what they are doing.

The first thing was to realize that these stability metrics were really slow-moving values, and we only needed to update them every 6 hours, or so. Any more often than that is a waste of time because they are not moving that much in an hour or so, and that saves us another factor of 100x or more.

Then we needed to look at the monitoring we had on the queues that fed these guys to make sure that we had good data feeding them as we base a lot of our monitoring metrics on these graphs, and an empty graph is just bad news.

In the end, we have something that is exactly what we need - fast in the database, fast on the workers, and very responsive to the needs we are placing on it. We didn't need this kind of performance in the initial deployment, but it's nice to know it didn't take me more than a day or two to get it once we needed it.

Optimizing SQL for Execution Speed

Wednesday, September 24th, 2014

PostgreSQL.jpg

Today I spent a good bit of my day working on speeding up a few postgres stored procedures (functions) to get the speed up to the point that it's possible to do what we need to do in the time we have allotted. The original implementation was to create a temp table, populate it with the complete history of the time-series, and then filter it out based on the clever use of row_number() and picking just one value in a repeating sequence.

This was fine as long as the time-series didn't get too big. Then the temp tables were crushing us, and the time was insanely long. What really helps is to not make the temp table - but just get everything ready for a single SELECT where sub-queries do the caching for you.

Like I said, it was most of the day, but the difference was more like 50x to 100x difference - well worth the time.

This is Why I’m Not a Java Fan

Monday, September 22nd, 2014

java-logo-thumb.png

Normally, I like working in clojure - I really do. I don't mind that it's really executing as Java code because the authors have gone to great lengths to make clojure a better experience than Java - and it is. But every now and then, a Java-ism pops up, and it's impossible to figure out what's going on. Case in point - one of my Storm workers today.

I started getting some odd Nagios alerts on one of my boxes, and I didn't think much of it because we've added a lot of traffic, and if we're in one of those pulses, it's going to saturate a box - there's just no way around this other than to buy far more hardware than we need - just so we can handle the maximum peak traffic times without saturating a box.

Sure... we could, but it's a lot of money for something that should be very natural and just a natural regulatory process of the cluster's operation.

Then I look at the graphs, and see this for the memory usage for one box:

Memory Usage

Clearly, something happened around 12:00 UTC that started the memory climbing, but I have no idea what it is. There are no exceptions, and the Storm logs are clear, so I have very little visibility into the process. It's difficult to know what caused it - and so how to prevent it from happening.

Even worse, this isn't an often-repeating problem. In fact, this is the first time I've seen this happen in all the time I've been working with Storm. It could have been the communication buffers or the JVM memory in combination with the communication buffers, but at 100GB, it had to be at least half the communication buffers, as the JVMs are configured to max out at 48GB - so the rest had to be the communication buffers.

So I'm not really thrilled at this, but then again, I've seen a lot of network issues in the last 24 hours, and if a box couldn't send to another box, maybe it got backed-up and that caused the problem. Hard to say, but it's not a fun thought.

I'll just have to monitor this and do my best to prevent this problem from hitting production.

Significant Re-Org at The Shop

Monday, September 22nd, 2014

cubeLifeView.gif

Well... today the other shoe dropped and it was announced that we're in the midst of another re-org. This one effects me, and the group I'm in and I'll just say that it's not something that has me thrilled with the prospects of what's about to happen, but at the same time, they clearly don't care about including me in any of the conversations or plans, and so what I think clearly doesn't matter.

So there's no use saying anything else. It is what it is.

Cloning Redis Servers for a Roll-Out

Monday, September 22nd, 2014

Redis Database

This morning I was getting ready for a roll-out of some new code, and I didn't want to have to re-populate the production cache with a lot of data that I already had in my UAT cache - which has been running for more than a week, and populating data as it goes. The script isn't all that hard - is just runs through all the servers that need to be cloned, stops them, copies over the data file and then restarts the redis server.

  #!/bin/bash
 
  for b in {1..8}; do
    echo "working on redis_${b}..."
 
    echo "  stopping redis${b} server..."
    pid=`top -Unobody -c -b | grep redis_${b}.conf | awk '{ print $1 }'`
    sudo kill -9 $pid
 
    echo "  copying data from caliban over..."
    scp opt-analytics-redis4-uat.snc1:/var/groupon/redis_${b}/dump.rdb .
    sudo cp dump.rdb /var/groupon/redis_${b}/
    sudo chown nobody:nobody /var/groupon/redis_${b}/dump.rdb
    rm dump.rdb
 
    echo "  restarting redis_${b} server..."
    sudo /usr/local/etc/init.d/redis_${b} start
  done

Yeah, it's nothing special, but I'm trying to be a little more diligent on the posts, and this was kinda fun to write, and it works perfectly.

The Amazing Speed of Clojure Development

Friday, September 19th, 2014

Finch Experiments

This morning I didn't have anything special to do, so I made two new endpoints for an existing service that I've got for one of my projects. They were really specced out as independent services, but as I looked at them, I knew each were a dozen lines of clojure - tops, and with the compojure library, I could easily add the routes to the existing service, and get everything I needed with a minimal level of effort.

The endpoint was all about adding a historical retrieval to the analytics project that I've been working on. Because I've already been storing these experiment reports in a database, it was easy to write a simple function:

  (defn load-nearest-report
    "Function to load the _nearest_ copy of the Panopticon-formatted
    report from the historical storage for the provided experiment name.
    This is for those times that you want the report for an experiment
    at a specific time."
    [expr-name gen]
    (let [row (dis/query
                ["select id
                    from finch_experiments
                   where experiment=?
                     and generated_at < ?
                   order by generated_at desc
                    limit 1" expr-name (to-timestamp gen)]
                  :result-set-fn first)
          exp-id (:id row)]
      (if exp-id (load-report exp-id))))

Now I've used HoneySQL, and it's nice, but I've found that for me - it's often just plain faster for me to write SQL as that's what I think in. For others, there are a lot of tools, but for me, this is about as easy as it gets - write a SQL statement - add in the arguments, set a result set function, and then load the report (code we already had).

Done. Easy.

The endpoint was just as easy:

  (GET "/experiments/:exp-name/as-of" [exp-name & opts]
       (let [asof (from-long (or (nil-if-zero (parse-long (:timestamp opts)))
                                 (to-long (now))))
             rpt (load-nearest-report exp-name asof)]
         (return-json (if (is-empty? rpt)
                        (-> (select-keys rpt [:timestamp :experiment
                                              :name :updated_at :version
                                              :days_live :origin])
                          (assoc :experiment exp-name :state "stopped")
                          (remove-nil-keys))
                        rpt))))

again, we had a lot of the functions written as part of the underlying support library, or as part of the analytics library - which is the beauty of the clojure style - make it all functions, make them simple and composable, and then it's easy to use them over and over again.

The second service is really a server-side mimicking of the internals of a client library that is at the core of The Shop's A/B testing suite. The idea is to be able to reliably, and quickly, decide if a user should be exposed to the control, or one of the experiment variants. And then make sure that this follows them time after time so that the experience is consistent.

The code for this is a little bigger, but it's because we're parsing the config data and mapping the persistent UUID into a bucket, etc.:

This is probably more convoluted than it needs to be, but the structure of the experiment configuration data isn't really great, but it's not bad. Still... this works and gives us a beautiful endpoint to use.

It's amazing what you can build in a morning with clojure and some good tools.

Crazy Update to Mac OS X 10.9.5

Friday, September 19th, 2014

Software Update

This morning I updated to the latest version of Mac OS X on my primary Retina MacBook Pro. I've done this dozens of times, so I knew the biggest issue would be getting everything back up and going after the restart. But today was different. Wow.

So I started the update and everything was fine, then it goes to the reboot phase, and it starts to come back up, and the gray screen with the white spinner shows up, and it stays there... and it stays there... and it stays there. It was there a good 15 mins, and at that point I thought it was locked up.

So I shut it down, tried again. Same thing.

Oh crud. Yes, I have a complete Time Machine backup, but this is annoying.

So I reset the NVRAM. No difference.

Then I did a safe boot and showed the console. This time, it's doing an fsck - makes sense. I said "Let it go...", and I started working. I have to admit I was really worried - after about 30 mins, but then it was up!

Sweet! It was back. I did a reboot and it was fine. So I will now let me other machine takes it's time - an hour at least, to make sure it's done. Apple makes good things - I just wish I'd had a little more faith in them this morning.

[9/21] UPDATE: the update on my work laptop went fine. Better than expected, and I have no idea why. But I'm sure glad it went smoothly.