| Class | Stomp::Client |
| In: |
lib/stomp/client.rb
|
| Parent: | Object |
| host | [R] | |
| login | [R] | |
| parameters | [R] | |
| passcode | [R] | |
| port | [R] | |
| reliable | [R] |
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '')
passcode (String, default : '')
host (String, default : 'localhost')
port (Integer, default : 61613)
reliable (Boolean, default : false)
e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 36
36: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
37:
38: # Parse stomp:// URL's or set params
39: if login.is_a?(Hash)
40: @parameters = login
41:
42: first_host = @parameters[:hosts][0]
43:
44: @login = first_host[:login]
45: @passcode = first_host[:passcode]
46: @host = first_host[:host]
47: @port = first_host[:port] || Connection::default_port(first_host[:ssl])
48:
49: @reliable = true
50:
51: elsif login =~ /^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port
52: @login = $2 || ""
53: @passcode = $3 || ""
54: @host = $4
55: @port = $5.to_i
56: @reliable = false
57: elsif login =~ /^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param
58:
59: first_host = {}
60: first_host[:ssl] = !$2.nil?
61: @login = first_host[:login] = $4 || ""
62: @passcode = first_host[:passcode] = $5 || ""
63: @host = first_host[:host] = $6
64: @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl])
65:
66: options = $16 || ""
67: parts = options.split(/&|=/)
68: options = Hash[*parts]
69:
70: hosts = [first_host] + parse_hosts(login)
71:
72: @parameters = {}
73: @parameters[:hosts] = hosts
74:
75: @parameters.merge! filter_options(options)
76:
77: @reliable = true
78: else
79: @login = login
80: @passcode = passcode
81: @host = host
82: @port = port.to_i
83: @reliable = reliable
84: end
85:
86: check_arguments!
87:
88: @id_mutex = Mutex.new
89: @ids = 1
90:
91: if @parameters
92: @connection = Connection.new(@parameters)
93: else
94: @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
95: end
96:
97: start_listeners
98:
99: end
Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.
# File lib/stomp/client.rb, line 102
102: def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
103: Client.new(login, passcode, host, port, reliable)
104: end
Abort a transaction by name
# File lib/stomp/client.rb, line 118
118: def abort(name, headers = {})
119: @connection.abort(name, headers)
120:
121: # lets replay any ack'd messages in this transaction
122: replay_list = @replay_messages_by_txn[name]
123: if replay_list
124: replay_list.each do |message|
125: if listener = find_listener(message)
126: listener.call(message)
127: end
128: end
129: end
130: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 167
167: def acknowledge(message, headers = {})
168: txn_id = headers[:transaction]
169: if txn_id
170: # lets keep around messages ack'd in this transaction in case we rollback
171: replay_list = @replay_messages_by_txn[txn_id]
172: if replay_list.nil?
173: replay_list = []
174: @replay_messages_by_txn[txn_id] = replay_list
175: end
176: replay_list << message
177: end
178: if block_given?
179: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
180: end
181: @connection.ack message.headers['message-id'], headers
182: end
Begin a transaction by name
# File lib/stomp/client.rb, line 113
113: def begin(name, headers = {})
114: @connection.begin(name, headers)
115: end
Close out resources in use by this client
# File lib/stomp/client.rb, line 231
231: def close headers={}
232: @listener_thread.exit
233: @connection.disconnect headers
234: end
Commit a transaction by name
# File lib/stomp/client.rb, line 133
133: def commit(name, headers = {})
134: txn_id = headers[:transaction]
135: @replay_messages_by_txn.delete(txn_id)
136: @connection.commit(name, headers)
137: end
# File lib/stomp/client.rb, line 212
212: def connection_frame
213: @connection.connection_frame
214: end
# File lib/stomp/client.rb, line 216
216: def disconnect_receipt
217: @connection.disconnect_receipt
218: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 108
108: def join(limit = nil)
109: @listener_thread.join(limit)
110: end
Publishes message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 196
196: def publish(destination, message, headers = {})
197: if block_given?
198: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
199: end
200: @connection.publish(destination, message, headers)
201: end
Check if the thread was created and isn‘t dead
# File lib/stomp/client.rb, line 237
237: def running
238: @listener_thread && !!@listener_thread.status
239: end
# File lib/stomp/client.rb, line 207
207: def send(*args)
208: warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
209: publish(*args)
210: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 143
143: def subscribe(destination, headers = {})
144: raise "No listener given" unless block_given?
145: # use subscription id to correlate messages to subscription. As described in
146: # the SUBSCRIPTION section of the protocol: http://stomp.codehaus.org/Protocol.
147: # If no subscription id is provided, generate one.
148: set_subscription_id_if_missing(destination, headers)
149: if @listeners[headers[:id]]
150: raise "attempting to subscribe to a queue with a previous subscription"
151: end
152: @listeners[headers[:id]] = lambda {|msg| yield msg}
153: @connection.subscribe(destination, headers)
154: end
Unreceive a message, sending it back to its queue or to the DLQ
# File lib/stomp/client.rb, line 186
186: def unreceive(message, options = {})
187: @connection.unreceive(message, options)
188: end
Unsubecribe from a channel
# File lib/stomp/client.rb, line 157
157: def unsubscribe(name, headers = {})
158: set_subscription_id_if_missing(name, headers)
159: @connection.unsubscribe(name, headers)
160: @listeners[headers[:id]] = nil
161: end
# File lib/stomp/client.rb, line 286
286: def check_arguments!
287: raise ArgumentError if @host.nil? || @host.empty?
288: raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535
289: raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass)
290: end
# File lib/stomp/client.rb, line 292
292: def filter_options(options)
293: new_options = {}
294: new_options[:initial_reconnect_delay] = (options["initialReconnectDelay"] || 10).to_f / 1000 # In ms
295: new_options[:max_reconnect_delay] = (options["maxReconnectDelay"] || 30000 ).to_f / 1000 # In ms
296: new_options[:use_exponential_back_off] = !(options["useExponentialBackOff"] == "false") # Default: true
297: new_options[:back_off_multiplier] = (options["backOffMultiplier"] || 2 ).to_i
298: new_options[:max_reconnect_attempts] = (options["maxReconnectAttempts"] || 0 ).to_i
299: new_options[:randomize] = options["randomize"] == "true" # Default: false
300: new_options[:backup] = false # Not implemented yet: I'm using a master X slave solution
301: new_options[:timeout] = -1 # Not implemented yet: a "timeout(5) do ... end" would do the trick, feel free
302:
303: new_options
304: end
# File lib/stomp/client.rb, line 306
306: def find_listener(message)
307: subscription_id = message.headers['subscription']
308: if subscription_id == nil
309: # For backward compatibility, some messages may already exist with no
310: # subscription id, in which case we can attempt to synthesize one.
311: set_subscription_id_if_missing(message.headers['destination'], message.headers)
312: subscription_id = message.headers['id']
313: end
314: @listeners[subscription_id]
315: end
# File lib/stomp/client.rb, line 268
268: def parse_hosts(url)
269: hosts = []
270:
271: host_match = /stomp(\+ssl)?:\/\/(([\w\.]*):(\w*)@)?([\w\.]+):(\d+)\)/
272: url.scan(host_match).each do |match|
273: host = {}
274: host[:ssl] = !match[0].nil?
275: host[:login] = match[2] || ""
276: host[:passcode] = match[3] || ""
277: host[:host] = match[4]
278: host[:port] = match[5].to_i
279:
280: hosts << host
281: end
282:
283: hosts
284: end
# File lib/stomp/client.rb, line 253
253: def register_receipt_listener(listener)
254: id = -1
255: @id_mutex.synchronize do
256: id = @ids.to_s
257: @ids = @ids.succ
258: end
259: @receipt_listeners[id] = listener
260: id
261: end
Set a subscription id in the headers hash if one does not already exist. For simplicities sake, all subscriptions have a subscription ID. setting an id in the SUBSCRIPTION header is described in the stomp protocol docs: stomp.codehaus.org/Protocol
# File lib/stomp/client.rb, line 246
246: def set_subscription_id_if_missing(destination, headers)
247: headers[:id] = headers[:id] ? headers[:id] : headers['id']
248: if headers[:id] == nil
249: headers[:id] = Digest::SHA1.hexdigest(destination)
250: end
251: end
# File lib/stomp/client.rb, line 317
317: def start_listeners
318: @listeners = {}
319: @receipt_listeners = {}
320: @replay_messages_by_txn = {}
321:
322: @listener_thread = Thread.start do
323: while true
324: message = @connection.receive
325: if message.command == 'MESSAGE'
326: if listener = find_listener(message)
327: listener.call(message)
328: end
329: elsif message.command == 'RECEIPT'
330: if listener = @receipt_listeners[message.headers['receipt-id']]
331: listener.call(message)
332: end
333: end
334: end
335: end
336:
337: end