juretta.com

ActiveMQ, Stomp, JMS and the empty message body

Tags: Flex, Java, ActiveMQ, Ruby, Stomp, BlazeDS

For a project I am currently working on I needed to hook up BlazeDS and ActiveMQ. While everything went fine and without any disruption, I ran into a minor problem after I decided to use a Ruby based script to sent notifications to a topic based message queue.

The setup looks remotely like this:

Activemq/BlazeDS setup

BlazeDS and ActiveMQ are using the Java Message Service (JMS) API for message exchange. The Ruby script is using the Stomp (Streaming Text Orientated Messaging Protocol) interface to ActiveMQ.

Stomp is a really simple interface that looks vaguely like HTTP - it can even be used by using a simple Telnet connection to the message queue.

A simple stomp based message frame looks like this:

SEND
destination:/queue/a

hello queue a
^@

The message frame starts with a single Stomp command name (one of SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, COMMIT, ABORT, ACK, DISCONNECT) followed by a newline, the message header consisting of key: value pairs, a blank line and the message body followed by a null character (NUL).

The Ruby based equivalent to send a simple text message looks like this:

require 'rubygems'
require 'stomp'

conn = Stomp::Connection.open 'USER', 'PASSWORD', 'localhost', 61613, true 
conn.send '/topic/messageTopic', "#{Time.now}", :persistent => 'true'
conn.disconnect

ActiveMQ is responsible for mapping Stomp messages to JMS and this is where the trouble began.

JMS defines five message types: TextMessage, MapMessage, BytesMessage, StreamMessage and ObjectMessage. Stomp does not define distinct message types, the content of the message body can be anything as long as you take care of proper handling of the terminating null character.

The result of sending messages using the Ruby Stomp based message producer was unexpected. All the messages had an empty message body when received by the Flex based clients.

Stepping through the BlazeDS code (the method messageReceived(JMSMessageEvent evt) in flex.messaging.services.messaging.adapters.JMSAdapter is a good candidate for a breakpoint) shows that all the messages sent by the Ruby Stomp client are converted to JMS BytesMessage?

Message Mapping - JMS BytesMessage

ActiveMQ uses the content-length header to determine if a Stomp message should be mapped to a JMS TextMessage or a BytesMessage (http://activemq.apache.org/stomp.html) - if a Stomp message contains a content-length header field it will be converted to a BytesMessage otherwise it will be converted to a TextMessage.

Digging into the Ruby Stomp code shows that every message that is being sent by the Ruby client will contain the content-length header. This is the reason for the conversion to the BytesMessage and the empty message body when the message is received by the BlazeDS clients.

lib/stomp/connection.rb

def _transmit(s, command, headers = {}, body = '')
  @transmit_semaphore.synchronize do
    s.puts command
    headers.each {|k,v| s.puts "#{k}:#{v}" }
    s.puts "content-length: #{body.length}"
    s.puts "content-type: text/plain; charset=UTF-8"
    s.puts
    s.write body
    s.write "\0"
  end
end

The Stomp gem actually follows the Stomp specification.

From the Stomp protocol specification:

It is recommended that SEND frames include a content-length header which is a byte count for the length of the message body. If a content-length header is included, this number of bytes should be read, regardless of whether or not there are null characters in the body. The frame still needs to be terminated with a null byte and if a content-length is not specified, the first null byte encountered signals the end of the frame.

Using the stock Ruby stomp gem and a standard BlazeDS/ActiveMQ setup seems to be a bit of a problem. You either have to take care of the message conversion yourself or you have to change the behaviour of the stomp library. As the official Stomp protocol recommends the inclusion of the 'content-length' header the current implementation seems to be correct. Changing the default behaviour therefore does not seem to be the best option. I decided to add a monkeypatch that can be used with ActiveMQ and changes the method _transmit in order to suppress the content-length header.

module Stomp
  class Connection
    def _transmit(s, command, headers = {}, body = '')
      @transmit_semaphore.synchronize do
        unless headers['suppress_content_length']
          headers['content-length'] = "#{body.length}"
        else
          headers.delete 'suppress_content_length'
        end
        s.puts command
        headers.each {|k,v| s.puts "#{k}:#{v}" }
        s.puts "content-type: text/plain; charset=UTF-8"
        s.puts
        s.write body
        s.write "\0"
      end
    end
  end
endd

conn = Stomp::Connection.open 'USER', \
    'PASSWORD', 'localhost', 61613, true 
conn.send '/topic/messageTopic', "Wellington: (#{Time.now})", \
    'suppress_content_length' => true
conn.disconnect

This fixes the problem and results in the desired behaviour. A bug has been filed and we'll see if there is a good way to enhance the Stomp gem to work properly with ActiveMQ without changing the current behaviour.

Meanwhile a fork of the stomp git repository including the ActiveMQ changes (hopefully without breaking existing client code) can be found here:

http://github.com/juretta/stomp/tree/activemq-jms-mapping


About

This is the defunct blog of Stefan Saasen.