Rigel Group Software Engineering

28Mar/10Off

Tee with Sinatra

OK, so I wanted to take all of the JSON data that we were stuffing into our Riak cluster, and send a copy of it to our ElasticSearch cluster as well, so that we could, you know, actually find the data later. We could have done this by modifying one of the Riak client libraries, but then any data that got uploaded through a different client would be missed. So as an experiment, we turned to our new favorite tool, Sinatra, and hacked up a Rack proxy app, that will intercept the incoming HTTP requests, send them on to Riak and also send a copy to the ElasticSearch cluster. We used Typhoeus as the HTTP client to do this, so that we could concurrently execute the 2 requests in the interests of speed.

Here is the proof-of-concept:

require 'rubygems'
require 'sinatra'
require 'typhoeus'
 
OPTIONS = {}
OPTIONS[:riak_host] = "localhost"
OPTIONS[:riak_port] = "8098"
OPTIONS[:es_host] = "localhost"
OPTIONS[:es_port] = "9200"
OPTIONS[:riak_timeout] = 5000 # milliseconds
OPTIONS[:es_timeout] = 5000 # milliseconds
 
class Rack::Proxy
 
  def initialize(app)
    @app = app
    @hydra = Typhoeus::Hydra.new
  end
 
  def call(env)
    req = Rack::Request.new(env)
    # We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.
    req_body = req.body.read if req.body 
 
    riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"
 
    opts = {:timeout => OPTIONS[:riak_timeout]}
    opts.merge!(:method => req.request_method.downcase.to_sym)
    opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type
    opts.merge!(:body => req_body) if req_body && req_body.length > 0
 
    riak_req = Typhoeus::Request.new(riak_url, opts)
    riak_response = {}
    riak_req.on_complete do |response|
      riak_response[:code] = response.code
      riak_response[:headers] = response.headers_hash
      riak_response[:body] = response.body
    end
    @hydra.queue riak_req
 
    # If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"
    if (req.put? || req.post?) && req.content_type == "application/json" 
      req.path =~ %r{^/riak/([^/]+)/([^/]+)}
      bucket, key = $1, $2
      es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"
      opts = {:timeout => OPTIONS[:es_timeout]}
      opts.merge!(:method => req.request_method.downcase.to_sym)
      opts.merge!(:body => req_body) if req_body  && req_body.length > 0
      es_req = Typhoeus::Request.new(es_url, opts)
      es_response = {}
      es_req.on_complete do |response|
        es_response[:code] = response.code
        es_response[:headers] = response.headers_hash
        es_response[:body] = response.body
      end
      @hydra.queue es_req
    end
 
    # Concurrently executes both HTTP requests, blocks until they both finish
    @hydra.run
 
    #If we wrote to ES add a custom header
    riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code]
 
    #Typhoeus can add nil headers, lets get rid of them
    riak_response[:headers].delete_if {|k,v| v == nil} 
 
    # Return original Riak response to client
    [riak_response[:code], riak_response[:headers], riak_response[:body]]
  end
end
 
use Rack::Proxy

(Gist here)

Execute the script, and it will listen on port 4567, so point your Riak client of choice there and start PUTing data, which will be seamlessly replicated into the ElasticSearch cluster. If we were really going to use this in anger, there is a lot of work yet to be done, but as a skeleton of how to use Sinatra (Rack, really) to quickly whip up custom proxys, and tee HTTP requests, I thought it might be useful.

Comments (3) Trackbacks (0)
  1. We perform similar operations with a tool we built called “pypes”. Essentially, it uses chaining. You POST content to its internal HTTP server, it routes the data through a pipeline (which can be a DAG), transforms it in any way you might need, and publishes the output to some other web interface.

    Since it allows you to create a DAG, you can easily publish to multiple endpoints like Riak and ES. I’ve actually got an experimental ElasticSearch publisher that I’ve been using to pump Excel data in with.

    We’ve been using pypes for about a year now in some enterprise settings to do Enterprise Information Integration with respect to enterprise search. It’s 100% Python and scales extremely well.

    It’s built on top of Stackless Python and supports multi-processor hardware for parallelism. You can also scale out by partitioning data across multiple nodes. We generally feed 500 million plus documents at about 10K documents per second (that’s typically with the input being XML).

    You can write custom adapters, transformers, and publishers very easily. All incoming data is “adapted” to (essentially) JSON “packets”. There’s a thin API on top of the data model that provides support for meta-data, multi-value fields, packet cloning, packet splitting, etc.

    Pypes also fits very nicely into the PubSubHubBub (webhooks) paradigm of publishing real time content using HTTP. It’s basically a middleware component for doing data transformation and routing.

    The nice thing is it provides a Web 2.0 UI that allows you to build data flow graphs using drag n drop, very much like Yahoo Pipes.

  2. Eric, that sounds very interesting — I will definitely have to check it out. Thanks for the comment.

  3. I think what you’re looking for is pre-commit hooks (sounds a bit like svn but these are Riak related): http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-March/000688.html. These are basically execution bits that will be triggered once data is reaching a bucket.

    Once this nice feature will get implemented I’m pretty sure I’ll post about it on myNoSQL (http://nosql.mypopescu.com).


Trackbacks are disabled.