| Class | Jabber::Stream |
| In: |
lib/xmpp4r/stream.rb
|
| Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
| DISCONNECTED | = | 1 |
| CONNECTED | = | 2 |
| fd | [R] | file descriptor used |
| status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 34
34: def initialize(threaded = true)
35: @fd = nil
36: @status = DISCONNECTED
37: @xmlcbs = CallbackList::new
38: @stanzacbs = CallbackList::new
39: @messagecbs = CallbackList::new
40: @iqcbs = CallbackList::new
41: @presencecbs = CallbackList::new
42: @threaded = threaded
43: @StanzaQueue = []
44: @StanzaQueueMutex = Mutex::new
45: @exception_block = nil
46: @threadBlocks = {}
47: # @pollCounter = 10
48: @waitingThread = nil
49: @wakeupThread = nil
50: @streamid = nil
51: end
Adds a callback block/proc to process received Iqs
| priority: | [Integer] The callback’s priority, the higher, the sooner |
| ref: | [String] The callback’s reference |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 410
410: def add_iq_callback(priority = 0, ref = nil, proc=nil, &block)
411: block = proc if proc
412: @iqcbs.add(priority, ref, block)
413: end
Adds a callback block/proc to process received Messages
| priority: | [Integer] The callback’s priority, the higher, the sooner |
| ref: | [String] The callback’s reference |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 350
350: def add_message_callback(priority = 0, ref = nil, proc=nil, &block)
351: block = proc if proc
352: @messagecbs.add(priority, ref, block)
353: end
Adds a callback block/proc to process received Presences
| priority: | [Integer] The callback’s priority, the higher, the sooner |
| ref: | [String] The callback’s reference |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 390
390: def add_presence_callback(priority = 0, ref = nil, proc=nil, &block)
391: block = proc if proc
392: @presencecbs.add(priority, ref, block)
393: end
Adds a callback block/proc to process received Stanzas
| priority: | [Integer] The callback’s priority, the higher, the sooner |
| ref: | [String] The callback’s reference |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 370
370: def add_stanza_callback(priority = 0, ref = nil, proc=nil, &block)
371: block = proc if proc
372: @stanzacbs.add(priority, ref, block)
373: end
Adds a callback block/proc to process received XML messages
| priority: | [Integer] The callback’s priority, the higher, the sooner |
| ref: | [String] The callback’s reference |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 330
330: def add_xml_callback(priority = 0, ref = nil, proc=nil, &block)
331: block = proc if proc
332: @xmlcbs.add(priority, ref, block)
333: end
Delete a Stanza callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 379
379: def delete_stanza_callback(ref)
380: @stanzacbs.delete(ref)
381: end
Delete an XML-messages callback
| ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 339
339: def delete_xml_callback(ref)
340: @xmlcbs.delete(ref)
341: end
Returns if this connection is connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 111
111: def is_connected?
112: return @status == CONNECTED
113: end
Returns if this connection is NOT connected to a Jabber service
| return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 119
119: def is_disconnected?
120: return @status == DISCONNECTED
121: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 94
94: def parse_failure
95: # A new thread has to be created because close will cause the thread
96: # to commit suicide
97: if @exception_block
98: Thread.new { @exception_block.call($!, self, :parser) }
99: else
100: puts "Stream#parse_failure was called by XML parser. Dumping " +
101: "backtrace...\n" + $!.exception + "\n"
102: puts $!.backtrace
103: close
104: raise
105: end
106: end
Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.
Currently not working!
# File lib/xmpp4r/stream.rb, line 307
307: def poll
308: sleep 10
309: while true
310: sleep 2
311: # @pollCounter = @pollCounter - 1
312: # if @pollCounter < 0
313: # begin
314: # send(" \t ")
315: # rescue
316: # Thread.new {@exception_block.call if @exception_block}
317: # break
318: # end
319: # end
320: end
321: end
Process |max| XML stanzas and call listeners for all of them.
| max: | [Integer] the number of stanzas to process (nil means process |
all available)
# File lib/xmpp4r/stream.rb, line 187
187: def process(max = nil)
188: n = 0
189: @StanzaQueueMutex.lock
190: while @StanzaQueue.size > 0 and (max == nil or n < max)
191: e = @StanzaQueue.shift
192: @StanzaQueueMutex.unlock
193: process_one(e)
194: n += 1
195: @StanzaQueueMutex.lock
196: end
197: @StanzaQueueMutex.unlock
198: n
199: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
| element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 128
128: def receive(element)
129: Jabber::debuglog("RECEIVED:\n#{element.to_s}")
130: case element.name
131: when 'stream'
132: stanza = element
133: i = element.attribute("id")
134: @streamid = i.value if i
135: when 'message'
136: stanza = Message::import(element)
137: when 'iq'
138: stanza = Iq::import(element)
139: when 'presence'
140: stanza = Presence::import(element)
141: else
142: stanza = element
143: end
144: # Iterate through blocked theads (= waiting for an answer)
145: @threadBlocks.each { |thread, proc|
146: r = proc.call(stanza)
147: if r == true
148: @threadBlocks.delete(thread)
149: thread.wakeup if thread.alive?
150: return
151: end
152: }
153: if @threaded
154: process_one(stanza)
155: else
156: # StanzaQueue will be read when the user call process
157: @StanzaQueueMutex.lock
158: @StanzaQueue.push(stanza)
159: @StanzaQueueMutex.unlock
160: @waitingThread.wakeup if @waitingThread
161: end
162: end
Sends XML data to the socket and (optionally) waits to process received data.
| xml: | [String] The xml data to send |
| proc: | [Proc = nil] The optional proc |
| &block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 243
243: def send(xml, proc=nil, &block)
244: Jabber::debuglog("SENDING:\n#{ xml.kind_of?(String) ? xml : xml.to_s }")
245: xml = xml.to_s if not xml.kind_of? String
246: block = proc if proc
247: @threadBlocks[Thread.current]=block if block
248: Thread.critical = true # we don't want to be interupted before we stop!
249: begin
250: @fd << xml
251: @fd.flush
252: rescue
253: if @exception_block
254: @exception_block.call($!, self, :sending)
255: else
256: puts "Exception caught while sending!"
257: raise
258: end
259: end
260: Thread.critical = false
261: # The parser thread might be running this (think of a callback running send())
262: # If this is the case, we mustn't stop (or we would cause a deadlock)
263: Thread.stop if block and Thread.current != @parserThread
264: @pollCounter = 10
265: end
Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!
Be aware that if a stanza with type=’error’ is received the function does not yield but raises an ErrorException with the corresponding error element.
| xml: | [XMLStanza] |
# File lib/xmpp4r/stream.rb, line 278
278: def send_with_id(xml, &block)
279: if xml.id.nil?
280: xml.id = Jabber::IdGenerator.instance.generate_id
281: end
282:
283: error = nil
284: send(xml) do |received|
285: if received.id == xml.id
286: if received.type == :error
287: error = received.error
288: true
289: else
290: yield(received)
291: end
292: else
293: false
294: end
295: end
296:
297: unless error.nil?
298: raise ErrorException.new(error)
299: end
300: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 55
55: def start(fd)
56: @fd = fd
57: @parser = StreamParser.new(@fd, self)
58: @parserThread = Thread.new do
59: begin
60: @parser.parse
61: rescue
62: if @exception_block
63: Thread.new { @exception_block.call($!, self, :start) }
64: else
65: puts "Exception caught in Parser thread!"
66: raise
67: end
68: end
69: end
70: # @pollThread = Thread.new do
71: # begin
72: # poll
73: # rescue
74: # puts "Exception caught in Poll thread, dumping backtrace and" +
75: # " exiting...\n" + $!.exception + "\n"
76: # puts $!.backtrace
77: # exit
78: # end
79: # end
80: @status = CONNECTED
81: end
Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.
| time: | [Integer] time to wait in seconds. If nil, wait infinitely. |
all available)
# File lib/xmpp4r/stream.rb, line 207
207: def wait_and_process(time = nil)
208: if time == 0
209: return process(1)
210: end
211: @StanzaQueueMutex.lock
212: if @StanzaQueue.size > 0
213: e = @StanzaQueue.shift
214: @StanzaQueueMutex.unlock
215: process_one(e)
216: return 1
217: end
218:
219: @waitingThread = Thread.current
220: @wakeupThread = Thread.new { sleep time ; @waitingThread.wakeup if @waitingThread }
221: @waitingThread.stop
222: @wakeupThread.kill if @wakeupThread
223: @wakeupThread = nil
224: @waitingThread = nil
225:
226: @StanzaQueueMutex.lock
227: if @StanzaQueue.size > 0
228: e = @StanzaQueue.shift
229: @StanzaQueueMutex.unlock
230: process_one(e)
231: return 1
232: end
233: return 0
234: end
Process |element| until it is consumed. Returns element.consumed? element The element to process
# File lib/xmpp4r/stream.rb, line 167
167: def process_one(stanza)
168: Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
169: return true if @xmlcbs.process(stanza)
170: return true if @stanzacbs.process(stanza)
171: case stanza
172: when Message
173: return true if @messagecbs.process(stanza)
174: when Iq
175: return true if @iqcbs.process(stanza)
176: when Presence
177: return true if @presencecbs.process(stanza)
178: end
179: end