001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArrayList;
029 import java.util.concurrent.CountDownLatch;
030 import java.util.concurrent.LinkedBlockingQueue;
031 import java.util.concurrent.ThreadFactory;
032 import java.util.concurrent.ThreadPoolExecutor;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicBoolean;
035 import java.util.concurrent.atomic.AtomicInteger;
036 import javax.jms.Connection;
037 import javax.jms.ConnectionConsumer;
038 import javax.jms.ConnectionMetaData;
039 import javax.jms.DeliveryMode;
040 import javax.jms.Destination;
041 import javax.jms.ExceptionListener;
042 import javax.jms.IllegalStateException;
043 import javax.jms.InvalidDestinationException;
044 import javax.jms.JMSException;
045 import javax.jms.Queue;
046 import javax.jms.QueueConnection;
047 import javax.jms.QueueSession;
048 import javax.jms.ServerSessionPool;
049 import javax.jms.Session;
050 import javax.jms.Topic;
051 import javax.jms.TopicConnection;
052 import javax.jms.TopicSession;
053 import javax.jms.XAConnection;
054 import org.apache.activemq.advisory.DestinationSource;
055 import org.apache.activemq.blob.BlobTransferPolicy;
056 import org.apache.activemq.command.ActiveMQDestination;
057 import org.apache.activemq.command.ActiveMQMessage;
058 import org.apache.activemq.command.ActiveMQTempDestination;
059 import org.apache.activemq.command.ActiveMQTempQueue;
060 import org.apache.activemq.command.ActiveMQTempTopic;
061 import org.apache.activemq.command.BrokerInfo;
062 import org.apache.activemq.command.Command;
063 import org.apache.activemq.command.CommandTypes;
064 import org.apache.activemq.command.ConnectionControl;
065 import org.apache.activemq.command.ConnectionError;
066 import org.apache.activemq.command.ConnectionId;
067 import org.apache.activemq.command.ConnectionInfo;
068 import org.apache.activemq.command.ConsumerControl;
069 import org.apache.activemq.command.ConsumerId;
070 import org.apache.activemq.command.ConsumerInfo;
071 import org.apache.activemq.command.ControlCommand;
072 import org.apache.activemq.command.DestinationInfo;
073 import org.apache.activemq.command.ExceptionResponse;
074 import org.apache.activemq.command.Message;
075 import org.apache.activemq.command.MessageDispatch;
076 import org.apache.activemq.command.MessageId;
077 import org.apache.activemq.command.ProducerAck;
078 import org.apache.activemq.command.ProducerId;
079 import org.apache.activemq.command.RemoveInfo;
080 import org.apache.activemq.command.RemoveSubscriptionInfo;
081 import org.apache.activemq.command.Response;
082 import org.apache.activemq.command.SessionId;
083 import org.apache.activemq.command.ShutdownInfo;
084 import org.apache.activemq.command.WireFormatInfo;
085 import org.apache.activemq.management.JMSConnectionStatsImpl;
086 import org.apache.activemq.management.JMSStatsImpl;
087 import org.apache.activemq.management.StatsCapable;
088 import org.apache.activemq.management.StatsImpl;
089 import org.apache.activemq.state.CommandVisitorAdapter;
090 import org.apache.activemq.thread.Scheduler;
091 import org.apache.activemq.thread.TaskRunnerFactory;
092 import org.apache.activemq.transport.Transport;
093 import org.apache.activemq.transport.TransportListener;
094 import org.apache.activemq.transport.failover.FailoverTransport;
095 import org.apache.activemq.util.IdGenerator;
096 import org.apache.activemq.util.IntrospectionSupport;
097 import org.apache.activemq.util.JMSExceptionSupport;
098 import org.apache.activemq.util.LongSequenceGenerator;
099 import org.apache.activemq.util.ServiceSupport;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102
103 public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104
105 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108
109 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
110 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111
112 public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113
114 protected boolean dispatchAsync=true;
115 protected boolean alwaysSessionAsync = true;
116
117 private TaskRunnerFactory sessionTaskRunner;
118 private final ThreadPoolExecutor executor;
119
120 // Connection state variables
121 private final ConnectionInfo info;
122 private ExceptionListener exceptionListener;
123 private ClientInternalExceptionListener clientInternalExceptionListener;
124 private boolean clientIDSet;
125 private boolean isConnectionInfoSentToBroker;
126 private boolean userSpecifiedClientID;
127
128 // Configuration options variables
129 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130 private BlobTransferPolicy blobTransferPolicy;
131 private RedeliveryPolicy redeliveryPolicy;
132 private MessageTransformer transformer;
133
134 private boolean disableTimeStampsByDefault;
135 private boolean optimizedMessageDispatch = true;
136 private boolean copyMessageOnSend = true;
137 private boolean useCompression;
138 private boolean objectMessageSerializationDefered;
139 private boolean useAsyncSend;
140 private boolean optimizeAcknowledge;
141 private boolean nestedMapAndListEnabled = true;
142 private boolean useRetroactiveConsumer;
143 private boolean exclusiveConsumer;
144 private boolean alwaysSyncSend;
145 private int closeTimeout = 15000;
146 private boolean watchTopicAdvisories = true;
147 private long warnAboutUnstartedConnectionTimeout = 500L;
148 private int sendTimeout =0;
149 private boolean sendAcksAsync=true;
150 private boolean checkForDuplicates = true;
151
152 private final Transport transport;
153 private final IdGenerator clientIdGenerator;
154 private final JMSStatsImpl factoryStats;
155 private final JMSConnectionStatsImpl stats;
156
157 private final AtomicBoolean started = new AtomicBoolean(false);
158 private final AtomicBoolean closing = new AtomicBoolean(false);
159 private final AtomicBoolean closed = new AtomicBoolean(false);
160 private final AtomicBoolean transportFailed = new AtomicBoolean(false);
161 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
162 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
163 private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
164 private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
165 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
166
167 // Maps ConsumerIds to ActiveMQConsumer objects
168 private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
169 private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
170 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
171 private final SessionId connectionSessionId;
172 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
173 private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
174 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
175 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
176
177 private AdvisoryConsumer advisoryConsumer;
178 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
179 private BrokerInfo brokerInfo;
180 private IOException firstFailureError;
181 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
182
183 // Assume that protocol is the latest. Change to the actual protocol
184 // version when a WireFormatInfo is received.
185 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
186 private final long timeCreated;
187 private final ConnectionAudit connectionAudit = new ConnectionAudit();
188 private DestinationSource destinationSource;
189 private final Object ensureConnectionInfoSentMutex = new Object();
190 private boolean useDedicatedTaskRunner;
191 protected volatile CountDownLatch transportInterruptionProcessingComplete;
192 private long consumerFailoverRedeliveryWaitPeriod;
193 private final Scheduler scheduler;
194 private boolean messagePrioritySupported=true;
195
196 /**
197 * Construct an <code>ActiveMQConnection</code>
198 *
199 * @param transport
200 * @param factoryStats
201 * @throws Exception
202 */
203 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204
205 this.transport = transport;
206 this.clientIdGenerator = clientIdGenerator;
207 this.factoryStats = factoryStats;
208
209 // Configure a single threaded executor who's core thread can timeout if
210 // idle
211 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212 public Thread newThread(Runnable r) {
213 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214 thread.setDaemon(true);
215 return thread;
216 }
217 });
218 // asyncConnectionThread.allowCoreThreadTimeOut(true);
219 String uniqueId = CONNECTION_ID_GENERATOR.generateId();
220 this.info = new ConnectionInfo(new ConnectionId(uniqueId));
221 this.info.setManageable(true);
222 this.info.setFaultTolerant(transport.isFaultTolerant());
223 this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
224
225 this.transport.setTransportListener(this);
226
227 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
228 this.factoryStats.addConnection(this);
229 this.timeCreated = System.currentTimeMillis();
230 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
231 this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
232 this.scheduler.start();
233 }
234
235 protected void setUserName(String userName) {
236 this.info.setUserName(userName);
237 }
238
239 protected void setPassword(String password) {
240 this.info.setPassword(password);
241 }
242
243 /**
244 * A static helper method to create a new connection
245 *
246 * @return an ActiveMQConnection
247 * @throws JMSException
248 */
249 public static ActiveMQConnection makeConnection() throws JMSException {
250 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
251 return (ActiveMQConnection)factory.createConnection();
252 }
253
254 /**
255 * A static helper method to create a new connection
256 *
257 * @param uri
258 * @return and ActiveMQConnection
259 * @throws JMSException
260 */
261 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
262 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
263 return (ActiveMQConnection)factory.createConnection();
264 }
265
266 /**
267 * A static helper method to create a new connection
268 *
269 * @param user
270 * @param password
271 * @param uri
272 * @return an ActiveMQConnection
273 * @throws JMSException
274 */
275 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
276 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
277 return (ActiveMQConnection)factory.createConnection();
278 }
279
280 /**
281 * @return a number unique for this connection
282 */
283 public JMSConnectionStatsImpl getConnectionStats() {
284 return stats;
285 }
286
287 /**
288 * Creates a <CODE>Session</CODE> object.
289 *
290 * @param transacted indicates whether the session is transacted
291 * @param acknowledgeMode indicates whether the consumer or the client will
292 * acknowledge any messages it receives; ignored if the
293 * session is transacted. Legal values are
294 * <code>Session.AUTO_ACKNOWLEDGE</code>,
295 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
296 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
297 * @return a newly created session
298 * @throws JMSException if the <CODE>Connection</CODE> object fails to
299 * create a session due to some internal error or lack of
300 * support for the specific transaction and acknowledgement
301 * mode.
302 * @see Session#AUTO_ACKNOWLEDGE
303 * @see Session#CLIENT_ACKNOWLEDGE
304 * @see Session#DUPS_OK_ACKNOWLEDGE
305 * @since 1.1
306 */
307 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
308 checkClosedOrFailed();
309 ensureConnectionInfoSent();
310 if(!transacted) {
311 if (acknowledgeMode==Session.SESSION_TRANSACTED) {
312 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
313 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
314 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
315 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
316 }
317 }
318 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
319 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
320 }
321
322 /**
323 * @return sessionId
324 */
325 protected SessionId getNextSessionId() {
326 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
327 }
328
329 /**
330 * Gets the client identifier for this connection.
331 * <P>
332 * This value is specific to the JMS provider. It is either preconfigured by
333 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
334 * dynamically by the application by calling the <code>setClientID</code>
335 * method.
336 *
337 * @return the unique client identifier
338 * @throws JMSException if the JMS provider fails to return the client ID
339 * for this connection due to some internal error.
340 */
341 public String getClientID() throws JMSException {
342 checkClosedOrFailed();
343 return this.info.getClientId();
344 }
345
346 /**
347 * Sets the client identifier for this connection.
348 * <P>
349 * The preferred way to assign a JMS client's client identifier is for it to
350 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
351 * object and transparently assigned to the <CODE>Connection</CODE> object
352 * it creates.
353 * <P>
354 * Alternatively, a client can set a connection's client identifier using a
355 * provider-specific value. The facility to set a connection's client
356 * identifier explicitly is not a mechanism for overriding the identifier
357 * that has been administratively configured. It is provided for the case
358 * where no administratively specified identifier exists. If one does exist,
359 * an attempt to change it by setting it must throw an
360 * <CODE>IllegalStateException</CODE>. If a client sets the client
361 * identifier explicitly, it must do so immediately after it creates the
362 * connection and before any other action on the connection is taken. After
363 * this point, setting the client identifier is a programming error that
364 * should throw an <CODE>IllegalStateException</CODE>.
365 * <P>
366 * The purpose of the client identifier is to associate a connection and its
367 * objects with a state maintained on behalf of the client by a provider.
368 * The only such state identified by the JMS API is that required to support
369 * durable subscriptions.
370 * <P>
371 * If another connection with the same <code>clientID</code> is already
372 * running when this method is called, the JMS provider should detect the
373 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
374 *
375 * @param newClientID the unique client identifier
376 * @throws JMSException if the JMS provider fails to set the client ID for
377 * this connection due to some internal error.
378 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
379 * invalid or duplicate client ID.
380 * @throws javax.jms.IllegalStateException if the JMS client attempts to set
381 * a connection's client ID at the wrong time or when it has
382 * been administratively configured.
383 */
384 public void setClientID(String newClientID) throws JMSException {
385 checkClosedOrFailed();
386
387 if (this.clientIDSet) {
388 throw new IllegalStateException("The clientID has already been set");
389 }
390
391 if (this.isConnectionInfoSentToBroker) {
392 throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
393 }
394
395 this.info.setClientId(newClientID);
396 this.userSpecifiedClientID = true;
397 ensureConnectionInfoSent();
398 }
399
400 /**
401 * Sets the default client id that the connection will use if explicitly not
402 * set with the setClientId() call.
403 */
404 public void setDefaultClientID(String clientID) throws JMSException {
405 this.info.setClientId(clientID);
406 this.userSpecifiedClientID = true;
407 }
408
409 /**
410 * Gets the metadata for this connection.
411 *
412 * @return the connection metadata
413 * @throws JMSException if the JMS provider fails to get the connection
414 * metadata for this connection.
415 * @see javax.jms.ConnectionMetaData
416 */
417 public ConnectionMetaData getMetaData() throws JMSException {
418 checkClosedOrFailed();
419 return ActiveMQConnectionMetaData.INSTANCE;
420 }
421
422 /**
423 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
424 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
425 * associated with it.
426 *
427 * @return the <CODE>ExceptionListener</CODE> for this connection, or
428 * null, if no <CODE>ExceptionListener</CODE> is associated with
429 * this connection.
430 * @throws JMSException if the JMS provider fails to get the
431 * <CODE>ExceptionListener</CODE> for this connection.
432 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
433 */
434 public ExceptionListener getExceptionListener() throws JMSException {
435 checkClosedOrFailed();
436 return this.exceptionListener;
437 }
438
439 /**
440 * Sets an exception listener for this connection.
441 * <P>
442 * If a JMS provider detects a serious problem with a connection, it informs
443 * the connection's <CODE> ExceptionListener</CODE>, if one has been
444 * registered. It does this by calling the listener's <CODE>onException
445 * </CODE>
446 * method, passing it a <CODE>JMSException</CODE> object describing the
447 * problem.
448 * <P>
449 * An exception listener allows a client to be notified of a problem
450 * asynchronously. Some connections only consume messages, so they would
451 * have no other way to learn their connection has failed.
452 * <P>
453 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
454 * <P>
455 * A JMS provider should attempt to resolve connection problems itself
456 * before it notifies the client of them.
457 *
458 * @param listener the exception listener
459 * @throws JMSException if the JMS provider fails to set the exception
460 * listener for this connection.
461 */
462 public void setExceptionListener(ExceptionListener listener) throws JMSException {
463 checkClosedOrFailed();
464 this.exceptionListener = listener;
465 }
466
467 /**
468 * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
469 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
470 * associated with it.
471 *
472 * @return the listener or <code>null</code> if no listener is registered with the connection.
473 */
474 public ClientInternalExceptionListener getClientInternalExceptionListener()
475 {
476 return clientInternalExceptionListener;
477 }
478
479 /**
480 * Sets a client internal exception listener for this connection.
481 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
482 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
483 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
484 * describing the problem.
485 *
486 * @param listener the exception listener
487 */
488 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
489 {
490 this.clientInternalExceptionListener = listener;
491 }
492
493 /**
494 * Starts (or restarts) a connection's delivery of incoming messages. A call
495 * to <CODE>start</CODE> on a connection that has already been started is
496 * ignored.
497 *
498 * @throws JMSException if the JMS provider fails to start message delivery
499 * due to some internal error.
500 * @see javax.jms.Connection#stop()
501 */
502 public void start() throws JMSException {
503 checkClosedOrFailed();
504 ensureConnectionInfoSent();
505 if (started.compareAndSet(false, true)) {
506 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
507 ActiveMQSession session = i.next();
508 session.start();
509 }
510 }
511 }
512
513 /**
514 * Temporarily stops a connection's delivery of incoming messages. Delivery
515 * can be restarted using the connection's <CODE>start</CODE> method. When
516 * the connection is stopped, delivery to all the connection's message
517 * consumers is inhibited: synchronous receives block, and messages are not
518 * delivered to message listeners.
519 * <P>
520 * This call blocks until receives and/or message listeners in progress have
521 * completed.
522 * <P>
523 * Stopping a connection has no effect on its ability to send messages. A
524 * call to <CODE>stop</CODE> on a connection that has already been stopped
525 * is ignored.
526 * <P>
527 * A call to <CODE>stop</CODE> must not return until delivery of messages
528 * has paused. This means that a client can rely on the fact that none of
529 * its message listeners will be called and that all threads of control
530 * waiting for <CODE>receive</CODE> calls to return will not return with a
531 * message until the connection is restarted. The receive timers for a
532 * stopped connection continue to advance, so receives may time out while
533 * the connection is stopped.
534 * <P>
535 * If message listeners are running when <CODE>stop</CODE> is invoked, the
536 * <CODE>stop</CODE> call must wait until all of them have returned before
537 * it may return. While these message listeners are completing, they must
538 * have the full services of the connection available to them.
539 *
540 * @throws JMSException if the JMS provider fails to stop message delivery
541 * due to some internal error.
542 * @see javax.jms.Connection#start()
543 */
544 public void stop() throws JMSException {
545 checkClosedOrFailed();
546 if (started.compareAndSet(true, false)) {
547 synchronized(sessions) {
548 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
549 ActiveMQSession s = i.next();
550 s.stop();
551 }
552 }
553 }
554 }
555
556 /**
557 * Closes the connection.
558 * <P>
559 * Since a provider typically allocates significant resources outside the
560 * JVM on behalf of a connection, clients should close these resources when
561 * they are not needed. Relying on garbage collection to eventually reclaim
562 * these resources may not be timely enough.
563 * <P>
564 * There is no need to close the sessions, producers, and consumers of a
565 * closed connection.
566 * <P>
567 * Closing a connection causes all temporary destinations to be deleted.
568 * <P>
569 * When this method is invoked, it should not return until message
570 * processing has been shut down in an orderly fashion. This means that all
571 * message listeners that may have been running have returned, and that all
572 * pending receives have returned. A close terminates all pending message
573 * receives on the connection's sessions' consumers. The receives may return
574 * with a message or with null, depending on whether there was a message
575 * available at the time of the close. If one or more of the connection's
576 * sessions' message listeners is processing a message at the time when
577 * connection <CODE>close</CODE> is invoked, all the facilities of the
578 * connection and its sessions must remain available to those listeners
579 * until they return control to the JMS provider.
580 * <P>
581 * Closing a connection causes any of its sessions' transactions in progress
582 * to be rolled back. In the case where a session's work is coordinated by
583 * an external transaction manager, a session's <CODE>commit</CODE> and
584 * <CODE> rollback</CODE> methods are not used and the result of a closed
585 * session's work is determined later by the transaction manager. Closing a
586 * connection does NOT force an acknowledgment of client-acknowledged
587 * sessions.
588 * <P>
589 * Invoking the <CODE>acknowledge</CODE> method of a received message from
590 * a closed connection's session must throw an
591 * <CODE>IllegalStateException</CODE>. Closing a closed connection must
592 * NOT throw an exception.
593 *
594 * @throws JMSException if the JMS provider fails to close the connection
595 * due to some internal error. For example, a failure to
596 * release resources or to close a socket connection can
597 * cause this exception to be thrown.
598 */
599 public void close() throws JMSException {
600 try {
601 // If we were running, lets stop first.
602 if (!closed.get() && !transportFailed.get()) {
603 stop();
604 }
605
606 synchronized (this) {
607 if (!closed.get()) {
608 closing.set(true);
609
610 if (destinationSource != null) {
611 destinationSource.stop();
612 destinationSource = null;
613 }
614 if (advisoryConsumer != null) {
615 advisoryConsumer.dispose();
616 advisoryConsumer = null;
617 }
618 if (this.scheduler != null) {
619 try {
620 this.scheduler.stop();
621 } catch (Exception e) {
622 JMSException ex = JMSExceptionSupport.create(e);
623 throw ex;
624 }
625 }
626
627 long lastDeliveredSequenceId = 0;
628 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
629 ActiveMQSession s = i.next();
630 s.dispose();
631 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
632 }
633 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
634 ActiveMQConnectionConsumer c = i.next();
635 c.dispose();
636 }
637 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
638 ActiveMQInputStream c = i.next();
639 c.dispose();
640 }
641 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
642 ActiveMQOutputStream c = i.next();
643 c.dispose();
644 }
645
646 // As TemporaryQueue and TemporaryTopic instances are bound
647 // to a connection we should just delete them after the connection
648 // is closed to free up memory
649 for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
650 ActiveMQTempDestination c = i.next();
651 c.delete();
652 }
653
654 if (isConnectionInfoSentToBroker) {
655 // If we announced ourselfs to the broker.. Try to let
656 // the broker
657 // know that the connection is being shutdown.
658 RemoveInfo removeCommand = info.createRemoveCommand();
659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660 doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
661 doAsyncSendPacket(new ShutdownInfo());
662 }
663
664 ServiceSupport.dispose(this.transport);
665
666 started.set(false);
667
668 // TODO if we move the TaskRunnerFactory to the connection
669 // factory
670 // then we may need to call
671 // factory.onConnectionClose(this);
672 if (sessionTaskRunner != null) {
673 sessionTaskRunner.shutdown();
674 }
675 closed.set(true);
676 closing.set(false);
677 }
678 }
679 } finally {
680 try {
681 if (executor != null){
682 executor.shutdown();
683 }
684 }catch(Throwable e) {
685 LOG.error("Error shutting down thread pool " + e,e);
686 }
687 factoryStats.removeConnection(this);
688 }
689 }
690
691 /**
692 * Tells the broker to terminate its VM. This can be used to cleanly
693 * terminate a broker running in a standalone java process. Server must have
694 * property enable.vm.shutdown=true defined to allow this to work.
695 */
696 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
697 // implemented.
698 /*
699 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
700 * command = new BrokerAdminCommand();
701 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
702 * asyncSendPacket(command); }
703 */
704
705 /**
706 * Create a durable connection consumer for this connection (optional
707 * operation). This is an expert facility not used by regular JMS clients.
708 *
709 * @param topic topic to access
710 * @param subscriptionName durable subscription name
711 * @param messageSelector only messages with properties matching the message
712 * selector expression are delivered. A value of null or an
713 * empty string indicates that there is no message selector
714 * for the message consumer.
715 * @param sessionPool the server session pool to associate with this durable
716 * connection consumer
717 * @param maxMessages the maximum number of messages that can be assigned to
718 * a server session at one time
719 * @return the durable connection consumer
720 * @throws JMSException if the <CODE>Connection</CODE> object fails to
721 * create a connection consumer due to some internal error
722 * or invalid arguments for <CODE>sessionPool</CODE> and
723 * <CODE>messageSelector</CODE>.
724 * @throws javax.jms.InvalidDestinationException if an invalid destination
725 * is specified.
726 * @throws javax.jms.InvalidSelectorException if the message selector is
727 * invalid.
728 * @see javax.jms.ConnectionConsumer
729 * @since 1.1
730 */
731 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
732 throws JMSException {
733 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
734 }
735
736 /**
737 * Create a durable connection consumer for this connection (optional
738 * operation). This is an expert facility not used by regular JMS clients.
739 *
740 * @param topic topic to access
741 * @param subscriptionName durable subscription name
742 * @param messageSelector only messages with properties matching the message
743 * selector expression are delivered. A value of null or an
744 * empty string indicates that there is no message selector
745 * for the message consumer.
746 * @param sessionPool the server session pool to associate with this durable
747 * connection consumer
748 * @param maxMessages the maximum number of messages that can be assigned to
749 * a server session at one time
750 * @param noLocal set true if you want to filter out messages published
751 * locally
752 * @return the durable connection consumer
753 * @throws JMSException if the <CODE>Connection</CODE> object fails to
754 * create a connection consumer due to some internal error
755 * or invalid arguments for <CODE>sessionPool</CODE> and
756 * <CODE>messageSelector</CODE>.
757 * @throws javax.jms.InvalidDestinationException if an invalid destination
758 * is specified.
759 * @throws javax.jms.InvalidSelectorException if the message selector is
760 * invalid.
761 * @see javax.jms.ConnectionConsumer
762 * @since 1.1
763 */
764 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
765 boolean noLocal) throws JMSException {
766 checkClosedOrFailed();
767 ensureConnectionInfoSent();
768 SessionId sessionId = new SessionId(info.getConnectionId(), -1);
769 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
770 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
771 info.setSubscriptionName(subscriptionName);
772 info.setSelector(messageSelector);
773 info.setPrefetchSize(maxMessages);
774 info.setDispatchAsync(isDispatchAsync());
775
776 // Allows the options on the destination to configure the consumerInfo
777 if (info.getDestination().getOptions() != null) {
778 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
779 IntrospectionSupport.setProperties(this.info, options, "consumer.");
780 }
781
782 return new ActiveMQConnectionConsumer(this, sessionPool, info);
783 }
784
785 // Properties
786 // -------------------------------------------------------------------------
787
788 /**
789 * Returns true if this connection has been started
790 *
791 * @return true if this Connection is started
792 */
793 public boolean isStarted() {
794 return started.get();
795 }
796
797 /**
798 * Returns true if the connection is closed
799 */
800 public boolean isClosed() {
801 return closed.get();
802 }
803
804 /**
805 * Returns true if the connection is in the process of being closed
806 */
807 public boolean isClosing() {
808 return closing.get();
809 }
810
811 /**
812 * Returns true if the underlying transport has failed
813 */
814 public boolean isTransportFailed() {
815 return transportFailed.get();
816 }
817
818 /**
819 * @return Returns the prefetchPolicy.
820 */
821 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
822 return prefetchPolicy;
823 }
824
825 /**
826 * Sets the <a
827 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
828 * policy</a> for consumers created by this connection.
829 */
830 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
831 this.prefetchPolicy = prefetchPolicy;
832 }
833
834 /**
835 */
836 public Transport getTransportChannel() {
837 return transport;
838 }
839
840 /**
841 * @return Returns the clientID of the connection, forcing one to be
842 * generated if one has not yet been configured.
843 */
844 public String getInitializedClientID() throws JMSException {
845 ensureConnectionInfoSent();
846 return info.getClientId();
847 }
848
849 /**
850 * @return Returns the timeStampsDisableByDefault.
851 */
852 public boolean isDisableTimeStampsByDefault() {
853 return disableTimeStampsByDefault;
854 }
855
856 /**
857 * Sets whether or not timestamps on messages should be disabled or not. If
858 * you disable them it adds a small performance boost.
859 */
860 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
861 this.disableTimeStampsByDefault = timeStampsDisableByDefault;
862 }
863
864 /**
865 * @return Returns the dispatchOptimizedMessage.
866 */
867 public boolean isOptimizedMessageDispatch() {
868 return optimizedMessageDispatch;
869 }
870
871 /**
872 * If this flag is set then an larger prefetch limit is used - only
873 * applicable for durable topic subscribers.
874 */
875 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
876 this.optimizedMessageDispatch = dispatchOptimizedMessage;
877 }
878
879 /**
880 * @return Returns the closeTimeout.
881 */
882 public int getCloseTimeout() {
883 return closeTimeout;
884 }
885
886 /**
887 * Sets the timeout before a close is considered complete. Normally a
888 * close() on a connection waits for confirmation from the broker; this
889 * allows that operation to timeout to save the client hanging if there is
890 * no broker
891 */
892 public void setCloseTimeout(int closeTimeout) {
893 this.closeTimeout = closeTimeout;
894 }
895
896 /**
897 * @return ConnectionInfo
898 */
899 public ConnectionInfo getConnectionInfo() {
900 return this.info;
901 }
902
903 public boolean isUseRetroactiveConsumer() {
904 return useRetroactiveConsumer;
905 }
906
907 /**
908 * Sets whether or not retroactive consumers are enabled. Retroactive
909 * consumers allow non-durable topic subscribers to receive old messages
910 * that were published before the non-durable subscriber started.
911 */
912 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
913 this.useRetroactiveConsumer = useRetroactiveConsumer;
914 }
915
916 public boolean isNestedMapAndListEnabled() {
917 return nestedMapAndListEnabled;
918 }
919
920 /**
921 * Enables/disables whether or not Message properties and MapMessage entries
922 * support <a
923 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
924 * Structures</a> of Map and List objects
925 */
926 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
927 this.nestedMapAndListEnabled = structuredMapsEnabled;
928 }
929
930 public boolean isExclusiveConsumer() {
931 return exclusiveConsumer;
932 }
933
934 /**
935 * Enables or disables whether or not queue consumers should be exclusive or
936 * not for example to preserve ordering when not using <a
937 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
938 *
939 * @param exclusiveConsumer
940 */
941 public void setExclusiveConsumer(boolean exclusiveConsumer) {
942 this.exclusiveConsumer = exclusiveConsumer;
943 }
944
945 /**
946 * Adds a transport listener so that a client can be notified of events in
947 * the underlying transport
948 */
949 public void addTransportListener(TransportListener transportListener) {
950 transportListeners.add(transportListener);
951 }
952
953 public void removeTransportListener(TransportListener transportListener) {
954 transportListeners.remove(transportListener);
955 }
956
957 public boolean isUseDedicatedTaskRunner() {
958 return useDedicatedTaskRunner;
959 }
960
961 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
962 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
963 }
964
965 public TaskRunnerFactory getSessionTaskRunner() {
966 synchronized (this) {
967 if (sessionTaskRunner == null) {
968 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
969 }
970 }
971 return sessionTaskRunner;
972 }
973
974 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
975 this.sessionTaskRunner = sessionTaskRunner;
976 }
977
978 public MessageTransformer getTransformer() {
979 return transformer;
980 }
981
982 /**
983 * Sets the transformer used to transform messages before they are sent on
984 * to the JMS bus or when they are received from the bus but before they are
985 * delivered to the JMS client
986 */
987 public void setTransformer(MessageTransformer transformer) {
988 this.transformer = transformer;
989 }
990
991 /**
992 * @return the statsEnabled
993 */
994 public boolean isStatsEnabled() {
995 return this.stats.isEnabled();
996 }
997
998 /**
999 * @param statsEnabled the statsEnabled to set
1000 */
1001 public void setStatsEnabled(boolean statsEnabled) {
1002 this.stats.setEnabled(statsEnabled);
1003 }
1004
1005 /**
1006 * Returns the {@link DestinationSource} object which can be used to listen to destinations
1007 * being created or destroyed or to enquire about the current destinations available on the broker
1008 *
1009 * @return a lazily created destination source
1010 * @throws JMSException
1011 */
1012 public DestinationSource getDestinationSource() throws JMSException {
1013 if (destinationSource == null) {
1014 destinationSource = new DestinationSource(this);
1015 destinationSource.start();
1016 }
1017 return destinationSource;
1018 }
1019
1020 // Implementation methods
1021 // -------------------------------------------------------------------------
1022
1023 /**
1024 * Used internally for adding Sessions to the Connection
1025 *
1026 * @param session
1027 * @throws JMSException
1028 * @throws JMSException
1029 */
1030 protected void addSession(ActiveMQSession session) throws JMSException {
1031 this.sessions.add(session);
1032 if (sessions.size() > 1 || session.isTransacted()) {
1033 optimizedMessageDispatch = false;
1034 }
1035 }
1036
1037 /**
1038 * Used interanlly for removing Sessions from a Connection
1039 *
1040 * @param session
1041 */
1042 protected void removeSession(ActiveMQSession session) {
1043 this.sessions.remove(session);
1044 this.removeDispatcher(session);
1045 }
1046
1047 /**
1048 * Add a ConnectionConsumer
1049 *
1050 * @param connectionConsumer
1051 * @throws JMSException
1052 */
1053 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1054 this.connectionConsumers.add(connectionConsumer);
1055 }
1056
1057 /**
1058 * Remove a ConnectionConsumer
1059 *
1060 * @param connectionConsumer
1061 */
1062 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1063 this.connectionConsumers.remove(connectionConsumer);
1064 this.removeDispatcher(connectionConsumer);
1065 }
1066
1067 /**
1068 * Creates a <CODE>TopicSession</CODE> object.
1069 *
1070 * @param transacted indicates whether the session is transacted
1071 * @param acknowledgeMode indicates whether the consumer or the client will
1072 * acknowledge any messages it receives; ignored if the
1073 * session is transacted. Legal values are
1074 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1075 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1076 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1077 * @return a newly created topic session
1078 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1079 * to create a session due to some internal error or lack of
1080 * support for the specific transaction and acknowledgement
1081 * mode.
1082 * @see Session#AUTO_ACKNOWLEDGE
1083 * @see Session#CLIENT_ACKNOWLEDGE
1084 * @see Session#DUPS_OK_ACKNOWLEDGE
1085 */
1086 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1087 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1088 }
1089
1090 /**
1091 * Creates a connection consumer for this connection (optional operation).
1092 * This is an expert facility not used by regular JMS clients.
1093 *
1094 * @param topic the topic to access
1095 * @param messageSelector only messages with properties matching the message
1096 * selector expression are delivered. A value of null or an
1097 * empty string indicates that there is no message selector
1098 * for the message consumer.
1099 * @param sessionPool the server session pool to associate with this
1100 * connection consumer
1101 * @param maxMessages the maximum number of messages that can be assigned to
1102 * a server session at one time
1103 * @return the connection consumer
1104 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1105 * to create a connection consumer due to some internal
1106 * error or invalid arguments for <CODE>sessionPool</CODE>
1107 * and <CODE>messageSelector</CODE>.
1108 * @throws javax.jms.InvalidDestinationException if an invalid topic is
1109 * specified.
1110 * @throws javax.jms.InvalidSelectorException if the message selector is
1111 * invalid.
1112 * @see javax.jms.ConnectionConsumer
1113 */
1114 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1115 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1116 }
1117
1118 /**
1119 * Creates a connection consumer for this connection (optional operation).
1120 * This is an expert facility not used by regular JMS clients.
1121 *
1122 * @param queue the queue to access
1123 * @param messageSelector only messages with properties matching the message
1124 * selector expression are delivered. A value of null or an
1125 * empty string indicates that there is no message selector
1126 * for the message consumer.
1127 * @param sessionPool the server session pool to associate with this
1128 * connection consumer
1129 * @param maxMessages the maximum number of messages that can be assigned to
1130 * a server session at one time
1131 * @return the connection consumer
1132 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1133 * to create a connection consumer due to some internal
1134 * error or invalid arguments for <CODE>sessionPool</CODE>
1135 * and <CODE>messageSelector</CODE>.
1136 * @throws javax.jms.InvalidDestinationException if an invalid queue is
1137 * specified.
1138 * @throws javax.jms.InvalidSelectorException if the message selector is
1139 * invalid.
1140 * @see javax.jms.ConnectionConsumer
1141 */
1142 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1143 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1144 }
1145
1146 /**
1147 * Creates a connection consumer for this connection (optional operation).
1148 * This is an expert facility not used by regular JMS clients.
1149 *
1150 * @param destination the destination to access
1151 * @param messageSelector only messages with properties matching the message
1152 * selector expression are delivered. A value of null or an
1153 * empty string indicates that there is no message selector
1154 * for the message consumer.
1155 * @param sessionPool the server session pool to associate with this
1156 * connection consumer
1157 * @param maxMessages the maximum number of messages that can be assigned to
1158 * a server session at one time
1159 * @return the connection consumer
1160 * @throws JMSException if the <CODE>Connection</CODE> object fails to
1161 * create a connection consumer due to some internal error
1162 * or invalid arguments for <CODE>sessionPool</CODE> and
1163 * <CODE>messageSelector</CODE>.
1164 * @throws javax.jms.InvalidDestinationException if an invalid destination
1165 * is specified.
1166 * @throws javax.jms.InvalidSelectorException if the message selector is
1167 * invalid.
1168 * @see javax.jms.ConnectionConsumer
1169 * @since 1.1
1170 */
1171 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1172 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1173 }
1174
1175 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1176 throws JMSException {
1177
1178 checkClosedOrFailed();
1179 ensureConnectionInfoSent();
1180
1181 ConsumerId consumerId = createConsumerId();
1182 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1183 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1184 consumerInfo.setSelector(messageSelector);
1185 consumerInfo.setPrefetchSize(maxMessages);
1186 consumerInfo.setNoLocal(noLocal);
1187 consumerInfo.setDispatchAsync(isDispatchAsync());
1188
1189 // Allows the options on the destination to configure the consumerInfo
1190 if (consumerInfo.getDestination().getOptions() != null) {
1191 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1192 IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1193 }
1194
1195 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1196 }
1197
1198 /**
1199 * @return
1200 */
1201 private ConsumerId createConsumerId() {
1202 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1203 }
1204
1205 /**
1206 * @return
1207 */
1208 private ProducerId createProducerId() {
1209 return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1210 }
1211
1212 /**
1213 * Creates a <CODE>QueueSession</CODE> object.
1214 *
1215 * @param transacted indicates whether the session is transacted
1216 * @param acknowledgeMode indicates whether the consumer or the client will
1217 * acknowledge any messages it receives; ignored if the
1218 * session is transacted. Legal values are
1219 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1220 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1221 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1222 * @return a newly created queue session
1223 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1224 * to create a session due to some internal error or lack of
1225 * support for the specific transaction and acknowledgement
1226 * mode.
1227 * @see Session#AUTO_ACKNOWLEDGE
1228 * @see Session#CLIENT_ACKNOWLEDGE
1229 * @see Session#DUPS_OK_ACKNOWLEDGE
1230 */
1231 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1232 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1233 }
1234
1235 /**
1236 * Ensures that the clientID was manually specified and not auto-generated.
1237 * If the clientID was not specified this method will throw an exception.
1238 * This method is used to ensure that the clientID + durableSubscriber name
1239 * are used correctly.
1240 *
1241 * @throws JMSException
1242 */
1243 public void checkClientIDWasManuallySpecified() throws JMSException {
1244 if (!userSpecifiedClientID) {
1245 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1246 }
1247 }
1248
1249 /**
1250 * send a Packet through the Connection - for internal use only
1251 *
1252 * @param command
1253 * @throws JMSException
1254 */
1255 public void asyncSendPacket(Command command) throws JMSException {
1256 if (isClosed()) {
1257 throw new ConnectionClosedException();
1258 } else {
1259 doAsyncSendPacket(command);
1260 }
1261 }
1262
1263 private void doAsyncSendPacket(Command command) throws JMSException {
1264 try {
1265 this.transport.oneway(command);
1266 } catch (IOException e) {
1267 throw JMSExceptionSupport.create(e);
1268 }
1269 }
1270
1271 /**
1272 * Send a packet through a Connection - for internal use only
1273 *
1274 * @param command
1275 * @return
1276 * @throws JMSException
1277 */
1278 public Response syncSendPacket(Command command) throws JMSException {
1279 if (isClosed()) {
1280 throw new ConnectionClosedException();
1281 } else {
1282
1283 try {
1284 Response response = (Response)this.transport.request(command);
1285 if (response.isException()) {
1286 ExceptionResponse er = (ExceptionResponse)response;
1287 if (er.getException() instanceof JMSException) {
1288 throw (JMSException)er.getException();
1289 } else {
1290 if (isClosed()||closing.get()) {
1291 LOG.debug("Received an exception but connection is closing");
1292 }
1293 JMSException jmsEx = null;
1294 try {
1295 jmsEx = JMSExceptionSupport.create(er.getException());
1296 }catch(Throwable e) {
1297 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1298 }
1299 if(jmsEx !=null) {
1300 throw jmsEx;
1301 }
1302 }
1303 }
1304 return response;
1305 } catch (IOException e) {
1306 throw JMSExceptionSupport.create(e);
1307 }
1308 }
1309 }
1310
1311 /**
1312 * Send a packet through a Connection - for internal use only
1313 *
1314 * @param command
1315 * @return
1316 * @throws JMSException
1317 */
1318 public Response syncSendPacket(Command command, int timeout) throws JMSException {
1319 if (isClosed() || closing.get()) {
1320 throw new ConnectionClosedException();
1321 } else {
1322 return doSyncSendPacket(command, timeout);
1323 }
1324 }
1325
1326 private Response doSyncSendPacket(Command command, int timeout)
1327 throws JMSException {
1328 try {
1329 Response response = (Response) (timeout > 0
1330 ? this.transport.request(command, timeout)
1331 : this.transport.request(command));
1332 if (response != null && response.isException()) {
1333 ExceptionResponse er = (ExceptionResponse)response;
1334 if (er.getException() instanceof JMSException) {
1335 throw (JMSException)er.getException();
1336 } else {
1337 throw JMSExceptionSupport.create(er.getException());
1338 }
1339 }
1340 return response;
1341 } catch (IOException e) {
1342 throw JMSExceptionSupport.create(e);
1343 }
1344 }
1345
1346 /**
1347 * @return statistics for this Connection
1348 */
1349 public StatsImpl getStats() {
1350 return stats;
1351 }
1352
1353 /**
1354 * simply throws an exception if the Connection is already closed or the
1355 * Transport has failed
1356 *
1357 * @throws JMSException
1358 */
1359 protected synchronized void checkClosedOrFailed() throws JMSException {
1360 checkClosed();
1361 if (transportFailed.get()) {
1362 throw new ConnectionFailedException(firstFailureError);
1363 }
1364 }
1365
1366 /**
1367 * simply throws an exception if the Connection is already closed
1368 *
1369 * @throws JMSException
1370 */
1371 protected synchronized void checkClosed() throws JMSException {
1372 if (closed.get()) {
1373 throw new ConnectionClosedException();
1374 }
1375 }
1376
1377 /**
1378 * Send the ConnectionInfo to the Broker
1379 *
1380 * @throws JMSException
1381 */
1382 protected void ensureConnectionInfoSent() throws JMSException {
1383 synchronized(this.ensureConnectionInfoSentMutex) {
1384 // Can we skip sending the ConnectionInfo packet??
1385 if (isConnectionInfoSentToBroker || closed.get()) {
1386 return;
1387 }
1388 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1389 if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1390 info.setClientId(clientIdGenerator.generateId());
1391 }
1392 syncSendPacket(info.copy());
1393
1394 this.isConnectionInfoSentToBroker = true;
1395 // Add a temp destination advisory consumer so that
1396 // We know what the valid temporary destinations are on the
1397 // broker without having to do an RPC to the broker.
1398
1399 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1400 if (watchTopicAdvisories) {
1401 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1402 }
1403 }
1404 }
1405
1406 public synchronized boolean isWatchTopicAdvisories() {
1407 return watchTopicAdvisories;
1408 }
1409
1410 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1411 this.watchTopicAdvisories = watchTopicAdvisories;
1412 }
1413
1414 /**
1415 * @return Returns the useAsyncSend.
1416 */
1417 public boolean isUseAsyncSend() {
1418 return useAsyncSend;
1419 }
1420
1421 /**
1422 * Forces the use of <a
1423 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1424 * adds a massive performance boost; but means that the send() method will
1425 * return immediately whether the message has been sent or not which could
1426 * lead to message loss.
1427 */
1428 public void setUseAsyncSend(boolean useAsyncSend) {
1429 this.useAsyncSend = useAsyncSend;
1430 }
1431
1432 /**
1433 * @return true if always sync send messages
1434 */
1435 public boolean isAlwaysSyncSend() {
1436 return this.alwaysSyncSend;
1437 }
1438
1439 /**
1440 * Set true if always require messages to be sync sent
1441 *
1442 * @param alwaysSyncSend
1443 */
1444 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1445 this.alwaysSyncSend = alwaysSyncSend;
1446 }
1447
1448 /**
1449 * @return the messagePrioritySupported
1450 */
1451 public boolean isMessagePrioritySupported() {
1452 return this.messagePrioritySupported;
1453 }
1454
1455 /**
1456 * @param messagePrioritySupported the messagePrioritySupported to set
1457 */
1458 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1459 this.messagePrioritySupported = messagePrioritySupported;
1460 }
1461
1462 /**
1463 * Cleans up this connection so that it's state is as if the connection was
1464 * just created. This allows the Resource Adapter to clean up a connection
1465 * so that it can be reused without having to close and recreate the
1466 * connection.
1467 */
1468 public void cleanup() throws JMSException {
1469
1470 if (advisoryConsumer != null && !isTransportFailed()) {
1471 advisoryConsumer.dispose();
1472 advisoryConsumer = null;
1473 }
1474
1475 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1476 ActiveMQSession s = i.next();
1477 s.dispose();
1478 }
1479 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1480 ActiveMQConnectionConsumer c = i.next();
1481 c.dispose();
1482 }
1483 for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1484 ActiveMQInputStream c = i.next();
1485 c.dispose();
1486 }
1487 for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1488 ActiveMQOutputStream c = i.next();
1489 c.dispose();
1490 }
1491
1492 if (isConnectionInfoSentToBroker) {
1493 if (!transportFailed.get() && !closing.get()) {
1494 syncSendPacket(info.createRemoveCommand());
1495 }
1496 isConnectionInfoSentToBroker = false;
1497 }
1498 if (userSpecifiedClientID) {
1499 info.setClientId(null);
1500 userSpecifiedClientID = false;
1501 }
1502 clientIDSet = false;
1503
1504 started.set(false);
1505 }
1506
1507 /**
1508 * Changes the associated username/password that is associated with this
1509 * connection. If the connection has been used, you must called cleanup()
1510 * before calling this method.
1511 *
1512 * @throws IllegalStateException if the connection is in used.
1513 */
1514 public void changeUserInfo(String userName, String password) throws JMSException {
1515 if (isConnectionInfoSentToBroker) {
1516 throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1517 }
1518 this.info.setUserName(userName);
1519 this.info.setPassword(password);
1520 }
1521
1522 /**
1523 * @return Returns the resourceManagerId.
1524 * @throws JMSException
1525 */
1526 public String getResourceManagerId() throws JMSException {
1527 waitForBrokerInfo();
1528 if (brokerInfo == null) {
1529 throw new JMSException("Connection failed before Broker info was received.");
1530 }
1531 return brokerInfo.getBrokerId().getValue();
1532 }
1533
1534 /**
1535 * Returns the broker name if one is available or null if one is not
1536 * available yet.
1537 */
1538 public String getBrokerName() {
1539 try {
1540 brokerInfoReceived.await(5, TimeUnit.SECONDS);
1541 if (brokerInfo == null) {
1542 return null;
1543 }
1544 return brokerInfo.getBrokerName();
1545 } catch (InterruptedException e) {
1546 Thread.currentThread().interrupt();
1547 return null;
1548 }
1549 }
1550
1551 /**
1552 * Returns the broker information if it is available or null if it is not
1553 * available yet.
1554 */
1555 public BrokerInfo getBrokerInfo() {
1556 return brokerInfo;
1557 }
1558
1559 /**
1560 * @return Returns the RedeliveryPolicy.
1561 * @throws JMSException
1562 */
1563 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1564 return redeliveryPolicy;
1565 }
1566
1567 /**
1568 * Sets the redelivery policy to be used when messages are rolled back
1569 */
1570 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1571 this.redeliveryPolicy = redeliveryPolicy;
1572 }
1573
1574 public BlobTransferPolicy getBlobTransferPolicy() {
1575 if (blobTransferPolicy == null) {
1576 blobTransferPolicy = createBlobTransferPolicy();
1577 }
1578 return blobTransferPolicy;
1579 }
1580
1581 /**
1582 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1583 * OBjects) are transferred from producers to brokers to consumers
1584 */
1585 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1586 this.blobTransferPolicy = blobTransferPolicy;
1587 }
1588
1589 /**
1590 * @return Returns the alwaysSessionAsync.
1591 */
1592 public boolean isAlwaysSessionAsync() {
1593 return alwaysSessionAsync;
1594 }
1595
1596 /**
1597 * If this flag is set then a separate thread is not used for dispatching
1598 * messages for each Session in the Connection. However, a separate thread
1599 * is always used if there is more than one session, or the session isn't in
1600 * auto acknowledge or duplicates ok mode
1601 */
1602 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1603 this.alwaysSessionAsync = alwaysSessionAsync;
1604 }
1605
1606 /**
1607 * @return Returns the optimizeAcknowledge.
1608 */
1609 public boolean isOptimizeAcknowledge() {
1610 return optimizeAcknowledge;
1611 }
1612
1613 /**
1614 * Enables an optimised acknowledgement mode where messages are acknowledged
1615 * in batches rather than individually
1616 *
1617 * @param optimizeAcknowledge The optimizeAcknowledge to set.
1618 */
1619 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1620 this.optimizeAcknowledge = optimizeAcknowledge;
1621 }
1622
1623 public long getWarnAboutUnstartedConnectionTimeout() {
1624 return warnAboutUnstartedConnectionTimeout;
1625 }
1626
1627 /**
1628 * Enables the timeout from a connection creation to when a warning is
1629 * generated if the connection is not properly started via {@link #start()}
1630 * and a message is received by a consumer. It is a very common gotcha to
1631 * forget to <a
1632 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1633 * the connection</a> so this option makes the default case to create a
1634 * warning if the user forgets. To disable the warning just set the value to <
1635 * 0 (say -1).
1636 */
1637 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1638 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1639 }
1640
1641 /**
1642 * @return the sendTimeout
1643 */
1644 public int getSendTimeout() {
1645 return sendTimeout;
1646 }
1647
1648 /**
1649 * @param sendTimeout the sendTimeout to set
1650 */
1651 public void setSendTimeout(int sendTimeout) {
1652 this.sendTimeout = sendTimeout;
1653 }
1654
1655 /**
1656 * @return the sendAcksAsync
1657 */
1658 public boolean isSendAcksAsync() {
1659 return sendAcksAsync;
1660 }
1661
1662 /**
1663 * @param sendAcksAsync the sendAcksAsync to set
1664 */
1665 public void setSendAcksAsync(boolean sendAcksAsync) {
1666 this.sendAcksAsync = sendAcksAsync;
1667 }
1668
1669
1670 /**
1671 * Returns the time this connection was created
1672 */
1673 public long getTimeCreated() {
1674 return timeCreated;
1675 }
1676
1677 private void waitForBrokerInfo() throws JMSException {
1678 try {
1679 brokerInfoReceived.await();
1680 } catch (InterruptedException e) {
1681 Thread.currentThread().interrupt();
1682 throw JMSExceptionSupport.create(e);
1683 }
1684 }
1685
1686 // Package protected so that it can be used in unit tests
1687 public Transport getTransport() {
1688 return transport;
1689 }
1690
1691 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1692 producers.put(producerId, producer);
1693 }
1694
1695 public void removeProducer(ProducerId producerId) {
1696 producers.remove(producerId);
1697 }
1698
1699 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1700 dispatchers.put(consumerId, dispatcher);
1701 }
1702
1703 public void removeDispatcher(ConsumerId consumerId) {
1704 dispatchers.remove(consumerId);
1705 }
1706
1707 /**
1708 * @param o - the command to consume
1709 */
1710 public void onCommand(final Object o) {
1711 final Command command = (Command)o;
1712 if (!closed.get() && command != null) {
1713 try {
1714 command.visit(new CommandVisitorAdapter() {
1715 @Override
1716 public Response processMessageDispatch(MessageDispatch md) throws Exception {
1717 waitForTransportInterruptionProcessingToComplete();
1718 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1719 if (dispatcher != null) {
1720 // Copy in case a embedded broker is dispatching via
1721 // vm://
1722 // md.getMessage() == null to signal end of queue
1723 // browse.
1724 Message msg = md.getMessage();
1725 if (msg != null) {
1726 msg = msg.copy();
1727 msg.setReadOnlyBody(true);
1728 msg.setReadOnlyProperties(true);
1729 msg.setRedeliveryCounter(md.getRedeliveryCounter());
1730 msg.setConnection(ActiveMQConnection.this);
1731 md.setMessage(msg);
1732 }
1733 dispatcher.dispatch(md);
1734 }
1735 return null;
1736 }
1737
1738 @Override
1739 public Response processProducerAck(ProducerAck pa) throws Exception {
1740 if (pa != null && pa.getProducerId() != null) {
1741 ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1742 if (producer != null) {
1743 producer.onProducerAck(pa);
1744 }
1745 }
1746 return null;
1747 }
1748
1749 @Override
1750 public Response processBrokerInfo(BrokerInfo info) throws Exception {
1751 brokerInfo = info;
1752 brokerInfoReceived.countDown();
1753 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1754 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1755 return null;
1756 }
1757
1758 @Override
1759 public Response processConnectionError(final ConnectionError error) throws Exception {
1760 executor.execute(new Runnable() {
1761 public void run() {
1762 onAsyncException(error.getException());
1763 }
1764 });
1765 return null;
1766 }
1767
1768 @Override
1769 public Response processControlCommand(ControlCommand command) throws Exception {
1770 onControlCommand(command);
1771 return null;
1772 }
1773
1774 @Override
1775 public Response processConnectionControl(ConnectionControl control) throws Exception {
1776 onConnectionControl((ConnectionControl)command);
1777 return null;
1778 }
1779
1780 @Override
1781 public Response processConsumerControl(ConsumerControl control) throws Exception {
1782 onConsumerControl((ConsumerControl)command);
1783 return null;
1784 }
1785
1786 @Override
1787 public Response processWireFormat(WireFormatInfo info) throws Exception {
1788 onWireFormatInfo((WireFormatInfo)command);
1789 return null;
1790 }
1791 });
1792 } catch (Exception e) {
1793 onClientInternalException(e);
1794 }
1795
1796 }
1797 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1798 TransportListener listener = iter.next();
1799 listener.onCommand(command);
1800 }
1801 }
1802
1803 protected void onWireFormatInfo(WireFormatInfo info) {
1804 protocolVersion.set(info.getVersion());
1805 }
1806
1807 /**
1808 * Handles async client internal exceptions.
1809 * A client internal exception is usually one that has been thrown
1810 * by a container runtime component during asynchronous processing of a
1811 * message that does not affect the connection itself.
1812 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1813 * its <code>onException</code> method, if one has been registered with this connection.
1814 *
1815 * @param error the exception that the problem
1816 */
1817 public void onClientInternalException(final Throwable error) {
1818 if ( !closed.get() && !closing.get() ) {
1819 if ( this.clientInternalExceptionListener != null ) {
1820 executor.execute(new Runnable() {
1821 public void run() {
1822 ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1823 }
1824 });
1825 } else {
1826 LOG.debug("Async client internal exception occurred with no exception listener registered: "
1827 + error, error);
1828 }
1829 }
1830 }
1831 /**
1832 * Used for handling async exceptions
1833 *
1834 * @param error
1835 */
1836 public void onAsyncException(Throwable error) {
1837 if (!closed.get() && !closing.get()) {
1838 if (this.exceptionListener != null) {
1839
1840 if (!(error instanceof JMSException)) {
1841 error = JMSExceptionSupport.create(error);
1842 }
1843 final JMSException e = (JMSException)error;
1844
1845 executor.execute(new Runnable() {
1846 public void run() {
1847 ActiveMQConnection.this.exceptionListener.onException(e);
1848 }
1849 });
1850
1851 } else {
1852 LOG.debug("Async exception with no exception listener: " + error, error);
1853 }
1854 }
1855 }
1856
1857 public void onException(final IOException error) {
1858 onAsyncException(error);
1859 if (!closing.get() && !closed.get()) {
1860 executor.execute(new Runnable() {
1861 public void run() {
1862 transportFailed(error);
1863 ServiceSupport.dispose(ActiveMQConnection.this.transport);
1864 brokerInfoReceived.countDown();
1865 try {
1866 cleanup();
1867 } catch (JMSException e) {
1868 LOG.warn("Exception during connection cleanup, " + e, e);
1869 }
1870 for (Iterator<TransportListener> iter = transportListeners
1871 .iterator(); iter.hasNext();) {
1872 TransportListener listener = iter.next();
1873 listener.onException(error);
1874 }
1875 }
1876 });
1877 }
1878 }
1879
1880 public void transportInterupted() {
1881 this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1882 if (LOG.isDebugEnabled()) {
1883 LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1884 }
1885 signalInterruptionProcessingNeeded();
1886
1887 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1888 ActiveMQSession s = i.next();
1889 s.clearMessagesInProgress();
1890 }
1891
1892 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1893 connectionConsumer.clearMessagesInProgress();
1894 }
1895
1896 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1897 TransportListener listener = iter.next();
1898 listener.transportInterupted();
1899 }
1900 }
1901
1902 public void transportResumed() {
1903 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1904 TransportListener listener = iter.next();
1905 listener.transportResumed();
1906 }
1907 }
1908
1909 /**
1910 * Create the DestinationInfo object for the temporary destination.
1911 *
1912 * @param topic - if its true topic, else queue.
1913 * @return DestinationInfo
1914 * @throws JMSException
1915 */
1916 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1917
1918 // Check if Destination info is of temporary type.
1919 ActiveMQTempDestination dest;
1920 if (topic) {
1921 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1922 } else {
1923 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1924 }
1925
1926 DestinationInfo info = new DestinationInfo();
1927 info.setConnectionId(this.info.getConnectionId());
1928 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1929 info.setDestination(dest);
1930 syncSendPacket(info);
1931
1932 dest.setConnection(this);
1933 activeTempDestinations.put(dest, dest);
1934 return dest;
1935 }
1936
1937 /**
1938 * @param destination
1939 * @throws JMSException
1940 */
1941 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1942
1943 checkClosedOrFailed();
1944
1945 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1946 ActiveMQSession s = i.next();
1947 if (s.isInUse(destination)) {
1948 throw new JMSException("A consumer is consuming from the temporary destination");
1949 }
1950 }
1951
1952 activeTempDestinations.remove(destination);
1953
1954 DestinationInfo destInfo = new DestinationInfo();
1955 destInfo.setConnectionId(this.info.getConnectionId());
1956 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1957 destInfo.setDestination(destination);
1958 destInfo.setTimeout(0);
1959 syncSendPacket(destInfo);
1960 }
1961
1962 public boolean isDeleted(ActiveMQDestination dest) {
1963
1964 // If we are not watching the advisories.. then
1965 // we will assume that the temp destination does exist.
1966 if (advisoryConsumer == null) {
1967 return false;
1968 }
1969
1970 return !activeTempDestinations.contains(dest);
1971 }
1972
1973 public boolean isCopyMessageOnSend() {
1974 return copyMessageOnSend;
1975 }
1976
1977 public LongSequenceGenerator getLocalTransactionIdGenerator() {
1978 return localTransactionIdGenerator;
1979 }
1980
1981 public boolean isUseCompression() {
1982 return useCompression;
1983 }
1984
1985 /**
1986 * Enables the use of compression of the message bodies
1987 */
1988 public void setUseCompression(boolean useCompression) {
1989 this.useCompression = useCompression;
1990 }
1991
1992 public void destroyDestination(ActiveMQDestination destination) throws JMSException {
1993
1994 checkClosedOrFailed();
1995 ensureConnectionInfoSent();
1996
1997 DestinationInfo info = new DestinationInfo();
1998 info.setConnectionId(this.info.getConnectionId());
1999 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2000 info.setDestination(destination);
2001 info.setTimeout(0);
2002 syncSendPacket(info);
2003
2004 }
2005
2006 public boolean isDispatchAsync() {
2007 return dispatchAsync;
2008 }
2009
2010 /**
2011 * Enables or disables the default setting of whether or not consumers have
2012 * their messages <a
2013 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2014 * synchronously or asynchronously by the broker</a>. For non-durable
2015 * topics for example we typically dispatch synchronously by default to
2016 * minimize context switches which boost performance. However sometimes its
2017 * better to go slower to ensure that a single blocked consumer socket does
2018 * not block delivery to other consumers.
2019 *
2020 * @param asyncDispatch If true then consumers created on this connection
2021 * will default to having their messages dispatched
2022 * asynchronously. The default value is false.
2023 */
2024 public void setDispatchAsync(boolean asyncDispatch) {
2025 this.dispatchAsync = asyncDispatch;
2026 }
2027
2028 public boolean isObjectMessageSerializationDefered() {
2029 return objectMessageSerializationDefered;
2030 }
2031
2032 /**
2033 * When an object is set on an ObjectMessage, the JMS spec requires the
2034 * object to be serialized by that set method. Enabling this flag causes the
2035 * object to not get serialized. The object may subsequently get serialized
2036 * if the message needs to be sent over a socket or stored to disk.
2037 */
2038 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2039 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2040 }
2041
2042 public InputStream createInputStream(Destination dest) throws JMSException {
2043 return createInputStream(dest, null);
2044 }
2045
2046 public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2047 return createInputStream(dest, messageSelector, false);
2048 }
2049
2050 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2051 return createInputStream(dest, messageSelector, noLocal, -1);
2052 }
2053
2054
2055
2056 public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2057 return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2058 }
2059
2060 public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2061 return createInputStream(dest, null, false);
2062 }
2063
2064 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2065 return createDurableInputStream(dest, name, messageSelector, false);
2066 }
2067
2068 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2069 return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2070 }
2071
2072 public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2073 return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2074 }
2075
2076 private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2077 checkClosedOrFailed();
2078 ensureConnectionInfoSent();
2079 return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2080 }
2081
2082 /**
2083 * Creates a persistent output stream; individual messages will be written
2084 * to disk/database by the broker
2085 */
2086 public OutputStream createOutputStream(Destination dest) throws JMSException {
2087 return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2088 }
2089
2090 /**
2091 * Creates a non persistent output stream; messages will not be written to
2092 * disk
2093 */
2094 public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2095 return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2096 }
2097
2098 /**
2099 * Creates an output stream allowing full control over the delivery mode,
2100 * the priority and time to live of the messages and the properties added to
2101 * messages on the stream.
2102 *
2103 * @param streamProperties defines a map of key-value pairs where the keys
2104 * are strings and the values are primitive values (numbers
2105 * and strings) which are appended to the messages similarly
2106 * to using the
2107 * {@link javax.jms.Message#setObjectProperty(String, Object)}
2108 * method
2109 */
2110 public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2111 checkClosedOrFailed();
2112 ensureConnectionInfoSent();
2113 return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2114 }
2115
2116 /**
2117 * Unsubscribes a durable subscription that has been created by a client.
2118 * <P>
2119 * This method deletes the state being maintained on behalf of the
2120 * subscriber by its provider.
2121 * <P>
2122 * It is erroneous for a client to delete a durable subscription while there
2123 * is an active <CODE>MessageConsumer </CODE> or
2124 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2125 * message is part of a pending transaction or has not been acknowledged in
2126 * the session.
2127 *
2128 * @param name the name used to identify this subscription
2129 * @throws JMSException if the session fails to unsubscribe to the durable
2130 * subscription due to some internal error.
2131 * @throws InvalidDestinationException if an invalid subscription name is
2132 * specified.
2133 * @since 1.1
2134 */
2135 public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2136 checkClosedOrFailed();
2137 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2138 rsi.setConnectionId(getConnectionInfo().getConnectionId());
2139 rsi.setSubscriptionName(name);
2140 rsi.setClientId(getConnectionInfo().getClientId());
2141 syncSendPacket(rsi);
2142 }
2143
2144 /**
2145 * Internal send method optimized: - It does not copy the message - It can
2146 * only handle ActiveMQ messages. - You can specify if the send is async or
2147 * sync - Does not allow you to send /w a transaction.
2148 */
2149 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2150 checkClosedOrFailed();
2151
2152 if (destination.isTemporary() && isDeleted(destination)) {
2153 throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2154 }
2155
2156 msg.setJMSDestination(destination);
2157 msg.setJMSDeliveryMode(deliveryMode);
2158 long expiration = 0L;
2159
2160 if (!isDisableTimeStampsByDefault()) {
2161 long timeStamp = System.currentTimeMillis();
2162 msg.setJMSTimestamp(timeStamp);
2163 if (timeToLive > 0) {
2164 expiration = timeToLive + timeStamp;
2165 }
2166 }
2167
2168 msg.setJMSExpiration(expiration);
2169 msg.setJMSPriority(priority);
2170
2171 msg.setJMSRedelivered(false);
2172 msg.setMessageId(messageId);
2173
2174 msg.onSend();
2175
2176 msg.setProducerId(msg.getMessageId().getProducerId());
2177
2178 if (LOG.isDebugEnabled()) {
2179 LOG.debug("Sending message: " + msg);
2180 }
2181
2182 if (async) {
2183 asyncSendPacket(msg);
2184 } else {
2185 syncSendPacket(msg);
2186 }
2187
2188 }
2189
2190 public void addOutputStream(ActiveMQOutputStream stream) {
2191 outputStreams.add(stream);
2192 }
2193
2194 public void removeOutputStream(ActiveMQOutputStream stream) {
2195 outputStreams.remove(stream);
2196 }
2197
2198 public void addInputStream(ActiveMQInputStream stream) {
2199 inputStreams.add(stream);
2200 }
2201
2202 public void removeInputStream(ActiveMQInputStream stream) {
2203 inputStreams.remove(stream);
2204 }
2205
2206 protected void onControlCommand(ControlCommand command) {
2207 String text = command.getCommand();
2208 if (text != null) {
2209 if (text.equals("shutdown")) {
2210 LOG.info("JVM told to shutdown");
2211 System.exit(0);
2212 }
2213 }
2214 }
2215
2216 protected void onConnectionControl(ConnectionControl command) {
2217 if (command.isFaultTolerant()) {
2218 this.optimizeAcknowledge = false;
2219 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2220 ActiveMQSession s = i.next();
2221 s.setOptimizeAcknowledge(false);
2222 }
2223 }
2224 }
2225
2226 protected void onConsumerControl(ConsumerControl command) {
2227 if (command.isClose()) {
2228 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2229 ActiveMQSession s = i.next();
2230 s.close(command.getConsumerId());
2231 }
2232 } else {
2233 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2234 ActiveMQSession s = i.next();
2235 s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2236 }
2237 }
2238 }
2239
2240 protected void transportFailed(IOException error) {
2241 transportFailed.set(true);
2242 if (firstFailureError == null) {
2243 firstFailureError = error;
2244 }
2245 }
2246
2247 /**
2248 * Should a JMS message be copied to a new JMS Message object as part of the
2249 * send() method in JMS. This is enabled by default to be compliant with the
2250 * JMS specification. You can disable it if you do not mutate JMS messages
2251 * after they are sent for a performance boost
2252 */
2253 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2254 this.copyMessageOnSend = copyMessageOnSend;
2255 }
2256
2257 @Override
2258 public String toString() {
2259 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2260 }
2261
2262 protected BlobTransferPolicy createBlobTransferPolicy() {
2263 return new BlobTransferPolicy();
2264 }
2265
2266 public int getProtocolVersion() {
2267 return protocolVersion.get();
2268 }
2269
2270 public int getProducerWindowSize() {
2271 return producerWindowSize;
2272 }
2273
2274 public void setProducerWindowSize(int producerWindowSize) {
2275 this.producerWindowSize = producerWindowSize;
2276 }
2277
2278 public void setAuditDepth(int auditDepth) {
2279 connectionAudit.setAuditDepth(auditDepth);
2280 }
2281
2282 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2283 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2284 }
2285
2286 protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2287 connectionAudit.removeDispatcher(dispatcher);
2288 }
2289
2290 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2291 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2292 }
2293
2294 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2295 connectionAudit.rollbackDuplicate(dispatcher, message);
2296 }
2297
2298 public IOException getFirstFailureError() {
2299 return firstFailureError;
2300 }
2301
2302 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2303 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2304 if (cdl != null) {
2305 if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2306 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2307 cdl.await(10, TimeUnit.SECONDS);
2308 }
2309 signalInterruptionProcessingComplete();
2310 }
2311 }
2312
2313 protected void transportInterruptionProcessingComplete() {
2314 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2315 if (cdl != null) {
2316 cdl.countDown();
2317 try {
2318 signalInterruptionProcessingComplete();
2319 } catch (InterruptedException ignored) {}
2320 }
2321 }
2322
2323 private void signalInterruptionProcessingComplete() throws InterruptedException {
2324 CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2325 if (cdl.getCount()==0) {
2326 if (LOG.isDebugEnabled()) {
2327 LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2328 }
2329 this.transportInterruptionProcessingComplete = null;
2330
2331 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2332 if (failoverTransport != null) {
2333 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2334 if (LOG.isDebugEnabled()) {
2335 LOG.debug("notified failover transport (" + failoverTransport
2336 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2337 }
2338 }
2339
2340 }
2341 }
2342
2343 private void signalInterruptionProcessingNeeded() {
2344 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2345 if (failoverTransport != null) {
2346 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2347 if (LOG.isDebugEnabled()) {
2348 LOG.debug("notified failover transport (" + failoverTransport
2349 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2350 }
2351 }
2352 }
2353
2354 /*
2355 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2356 * will wait to receive re dispatched messages.
2357 * default value is 0 so there is no wait by default.
2358 */
2359 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2360 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2361 }
2362
2363 public long getConsumerFailoverRedeliveryWaitPeriod() {
2364 return consumerFailoverRedeliveryWaitPeriod;
2365 }
2366
2367 protected Scheduler getScheduler() {
2368 return this.scheduler;
2369 }
2370
2371 protected ThreadPoolExecutor getExecutor() {
2372 return this.executor;
2373 }
2374
2375 /**
2376 * @return the checkForDuplicates
2377 */
2378 public boolean isCheckForDuplicates() {
2379 return this.checkForDuplicates;
2380 }
2381
2382 /**
2383 * @param checkForDuplicates the checkForDuplicates to set
2384 */
2385 public void setCheckForDuplicates(boolean checkForDuplicates) {
2386 this.checkForDuplicates = checkForDuplicates;
2387 }
2388
2389 }