SOA - using AMQP
Wed, Sep 3, 2014In the introductory article we talked about why we might implement a Service Oriented architecture in order to:
- scale to meet the demands of our users
- scale to meet the demands of our dev team
We also talked a little bit of theory on what an SOA looks like, how to define the boundaries for our services, and the practical implications of building a distributed system.
In the previous article we showed how to build an SOA using HTTP that implements the 3 main communication patterns of a distributed system:
- Synchronous Request/Response
using Faraday, Unicorn, and Sinatra. - Asynchronous Worker Queue
using Redis and Resque. - Asynchronous Publish/Subscribe
using Redis.
This article will show how to implement the same 3 communication patterns using a single technology, an AMQP message broker such as RabbitMQ.
Table of Contents
- About RabbitMQ
- Installing RabbitMQ
- Synchronous Request/Response
- Asynchronous Worker Queue
- Asynchronous Publish/Subscribe
About RabbitMQ
Using the RabbitMQ broker requires the use of a language-specific client library. For Ruby we have a couple of options:
- Bunny - the most common Ruby client
- Ruby AMQP - an EventMachine based client
- March Hare - a JRuby client
For most purposes, the Bunny gem is the simplest option. Indeed the RabbitMQ tutorial’s themselves are implemented using the Bunny gem:
Installing RabbitMQ
Install a RabbitMQ broker (more details):
$ sudo apt-get install rabbitmq-server
… along with a Ruby client library (more details):
$ sudo gem install bunny
Synchronous Request/Response
Just like HTTP we need to host a service and make requests from a client…
Hosting the Service
For a simple service we can implement a RabbitMQ consumer that subscribes to a named queue to receive incoming requests and then publishes the response to an exclusive reply queue for that client.
For example, consider a service that returns the current (random) weather for a given location,
implemented in weather_service.rb
:
#!/usr/bin/env ruby
require 'bunny'
module WeatherService
#-------------------------------------------
QUEUE = "weather"
#-------------------------------------------
def self.run
conn = Bunny.new
conn.start
channel = conn.create_channel
exchange = channel.default_exchange
queue = channel.queue(QUEUE)
queue.subscribe(:block => true) do |delivery, properties, location|
puts "Weather requested for #{location}"
response = WeatherService.forecast(location)
exchange.publish(response, {
:routing_key => properties.reply_to,
:correlation_id => properties.correlation_id
})
end
end
#-------------------------------------------
def self.forecast(location)
"It's #{['hot', 'warm', 'cold', 'freezing'].sample} in #{location}"
end
#-------------------------------------------
end
WeatherService.run
We can run the service from the command line:
$ ruby weather_service.rb
NOTE: In the previous article we discussed using Unicorn and Sinatra to host our HTTP services in order to provide a robust production environment including daemonization, clustering, logfile/pidfile management, etc. If, instead of HTTP, we have chosen AMQP as our SOA transport then there are not (yet) robust service hosting applications…. but for a sneak preview of one that is in development (and will be introduced in the next article in this series), you can check out the RackRabbit project.
Making Client Requests
Where HTTP is designed for request/response, a message broker is designed primarily for asynchronous messaging. Therefore request/response using a message broker is actually a little more complex. In order to wait for a response, the client must subscribe to an exclusive reply queue, and since the subscription handler occurs in a different thread, this involves thread synchronization in order to block the primary thread while waiting for the response.
We can implement a weather.rb
client executable script as follows:
#!/usr/bin/env ruby
require "bunny"
require "securerandom"
QUEUE = "weather"
conn = Bunny.new
conn.start
location = ARGV[0] || "London"
channel = conn.create_channel
exchange = channel.default_exchange
reply_queue = channel.queue("", :exclusive => true)
message_id = SecureRandom.uuid
lock = Mutex.new
condition = ConditionVariable.new
response = nil
# PREPARE the response handler
reply_queue.subscribe do |deliver, properties, body|
if properties[:correlation_id] == message_id
response = body
lock.synchronize { condition.signal }
end
end
# PUBLISH the request
exchange.publish(location, {
:routing_key => QUEUE,
:correlation_id => message_id,
:reply_to => reply_queue.name
})
# WAIT for the response
lock.synchronize { condition.wait(lock) }
puts response
… and test it from the command line:
$ ruby weather.rb Seattle
It's hot in Seattle
$ ruby weather.rb Boston
It's cold in Boston
NOTE: This request/response dance using an exclusive reply queue and a thread synchronization mechanism is fairly complex, and is boilerplate framework code that should be extracted out into a separate library. For a sneak preview of a library that is in development (and will be introduced in the next article in this series), you can check out the RackRabbit client library.
Asynchronous Worker Queue
To implement a worker queue with RabbitMQ we create a consumer process that subscribes to a named queue. We can run any number of instances of the process and RabbitMQ will distribute the requests among the active consumers.
The following example shows a worker process subscribing to a named queue and extracting it’s arguments from serialized JSON in the message body:
#!/usr/bin/env ruby
require "bunny"
require "json"
module MyWorker
#----------------------------------------------
QUEUE = "tasks"
#----------------------------------------------
def self.run
conn = Bunny.new
conn.start
channel = conn.create_channel
queue = channel.queue(QUEUE)
queue.subscribe(:block => true) do |delivery, properties, body|
perform(JSON.parse(body))
end
end
#----------------------------------------------
def self.perform(args)
action = args["action"]
amount = args["amount"]
amount.times do |i|
puts "#{action} for #{i} seconds"
sleep 1
end
end
#----------------------------------------------
end
MyWorker.run
We can run the worker from the command line:
$ ruby worker.rb
A client process can enqueue tasks for the worker by publishing a serialized JSON message into
the RabbitMQ queue. The following executable script, enqueue.rb
shows an example:
#!/usr/bin/env ruby
require "bunny"
require "json"
QUEUE = "tasks"
conn = Bunny.new
conn.start
channel = conn.create_channel
queue = channel.queue(QUEUE)
action = ARGV[0] || "waiting"
amount = (ARGV[1] || 5).to_i
args = { :action => action, :amount => amount }
channel.default_exchange.publish(args.to_json, {
:routing_key => queue.name
})
conn.close
Tasks can now be enqueued from the command line:
$ ruby enqueue.rb reading 2
$ ruby enqueue.rb sleeping 3
$ ruby enqueue.rb eating 4
… and we should see output in our other terminal where the worker process is running:
$ ruby worker.rb
reading for 0 seconds
reading for 1 seconds
sleeping for 0 seconds
sleeping for 1 seconds
sleeping for 2 seconds
eating for 0 seconds
eating for 1 seconds
eating for 2 seconds
eating for 3 seconds
Asynchronous Publish/Subscribe
To implement publish/subscribe in RabbitMQ the client publishes to an exchange instead of directly to a specific named queue. Subscribers can bind to the exchange in a variety of ways. The simplest solution is to use a fanout exchange. RabbitMQ will route the message to all subscribers of a fanout exchange.
More selective routing can be peformed by using a topic exchange instead of a fanout exchange. See AMQP concepts for more information on the different types of exchange, and RabbitMQ Tutorials for details on how to use them.
For example, we can implement a subscriber in subscribe.rb
:
#!/usr/bin/env ruby
require "bunny"
module Subscriber
EXCHANGE = "events"
def self.run(name)
conn = Bunny.new
conn.start
channel = conn.create_channel
exchange = channel.fanout(EXCHANGE)
queue = channel.queue("", :exclusive => true)
queue.bind(exchange)
queue.subscribe(:block => true) do |delivery_info, properties, event|
puts "#{name} saw #{event}"
end
end
end
Subscriber.run(ARGV[0] || "A Subscriber")
We can run multiple subscribers (in different terminals):
$ ruby subscribe FOO
$ ruby subscribe BAR
We can publish an event as follows:
#!/usr/bin/env ruby
require "bunny"
EXCHANGE = "events"
conn = Bunny.new
conn.start
event = ARGV[0] || "An Event"
channel = conn.create_channel
exchange = channel.fanout(EXCHANGE)
exchange.publish(event)
conn.close
Finally, we can test the system from the command line:
$ ruby publish eggs
$ ruby publish sausages
$ ruby publish pancakes
… and we should see output in our other terminals where the worker processes are running:
$ ruby subscribe FOO
FOO saw eggs
FOO saw sausages
FOO saw pancakes
$ ruby subscribe BAR
BAR saw eggs
BAR saw sausages
BAR saw pancakes
Up Next…
This article has shown how to implement the 3 primary communication patterns of a distributed system using RabbitMQ and the Bunny client library.
If you are new to RabbitMQ and Bunny then the examples in this article may seem complex, especially compared to more well-known mechanisms such as HTTParty, Rack, Redis and Resque. However, it is important to recognize that much of the RabbitMQ/Bunny code is boilerplate code and could easily be extracted into a common library and exposed with a much simpler interface.
We could imagine exposing a simpler interface in our client side code, something like:
rabbit.request("queue", "message") # synchronous request/response
rabbit.enqueue("queue", "message") # asynchronous worker queue
rabbit.publish("exchange", "message") # asynchronous pub/sub
We could also imagine exposing a simpler mechanism for building and hosting consumer services, since they all, ultimately, simply subscribe to a queue. Perhaps a Unicorn-style server that will subscribe to a queue instead of listening on a socket…. perhaps it can automatically wrap the inbound message into a Rack environment and pass it off to be handled to any rack application… perhaps one built with Sinatra…
… well in fact there is such a project in development, it’s called RackRabbit, and it will be introduced in the next article in this series.
- SOA Overview
- SOA using HTTP
- SOA using AMQP (this article)
- Introducing RackRabbit
Related Links
Ruby client libraries:
RabbitMQ and AMQP:
RabbitMQ Tutorials:
RabbitMQ Articles: