Archive for the ‘Clojure Coding’ Category

Added Security Checks on Messages

Thursday, October 30th, 2014

Crazy Lemon the Coder

Today one of the users of my data feed emailed me saying that we were sending bad values for a few fields for a few messages. Turns out, we were getting some data that was clearly either the internal Security Team trying to find weaknesses, or someone was stuffing messages in a vain attempt to get data. Where I'd be expecting a UUID, I was getting "cat /etc/passwd" - no joke.

So I had to take the time to expand the incoming message filters to include these fields so that we filter these out as opposed to thinking they are possible users. It's not a huge deal, but it's just another thing that I didn't expect to have to do today, but really needed to get out because of the security implications.

Updating the Topology and Library Configs

Wednesday, October 29th, 2014

Unified Click

Today I've done quite a bit of reconfiguration in my endpoints library. It's the clojure library I created to make the Storm work I was doing so much easier and fast by isolating all the code of dealing with the external endpoints - redis, databases, RESTful services, etc. so that all the topologies and projects built on this code can start from a given baseline.

Well... today was a lot of re-configuration because the shared Kafka clusters in the different data centers were undergoing some change, and I had to change how I was connecting to them to stay up to date. Normally, this is just the cost of being in an organization, but when I had to change the same service two and three times, well... then it kinda became work.

In the end, I was able to get all the versions up on nexus and deployed to all the versions, as needed, so that we stayed up, but it was a challenging day in communication due to the costs of the changes.

It’s Hard to Stay Quiet Sometimes

Wednesday, October 29th, 2014

Unified Click

I know I'm not typical - not in the least. I can get carried away at times, my passion can get the best of me... but it's because I'm driven to give the best I can in all that I choose to do, and it really bothers me when I can't do that. I mean really bothers me when I can't do that.

This morning I came into work to see that things weren't looking all that good for the latency on my data stream. In fact, it's worse than I've ever seen:

Latency Graphs

Twelve solid hours of off the chart latencies. And when I dug into the Capacity numbers it was clear what was going on... we are getting lots of back-pressure on publishing to the shared Kafka cluster, and that backed up the bolts feeding those, and so on, and so on...

My concern is that we're only retaining data on the source kafka cluster for 72 hours, so if this continues for too long, we're going to be in real trouble because we're soon going to be hitting the retention limit on the kafka cluster, and then we'll start losing data.

Not good.

Yet the solution is simple - give me three more boxes, put them on my existing kafka cluster, and put all the traffic back to mine. That's where it used to be - and until the number of clients got too high - it was perfectly fine. Add a few boxes for the additional client load, and it'll be fine again.

And that's what makes staying quiet very hard... It's politics. They are playing politics with the quality of a service that has my name on it. And that really irks me. I have the ability to deliver a great product, but it's politics that is causing me to be associated with a bad one.

Excellent Code for JSONP Responses

Tuesday, October 28th, 2014

Clojure.jpg

Today I needed to do some real updating to my JSONP code in one of my services. Up to now, it's been OK for all the clients, but it's clear that we're getting to the limits of the simple approach we initially took to dealing with JSONP responses:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (fn [out]
                 (->> out
                      (OutputStreamWriter.)
                      (BufferedWriter.)
                      (json/generate-stream ob))))}))

which didn't really allow for any errors, or issues that might crop up.

Sadly, we're starting to see things crop up, and so with Gary's help - the original author of the original code, who has updated his code to handle a great many more things, we have:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (bound-fn [out]
                 (with-open [osw (OutputStreamWriter. out)
                             bw (BufferedWriter. osw)]
                   (let [error-streaming
                         (fn [e]
                           ;; Since the HTTP headers have already been sent,
                           ;; at this point it is too late to report the error
                           ;; as a 500. The best we can do is abruptly print
                           ;; an error and quit.
                           (.write bw "\n\n---ERROR WHILE STREAMING JSON---\n")
                           (.write bw (str e "\n\n"))
                           (warnf "Streaming exception for JSONP: %s"
                                  (.getMessage e)))]
                     (try
                       (json/generate-stream ob bw)
                       ;; Handle "pipe closed" errors
                       (catch IOException e
                         (if (re-find #"Pipe closed" (.getMessage e))
                           (info "Pipe closed exception: %s" (.getMessage e))
                           (error-streaming e)))
                       (catch Throwable t
                         (error-streaming t)))))))}))

All the tests I've run this looks to do everything I need - it works - Whew!, but with the trapping, we should be able to have a lot better control when things go wrong.

Overhead of Storm Communication

Monday, October 27th, 2014

Storm Logo

