Archive for the ‘Clojure Coding’ Category

Struggling with Storm and Garbage Collection

Tuesday, October 21st, 2014

Finch Experiments

OK, this has to be one of the hardest topologies to balance I've had. Yeah, it's only been a few days, but Holy Cow! this is a nasty one. The problem was that it was seemingly impossible to find a smoking gun for the jumps in the Capacity of the bolts for the topology. Nothing in the logs. Nothing to be found on any of the boxes.

It has been a pain for several days, and I was really starting to get frustrated with this guy. And then I started to think more about the data I had already collected, and where that data - measured over and over again, was really leading me. I have come to the conclusion it's all about Garbage Collection.

The final tip has been the emitted counts from the bolts during these problems:

Bolt Counts

where the corresponding capacity graph looks like:

Bolt Counts

The tip was that there was a drop in the emitted tuples, and then a quick spike up, and then back to the pre-incident levels. This tells me that something caused the flow of tuples to nearly stop, and then the system caught back up again, and the integral over that interval was the same as the average flow.

What lead me to this discovery is that all the spikes in the Capacity graph were 10 min long. Always. That was too regular to be an event, and as I dug into the code for Storm, it was clear it was using a 10 min average for the Capacity calculation, and that explains the duration - it took 10 mins for the event to be wiped from the memory of the calculation, and for things to return to normal.

Given that, I wasn't looking for a long-term situation - I was looking for an event, and with that, I was able to start looking at other data sources for something that would be an impulse event that would have a 10 min duration effect on the capacity.

While I'm not 100% positive - yet - I am pretty sure that this is the culprit, so I've taken steps to spread out the load of the bolts in the topology to give the overall topology more memory, and less work per worker. This should have a two-fold effect on the Garbage Collection, and I'm hoping it'll stay under control.

Only time will tell...

UPDATE: HA! it's not the Garbage Collection - it's the redis box! It appears that the redis servers have hit the limit on the forking and writing to disk, and even the redis start-up log says that there should be the system setting:

  vm.overcommit_memory = 1

to /etc/sysctl.conf and then reboot or run the command:

  sysctl vm.overcommit_memory=1

for this to take effect immediately. I did both on all the redis boxes. I'm thinking this is the problem after all.

Best results I could have hoped for:

Bolt Counts

Everything is looking much better!

Changing the Topology to Get Stability (cont.)

Monday, October 20th, 2014

Storm Logo

I think I've finally cracked the nut of the stability issues with the experiment analysis topology (for as long as it's going to last) - it's the work it's doing. And this isn't a simple topology - so the work it's doing is not at all obvious. But it's all there. In short - I think it's all about Garbage Collection and what we are doing differently in the Thumbtack version than in the Third-Rock version.

For example, we had a change of the data schema in redis so that we had the following in the code to make sure that we didn't read any bad data:

  (defn get-trips
    "The experiment name and variant name visited for a specific browserID are held
    in redis in the following manner:
 
      finch|<browserID> => <expr-name>|<browser>|<t-src>|<variant>|<country> => 0
 
    and this method will return a sequence of the:
 
      <expr-name>|<browser>|<t-src>|<variant>|<country>
 
    tuples for a given browserID. This is just a convenience function to look at all
    the keys in the finch|<browserID> hash, and keep only the ones with five values
    in them. Pretty simple."
    [browserID]
    (if browserID
      (let [all (fhkeys (str *master* "|" browserID))]
        (filter #(= 4 (count (filter #{\|} %))) all))))

where we needed to filter out the bad tuples. This is no longer necessary, so we can save a lot of time - and GC by simply using:

  (defn get-trips
    "The experiment name and variant name visited for a specific browserID are held
    in redis in the following manner:
 
      finch|<browserID> => <expr-name>|<browser>|<t-src>|<variant>|<country> => 0
 
    and this method will return a sequence of the:
 
      <expr-name>|<browser>|<t-src>|<variant>|<country>
 
    tuples for a given browserID. This is just a convenience function to look at all
    the keys in the finch|<browserID> hash. Pretty simple."
    [browserID]
    (if browserID
      (fhkeys (str *master* "|" browserID))))

At the same time, it's clear that in the code I was including far too much data in the analysis. For example, I'm looking at a decay function that weights the most recent experiment experience more than the next most distant, etc. This linear weighting pretty simple and pretty fast to compute. It basically says to weight each event differently - with a simple linear decay.

I wish I had better tools for drawing this, but I don't.

This was just a little too computationally intensive, and it made sense to hard-code these values for different values of n. As I looked at the data, when n was 20, the move important event was about 10%, and the least was about 0.5%. That's small. So I decided to only consider the first 20 events - any more than that and we are really spreading out the effect too much.

Then there was the redis issues...

Redis is the bread-n-butter of these storm topologies. Without it, we would have a lot more work in the topologies to maintain state - and then get it back out. Not fun. But it seems that redis has been having some issues this morning, and I needed to shut down all the topologies, and then restart all the redis servers (all 48 of them).

I'm hoping that the restart settles things down - from the initial observations, it's looking a lot better, and that's a really good sign.

Changing the Topology to Get Stability (cont.)

Friday, October 17th, 2014

Storm Logo

I have been working one the re-configuration of the experimental analysis topology to try and get stability, and it's been a lot more difficult than I had expected. What I'm getting looks a lot like this:

Instability

and that's no good.

Sadly, the problem is not at all easy to solve. What I'm seeing when I dig into the Capacity numbers for the bolts are that there's only one of the 300+ bolts that has a capacity number that exceeds 1.000 - and then by a lot. But all the others are less than 0.200 - well under control. So why?

I've looked at logs, I've looked at the logs - nothing... I've looked at the tuples moving through the bolts, and interestingly, found that many of them just aren't moving any tuples. Why? no clue, but it's easy enough to scale it back to me a more efficient topology.

What I've come away with is the idea that might be the different way we're dealing with the data. So this weekend, if I have time, I'll dig in and see what I can do to make this more even between the two. It's not obvious, but then - that's 0.9.0.1 software.

Changing the Topology to Get Stability

Thursday, October 16th, 2014

Storm Logo

OK... it turns out that I needed to change my topology in order to get stability, and it's only taken me a day (or so) to find this out. It's been a really draining day, but I've learned that there is a lot more about Storm that I don't know - than that which I do.

Switching from having one bolt that calls two functions to two bolts each calling one function - should certainly have a lot more communication overhead. Interestingly, it's how I regained stability. The lesson I've learned - Make bolts as small and purposeful as possible.

Amazing...

Re-Tuning Experiment Topology

Wednesday, October 15th, 2014

Finch Experiments

Anytime you add a significant workload to a storm cluster, you really need to re-balance it. This means looking at the work each bolt does, making sure there is the proper balance between the bolts at each phase of the processing, and then that there are enough workers to handle the cumulative throughput. It's not a trivial job, but it's a lot of experimentation and then looking for patterns and zeroing in on the solution.

That's what I've been doing for several hours, and I'm no where near done. It's getting closer, and I think I have the problem isolated, and it's very odd. Basically, there are a few bolt instances - say 4 out of 160 - that are above 1.0 - the rest are at least a factor of ten less. This is my problem. Something is causing these few bolts to take too long, and then that skews the metric for all the instances of that bolt.

Refactoring Analytics for Multi-Mode Design (cont,)

Wednesday, October 15th, 2014

Finch Experiments

This morning I finished up the deployment of the code to UAT and then set about updating the docs in the GitHub/E repo for all the changes. This wasn't all that hard as most of it was already there, but I needed to make sure that I had the docs match the code in the server.clj namespace, and then the big job of adding the docs for the different attribution schemes.

I've got two schemes - the original scheme that wasn't all that good, and the new one looks at the order of the experiment experiences per session and then attributes the weight of the deal on those in a time-decaying fashion. It's not great, but it's a massive improvement over the old scheme.

I wrote all this up, with examples, and it's checked in for the front-end guys to use.

Refactoring Analytics for Multi-Mode Design

Tuesday, October 14th, 2014

Finch Experiments

Today has been a lot of coding on a change that I was asked to do - and in truth, it's a nice feature to have in the experiment analytics. It's basically the request that all attribution schemes be active in the system at once, and then there is just a different URL for the different versions of the code and data.

This is not unlike what I've done in the past, and it's a good way to allow things to be isolated and roll-out new features without having to mess with a long and involved testing process. But the problem here is that we have a finite redis space, and there's only so much I can do at 50k msgs/sec, and while I would love to have all the versions running side by side, I've already had problems getting the first one working and fitting in redis.

I know this doesn't compute to the current Management, and it's sad that the guy doesn't really understand what's going on and just admit it. Sadly, that seems to be hard for a lot of managers - it makes them seem more human, but at the same time, forces them to expose their weaknesses.

Anyway... I've been re-writing this code all day, and I think I have it all code complete. In the morning I'll try it out in UAT and see how it goes.

Updating Message Forwarders

Tuesday, October 14th, 2014

Unified Click

Over the last few days I've been refactoring the code to publish not only to my attached kafka cluster, but to the shared kafka cluster in the data centers. This hasn't been the most reliable in the past, but I was getting a lot of pressure from Management to do this when the load on my cluster got too much to handle, and while I pushed back for Quality of Service (QoS) reasons, when the clients agreed that I would no longer be responsible for the latency once I handed it to the shared cluster, I relented.

At the same time, I've been responsible for the forwarding of messages from one data center to the other because the group that was generating the data would not make the move on it's end, and a sensitive project needed this data. So once again - because I can, I have to.

The intersection of these events is what I was working on this morning - forwarding from one kafka cluster to another for the purpose of having the messages be seen in both clusters in all data centers. It's not hard work, and I have monitors and such set up to make sure it's working, but the idea that a product team is doing the message shuffling is just... well... crazy.

Balancing Kafka Readers to Shared Kafka Cluster

Friday, October 10th, 2014

Kafka Broker

Today has been a bad day. We had just a little too much traffic, and one too many clients on our kafka cluster in production, and the consequence was that the kafka cluster started to generate back-pressure on the Storm topology bolts writing to the cluster, and that, in turn, caused back-pressure up the topology, and that caused way too much latency.

So I had to do something.

I could have asked for three more boxes, and I did, but I knew they weren't going to be available in time. Management said they wanted me to use the shared Kafka cluster operated by the Operations group, but their attitude about the data passing through them is that of a Common Carrier - "I don't care what it is", so they take no responsibility for it.

Still it was the only option, and when I got the release from the clients that I could not be held responsible for the data one I'd handed it off to the shared kafka cluster, I added the code to be able to do just that - publish to two kafka clusters - mine and theirs.

The load I put on theirs threw them on their heels at first, and I hope they get it all cleared up soon, but I have a feeling that my weekend is going to be filled with monitoring the latency through the system.

Looking at Cassandra for Fast SQL Storage

Thursday, October 9th, 2014

Cassandra

I've got a lot of streaming data in Storm, and I'm doing quite a bit of analytical processing on that data, but I can't store much of it because the cost of storage is so high - latency. So I end up using a lot of redis boxes and having boxes read out of that into more conventional storage - like Postgres. But I'm starting to hear good things about Cassandra and Storm working well together.

My concern is real speed. I have an average message rate of 40k to 50k msgs/sec, with peaks as high as four times that. I need to be able to handle those peaks - which are by no means the once a month peak levels of several times that, but it's something I see on a regular basis, and we need to be able to take all this data from the topology and not slow it down.

If I can really do this on, say, eight machines, then I'll be able to have the kind of deep dive we've needed for looking into the analytics we're calculating. This would be a very big win.

I've reached out to a few groups that are doing something very similar to this, and their preliminary results say we can do it. That's really good news. We'll have to see what happens with the real hardware and software.