| Class | Jabber::Stream |
| In: |
lib/xmpp4r/stream.rb
|
| Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
| DISCONNECTED | = | 1 |
| CONNECTED | = | 2 |
| fd | [R] | file descriptor used |
| status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 42
42: def initialize(threaded = true)
43: unless threaded
44: raise "Non-threaded mode was removed from XMPP4R."
45: end
46: @fd = nil
47: @status = DISCONNECTED
48: @xmlcbs = CallbackList::new
49: @stanzacbs = CallbackList::new
50: @messagecbs = CallbackList::new
51: @iqcbs = CallbackList::new
52: @presencecbs = CallbackList::new
53: @send_lock = Mutex.new
54: @last_send = Time.now
55: @exception_block = nil
56: @threadblocks = []
57: @wakeup_thread = nil
58: @streamid = nil
59: @streamns = 'jabber:client'
60: @features_sem = Semaphore.new
61: @parser_thread = nil
62: end
Adds a callback block to process received Iqs
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 478
478: def add_iq_callback(priority = 0, ref = nil, &block)
479: @iqcbs.add(priority, ref, block)
480: end
Adds a callback block to process received Messages
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 424
424: def add_message_callback(priority = 0, ref = nil, &block)
425: @messagecbs.add(priority, ref, block)
426: end
Adds a callback block to process received Presences
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 460
460: def add_presence_callback(priority = 0, ref = nil, &block)
461: @presencecbs.add(priority, ref, block)
462: end
Adds a callback block to process received Stanzas
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 442
442: def add_stanza_callback(priority = 0, ref = nil, &block)
443: @stanzacbs.add(priority, ref, block)
444: end
Adds a callback block to process received XML messages
| priority: | [Integer] The callback‘s priority, the higher, the sooner |
| ref: | [String] The callback‘s reference |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 406
406: def add_xml_callback(priority = 0, ref = nil, &block)
407: @xmlcbs.add(priority, ref, block)
408: end
# File lib/xmpp4r/stream.rb, line 496
496: def close!
497: @parser_thread.kill if @parser_thread
498: @fd.close if @fd and !@fd.closed?
499: @status = DISCONNECTED
500: end
Delete a Stanza callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 450
450: def delete_stanza_callback(ref)
451: @stanzacbs.delete(ref)
452: end
Delete an XML-messages callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 414
414: def delete_xml_callback(ref)
415: @xmlcbs.delete(ref)
416: end
Returns if this connection is connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 164
164: def is_connected?
165: return @status == CONNECTED
166: end
Returns if this connection is NOT connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 172
172: def is_disconnected?
173: return @status == DISCONNECTED
174: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 120
120: def on_exception(&block)
121: @exception_block = block
122: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 126
126: def parse_failure(e)
127: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
128:
129: # A new thread has to be created because close will cause the thread
130: # to commit suicide(???)
131: if @exception_block
132: # New thread, because close will kill the current thread
133: Thread.new do
134: Thread.current.abort_on_exception = true
135: close
136: @exception_block.call(e, self, :parser)
137: end
138: else
139: puts "Stream#parse_failure was called by XML parser. Dumping " +
140: "backtrace...\n" + e.exception + "\n"
141: puts e.backtrace
142: close
143: raise
144: end
145: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 149
149: def parser_end
150: if @exception_block
151: Thread.new do
152: Thread.current.abort_on_exception = true
153: close
154: @exception_block.call(nil, self, :close)
155: end
156: else
157: close
158: end
159: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
| element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 181
181: def receive(element)
182: Jabber::debuglog("RECEIVED:\n#{element.to_s}")
183:
184: if element.namespace('').to_s == '' # REXML namespaces are always strings
185: element.add_namespace(@streamns)
186: end
187:
188: case element.prefix
189: when 'stream'
190: case element.name
191: when 'stream'
192: stanza = element
193: @streamid = element.attributes['id']
194: @streamns = element.namespace('') if element.namespace('')
195:
196: # Hack: component streams are basically client streams.
197: # Someday we may want to create special stanza classes
198: # for components/s2s deriving from normal stanzas but
199: # posessing these namespaces
200: @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'
201:
202: unless element.attributes['version'] # isn't XMPP compliant, so
203: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
204: @features_sem.run # don't wait for <stream:features/>
205: end
206: when 'features'
207: stanza = element
208: element.each { |e|
209: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
210: e.each_element('mechanism') { |mech|
211: @stream_mechanisms.push(mech.text)
212: }
213: else
214: @stream_features[e.name] = e.namespace
215: end
216: }
217: Jabber::debuglog("FEATURES: received")
218: @features_sem.run
219: else
220: stanza = element
221: end
222: else
223: # Any stanza, classes are registered by XMPPElement::name_xmlns
224: begin
225: stanza = XMPPStanza::import(element)
226: rescue NoNameXmlnsRegistered
227: stanza = element
228: end
229: end
230:
231: # Iterate through blocked threads (= waiting for an answer)
232: #
233: # We're dup'ping the @threadblocks here, so that we won't end up in an
234: # endless loop if Stream#send is being nested. That means, the nested
235: # threadblock won't receive the stanza currently processed, but the next
236: # one.
237: threadblocks = @threadblocks.dup
238: threadblocks.each { |threadblock|
239: exception = nil
240: r = false
241: begin
242: r = threadblock.call(stanza)
243: rescue Exception => e
244: exception = e
245: end
246:
247: if r == true
248: @threadblocks.delete(threadblock)
249: threadblock.wakeup
250: return
251: elsif exception
252: @threadblocks.delete(threadblock)
253: threadblock.raise(exception)
254: end
255: }
256:
257: Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
258: return true if @xmlcbs.process(stanza)
259: return true if @stanzacbs.process(stanza)
260: case stanza
261: when Message
262: return true if @messagecbs.process(stanza)
263: when Iq
264: return true if @iqcbs.process(stanza)
265: when Presence
266: return true if @presencecbs.process(stanza)
267: end
268: end
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
| xml: | [String] The xml data to send |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 315
315: def send(xml, &block)
316: Jabber::debuglog("SENDING:\n#{xml}")
317: @threadblocks.unshift(threadblock = ThreadBlock.new(block)) if block
318: begin
319: # Temporarily remove stanza's namespace to
320: # reduce bandwidth consumption
321: if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client'
322: xml.delete_namespace
323: send_data(xml.to_s)
324: xml.add_namespace(@streamns)
325: else
326: send_data(xml.to_s)
327: end
328: rescue Exception => e
329: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
330:
331: if @exception_block
332: Thread.new do
333: Thread.current.abort_on_exception = true
334: close!
335: @exception_block.call(e, self, :sending)
336: end
337: else
338: if Jabber::debug
339: puts "Exception caught while sending! (#{e.class})"
340: puts e.backtrace
341: end
342: close!
343: raise
344: end
345: end
346: # The parser thread might be running this (think of a callback running send())
347: # If this is the case, we mustn't stop (or we would cause a deadlock)
348: if block and Thread.current != @parser_thread
349: threadblock.wait
350: elsif block
351: Jabber::debuglog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
352: end
353: end
# File lib/xmpp4r/stream.rb, line 297
297: def send_data(data)
298: @send_lock.synchronize do
299: @last_send = Time.now
300: @fd << data
301: @fd.flush
302: end
303: end
Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.
Please see Stream#send for some implementational details.
Please read the note about nesting at Stream#send
| xml: | [XMPPStanza] |
# File lib/xmpp4r/stream.rb, line 372
372: def send_with_id(xml, &block)
373: if xml.id.nil?
374: xml.id = Jabber::IdGenerator.instance.generate_id
375: end
376:
377: res = nil
378: error = nil
379: send(xml) do |received|
380: if received.kind_of? XMPPStanza and received.id == xml.id
381: if received.type == :error
382: error = (received.error ? received.error : Error.new)
383: true
384: else
385: res = yield(received)
386: true
387: end
388: else
389: false
390: end
391: end
392:
393: unless error.nil?
394: raise ErrorException.new(error)
395: end
396:
397: res
398: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 66
66: def start(fd)
67: @stream_mechanisms = []
68: @stream_features = {}
69:
70: @fd = fd
71: @parser = StreamParser.new(@fd, self)
72: @parser_thread = Thread.new do
73: Thread.current.abort_on_exception = true
74: begin
75: @parser.parse
76: Jabber::debuglog("DISCONNECTED\n")
77:
78: if @exception_block
79: Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
80: else
81: close!
82: end
83: rescue Exception => e
84: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
85:
86: if @exception_block
87: Thread.new do
88: Thread.current.abort_on_exception = true
89: close
90: @exception_block.call(e, self, :start)
91: end
92: else
93: if Jabber::debug
94: puts "Exception caught in Parser thread! (#{e.class})"
95: puts e.backtrace
96: end
97: close!
98: raise
99: end
100: end
101: end
102:
103: @status = CONNECTED
104: end