Tuning Storm topologies isn't easy. Sure... if you have a topology that just does simple calculations, like my Disruption Detector - then it's easy. Almost deceptively simple. But if you have a topology whose bolts actually have to do something, then it gets a lot more complex. And the more they do, the more complex it becomes.

And one of the insidious things about Storm is the communication buffers. These are using memory not in the heap, so they have the ability to crush a box even if you are careful with the -Xmx4G settings to make sure you don't overload the box. No... these are tricky little beasts, and the conflicts raised by improper use of Kafka is really challenging.

The tradeoff is really one of connectivity versus balance. If the Kafka data is well-balanced across all boxes and all partitions, then you can simply default to using the :local-or-shuffle connectivity directive in your Storm topology, and you should be in good shape. This scheme says: If I have a bolt in the same worker, use it. If not, then look to others and balance accordingly.

And it's that stickiness to the local worker that's the kicker.

If we have an unbalanced kafka cluster, then some of the readers will have more data to process than others, but with this affinity for the local bolts, the excess data will be queued up on the local bolts in the worker, and not get spread out in the cluster. This makes for an uneven distribution of work, and that's very bad.

But if we use the :shuffle directive then every message will be looking to balance out the load by checking all the similar bolts in the topology and messages will be moved from worker to worker, box to box, without regard for really the benefit of that movement. It's not like you can put in a Cost Function for the movement of n bytes - Oh... that would be sweet.

So what happens is that you end up having multi-GB communication buffers on all boxes, and that can crush a system. So you have to be careful - you can't remove all the :shuffle directives - especially if you have an unknown - and potentially unbalanced - kafka source. But you can't have too many of them, either.

Finding the right topology when you're doing real work, is not trivial.

Cleaning Up Parsing Errors

Friday, October 24th, 2014

Unified Click

From time to time, I've been asked to add features to my data stream by groups, and one of these groups at The Shop deals with affiliates directing traffic - and hopefully sales - to the site. If so, they get paid, which is good for everyone. One of the things that's important about this is to accurately track who came from where, and all that. To that end, I've had to implement some rather complex if-then-else logic into the code to match what the existing attribution code is.

Well... it seems I probably made a mistake in being a little too inclusive with some of the data. I was including a URL in the data I'm being sent when I really didn't want to include the domain in that URL - just the parameters.

When this was pointed out, I realized that the instructions I'd received from the group about this feature were relatively vague, and after I really dug into what they were asking, and comparing the code, it was likely the case that I wanted the options, but not the domain in the URL.

Not at all clear, but hey... I can admit that I didn't get it right - easy fix.

Moving Sharded Redis Servers

Friday, October 24th, 2014

Redis Database

This morning I found that I really needed to move some of my shared redis servers. Due to the needed bandwidth, I've got eight redis servers on one 196GB RAM box, but this morning, I saw that the total RAM in use was over 170GB, and it was causing issues when redis tried to fork and save the data.

This is what I love about redis and this sharding - I can simply shut things down, move the redis dump.rdb files, fire up the servers on the new machines, and everything will load back up and be ready to go. A simple change in the sharing logic to point to the new machine for those moved servers, and everything is back up and running. Very nice.

Yes, it's very manual as well, but sometimes, the costs of that manual configuration are really worth it. Today it paid off. Big.

Finding the Joy in Life Again

Wednesday, October 22nd, 2014

Great News

I honestly would have put money on the fact that this would not have happened today. Big money.

I'm sitting on the bus riding to work, and I realize that I'm pretty happy without a pain-causing personal relationship in my life. That was a wow! moment. I've been separated for about 2 years, and the divorce is in the works, but I would have bet real money I'd feel horrible for the rest of my natural life. But today... on the bus... for a few minutes... I didn't.

That was huge for me. Huge.

Then I'm in work, updating a few postings with the results of the tests I'd done overnight, and I'm back into the swing of posting like I used to. It's been a long two years, but I'm back to writing about what I'm doing, and it's really helping. I'm feeling like I'm enjoying myself again.

This, too, was huge for me.

I don't expect this to last all day... but the fact that I have felt this way tells me that I need to keep doing what I'm doing - keep moving forward, and then maybe this will come again. And maybe when it comes again, it'll last longer. Maybe.

Struggling with Storm and Garbage Collection

Tuesday, October 21st, 2014

Finch Experiments

OK, this has to be one of the hardest topologies to balance I've had. Yeah, it's only been a few days, but Holy Cow! this is a nasty one. The problem was that it was seemingly impossible to find a smoking gun for the jumps in the Capacity of the bolts for the topology. Nothing in the logs. Nothing to be found on any of the boxes.

