| Class | Stomp::Connection |
| In: |
lib/stomp/connection.rb
|
| Parent: | Object |
Low level connection which maps commands and supports synchronous receives
| connection_frame | [R] | |
| disconnect_receipt | [R] |
A new Connection object accepts the following parameters:
login (String, default : '')
passcode (String, default : '')
host (String, default : 'localhost')
port (Integer, default : 61613)
reliable (Boolean, default : false)
reconnect_delay (Integer, default : 5)
e.g. c = Connection.new("username", "password", "localhost", 61613, true)
Hash:
hash = {
:hosts => [
{:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
{:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
],
:initial_reconnect_delay => 0.01,
:max_reconnect_delay => 30.0,
:use_exponential_back_off => true,
:back_off_multiplier => 2,
:max_reconnect_attempts => 0,
:randomize => false,
:backup => false,
:timeout => -1,
:connect_headers => {},
:parse_timeout => 5,
:logger => nil,
}
e.g. c = Connection.new(hash)
TODO Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://user:pass@host:port stomp://user:pass@host.domain.tld:port
# File lib/stomp/connection.rb, line 60
60: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
61: @received_messages = []
62:
63: if login.is_a?(Hash)
64: hashed_initialize(login)
65: else
66: @host = host
67: @port = port
68: @login = login
69: @passcode = passcode
70: @reliable = reliable
71: @reconnect_delay = reconnect_delay
72: @connect_headers = connect_headers
73: @ssl = false
74: @parameters = nil
75: @parse_timeout = 5 # To override, use hashed parameters
76: @logger = nil # To override, use hashed parameters
77: end
78:
79: # Use Mutexes: only one lock per each thread
80: # Revert to original implementation attempt
81: @transmit_semaphore = Mutex.new
82: @read_semaphore = Mutex.new
83: @socket_semaphore = Mutex.new
84:
85: @subscriptions = {}
86: @failure = nil
87: @connection_attempts = 0
88:
89: socket
90: end
Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.
# File lib/stomp/connection.rb, line 108
108: def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
109: Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
110: end
Receive a frame, block until the frame is received
# File lib/stomp/connection.rb, line 340
340: def __old_receive
341: # The recive my fail so we may need to retry.
342: while TRUE
343: begin
344: used_socket = socket
345: return _receive(used_socket)
346: rescue
347: @failure = $!
348: raise unless @reliable
349: errstr = "receive failed: #{$!}"
350: if @logger && @logger.respond_to?(:on_miscerr)
351: @logger.on_miscerr(log_params, errstr)
352: else
353: $stderr.print errstr
354: end
355: end
356: end
357: end
Abort a transaction by name
# File lib/stomp/connection.rb, line 234
234: def abort(name, headers = {})
235: headers[:transaction] = name
236: transmit("ABORT", headers)
237: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 222
222: def ack(message_id, headers = {})
223: headers['message-id'] = message_id
224: transmit("ACK", headers)
225: end
Begin a transaction, requires a name for the transaction
# File lib/stomp/connection.rb, line 213
213: def begin(name, headers = {})
214: headers[:transaction] = name
215: transmit("BEGIN", headers)
216: end
# File lib/stomp/connection.rb, line 175
175: def change_host
176: @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
177:
178: # Set first as master and send it to the end of array
179: current_host = @parameters[:hosts].shift
180: @parameters[:hosts] << current_host
181:
182: @ssl = current_host[:ssl]
183: @host = current_host[:host]
184: @port = current_host[:port] || Connection::default_port(@ssl)
185: @login = current_host[:login] || ""
186: @passcode = current_host[:passcode] || ""
187:
188: end
# File lib/stomp/connection.rb, line 314
314: def client_ack?(message)
315: headers = @subscriptions[message.headers[:destination]]
316: !headers.nil? && headers[:ack] == "client"
317: end
Commit a transaction by name
# File lib/stomp/connection.rb, line 228
228: def commit(name, headers = {})
229: headers[:transaction] = name
230: transmit("COMMIT", headers)
231: end
Close this connection
# File lib/stomp/connection.rb, line 320
320: def disconnect(headers = {})
321: transmit("DISCONNECT", headers)
322: headers = headers.symbolize_keys
323: @disconnect_receipt = receive if headers[:receipt]
324: if @logger && @logger.respond_to?(:on_disconnect)
325: @logger.on_disconnect(log_params)
326: end
327: close_socket
328: end
# File lib/stomp/connection.rb, line 92
92: def hashed_initialize(params)
93:
94: @parameters = refine_params(params)
95: @reliable = true
96: @reconnect_delay = @parameters[:initial_reconnect_delay]
97: @connect_headers = @parameters[:connect_headers]
98: @parse_timeout = @parameters[:parse_timeout]
99: @logger = @parameters[:logger]
100: #sets the first host to connect
101: change_host
102: if @logger && @logger.respond_to?(:on_connecting)
103: @logger.on_connecting(log_params)
104: end
105: end
# File lib/stomp/connection.rb, line 194
194: def increase_reconnect_delay
195:
196: @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
197: @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
198:
199: @reconnect_delay
200: end
# File lib/stomp/connection.rb, line 190
190: def max_reconnect_attempts?
191: !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
192: end
Return a pending message if one is available, otherwise return nil
# File lib/stomp/connection.rb, line 332
332: def poll
333: # No need for a read lock here. The receive method eventually fullfills
334: # that requirement.
335: return nil if @socket.nil? || !@socket.ready?
336: receive
337: end
Publish message to destination
To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 265
265: def publish(destination, message, headers = {})
266: headers[:destination] = destination
267: transmit("SEND", headers, message)
268: end
# File lib/stomp/connection.rb, line 359
359: def receive
360: super_result = __old_receive
361: if super_result.nil? && @reliable
362: errstr = "connection.receive returning EOF as nil - resetting connection.\n"
363: if @logger && @logger.respond_to?(:on_miscerr)
364: @logger.on_miscerr(log_params, errstr)
365: else
366: $stderr.print errstr
367: end
368: @socket = nil
369: super_result = __old_receive
370: end
371: return super_result
372: end
# File lib/stomp/connection.rb, line 153
153: def refine_params(params)
154: params = params.uncamelize_and_symbolize_keys
155:
156: default_params = {
157: :connect_headers => {},
158: # Failover parameters
159: :initial_reconnect_delay => 0.01,
160: :max_reconnect_delay => 30.0,
161: :use_exponential_back_off => true,
162: :back_off_multiplier => 2,
163: :max_reconnect_attempts => 0,
164: :randomize => false,
165: :backup => false,
166: :timeout => -1,
167: # Parse Timeout
168: :parse_timeout => 5
169: }
170:
171: default_params.merge(params)
172:
173: end
# File lib/stomp/connection.rb, line 274
274: def send(*args)
275: warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
276: publish(*args)
277: end
# File lib/stomp/connection.rb, line 112
112: def socket
113: @socket_semaphore.synchronize do
114: used_socket = @socket
115: used_socket = nil if closed?
116:
117: while used_socket.nil? || !@failure.nil?
118: @failure = nil
119: begin
120: used_socket = open_socket
121: # Open complete
122:
123: connect(used_socket)
124: if @logger && @logger.respond_to?(:on_connected)
125: @logger.on_connected(log_params)
126: end
127: @connection_attempts = 0
128: rescue
129: @failure = $!
130: used_socket = nil
131: raise unless @reliable
132: if @logger && @logger.respond_to?(:on_connectfail)
133: @logger.on_connectfail(log_params)
134: else
135: $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
136: end
137: raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?
138:
139: sleep(@reconnect_delay)
140:
141: @connection_attempts += 1
142:
143: if @parameters
144: change_host
145: increase_reconnect_delay
146: end
147: end
148: end
149: @socket = used_socket
150: end
151: end
Subscribe to a destination, must specify a name
# File lib/stomp/connection.rb, line 240
240: def subscribe(name, headers = {}, subId = nil)
241: headers[:destination] = name
242: transmit("SUBSCRIBE", headers)
243:
244: # Store the sub so that we can replay if we reconnect.
245: if @reliable
246: subId = name if subId.nil?
247: @subscriptions[subId] = headers
248: end
249: end
Send a message back to the source or to the dead letter queue
Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)
# File lib/stomp/connection.rb, line 284
284: def unreceive(message, options = {})
285: options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
286: # Lets make sure all keys are symbols
287: message.headers = message.headers.symbolize_keys
288:
289: retry_count = message.headers[:retry_count].to_i || 0
290: message.headers[:retry_count] = retry_count + 1
291: transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
292: message_id = message.headers.delete('message-id''message-id')
293:
294: begin
295: self.begin transaction_id
296:
297: if client_ack?(message) || options[:force_client_ack]
298: self.ack(message_id, :transaction => transaction_id)
299: end
300:
301: if retry_count <= options[:max_redeliveries]
302: self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
303: else
304: # Poison ack, sending the message to the DLQ
305: self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
306: end
307: self.commit transaction_id
308: rescue Exception => exception
309: self.abort transaction_id
310: raise exception
311: end
312: end
Unsubscribe from a destination, must specify a name
# File lib/stomp/connection.rb, line 252
252: def unsubscribe(name, headers = {}, subId = nil)
253: headers[:destination] = name
254: transmit("UNSUBSCRIBE", headers)
255: if @reliable
256: subId = name if subId.nil?
257: @subscriptions.delete(subId)
258: end
259: end
# File lib/stomp/connection.rb, line 376
376: def _receive( read_socket )
377: @read_semaphore.synchronize do
378: line = read_socket.gets
379:
380: return nil if line.nil?
381:
382: # If the reading hangs for more than X seconds, abort the parsing process.
383: # X defaults to 5. Override allowed in connection hash parameters.
384: Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
385: # Reads the beginning of the message until it runs into a empty line
386: message_header = ''
387: begin
388: message_header += line
389: line = read_socket.gets
390: end until line =~ /^\s?\n$/
391:
392: # Checks if it includes content_length header
393: content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
394: message_body = ''
395:
396: # If it does, reads the specified amount of bytes
397: char = ''
398: if content_length
399: message_body = read_socket.read content_length[1].to_i
400: raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
401: # Else reads, the rest of the message until the first \0
402: else
403: message_body += char while (char = parse_char(read_socket.getc)) != "\0"
404: end
405:
406: # If the buffer isn't empty, reads trailing new lines.
407: # Note: experiments with JRuby seem to show that .ready? never
408: # returns true. This means that this code to drain trailing new
409: # lines never runs using JRuby.
410: while read_socket.ready?
411: last_char = read_socket.getc
412: break unless last_char
413: if parse_char(last_char) != "\n"
414: read_socket.ungetc(last_char)
415: break
416: end
417: end
418: # And so, a JRuby hack. Remove any new lines at the start of the
419: # next buffer.
420: message_header.gsub!(/^\n?/, "")
421:
422: # Adds the excluded \n and \0 and tries to create a new message with it
423: Message.new(message_header + "\n" + message_body + "\0")
424: end
425: end
426: end
# File lib/stomp/connection.rb, line 454
454: def _transmit(used_socket, command, headers = {}, body = '')
455: @transmit_semaphore.synchronize do
456: # Handle nil body
457: body = '' if body.nil?
458: # The content-length should be expressed in bytes.
459: # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
460: # With Unicode strings, # of bytes != # of characters. So, use String#bytesize when available.
461: body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length
462:
463: # ActiveMQ interprets every message as a BinaryMessage
464: # if content_length header is included.
465: # Using :suppress_content_length => true will suppress this behaviour
466: # and ActiveMQ will interpret the message as a TextMessage.
467: # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
468: # Lets send this header in the message, so it can maintain state when using unreceive
469: headers['content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
470:
471: used_socket.puts command
472: headers.each {|k,v| used_socket.puts "#{k}:#{v}" }
473: used_socket.puts "content-type: text/plain; charset=UTF-8"
474: used_socket.puts
475: used_socket.write body
476: used_socket.write "\0"
477: end
478: end
# File lib/stomp/connection.rb, line 511
511: def close_socketclose_socket
512: begin
513: @socket.close
514: rescue
515: #Ignoring if already closed
516: end
517:
518: @closed = true
519: end
# File lib/stomp/connection.rb, line 532
532: def connect(used_socket)
533: headers = @connect_headers.clone
534: headers[:login] = @login
535: headers[:passcode] = @passcode
536: _transmit(used_socket, "CONNECT", headers)
537: @connection_frame = _receive(used_socket)
538: @disconnect_receipt = nil
539: # replay any subscriptions.
540: @subscriptions.each { |k,v| _transmit(used_socket, "SUBSCRIBE", v) }
541: end
# File lib/stomp/connection.rb, line 543
543: def log_params
544: lparms = @parameters.clone
545: lparms[:cur_host] = @host
546: lparms[:cur_port] = @port
547: lparms[:cur_login] = @login
548: lparms[:cur_passcode] = @passcode
549: lparms[:cur_ssl] = @ssl
550: lparms[:cur_recondelay] = @reconnect_delay
551: lparms[:cur_parseto] = @parse_timeout
552: lparms[:cur_conattempts] = @connection_attempts
553: #
554: lparms
555: end
# File lib/stomp/connection.rb, line 521
521: def open_socket
522: used_socket = @ssl ? open_ssl_socket : open_tcp_socket
523: # try to close the old connection if any
524: close_socket
525:
526: @closed = false
527: # Use keepalive
528: used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
529: used_socket
530: end
# File lib/stomp/connection.rb, line 486
486: def open_ssl_socket
487: require 'openssl' unless defined?(OpenSSL)
488: ctx = OpenSSL::SSL::SSLContext.new
489:
490: # For client certificate authentication:
491: # key_path = ENV["STOMP_KEY_PATH"] || "~/stomp_keys"
492: # ctx.cert = OpenSSL::X509::Certificate.new("#{key_path}/client.cer")
493: # ctx.key = OpenSSL::PKey::RSA.new("#{key_path}/client.keystore")
494:
495: # For server certificate authentication:
496: # truststores = OpenSSL::X509::Store.new
497: # truststores.add_file("#{key_path}/client.ts")
498: # ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
499: # ctx.cert_store = truststores
500:
501: ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE
502:
503: ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
504: def ssl.ready?
505: ! @rbuffer.empty? || @io.ready?
506: end
507: ssl.connect
508: ssl
509: end
# File lib/stomp/connection.rb, line 480
480: def open_tcp_socket
481: tcp_socket = TCPSocket.open @host, @port
482:
483: tcp_socket
484: end
# File lib/stomp/connection.rb, line 428
428: def parse_char(char)
429: RUBY_VERSION > '1.9' ? char : char.chr
430: end
# File lib/stomp/connection.rb, line 432
432: def transmit(command, headers = {}, body = '')
433: # The transmit may fail so we may need to retry.
434: while TRUE
435: begin
436: used_socket = socket
437: _transmit(used_socket, command, headers, body)
438: return
439: rescue Stomp::Error::MaxReconnectAttempts => e
440: raise
441: rescue
442: @failure = $!
443: raise unless @reliable
444: errstr = "transmit to #{@host} failed: #{$!}\n"
445: if @logger && @logger.respond_to?(:on_miscerr)
446: @logger.on_miscerr(log_params, errstr)
447: else
448: $stderr.print errstr
449: end
450: end
451: end
452: end