Adding Queueing to CouchDB Interface
Friday, November 9th, 2012I've been fighting a problem with our CouchDB installation using the CouchRest interface for ruby/jruby. What I'm seeing is that when I have a lot of updates to CouchDB from the REST API in CouchRest, we start to get socket connection errors to the CouchDB server. I've gone through a lot of different configurations of open file handles on both boxes, and nothing seems to really 'fix' the problem.
So what I wanted to do was to make a queueing database class. Currently, we have something that abstracts away the CouchDB-centric features, and adds in necessary metadata so that we can more easily track all the data held in CouchDB. This is a nice start, in that I only really needed to add to the bulk of the code was a simple flush method:
Database.flush
where I initially started with it being implemented as:
def flush nil end
At this point, I was free to do just about anything with the Database class - as long as I didn't change the public API. What I came up with is this:
# encoding: utf-8 | |
require 'quantum_lead/application' | |
require 'singleton' | |
require 'logger' | |
require 'version' | |
require 'couchrest' | |
require 'java' | |
java_import 'java.util.concurrent.ConcurrentLinkedQueue' | |
class Database | |
include Singleton | |
# On large bulk store operations, we still need to keep some kind of | |
# limit on how many documents we'll send up to CouchDB at once. This | |
# will be that limit. | |
MAX_DOCS_PER_SEND = 2000 | |
# This will be sent to the unspoolers to tell them to stop what they | |
# are doing and shut down. | |
QUEUE_SHUTDOWN = '__queue_shutdown__' | |
def self.store(label, message = '') | |
return nil unless use_db? | |
doc = block_given? ? yield : {} | |
# make the tagging metadata for this save operation | |
metadata = { | |
:created => Time.now, | |
:label => label, | |
:message => message, | |
:environment => QuantumLead::Application.environment, | |
:division => QuantumLead::Application.division, | |
:execution_tag => QuantumLead::Application.execution_tag, | |
:jar_name => QuantumLead::Application.jar_name, | |
:pid => Process.pid, | |
:version => { :version => Version.version, | |
:branch => Version.branch } | |
} | |
if doc.is_a?(Array) | |
spool(doc.map { |elem| elem.to_hash.merge(:meta => metadata) }) | |
else | |
# make it a hash - a document, if it's not already | |
doc = { :data => doc } unless doc.is_a?(Hash) | |
spool(doc.to_hash.merge(:meta => metadata)) | |
end | |
end | |
def self.view(path, opts = {}) | |
return nil unless use_db? | |
retryable { database.view(path, opts) } | |
end | |
def self.get(id, opts = {}) | |
return nil unless use_db? | |
retryable { database.get(id, opts) } | |
end | |
def self.save_doc(doc, opts = {}) | |
return nil unless use_db? | |
meta = doc["meta"] || {} | |
payload = doc.merge("meta" => meta.merge("updated" => Time.now)) | |
retryable { database.save_doc(payload, opts) } | |
end | |
def self.use_db? | |
QuantumLead::Application.config.application.data_to == 'database' | |
end | |
def self.start(thread_count = 2) | |
thread_count.times do | |
workers << Thread.new { self.unspool() } | |
end | |
end | |
def self.flush | |
unless workers.empty? | |
# tell all the workers it's time to stop, and then wait until they are done | |
workers.each { |t| spool(QUEUE_SHUTDOWN) } | |
QuantumLead::Application.logger.info("Database.flush[#{Thread.current.object_id}]") { "waiting for queue to empty..." } | |
workers.each { |t| t.join unless t.nil? } | |
end | |
end | |
private | |
def self.queue | |
@queue ||= ConcurrentLinkedQueue.new | |
end | |
def self.workers | |
@workers ||= [] | |
end | |
def self.spool(doc) | |
return nil unless use_db? && !doc.nil? | |
workers.empty? ? shoot(doc) : queue.offer(doc) | |
end | |
def self.unspool | |
keep_going = true | |
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "starting unspooler" } | |
begin | |
doc = queue.poll | |
if doc.nil? | |
sleep(0.5) | |
elsif doc.is_a?(String) | |
# this is a control message to this processing code | |
keep_going = false if doc == QUEUE_SHUTDOWN | |
else | |
shoot(doc) | |
end | |
end while keep_going | |
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "shutting down unspooler" } | |
end | |
def self.shoot(doc) | |
return nil if doc.nil? | |
if doc.is_a?(Array) | |
# if it's an array - handle it as a bulk_save | |
doc.each_slice(MAX_DOCS_PER_SEND) do |bunch| | |
retryable { database.bulk_save(bunch) } | |
end | |
else | |
retryable { database.save_doc(doc) } | |
end | |
end | |
def self.database(config = QuantumLead::Application.config) | |
CouchRest.proxy(config.database.proxy_uri) if config.database.use_proxy? | |
Thread.current[:couch_db] ||= CouchRest.database(config.database.uri) | |
end | |
# We need to have a general retry processor for all the database calls, | |
# and this is it. We have a block, and this will retry it some number of | |
# times - logging problems, and giving up if it can't eventually get it | |
# right. This makes it a lot easier to deal with intermittant issues with | |
# the database. | |
def self.retryable(retries = 3) | |
return (block_given? ? yield : nil) | |
rescue Errno::EADDRNOTAVAIL => se | |
if retries > 0 | |
QuantumLead::Application.logger.warn('Database.retryable') { se } | |
QuantumLead::Application.logger.warn('Database.retryable') { "problem sending data to CouchDB... retrying..." } | |
sleep(0.5) | |
retries -= 1 | |
retry | |
else | |
QuantumLead::Application.logger.error('Database.retryable') { se } | |
QuantumLead::Application.logger.error('Database.retryable') { "problem sending data to CouchDB... giving up..." } | |
raise | |
end | |
rescue Exception, RestClient::RequestTimeout => e | |
if retries > 0 | |
QuantumLead::Application.logger.warn('Database.retryable') { e } | |
QuantumLead::Application.logger.warn('Database.retryable') { "problem dealing with CouchDB... retrying..." } | |
retries -= 1 | |
retry | |
else | |
QuantumLead::Application.logger.error('Database.retryable') { e } | |
QuantumLead::Application.logger.error('Database.retryable') { "problem dealing with CouchDB... giving up..." } | |
raise | |
end | |
end | |
end |
What I really like about this is that I can use it either way - as a cacheing/flushing endpoint, or as a straight pass-through with thread-local database connections. This means that I can start with a simple config file setting:
data_to: 'database' writers: 3
which will give me three writer threads, as specified in the start method argument. Then, when I fix the CouchDB issues, I can switch to the simpler, thread-local connection storage with the code:
Database.start(config.writers) if config.writers > 0
The call to flush is a no-op if we didn't start anything, so there's no harm in always calling it at the end of the process. Pretty nice. I've easily verified that this gets me what I need, and it's just a matter of how 'throttling' I want to be with the writing of the data to CouchDB. But I tell you this… I'm closer to MySQL than ever before - just because of this. There's no reason in the world to put up with this kind of software if it can't do the job.