It has been a pain for several days, and I was really starting to get frustrated with this guy. And then I started to think more about the data I had already collected, and where that data - measured over and over again, was really leading me. I have come to the conclusion it's all about Garbage Collection.

The final tip has been the emitted counts from the bolts during these problems:

Bolt Counts

where the corresponding capacity graph looks like:

Bolt Counts

The tip was that there was a drop in the emitted tuples, and then a quick spike up, and then back to the pre-incident levels. This tells me that something caused the flow of tuples to nearly stop, and then the system caught back up again, and the integral over that interval was the same as the average flow.

What lead me to this discovery is that all the spikes in the Capacity graph were 10 min long. Always. That was too regular to be an event, and as I dug into the code for Storm, it was clear it was using a 10 min average for the Capacity calculation, and that explains the duration - it took 10 mins for the event to be wiped from the memory of the calculation, and for things to return to normal.

Given that, I wasn't looking for a long-term situation - I was looking for an event, and with that, I was able to start looking at other data sources for something that would be an impulse event that would have a 10 min duration effect on the capacity.

While I'm not 100% positive - yet - I am pretty sure that this is the culprit, so I've taken steps to spread out the load of the bolts in the topology to give the overall topology more memory, and less work per worker. This should have a two-fold effect on the Garbage Collection, and I'm hoping it'll stay under control.

Only time will tell...

UPDATE: HA! it's not the Garbage Collection - it's the redis box! It appears that the redis servers have hit the limit on the forking and writing to disk, and even the redis start-up log says that there should be the system setting:

  vm.overcommit_memory = 1

to /etc/sysctl.conf and then reboot or run the command:

  sysctl vm.overcommit_memory=1

for this to take effect immediately. I did both on all the redis boxes. I'm thinking this is the problem after all.

Best results I could have hoped for:

Bolt Counts

Everything is looking much better!

Changing the Topology to Get Stability (cont.)

Monday, October 20th, 2014

Storm Logo

I think I've finally cracked the nut of the stability issues with the experiment analysis topology (for as long as it's going to last) - it's the work it's doing. And this isn't a simple topology - so the work it's doing is not at all obvious. But it's all there. In short - I think it's all about Garbage Collection and what we are doing differently in the Thumbtack version than in the Third-Rock version.

For example, we had a change of the data schema in redis so that we had the following in the code to make sure that we didn't read any bad data:

  (defn get-trips
    "The experiment name and variant name visited for a specific browserID are held
    in redis in the following manner:
 
      finch|<browserID> => <expr-name>|<browser>|<t-src>|<variant>|<country> => 0
 
    and this method will return a sequence of the:
 
      <expr-name>|<browser>|<t-src>|<variant>|<country>
 
    tuples for a given browserID. This is just a convenience function to look at all
    the keys in the finch|<browserID> hash, and keep only the ones with five values
    in them. Pretty simple."
    [browserID]
    (if browserID
      (let [all (fhkeys (str *master* "|" browserID))]
        (filter #(= 4 (count (filter #{\|} %))) all))))

where we needed to filter out the bad tuples. This is no longer necessary, so we can save a lot of time - and GC by simply using:

  (defn get-trips
    "The experiment name and variant name visited for a specific browserID are held
    in redis in the following manner:
 
      finch|<browserID> => <expr-name>|<browser>|<t-src>|<variant>|<country> => 0
 
    and this method will return a sequence of the:
 
      <expr-name>|<browser>|<t-src>|<variant>|<country>
 
    tuples for a given browserID. This is just a convenience function to look at all
    the keys in the finch|<browserID> hash. Pretty simple."
    [browserID]
    (if browserID
      (fhkeys (str *master* "|" browserID))))

At the same time, it's clear that in the code I was including far too much data in the analysis. For example, I'm looking at a decay function that weights the most recent experiment experience more than the next most distant, etc. This linear weighting pretty simple and pretty fast to compute. It basically says to weight each event differently - with a simple linear decay.

I wish I had better tools for drawing this, but I don't.

This was just a little too computationally intensive, and it made sense to hard-code these values for different values of n. As I looked at the data, when n was 20, the move important event was about 10%, and the least was about 0.5%. That's small. So I decided to only consider the first 20 events - any more than that and we are really spreading out the effect too much.

Then there was the redis issues...

Redis is the bread-n-butter of these storm topologies. Without it, we would have a lot more work in the topologies to maintain state - and then get it back out. Not fun. But it seems that redis has been having some issues this morning, and I needed to shut down all the topologies, and then restart all the redis servers (all 48 of them).

I'm hoping that the restart settles things down - from the initial observations, it's looking a lot better, and that's a really good sign.