| Class | Stomp::Client |
| In: |
lib/stomp.rb
|
| Parent: | Object |
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 263
263: def initialize user="", pass="", host="localhost", port=61613, reliable=false
264: if user =~ /stomp:\/\/(\w+):(\d+)/
265: user = ""
266: pass = ""
267: host = $1
268: port = $2
269: reliable = false
270: elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/
271: user = $1
272: pass = $2
273: host = $3
274: port = $4
275: reliable = false
276: end
277:
278: @id_mutex = Mutex.new
279: @ids = 1
280: @connection = Connection.open user, pass, host, port, reliable
281: @listeners = {}
282: @receipt_listeners = {}
283: @running = true
284: @replay_messages_by_txn = Hash.new
285: @listener_thread = Thread.start do
286: while @running
287: message = @connection.receive
288: case
289: when message == NIL:
290: break
291: when message.command == 'MESSAGE':
292: if listener = @listeners[message.headers['destination']]
293: listener.call(message)
294: end
295: when message.command == 'RECEIPT':
296: if listener = @receipt_listeners[message.headers['receipt-id']]
297: listener.call(message)
298: end
299: end
300: end
301: end
302: end
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 312
312: def self.open user="", pass="", host="localhost", port=61613, reliable=false
313: Client.new user, pass, host, port, reliable
314: end
Abort a transaction by name
# File lib/stomp.rb, line 322
322: def abort name, headers={}
323: @connection.abort name, headers
324:
325: # lets replay any ack'd messages in this transaction
326: replay_list = @replay_messages_by_txn[name]
327: if replay_list
328: replay_list.each do |message|
329: if listener = @listeners[message.headers['destination']]
330: listener.call(message)
331: end
332: end
333: end
334: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 363
363: def acknowledge message, headers={}
364: txn_id = headers[:transaction]
365: if txn_id
366: # lets keep around messages ack'd in this transaction in case we rollback
367: replay_list = @replay_messages_by_txn[txn_id]
368: if replay_list == nil
369: replay_list = []
370: @replay_messages_by_txn[txn_id] = replay_list
371: end
372: replay_list << message
373: end
374: if block_given?
375: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
376: end
377: @connection.ack message.headers['message-id'], headers
378: end
Begin a transaction by name
# File lib/stomp.rb, line 317
317: def begin name, headers={}
318: @connection.begin name, headers
319: end
Close out resources in use by this client
# File lib/stomp.rb, line 399
399: def close
400: @connection.disconnect
401: @running = false
402: end
Commit a transaction by name
# File lib/stomp.rb, line 337
337: def commit name, headers={}
338: txn_id = headers[:transaction]
339: @replay_messages_by_txn.delete(txn_id)
340: @connection.commit name, headers
341: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp.rb, line 306
306: def join
307: @listener_thread.join
308: end
Send message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 386
386: def send destination, message, headers = {}
387: if block_given?
388: headers['receipt'] = register_receipt_listener lambda {|r| yield r}
389: end
390: @connection.send destination, message, headers
391: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 347
347: def subscribe destination, headers={}
348: raise "No listener given" unless block_given?
349: @listeners[destination] = lambda {|msg| yield msg}
350: @connection.subscribe destination, headers
351: end
Unsubecribe from a channel
# File lib/stomp.rb, line 354
354: def unsubscribe name, headers={}
355: @connection.unsubscribe name, headers
356: @listeners[name] = nil
357: end