Tee with Sinatra
OK, so I wanted to take all of the JSON data that we were stuffing into our Riak cluster, and send a copy of it to our ElasticSearch cluster as well, so that we could, you know, actually find the data later. We could have done this by modifying one of the Riak client libraries, but then any data that got uploaded through a different client would be missed. So as an experiment, we turned to our new favorite tool, Sinatra, and hacked up a Rack proxy app, that will intercept the incoming HTTP requests, send them on to Riak and also send a copy to the ElasticSearch cluster. We used Typhoeus as the HTTP client to do this, so that we could concurrently execute the 2 requests in the interests of speed.
Here is the proof-of-concept:
require 'rubygems' require 'sinatra' require 'typhoeus' OPTIONS = {} OPTIONS[:riak_host] = "localhost" OPTIONS[:riak_port] = "8098" OPTIONS[:es_host] = "localhost" OPTIONS[:es_port] = "9200" OPTIONS[:riak_timeout] = 5000 # milliseconds OPTIONS[:es_timeout] = 5000 # milliseconds class Rack::Proxy def initialize(app) @app = app @hydra = Typhoeus::Hydra.new end def call(env) req = Rack::Request.new(env) # We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware. req_body = req.body.read if req.body riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}" opts = {:timeout => OPTIONS[:riak_timeout]} opts.merge!(:method => req.request_method.downcase.to_sym) opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type opts.merge!(:body => req_body) if req_body && req_body.length > 0 riak_req = Typhoeus::Request.new(riak_url, opts) riak_response = {} riak_req.on_complete do |response| riak_response[:code] = response.code riak_response[:headers] = response.headers_hash riak_response[:body] = response.body end @hydra.queue riak_req # If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak" if (req.put? || req.post?) && req.content_type == "application/json" req.path =~ %r{^/riak/([^/]+)/([^/]+)} bucket, key = $1, $2 es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}" opts = {:timeout => OPTIONS[:es_timeout]} opts.merge!(:method => req.request_method.downcase.to_sym) opts.merge!(:body => req_body) if req_body && req_body.length > 0 es_req = Typhoeus::Request.new(es_url, opts) es_response = {} es_req.on_complete do |response| es_response[:code] = response.code es_response[:headers] = response.headers_hash es_response[:body] = response.body end @hydra.queue es_req end # Concurrently executes both HTTP requests, blocks until they both finish @hydra.run #If we wrote to ES add a custom header riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code] #Typhoeus can add nil headers, lets get rid of them riak_response[:headers].delete_if {|k,v| v == nil} # Return original Riak response to client [riak_response[:code], riak_response[:headers], riak_response[:body]] end end use Rack::Proxy
(Gist here)
Execute the script, and it will listen on port 4567, so point your Riak client of choice there and start PUTing data, which will be seamlessly replicated into the ElasticSearch cluster. If we were really going to use this in anger, there is a lot of work yet to be done, but as a skeleton of how to use Sinatra (Rack, really) to quickly whip up custom proxys, and tee HTTP requests, I thought it might be useful.
March 29th, 2010 - 06:25
We perform similar operations with a tool we built called “pypes”. Essentially, it uses chaining. You POST content to its internal HTTP server, it routes the data through a pipeline (which can be a DAG), transforms it in any way you might need, and publishes the output to some other web interface.
Since it allows you to create a DAG, you can easily publish to multiple endpoints like Riak and ES. I’ve actually got an experimental ElasticSearch publisher that I’ve been using to pump Excel data in with.
We’ve been using pypes for about a year now in some enterprise settings to do Enterprise Information Integration with respect to enterprise search. It’s 100% Python and scales extremely well.
It’s built on top of Stackless Python and supports multi-processor hardware for parallelism. You can also scale out by partitioning data across multiple nodes. We generally feed 500 million plus documents at about 10K documents per second (that’s typically with the input being XML).
You can write custom adapters, transformers, and publishers very easily. All incoming data is “adapted” to (essentially) JSON “packets”. There’s a thin API on top of the data model that provides support for meta-data, multi-value fields, packet cloning, packet splitting, etc.
Pypes also fits very nicely into the PubSubHubBub (webhooks) paradigm of publishing real time content using HTTP. It’s basically a middleware component for doing data transformation and routing.
The nice thing is it provides a Web 2.0 UI that allows you to build data flow graphs using drag n drop, very much like Yahoo Pipes.
March 29th, 2010 - 07:24
Eric, that sounds very interesting — I will definitely have to check it out. Thanks for the comment.
March 29th, 2010 - 12:25
I think what you’re looking for is pre-commit hooks (sounds a bit like svn but these are Riak related): http://lists.basho.com/pipermail/riak-users_lists.basho.com/2010-March/000688.html. These are basically execution bits that will be triggered once data is reaching a bucket.
Once this nice feature will get implemented I’m pretty sure I’ll post about it on myNoSQL (http://nosql.mypopescu.com).