Adding Optional JSON Encoding to Bolt

Storm Logo

I wanted to see what the effect would be to the topology to remove the communication overhead between the JSON encoder bolt and the Kafka sender bolt. Even with the :local-or-shuffle directive, I was worried that the objects created were causing garbage collection, or some kind of overhead, so I modified the defbolt I was using to send kafka messages, to take an optional parameter to indicate that it should use cheshire's JSON encoding on the message prior to sending it out.

That way, if it's there, then it's encoded, and if it's not, then the overhead of this is very minor:

  (defbolt mk-sender ["message"] {:params [cfg topic & [fmt]] :prepare true}
    [conf context collector]
    (let [dest (ep/config cfg)
          top (get-topic dest topic)
          zk (or (:zkStoreConnect dest) (rand-nth (:zookeepers dest)))
          p (kp/producer {(kafka-tag zk) zk})]
      (bolt
        (execute [tuple]
          (doseq [raw (.getValues tuple)
                  :when raw
                  :let [msg (case fmt
                              :json (nil-if-empty (json/generate-string raw))
                              raw)]
                  :when msg]
            (kp/send-messages p top [(kp/message (.getBytes msg))])
            (emit-bolt! collector [msg] :anchor tuple))
          (ack! collector tuple)))))

It works like a charm, and while the overhead wasn't even measurable, I left it in just in case there comes a time I want to simplify the topology by having a kayak bolt that also encodes to JSON.