Archive for the ‘Clojure Coding’ Category

It’s Hard to Beat Clojure for Complex Systems

Wednesday, October 8th, 2014

Clojure.jpg

I'm doing a lot of performance work on the Deal Performance Service this morning - trying to handle these 6 million row files and imports into Postgres, and I am constantly struck by the fantastic smile I get on my face when working with clojure in this environment. It's simple, compact, expressive, and actually very readable to me. And I'm a huge fan of extensive comments.

Being able to take a serial process like:

  (doseq [l (take limit (line-sew rdr))
          :let [raw (line-to-ddo l stamp deals taxy)]
          :when raw
          :let [old (get eod (:ddo_key raw))
                ddl (if-not eod raw (merge-daily cfg stamp raw old))]
          :when ddl
          :let [row (gen-csv-row ddl all-fields)]
    (spit csv-name row :append true)))))

and with almost no effort, turn it into a parallel one:

  (let [do-it (fn [l] (if-let [raw (line-to-ddo l stamp deals taxy)]
                        (let [old (get eod (:ddo_key raw))]
                          (if-let [ddo (if-not eod
                                         raw
                                         (merge-daily cfg stamp raw old))]
                            (gen-csv-row ddo all-fields)))))]
    (doseq [row (pmap do-it (take limit (line-sew rdr))]
      (spit csv-name row :append true)))

I simply have to make the body of the doseq into a simple function and then use pmap as the source for the new doseq, and I'm in business. This has made the refactoring of the code so much simpler. It's easy to re-work the code over and over to get the optimal data flow.

And then there's the memoization... Sure, you can cache in any language, and it's not all that hard. But again, the ease with which it's added after the fact to a clojure function is really why it's so powerful. You can start out with nothing cached and see what needs to be cached after you start doing performance tests. This makes the refactoring so much easier than predicting if caching is going to be needed in any case, and then try to make a system of functions or classes ready to move that way, should it be necessary.

I've done it in C++ - and it's not horrible, but it means that you have classes for looking everything up, and then within each class, the implementation is either with - or without - caching. It can all be done, but it complicates everything because now there's a class for loading everything.

I'm sure there are tons of other languages that folks like. Heck, I like Obj-C and C++, but I have to look at what clojure is capable of creating and facilitating, and have to marvel at it's design. Really quite impressive.

Tinkering with the External Loader for DDO

Tuesday, October 7th, 2014

DealPerf Svc

Since I didn't have anything else to do this afternoon, I decided that I could get a lot done with the tests and checks on the external database loader I've been working on. The main box is still cranking through the back-log, and while it does that, I can get to work on copying over the source files, checking the deployment, running the script a few at a time, and then seeing that everything would be copied over - as planned.

Then it's just a matter of letting the back-log clear, and firing this guy up for good.

Adding Exclusions to Storm Topology Processing

Monday, October 6th, 2014

Police.jpg

The Shop now has a group that's continually hitting the different apps and APIs for application security and as a consequence, we're getting a lot of messages that have completely bogus information in them. Like a country field with the value " SLEEP(10) " - as if there's a way to hack the system.

This all makes sense, and I can certainly respect their job, but it does mean that someone needs to filter these out, or we are all going to be dealing with partially corrupted data, and getting wrong answers from it. That someone turned out to be me.

The test wasn't all that hard - we're just looking for a few characters in the field that would be strictly no-good, and then exclude the entire message based on that. The test is really a very simple predicate: a clojure set:

  (def bad-chars "()%\"'")

and then it's used very simply:

  (if (not-any? bad-chars (:field msg))
  )

and we can also use some for the opposite logic, if needed.

There was an additional test - looking at the userAgent to see if it was one of the security group's tests - again, pretty simple, and not too hard to add.

Refactor DDO Loader for Long-Term Usage

Monday, October 6th, 2014

DealPerf Svc

This weekend it was clear that the demand-data files we are loading five times a day were becoming a problem as they were blowing out the 16GB JVM process. When I first wrote the loader, we had files of 1.7 million records, and that fit very nicely in 16GB - with room to spare. Now we're pushing 6 million records a file, and the 16GB isn't able to do the job, and the ensuing garbage collection was eating up more than 10 cores, and bringing the box to it's knees.

Very bad.

So I needed to look at how I was loading the data - and break it up so that the bulk of the work can be done on a separate machine, and the only thing that needs to be done on the database server is the COPY of the CSV file into the table in the database. That's by far the fastest way to load 6 million rows and keep the database online and taking requests all the while.

I realized that it wasn't all that hard - I changed the bulk of the processing to a new box, that was easy, and then I just had to change some crontabs and scripts to have the new locations of the files. Then I simply SCP the file from the processor to the database server, and then use SSH to kick off the loader.

Really not that bad. I still want to walk through a complete cycle, but that shouldn't be too bad.

Adding Hadoop Credentials to Storm Library

Monday, October 6th, 2014

Hadoop

Last Friday, we received the credentials for saving data to the shared Hadoop cluster at The Shop. I wasn't close to my machine, so I couldn't add them easily, so I held off until this morning. It was pretty easy to add using a clojure library I've built here for a previous project. It's not perfect, but it's pretty good, and using the clojure macro partial it was very easy to add the configuration as a 'hidden' field and let the others "Just Work".

Didn't take me very long, but I know a couple of guys needed this on the team, so I wanted to get it done first thing this morning.

Creating a Storm Topology Latency

Tuesday, September 30th, 2014

We have been tracking and reporting on an end-to-end latency for my main data-pipe, and that was something that I felt was the most useful metric to look at. But I was overruled. Management wanted to have a metric that looked at the internal latency of the storm topology and kafka cluster. Sadly, that's not really possible without really hacking away at Storm. But on this, they relented. I was allowed to grab a timestamp at the first possible moment the message is in the topology, and then base the latency on that.

The solution I came up with was to make a new storm bolt that would be directly connected to the spout from the kafka-reader that tagged once per tuple, and got this before the tuple was decoded, so that it included the decoding time:

  ;;
  ;; onboard - a simple Bolt to take the first string in the incoming
  ;;           tuple and assume it's JSON and parse it into a standard
  ;;           clojure data container, tag it with the current time in
  ;;           milliseconds, and emit it. The keys will be converted to
  ;;           symbols for consistency in most of the project. This will
  ;;           ONLY emit the object if it's not-nil. It will ALWAYS ack the
  ;;           tuple. The arguments to this bolt are the keyword of the
  ;;           tag that will be used to augment the message with the
  ;;           timestamp, and a predicate function that will return 'true'
  ;;           if the parsed message is to be processed.
  ;;
  (defbolt onboard ["object"] {:params [tag pfn?]}
    [tuple collector]
    (let [ts (System/currentTimeMillis)]
      (doseq [p (.getValues tuple)
              :let [obj (nil-if-empty (parse-string p))]
              :when (and obj (if pfn? (pfn? obj) true))]
        (emit-bolt! collector [(assoc obj tag ts)] :anchor tuple)))
    (ack! collector tuple))

the idea is that the predicate function can be included so that I can pre-reject those messages that I know are going to cause me grief. What are these messages?, you ask? They are JSON messages that have URLs as keys, and as long as they are strings, that's fine, but I'm decoding the JSON and converting the keys to clojure keywords, and in that case, these URLs are killing the serialization in Storm.

Thankfully, there's a very simple way to make a predicate function to remove these messages, and I don't need them anyway. This way, I skip the parsing of the JSON data, and processing a useless message.

After this, I needed to expand the latency work to have an additional key for this topology latency, and then I could pretty much do everything else as planned.

Tuning Storm Topologies – Again

Monday, September 29th, 2014

Unified Click

I spent the majority of the day tuning and scaling the primary data-pipe storm topology as we have been adding then removing features, and we had to get this to a size that handled the load, but didn't burden the system by using workers that weren't needed. Every little change doesn't necessarily require a re-tune of the topology, but when I'm adding or removing bursts of 100k msgs/sec, it's really important to make sure that the new system responds and is as efficient as reasonable.

The problem is that it takes a considerable amount of time to change something, and watch it develop. This isn't like a success/failure, it's a matter of degree, and it takes between 5 and 30 mins to know if the change has helped or hurt the overall cluster.

So that was my day.

Expanded Latency Detection for Graphing

Thursday, September 25th, 2014

Detective.jpg

We have been detecting and calculating the latency of the main data stream we're creating, but we haven't really made it generally available to graphing via the tools built inside The Shop, and today was the time to make that change.

The extension was really pretty simple, we had everything we needed in-place, we just needed to add some data to redis, and then have a process that would read this data out of redis, clear the data, and the counting could continue. Pretty simple.

We picked the redis structure:

  dd|current-latency|lt10 => long (count)
  dd|current-latency|lt60 => long (count)
  dd|current-latency|lt180 => long (count)
  dd|current-latency|lt600 => long (count)
  dd|current-latency|gt600 => long (count)

where the keys are for less than 10 sec, less than 60 sec, less than 180 sec, less than 600 sec, and more than 600 sec. These are not meant to be particularly fine grained, but they are more than adequate to see if there is a trend, and at less than 10 sec, we're more than fine.

The code to read/reset the values is amazingly simple:

  (defn pull-current
    "Function to extract and reset the 'current-latency' counts from
    redis and return them to the caller. This is the way to
    monitor the latency as it develops so that differences don't
    have to be taken. Simply call this function on a regular interval,
    and graph - it's pretty simple."
    []
    (let [lk (keys @_ls_counts)
          rlk (map #(str *master* "|current-latency|" (name %)) lk)]
      (into {}
        (map vector lk (wcar :detector
                         (mapv #(or (car/getset % 0) 0) rlk))))))

I do love the way that carmine makes working with redis in clojure a breeze.

We then put this into one of the monitord scripts, and we had everything we needed. Nice.

Speeding up Experiment Processors

Thursday, September 25th, 2014

Finch Experiments

This morning has been a big push on speeding up the experiment reporting engines - the first part was speeding up the SQL they used, and then we had to smartly cache the results of certain values that are calculated based on the stability of the time-series data. These calculations are very expensive - even with the speed-ups in the SQL pulling the data from the database, we needed to make these workers far faster at what they are doing.

The first thing was to realize that these stability metrics were really slow-moving values, and we only needed to update them every 6 hours, or so. Any more often than that is a waste of time because they are not moving that much in an hour or so, and that saves us another factor of 100x or more.

Then we needed to look at the monitoring we had on the queues that fed these guys to make sure that we had good data feeding them as we base a lot of our monitoring metrics on these graphs, and an empty graph is just bad news.

In the end, we have something that is exactly what we need - fast in the database, fast on the workers, and very responsive to the needs we are placing on it. We didn't need this kind of performance in the initial deployment, but it's nice to know it didn't take me more than a day or two to get it once we needed it.

This is Why I’m Not a Java Fan

Monday, September 22nd, 2014

java-logo-thumb.png

Normally, I like working in clojure - I really do. I don't mind that it's really executing as Java code because the authors have gone to great lengths to make clojure a better experience than Java - and it is. But every now and then, a Java-ism pops up, and it's impossible to figure out what's going on. Case in point - one of my Storm workers today.

I started getting some odd Nagios alerts on one of my boxes, and I didn't think much of it because we've added a lot of traffic, and if we're in one of those pulses, it's going to saturate a box - there's just no way around this other than to buy far more hardware than we need - just so we can handle the maximum peak traffic times without saturating a box.

Sure... we could, but it's a lot of money for something that should be very natural and just a natural regulatory process of the cluster's operation.

Then I look at the graphs, and see this for the memory usage for one box:

Memory Usage

Clearly, something happened around 12:00 UTC that started the memory climbing, but I have no idea what it is. There are no exceptions, and the Storm logs are clear, so I have very little visibility into the process. It's difficult to know what caused it - and so how to prevent it from happening.

Even worse, this isn't an often-repeating problem. In fact, this is the first time I've seen this happen in all the time I've been working with Storm. It could have been the communication buffers or the JVM memory in combination with the communication buffers, but at 100GB, it had to be at least half the communication buffers, as the JVMs are configured to max out at 48GB - so the rest had to be the communication buffers.

So I'm not really thrilled at this, but then again, I've seen a lot of network issues in the last 24 hours, and if a box couldn't send to another box, maybe it got backed-up and that caused the problem. Hard to say, but it's not a fun thought.

I'll just have to monitor this and do my best to prevent this problem from hitting production.