Archive for October, 2014

Adding Optional JSON Encoding to Bolt

Friday, October 31st, 2014

Storm Logo

I wanted to see what the effect would be to the topology to remove the communication overhead between the JSON encoder bolt and the Kafka sender bolt. Even with the :local-or-shuffle directive, I was worried that the objects created were causing garbage collection, or some kind of overhead, so I modified the defbolt I was using to send kafka messages, to take an optional parameter to indicate that it should use cheshire's JSON encoding on the message prior to sending it out.

That way, if it's there, then it's encoded, and if it's not, then the overhead of this is very minor:

  (defbolt mk-sender ["message"] {:params [cfg topic & [fmt]] :prepare true}
    [conf context collector]
    (let [dest (ep/config cfg)
          top (get-topic dest topic)
          zk (or (:zkStoreConnect dest) (rand-nth (:zookeepers dest)))
          p (kp/producer {(kafka-tag zk) zk})]
      (bolt
        (execute [tuple]
          (doseq [raw (.getValues tuple)
                  :when raw
                  :let [msg (case fmt
                              :json (nil-if-empty (json/generate-string raw))
                              raw)]
                  :when msg]
            (kp/send-messages p top [(kp/message (.getBytes msg))])
            (emit-bolt! collector [msg] :anchor tuple))
          (ack! collector tuple)))))

It works like a charm, and while the overhead wasn't even measurable, I left it in just in case there comes a time I want to simplify the topology by having a kayak bolt that also encodes to JSON.

Added Security Checks on Messages

Thursday, October 30th, 2014

Crazy Lemon the Coder

Today one of the users of my data feed emailed me saying that we were sending bad values for a few fields for a few messages. Turns out, we were getting some data that was clearly either the internal Security Team trying to find weaknesses, or someone was stuffing messages in a vain attempt to get data. Where I'd be expecting a UUID, I was getting "cat /etc/passwd" - no joke.

So I had to take the time to expand the incoming message filters to include these fields so that we filter these out as opposed to thinking they are possible users. It's not a huge deal, but it's just another thing that I didn't expect to have to do today, but really needed to get out because of the security implications.

Updating the Topology and Library Configs

Wednesday, October 29th, 2014

Unified Click

Today I've done quite a bit of reconfiguration in my endpoints library. It's the clojure library I created to make the Storm work I was doing so much easier and fast by isolating all the code of dealing with the external endpoints - redis, databases, RESTful services, etc. so that all the topologies and projects built on this code can start from a given baseline.

Well... today was a lot of re-configuration because the shared Kafka clusters in the different data centers were undergoing some change, and I had to change how I was connecting to them to stay up to date. Normally, this is just the cost of being in an organization, but when I had to change the same service two and three times, well... then it kinda became work.

In the end, I was able to get all the versions up on nexus and deployed to all the versions, as needed, so that we stayed up, but it was a challenging day in communication due to the costs of the changes.

It’s Hard to Stay Quiet Sometimes

Wednesday, October 29th, 2014

Unified Click

I know I'm not typical - not in the least. I can get carried away at times, my passion can get the best of me... but it's because I'm driven to give the best I can in all that I choose to do, and it really bothers me when I can't do that. I mean really bothers me when I can't do that.

This morning I came into work to see that things weren't looking all that good for the latency on my data stream. In fact, it's worse than I've ever seen:

Latency Graphs

Twelve solid hours of off the chart latencies. And when I dug into the Capacity numbers it was clear what was going on... we are getting lots of back-pressure on publishing to the shared Kafka cluster, and that backed up the bolts feeding those, and so on, and so on...

My concern is that we're only retaining data on the source kafka cluster for 72 hours, so if this continues for too long, we're going to be in real trouble because we're soon going to be hitting the retention limit on the kafka cluster, and then we'll start losing data.

Not good.

