I recently needed to make use of our ActiveMQ message queue service to scale up write performance of CouchDB. However, there seemed to be a bug with JRuby that kills off the STOMP subscriber every 5 seconds. Digging a bit deeper into the STOMP source, I figured out a way to get around the bug by removing the timeout line.

ActiveMQ let me scale CouchDB writes from 10req/sec to 128req/sec. Huge performance win with very little effort.

Ticket to the bug.

STOMP Library Monkey Patch:

# for stomp subscriber
if defined?(JRUBY_VERSION)
  module Stomp
    class Connection
      def _receive( read_socket )
        @read_semaphore.synchronize do
           line = read_socket.gets
           return nil if line.nil?

           # If the reading hangs for more than 5 seconds, abort the parsing process
           #Timeout::timeout(5, Stomp::Error::PacketParsingTimeout) do
             # Reads the beginning of the message until it runs into a empty line
             message_header =
             begin
               message_header += line
               begin
                 line = read_socket.gets
               rescue
                  p read_socket
               end
             end until line =~ /^\s?\n$/

             # Checks if it includes content_length header
             content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
             message_body =

             # If it does, reads the specified amount of bytes
             char =
             if content_length
               message_body = read_socket.read content_length[1].to_i
               raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
             # Else reads, the rest of the message until the first \0
             else
               message_body += char while read_socket.ready? && (char = parse_char(read_socket.getc)) != "\0"
             end

             # If the buffer isn’t empty, reads the next char and returns it to the buffer
             # unless it’s a \n
             if read_socket.ready?
               last_char = read_socket.getc
               read_socket.ungetc(last_char) if parse_char(last_char) != "\n"
             end

             # Adds the excluded \n and \0 and tries to create a new message with it
             Message.new(message_header + "\n" + message_body + "\0")
           end
         #end
      end
    end
  end
end

Be Sociable, Share!