Yet the solution is simple - give me three more boxes, put them on my existing kafka cluster, and put all the traffic back to mine. That's where it used to be - and until the number of clients got too high - it was perfectly fine. Add a few boxes for the additional client load, and it'll be fine again.

And that's what makes staying quiet very hard... It's politics. They are playing politics with the quality of a service that has my name on it. And that really irks me. I have the ability to deliver a great product, but it's politics that is causing me to be associated with a bad one.

Excellent Code for JSONP Responses

Tuesday, October 28th, 2014

Clojure.jpg

Today I needed to do some real updating to my JSONP code in one of my services. Up to now, it's been OK for all the clients, but it's clear that we're getting to the limits of the simple approach we initially took to dealing with JSONP responses:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (fn [out]
                 (->> out
                      (OutputStreamWriter.)
                      (BufferedWriter.)
                      (json/generate-stream ob))))}))

which didn't really allow for any errors, or issues that might crop up.

Sadly, we're starting to see things crop up, and so with Gary's help - the original author of the original code, who has updated his code to handle a great many more things, we have:

  (defn return-json
    "Creates a ring response for returning the given object as JSON."
    ([ob] (return-json ob (now) 200))
    ([ob lastm] (return-json ob lastm 200))
    ([ob lastm code]
      {:status code
       :headers {"Content-Type" "application/json; charset=UTF-8"
                 "Last-Modified" (str (or lastm (now)))}
       :body (piped-input-stream
               (bound-fn [out]
                 (with-open [osw (OutputStreamWriter. out)
                             bw (BufferedWriter. osw)]
                   (let [error-streaming
                         (fn [e]
                           ;; Since the HTTP headers have already been sent,
                           ;; at this point it is too late to report the error
                           ;; as a 500. The best we can do is abruptly print
                           ;; an error and quit.
                           (.write bw "\n\n---ERROR WHILE STREAMING JSON---\n")
                           (.write bw (str e "\n\n"))
                           (warnf "Streaming exception for JSONP: %s"
                                  (.getMessage e)))]
                     (try
                       (json/generate-stream ob bw)
                       ;; Handle "pipe closed" errors
                       (catch IOException e
                         (if (re-find #"Pipe closed" (.getMessage e))
                           (info "Pipe closed exception: %s" (.getMessage e))
                           (error-streaming e)))
                       (catch Throwable t
                         (error-streaming t)))))))}))

All the tests I've run this looks to do everything I need - it works - Whew!, but with the trapping, we should be able to have a lot better control when things go wrong.

Flakey Docs… Flakey People… Dangerous Waters

Tuesday, October 28th, 2014

Danger

I've been working with an old friend on a project he brought to me about two years ago. There have been several cycles of work here - some really worked well, others not so much. But the current crop of code is about creating an iPad app for this third-party RESTful API. It's for UK midwives - and it's tracking the health and vital stats of the mothers and the babies - and it's a good thing to do as it helps out the people getting health care to the villages in the UK. Good goal.

The problem is that the API that I'm supposed to be writing to is really not well documented at all, and then it's got a really odd way of doing OAuth, and then making subsequent calls is not easy, and in all, it looks like it's been built by someone that really has no idea whatsoever of how to make a good, solid, reliable API. But maybe it's just the docs - who knows?

Well, today I've found out that it's not just the docs - they are bad, but it's the API as well because the docs are incomplete, and when I get a return value it's not what the docs say, so it's not even tracking the old, horrible docs. Additionally, my friend said he worked out how to make the calls by digging into the FORM values returned by the authentication call (yeah, makes no sense to me either), and when he tried it again a week later, the parameters have changed so that it no longer worked.

So in order to make this work we're scraping a web FORM, and it's so dynamic that it's going to change within a week? Yikes! That's all kinds of warning flags to me. Now I'm not trying to be a snob, and there are certainly plenty of issues with professionals doing this, but I can't remember such a really bad interface for a service. It's almost like it doesn't exist - as if the web site is all they have, and there's no plans at all for a decent RESTful API.

That makes a lot more sense than the things I'm seeing.

My friend is asking me if we want to fix this for this guy - and I can't honestly remember a time when that really worked. I'm not saying that it can't - I'm saying that in my life, I can't remember ever seeing it actually work. The singular boss would have to give up a lot of control on the tech side of things. He'd also have to give up a lot of the money he's making on this in order to compensate people capable of really making this a professional API.

On the up-side, he could then sell access to the API to people wanting to build apps on it - like he's attempting to do to us. But in it's current state, there's no way in the world I'd be willing to pay a dime for this - it isn't professional, and it exudes no trust.

I'm not sure what will happen, but I know sirens when I hear them...

Improper Use of Kafka

Monday, October 27th, 2014

Kafka Broker

Today has been one of those days that I knew was coming, didn't know when it'd arrive, but knew without a shadow of a doubt that it was going to be a defining moment for me at The Shop. Today, I kinda butted heads with the manager of the centralized Kafka cluster about how to publish to his cluster - in fact how to properly publish to any kafka cluster, and what the issues are with Kafka 0.7 and adding new boxes.

First, the set-up. I've been asked to publish to the shared kafka cluster by my new manager because he sees nothing up upside to using shared services. He believes that if the shared services are not up to the task, that he can then apply pressure to get them up to the task. I am not so optimistic. I will gladly ask for help from anyone - as long as the product will benefit from it. But if the product suffers, I don't care who it is - they have to shape up or ship out.

So - against my wishes - I started publishing to the shared kafka cluster. We started having a lot of problems, but everyone was happy - save me. They added machines to the cluster, and because the topics I publish to already existed in the cluster, the known bug in Kafka 0.7 didn't allow the automatic rebalancing of the topic to the new boxes. You have to publish a message - no matter how small - to the specific boxes under the same topic name, and then they will start picking up traffic - automatically.

I know this because I ran into this problem, had to figure it out, and finally did after creating a little function to send an empty JSON message to a specific partition on a specific box in the cluster. But it worked like a champ, so I knew how this worked for Kafka 0.7.

Today, I had a disagreement with the manager of the shared cluster because he wanted people to write to specific machines, and then use the load balancer to assign different machines to different publishing clients. Sadly, this is not how kafka is meant to be used. It's meant to be used with a single automatic configuration based on the cluster configuration in zookeeper, and in this way, distributing the load to all the boxes in the cluster in equal share.

The manager's idea allows the load balancer to direct the traffic - but allows things to be very unbalanced, and therefore complicating all the topologies based on these topics. It's just bad design to use Kafka in this way. But it does get around the problem of adding boxes to the cluster and activating the topics on the new boxes.

But that's trivial with the 4 line clojure function I wrote:

  (defn direct-injection
    "Function to send a single message to a specified topic on the specified
    kafka broker - bypassing all the zookeeper stuff to make sure that this
    one message gets to this one broker. This is essential for bootstrapping
    a new broker box to an existing topic."
    [broker topic msg]
    (let [p (kp/producer {"broker.list" "2:kafka-broker3:9092"})]
      (kp/send-messages p topic [(kp/message (.getBytes msg))])))

and it only needs to be run once for each existing topic on each new box. It's trivial.

Finally, I got them to see this, and do this, and my publishing automatically picked this up and started pushing messages to all boxes evenly. As you should with Kafka.

The moral of the story of today is that you can use shared tools - and it can appear to save you time and money - but appearances are deceptive, and you can shoot yourself int he foot so fast that you will find that careful consideration on all deployment issues is really the time - and money - saver.

Nice Singleton in Obj-C

Monday, October 27th, 2014

xcode.jpg

As part of this project I'm working on with a friend, I ran into the fact that in iOS development, Apple really doesn't allow the same level of control with custom objects that you can have with OS X development. For instance, it's very common in OS X development to have a controller class written in code and then instantiated in the xib, and wired up. It saves a lot of needless code of wiring up things, that's brittle and not really pushing the value of the app. My friends and I always called this Nib-ware, after the original NeXTSTEP Interface Builder files.

But what seems to be far more common in iOS is the idea of singletons. The key to making a good singleton is, of course, making sure that if you want only one, you get only one. And that usually means locking, or Compare and Swap - which is illegal with Automatic Reference Counting (ARC) code as it can't know if you got the swap or not. So when I found a simple way to make a singleton in Obj-C, I loved it's simplicity and beauty.

The header is simple enough:

  /*!
   This class method is the singleton of the PKBClient for the entire app.
   The first call to this method will create the singleton and initialize
   it for all uses. All state for the connection to the back-end will be
   stored in this one instance.
   */
  + (id) sharedClient;

and the implementation is just lovely:

  + (id) sharedClient {
      static PKBClient*	shared = nil;
      static dispatch_once_t onceToken;
      dispatch_once(&onceToken, ^{
          shared = [[self alloc] init];
      });
      return shared;
  }

So simple... a static value and then a simple block to initialize it, but the magic is in the dispatch_once - that's really exceptional. Sure, clojure has this, and a lot of other languages do, but this is the first time I've seen it in a C/C++/Obj-C language, and it makes so many things so much easier.

It's nice to work with good tools.

Overhead of Storm Communication

Monday, October 27th, 2014

Storm Logo

Tuning Storm topologies isn't easy. Sure... if you have a topology that just does simple calculations, like my Disruption Detector - then it's easy. Almost deceptively simple. But if you have a topology whose bolts actually have to do something, then it gets a lot more complex. And the more they do, the more complex it becomes.

And one of the insidious things about Storm is the communication buffers. These are using memory not in the heap, so they have the ability to crush a box even if you are careful with the -Xmx4G settings to make sure you don't overload the box. No... these are tricky little beasts, and the conflicts raised by improper use of Kafka is really challenging.

The tradeoff is really one of connectivity versus balance. If the Kafka data is well-balanced across all boxes and all partitions, then you can simply default to using the :local-or-shuffle connectivity directive in your Storm topology, and you should be in good shape. This scheme says: If I have a bolt in the same worker, use it. If not, then look to others and balance accordingly.

And it's that stickiness to the local worker that's the kicker.

If we have an unbalanced kafka cluster, then some of the readers will have more data to process than others, but with this affinity for the local bolts, the excess data will be queued up on the local bolts in the worker, and not get spread out in the cluster. This makes for an uneven distribution of work, and that's very bad.

But if we use the :shuffle directive then every message will be looking to balance out the load by checking all the similar bolts in the topology and messages will be moved from worker to worker, box to box, without regard for really the benefit of that movement. It's not like you can put in a Cost Function for the movement of n bytes - Oh... that would be sweet.

So what happens is that you end up having multi-GB communication buffers on all boxes, and that can crush a system. So you have to be careful - you can't remove all the :shuffle directives - especially if you have an unknown - and potentially unbalanced - kafka source. But you can't have too many of them, either.

Finding the right topology when you're doing real work, is not trivial.

Cleaning Up Parsing Errors

Friday, October 24th, 2014

Unified Click

From time to time, I've been asked to add features to my data stream by groups, and one of these groups at The Shop deals with affiliates directing traffic - and hopefully sales - to the site. If so, they get paid, which is good for everyone. One of the things that's important about this is to accurately track who came from where, and all that. To that end, I've had to implement some rather complex if-then-else logic into the code to match what the existing attribution code is.

Well... it seems I probably made a mistake in being a little too inclusive with some of the data. I was including a URL in the data I'm being sent when I really didn't want to include the domain in that URL - just the parameters.

When this was pointed out, I realized that the instructions I'd received from the group about this feature were relatively vague, and after I really dug into what they were asking, and comparing the code, it was likely the case that I wanted the options, but not the domain in the URL.

Not at all clear, but hey... I can admit that I didn't get it right - easy fix.