001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.io.OutputStream;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.LinkedBlockingQueue;
031    import java.util.concurrent.ThreadFactory;
032    import java.util.concurrent.ThreadPoolExecutor;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    import java.util.concurrent.atomic.AtomicInteger;
036    import javax.jms.Connection;
037    import javax.jms.ConnectionConsumer;
038    import javax.jms.ConnectionMetaData;
039    import javax.jms.DeliveryMode;
040    import javax.jms.Destination;
041    import javax.jms.ExceptionListener;
042    import javax.jms.IllegalStateException;
043    import javax.jms.InvalidDestinationException;
044    import javax.jms.JMSException;
045    import javax.jms.Queue;
046    import javax.jms.QueueConnection;
047    import javax.jms.QueueSession;
048    import javax.jms.ServerSessionPool;
049    import javax.jms.Session;
050    import javax.jms.Topic;
051    import javax.jms.TopicConnection;
052    import javax.jms.TopicSession;
053    import javax.jms.XAConnection;
054    import org.apache.activemq.advisory.DestinationSource;
055    import org.apache.activemq.blob.BlobTransferPolicy;
056    import org.apache.activemq.command.ActiveMQDestination;
057    import org.apache.activemq.command.ActiveMQMessage;
058    import org.apache.activemq.command.ActiveMQTempDestination;
059    import org.apache.activemq.command.ActiveMQTempQueue;
060    import org.apache.activemq.command.ActiveMQTempTopic;
061    import org.apache.activemq.command.BrokerInfo;
062    import org.apache.activemq.command.Command;
063    import org.apache.activemq.command.CommandTypes;
064    import org.apache.activemq.command.ConnectionControl;
065    import org.apache.activemq.command.ConnectionError;
066    import org.apache.activemq.command.ConnectionId;
067    import org.apache.activemq.command.ConnectionInfo;
068    import org.apache.activemq.command.ConsumerControl;
069    import org.apache.activemq.command.ConsumerId;
070    import org.apache.activemq.command.ConsumerInfo;
071    import org.apache.activemq.command.ControlCommand;
072    import org.apache.activemq.command.DestinationInfo;
073    import org.apache.activemq.command.ExceptionResponse;
074    import org.apache.activemq.command.Message;
075    import org.apache.activemq.command.MessageDispatch;
076    import org.apache.activemq.command.MessageId;
077    import org.apache.activemq.command.ProducerAck;
078    import org.apache.activemq.command.ProducerId;
079    import org.apache.activemq.command.RemoveInfo;
080    import org.apache.activemq.command.RemoveSubscriptionInfo;
081    import org.apache.activemq.command.Response;
082    import org.apache.activemq.command.SessionId;
083    import org.apache.activemq.command.ShutdownInfo;
084    import org.apache.activemq.command.WireFormatInfo;
085    import org.apache.activemq.management.JMSConnectionStatsImpl;
086    import org.apache.activemq.management.JMSStatsImpl;
087    import org.apache.activemq.management.StatsCapable;
088    import org.apache.activemq.management.StatsImpl;
089    import org.apache.activemq.state.CommandVisitorAdapter;
090    import org.apache.activemq.thread.Scheduler;
091    import org.apache.activemq.thread.TaskRunnerFactory;
092    import org.apache.activemq.transport.Transport;
093    import org.apache.activemq.transport.TransportListener;
094    import org.apache.activemq.transport.failover.FailoverTransport;
095    import org.apache.activemq.util.IdGenerator;
096    import org.apache.activemq.util.IntrospectionSupport;
097    import org.apache.activemq.util.JMSExceptionSupport;
098    import org.apache.activemq.util.LongSequenceGenerator;
099    import org.apache.activemq.util.ServiceSupport;
100    import org.slf4j.Logger;
101    import org.slf4j.LoggerFactory;
102    
103    public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
104    
105        public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
106        public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
107        public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
108    
109        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
110        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
111    
112        public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
113    
114        protected boolean dispatchAsync=true;
115        protected boolean alwaysSessionAsync = true;
116    
117        private TaskRunnerFactory sessionTaskRunner;
118        private final ThreadPoolExecutor executor;
119    
120        // Connection state variables
121        private final ConnectionInfo info;
122        private ExceptionListener exceptionListener;
123        private ClientInternalExceptionListener clientInternalExceptionListener;
124        private boolean clientIDSet;
125        private boolean isConnectionInfoSentToBroker;
126        private boolean userSpecifiedClientID;
127    
128        // Configuration options variables
129        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
130        private BlobTransferPolicy blobTransferPolicy;
131        private RedeliveryPolicy redeliveryPolicy;
132        private MessageTransformer transformer;
133    
134        private boolean disableTimeStampsByDefault;
135        private boolean optimizedMessageDispatch = true;
136        private boolean copyMessageOnSend = true;
137        private boolean useCompression;
138        private boolean objectMessageSerializationDefered;
139        private boolean useAsyncSend;
140        private boolean optimizeAcknowledge;
141        private boolean nestedMapAndListEnabled = true;
142        private boolean useRetroactiveConsumer;
143        private boolean exclusiveConsumer;
144        private boolean alwaysSyncSend;
145        private int closeTimeout = 15000;
146        private boolean watchTopicAdvisories = true;
147        private long warnAboutUnstartedConnectionTimeout = 500L;
148        private int sendTimeout =0;
149        private boolean sendAcksAsync=true;
150        private boolean checkForDuplicates = true;
151    
152        private final Transport transport;
153        private final IdGenerator clientIdGenerator;
154        private final JMSStatsImpl factoryStats;
155        private final JMSConnectionStatsImpl stats;
156    
157        private final AtomicBoolean started = new AtomicBoolean(false);
158        private final AtomicBoolean closing = new AtomicBoolean(false);
159        private final AtomicBoolean closed = new AtomicBoolean(false);
160        private final AtomicBoolean transportFailed = new AtomicBoolean(false);
161        private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
162        private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
163        private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
164        private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
165        private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
166    
167        // Maps ConsumerIds to ActiveMQConsumer objects
168        private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
169        private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
170        private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
171        private final SessionId connectionSessionId;
172        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
173        private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
174        private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
175        private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
176    
177        private AdvisoryConsumer advisoryConsumer;
178        private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
179        private BrokerInfo brokerInfo;
180        private IOException firstFailureError;
181        private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
182    
183        // Assume that protocol is the latest. Change to the actual protocol
184        // version when a WireFormatInfo is received.
185        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
186        private final long timeCreated;
187        private final ConnectionAudit connectionAudit = new ConnectionAudit();
188        private DestinationSource destinationSource;
189        private final Object ensureConnectionInfoSentMutex = new Object();
190        private boolean useDedicatedTaskRunner;
191        protected volatile CountDownLatch transportInterruptionProcessingComplete;
192        private long consumerFailoverRedeliveryWaitPeriod;
193        private final Scheduler scheduler;
194        private boolean messagePrioritySupported=true;
195    
196        /**
197         * Construct an <code>ActiveMQConnection</code>
198         * 
199         * @param transport
200         * @param factoryStats
201         * @throws Exception
202         */
203        protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
204    
205            this.transport = transport;
206            this.clientIdGenerator = clientIdGenerator;
207            this.factoryStats = factoryStats;
208    
209            // Configure a single threaded executor who's core thread can timeout if
210            // idle
211            executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
212                public Thread newThread(Runnable r) {
213                    Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
214                    thread.setDaemon(true);
215                    return thread;
216                }
217            });
218            // asyncConnectionThread.allowCoreThreadTimeOut(true);
219            String uniqueId = CONNECTION_ID_GENERATOR.generateId();
220            this.info = new ConnectionInfo(new ConnectionId(uniqueId));
221            this.info.setManageable(true);
222            this.info.setFaultTolerant(transport.isFaultTolerant());
223            this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
224    
225            this.transport.setTransportListener(this);
226    
227            this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
228            this.factoryStats.addConnection(this);
229            this.timeCreated = System.currentTimeMillis();
230            this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
231            this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"] Scheduler");
232            this.scheduler.start();
233        }
234    
235        protected void setUserName(String userName) {
236            this.info.setUserName(userName);
237        }
238    
239        protected void setPassword(String password) {
240            this.info.setPassword(password);
241        }
242    
243        /**
244         * A static helper method to create a new connection
245         * 
246         * @return an ActiveMQConnection
247         * @throws JMSException
248         */
249        public static ActiveMQConnection makeConnection() throws JMSException {
250            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
251            return (ActiveMQConnection)factory.createConnection();
252        }
253    
254        /**
255         * A static helper method to create a new connection
256         * 
257         * @param uri
258         * @return and ActiveMQConnection
259         * @throws JMSException
260         */
261        public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
262            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
263            return (ActiveMQConnection)factory.createConnection();
264        }
265    
266        /**
267         * A static helper method to create a new connection
268         * 
269         * @param user
270         * @param password
271         * @param uri
272         * @return an ActiveMQConnection
273         * @throws JMSException
274         */
275        public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
276            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
277            return (ActiveMQConnection)factory.createConnection();
278        }
279    
280        /**
281         * @return a number unique for this connection
282         */
283        public JMSConnectionStatsImpl getConnectionStats() {
284            return stats;
285        }
286    
287        /**
288         * Creates a <CODE>Session</CODE> object.
289         * 
290         * @param transacted indicates whether the session is transacted
291         * @param acknowledgeMode indicates whether the consumer or the client will
292         *                acknowledge any messages it receives; ignored if the
293         *                session is transacted. Legal values are
294         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
295         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
296         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
297         * @return a newly created session
298         * @throws JMSException if the <CODE>Connection</CODE> object fails to
299         *                 create a session due to some internal error or lack of
300         *                 support for the specific transaction and acknowledgement
301         *                 mode.
302         * @see Session#AUTO_ACKNOWLEDGE
303         * @see Session#CLIENT_ACKNOWLEDGE
304         * @see Session#DUPS_OK_ACKNOWLEDGE
305         * @since 1.1
306         */
307        public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
308            checkClosedOrFailed();
309            ensureConnectionInfoSent();
310            if(!transacted) {
311                if (acknowledgeMode==Session.SESSION_TRANSACTED) {
312                    throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
313                } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
314                    throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
315                            "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
316                }
317            }
318            return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
319                ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
320        }
321    
322        /**
323         * @return sessionId
324         */
325        protected SessionId getNextSessionId() {
326            return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
327        }
328    
329        /**
330         * Gets the client identifier for this connection.
331         * <P>
332         * This value is specific to the JMS provider. It is either preconfigured by
333         * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
334         * dynamically by the application by calling the <code>setClientID</code>
335         * method.
336         * 
337         * @return the unique client identifier
338         * @throws JMSException if the JMS provider fails to return the client ID
339         *                 for this connection due to some internal error.
340         */
341        public String getClientID() throws JMSException {
342            checkClosedOrFailed();
343            return this.info.getClientId();
344        }
345    
346        /**
347         * Sets the client identifier for this connection.
348         * <P>
349         * The preferred way to assign a JMS client's client identifier is for it to
350         * be configured in a client-specific <CODE>ConnectionFactory</CODE>
351         * object and transparently assigned to the <CODE>Connection</CODE> object
352         * it creates.
353         * <P>
354         * Alternatively, a client can set a connection's client identifier using a
355         * provider-specific value. The facility to set a connection's client
356         * identifier explicitly is not a mechanism for overriding the identifier
357         * that has been administratively configured. It is provided for the case
358         * where no administratively specified identifier exists. If one does exist,
359         * an attempt to change it by setting it must throw an
360         * <CODE>IllegalStateException</CODE>. If a client sets the client
361         * identifier explicitly, it must do so immediately after it creates the
362         * connection and before any other action on the connection is taken. After
363         * this point, setting the client identifier is a programming error that
364         * should throw an <CODE>IllegalStateException</CODE>.
365         * <P>
366         * The purpose of the client identifier is to associate a connection and its
367         * objects with a state maintained on behalf of the client by a provider.
368         * The only such state identified by the JMS API is that required to support
369         * durable subscriptions.
370         * <P>
371         * If another connection with the same <code>clientID</code> is already
372         * running when this method is called, the JMS provider should detect the
373         * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
374         * 
375         * @param newClientID the unique client identifier
376         * @throws JMSException if the JMS provider fails to set the client ID for
377         *                 this connection due to some internal error.
378         * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
379         *                 invalid or duplicate client ID.
380         * @throws javax.jms.IllegalStateException if the JMS client attempts to set
381         *                 a connection's client ID at the wrong time or when it has
382         *                 been administratively configured.
383         */
384        public void setClientID(String newClientID) throws JMSException {
385            checkClosedOrFailed();
386    
387            if (this.clientIDSet) {
388                throw new IllegalStateException("The clientID has already been set");
389            }
390    
391            if (this.isConnectionInfoSentToBroker) {
392                throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
393            }
394    
395            this.info.setClientId(newClientID);
396            this.userSpecifiedClientID = true;
397            ensureConnectionInfoSent();
398        }
399    
400        /**
401         * Sets the default client id that the connection will use if explicitly not
402         * set with the setClientId() call.
403         */
404        public void setDefaultClientID(String clientID) throws JMSException {
405            this.info.setClientId(clientID);
406            this.userSpecifiedClientID = true;
407        }
408    
409        /**
410         * Gets the metadata for this connection.
411         * 
412         * @return the connection metadata
413         * @throws JMSException if the JMS provider fails to get the connection
414         *                 metadata for this connection.
415         * @see javax.jms.ConnectionMetaData
416         */
417        public ConnectionMetaData getMetaData() throws JMSException {
418            checkClosedOrFailed();
419            return ActiveMQConnectionMetaData.INSTANCE;
420        }
421    
422        /**
423         * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
424         * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
425         * associated with it.
426         * 
427         * @return the <CODE>ExceptionListener</CODE> for this connection, or
428         *         null, if no <CODE>ExceptionListener</CODE> is associated with
429         *         this connection.
430         * @throws JMSException if the JMS provider fails to get the
431         *                 <CODE>ExceptionListener</CODE> for this connection.
432         * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
433         */
434        public ExceptionListener getExceptionListener() throws JMSException {
435            checkClosedOrFailed();
436            return this.exceptionListener;
437        }
438    
439        /**
440         * Sets an exception listener for this connection.
441         * <P>
442         * If a JMS provider detects a serious problem with a connection, it informs
443         * the connection's <CODE> ExceptionListener</CODE>, if one has been
444         * registered. It does this by calling the listener's <CODE>onException
445         * </CODE>
446         * method, passing it a <CODE>JMSException</CODE> object describing the
447         * problem.
448         * <P>
449         * An exception listener allows a client to be notified of a problem
450         * asynchronously. Some connections only consume messages, so they would
451         * have no other way to learn their connection has failed.
452         * <P>
453         * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
454         * <P>
455         * A JMS provider should attempt to resolve connection problems itself
456         * before it notifies the client of them.
457         * 
458         * @param listener the exception listener
459         * @throws JMSException if the JMS provider fails to set the exception
460         *                 listener for this connection.
461         */
462        public void setExceptionListener(ExceptionListener listener) throws JMSException {
463            checkClosedOrFailed();
464            this.exceptionListener = listener;
465        }
466    
467        /**
468         * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
469         * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
470         * associated with it.
471         * 
472         * @return the listener or <code>null</code> if no listener is registered with the connection.
473         */
474        public ClientInternalExceptionListener getClientInternalExceptionListener()
475        {
476            return clientInternalExceptionListener;
477        }
478    
479        /**
480         * Sets a client internal exception listener for this connection.
481         * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
482         * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
483         * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
484         * describing the problem.
485         * 
486         * @param listener the exception listener
487         */
488        public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
489        {
490            this.clientInternalExceptionListener = listener;
491        }
492        
493        /**
494         * Starts (or restarts) a connection's delivery of incoming messages. A call
495         * to <CODE>start</CODE> on a connection that has already been started is
496         * ignored.
497         * 
498         * @throws JMSException if the JMS provider fails to start message delivery
499         *                 due to some internal error.
500         * @see javax.jms.Connection#stop()
501         */
502        public void start() throws JMSException {
503            checkClosedOrFailed();
504            ensureConnectionInfoSent();
505            if (started.compareAndSet(false, true)) {
506                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
507                    ActiveMQSession session = i.next();
508                    session.start();
509                }
510            }
511        }
512    
513        /**
514         * Temporarily stops a connection's delivery of incoming messages. Delivery
515         * can be restarted using the connection's <CODE>start</CODE> method. When
516         * the connection is stopped, delivery to all the connection's message
517         * consumers is inhibited: synchronous receives block, and messages are not
518         * delivered to message listeners.
519         * <P>
520         * This call blocks until receives and/or message listeners in progress have
521         * completed.
522         * <P>
523         * Stopping a connection has no effect on its ability to send messages. A
524         * call to <CODE>stop</CODE> on a connection that has already been stopped
525         * is ignored.
526         * <P>
527         * A call to <CODE>stop</CODE> must not return until delivery of messages
528         * has paused. This means that a client can rely on the fact that none of
529         * its message listeners will be called and that all threads of control
530         * waiting for <CODE>receive</CODE> calls to return will not return with a
531         * message until the connection is restarted. The receive timers for a
532         * stopped connection continue to advance, so receives may time out while
533         * the connection is stopped.
534         * <P>
535         * If message listeners are running when <CODE>stop</CODE> is invoked, the
536         * <CODE>stop</CODE> call must wait until all of them have returned before
537         * it may return. While these message listeners are completing, they must
538         * have the full services of the connection available to them.
539         * 
540         * @throws JMSException if the JMS provider fails to stop message delivery
541         *                 due to some internal error.
542         * @see javax.jms.Connection#start()
543         */
544        public void stop() throws JMSException {
545            checkClosedOrFailed();
546            if (started.compareAndSet(true, false)) {
547                synchronized(sessions) {
548                    for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
549                        ActiveMQSession s = i.next();
550                        s.stop();
551                    }
552                }
553            }
554        }
555    
556        /**
557         * Closes the connection.
558         * <P>
559         * Since a provider typically allocates significant resources outside the
560         * JVM on behalf of a connection, clients should close these resources when
561         * they are not needed. Relying on garbage collection to eventually reclaim
562         * these resources may not be timely enough.
563         * <P>
564         * There is no need to close the sessions, producers, and consumers of a
565         * closed connection.
566         * <P>
567         * Closing a connection causes all temporary destinations to be deleted.
568         * <P>
569         * When this method is invoked, it should not return until message
570         * processing has been shut down in an orderly fashion. This means that all
571         * message listeners that may have been running have returned, and that all
572         * pending receives have returned. A close terminates all pending message
573         * receives on the connection's sessions' consumers. The receives may return
574         * with a message or with null, depending on whether there was a message
575         * available at the time of the close. If one or more of the connection's
576         * sessions' message listeners is processing a message at the time when
577         * connection <CODE>close</CODE> is invoked, all the facilities of the
578         * connection and its sessions must remain available to those listeners
579         * until they return control to the JMS provider.
580         * <P>
581         * Closing a connection causes any of its sessions' transactions in progress
582         * to be rolled back. In the case where a session's work is coordinated by
583         * an external transaction manager, a session's <CODE>commit</CODE> and
584         * <CODE> rollback</CODE> methods are not used and the result of a closed
585         * session's work is determined later by the transaction manager. Closing a
586         * connection does NOT force an acknowledgment of client-acknowledged
587         * sessions.
588         * <P>
589         * Invoking the <CODE>acknowledge</CODE> method of a received message from
590         * a closed connection's session must throw an
591         * <CODE>IllegalStateException</CODE>. Closing a closed connection must
592         * NOT throw an exception.
593         * 
594         * @throws JMSException if the JMS provider fails to close the connection
595         *                 due to some internal error. For example, a failure to
596         *                 release resources or to close a socket connection can
597         *                 cause this exception to be thrown.
598         */
599        public void close() throws JMSException {
600            try {
601                // If we were running, lets stop first.
602                if (!closed.get() && !transportFailed.get()) {
603                    stop();
604                }
605    
606                synchronized (this) {
607                    if (!closed.get()) {
608                        closing.set(true);
609    
610                        if (destinationSource != null) {
611                            destinationSource.stop();
612                            destinationSource = null;
613                        }
614                        if (advisoryConsumer != null) {
615                            advisoryConsumer.dispose();
616                            advisoryConsumer = null;
617                        }
618                        if (this.scheduler != null) {
619                            try {
620                                this.scheduler.stop();
621                            } catch (Exception e) {
622                                JMSException ex =  JMSExceptionSupport.create(e);
623                                throw ex;
624                            }
625                        }
626    
627                        long lastDeliveredSequenceId = 0;
628                        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
629                            ActiveMQSession s = i.next();
630                            s.dispose();
631                            lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
632                        }
633                        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
634                            ActiveMQConnectionConsumer c = i.next();
635                            c.dispose();
636                        }
637                        for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
638                            ActiveMQInputStream c = i.next();
639                            c.dispose();
640                        }
641                        for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
642                            ActiveMQOutputStream c = i.next();
643                            c.dispose();
644                        }
645    
646                        // As TemporaryQueue and TemporaryTopic instances are bound
647                        // to a connection we should just delete them after the connection
648                        // is closed to free up memory
649                        for (Iterator<ActiveMQTempDestination> i = this.activeTempDestinations.values().iterator(); i.hasNext();) {
650                            ActiveMQTempDestination c = i.next();
651                            c.delete();
652                        }
653                        
654                        if (isConnectionInfoSentToBroker) {
655                            // If we announced ourselfs to the broker.. Try to let
656                            // the broker
657                            // know that the connection is being shutdown.
658                            RemoveInfo removeCommand = info.createRemoveCommand();
659                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660                            doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
661                            doAsyncSendPacket(new ShutdownInfo());
662                        }
663    
664                        ServiceSupport.dispose(this.transport);
665    
666                        started.set(false);
667    
668                        // TODO if we move the TaskRunnerFactory to the connection
669                        // factory
670                        // then we may need to call
671                        // factory.onConnectionClose(this);
672                        if (sessionTaskRunner != null) {
673                            sessionTaskRunner.shutdown();
674                        }
675                        closed.set(true);
676                        closing.set(false);
677                    }
678                }
679            } finally {
680                try {
681                    if (executor != null){
682                        executor.shutdown();
683                    }
684                }catch(Throwable e) {
685                    LOG.error("Error shutting down thread pool " + e,e);
686                }
687                factoryStats.removeConnection(this);
688            }
689        }
690    
691        /**
692         * Tells the broker to terminate its VM. This can be used to cleanly
693         * terminate a broker running in a standalone java process. Server must have
694         * property enable.vm.shutdown=true defined to allow this to work.
695         */
696        // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
697        // implemented.
698        /*
699         * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
700         * command = new BrokerAdminCommand();
701         * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
702         * asyncSendPacket(command); }
703         */
704    
705        /**
706         * Create a durable connection consumer for this connection (optional
707         * operation). This is an expert facility not used by regular JMS clients.
708         * 
709         * @param topic topic to access
710         * @param subscriptionName durable subscription name
711         * @param messageSelector only messages with properties matching the message
712         *                selector expression are delivered. A value of null or an
713         *                empty string indicates that there is no message selector
714         *                for the message consumer.
715         * @param sessionPool the server session pool to associate with this durable
716         *                connection consumer
717         * @param maxMessages the maximum number of messages that can be assigned to
718         *                a server session at one time
719         * @return the durable connection consumer
720         * @throws JMSException if the <CODE>Connection</CODE> object fails to
721         *                 create a connection consumer due to some internal error
722         *                 or invalid arguments for <CODE>sessionPool</CODE> and
723         *                 <CODE>messageSelector</CODE>.
724         * @throws javax.jms.InvalidDestinationException if an invalid destination
725         *                 is specified.
726         * @throws javax.jms.InvalidSelectorException if the message selector is
727         *                 invalid.
728         * @see javax.jms.ConnectionConsumer
729         * @since 1.1
730         */
731        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
732            throws JMSException {
733            return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
734        }
735    
736        /**
737         * Create a durable connection consumer for this connection (optional
738         * operation). This is an expert facility not used by regular JMS clients.
739         * 
740         * @param topic topic to access
741         * @param subscriptionName durable subscription name
742         * @param messageSelector only messages with properties matching the message
743         *                selector expression are delivered. A value of null or an
744         *                empty string indicates that there is no message selector
745         *                for the message consumer.
746         * @param sessionPool the server session pool to associate with this durable
747         *                connection consumer
748         * @param maxMessages the maximum number of messages that can be assigned to
749         *                a server session at one time
750         * @param noLocal set true if you want to filter out messages published
751         *                locally
752         * @return the durable connection consumer
753         * @throws JMSException if the <CODE>Connection</CODE> object fails to
754         *                 create a connection consumer due to some internal error
755         *                 or invalid arguments for <CODE>sessionPool</CODE> and
756         *                 <CODE>messageSelector</CODE>.
757         * @throws javax.jms.InvalidDestinationException if an invalid destination
758         *                 is specified.
759         * @throws javax.jms.InvalidSelectorException if the message selector is
760         *                 invalid.
761         * @see javax.jms.ConnectionConsumer
762         * @since 1.1
763         */
764        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
765                                                                  boolean noLocal) throws JMSException {
766            checkClosedOrFailed();
767            ensureConnectionInfoSent();
768            SessionId sessionId = new SessionId(info.getConnectionId(), -1);
769            ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
770            info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
771            info.setSubscriptionName(subscriptionName);
772            info.setSelector(messageSelector);
773            info.setPrefetchSize(maxMessages);
774            info.setDispatchAsync(isDispatchAsync());
775    
776            // Allows the options on the destination to configure the consumerInfo
777            if (info.getDestination().getOptions() != null) {
778                Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
779                IntrospectionSupport.setProperties(this.info, options, "consumer.");
780            }
781    
782            return new ActiveMQConnectionConsumer(this, sessionPool, info);
783        }
784    
785        // Properties
786        // -------------------------------------------------------------------------
787    
788        /**
789         * Returns true if this connection has been started
790         * 
791         * @return true if this Connection is started
792         */
793        public boolean isStarted() {
794            return started.get();
795        }
796    
797        /**
798         * Returns true if the connection is closed
799         */
800        public boolean isClosed() {
801            return closed.get();
802        }
803    
804        /**
805         * Returns true if the connection is in the process of being closed
806         */
807        public boolean isClosing() {
808            return closing.get();
809        }
810    
811        /**
812         * Returns true if the underlying transport has failed
813         */
814        public boolean isTransportFailed() {
815            return transportFailed.get();
816        }
817    
818        /**
819         * @return Returns the prefetchPolicy.
820         */
821        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
822            return prefetchPolicy;
823        }
824    
825        /**
826         * Sets the <a
827         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
828         * policy</a> for consumers created by this connection.
829         */
830        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
831            this.prefetchPolicy = prefetchPolicy;
832        }
833    
834        /**
835         */
836        public Transport getTransportChannel() {
837            return transport;
838        }
839    
840        /**
841         * @return Returns the clientID of the connection, forcing one to be
842         *         generated if one has not yet been configured.
843         */
844        public String getInitializedClientID() throws JMSException {
845            ensureConnectionInfoSent();
846            return info.getClientId();
847        }
848    
849        /**
850         * @return Returns the timeStampsDisableByDefault.
851         */
852        public boolean isDisableTimeStampsByDefault() {
853            return disableTimeStampsByDefault;
854        }
855    
856        /**
857         * Sets whether or not timestamps on messages should be disabled or not. If
858         * you disable them it adds a small performance boost.
859         */
860        public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
861            this.disableTimeStampsByDefault = timeStampsDisableByDefault;
862        }
863    
864        /**
865         * @return Returns the dispatchOptimizedMessage.
866         */
867        public boolean isOptimizedMessageDispatch() {
868            return optimizedMessageDispatch;
869        }
870    
871        /**
872         * If this flag is set then an larger prefetch limit is used - only
873         * applicable for durable topic subscribers.
874         */
875        public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
876            this.optimizedMessageDispatch = dispatchOptimizedMessage;
877        }
878    
879        /**
880         * @return Returns the closeTimeout.
881         */
882        public int getCloseTimeout() {
883            return closeTimeout;
884        }
885    
886        /**
887         * Sets the timeout before a close is considered complete. Normally a
888         * close() on a connection waits for confirmation from the broker; this
889         * allows that operation to timeout to save the client hanging if there is
890         * no broker
891         */
892        public void setCloseTimeout(int closeTimeout) {
893            this.closeTimeout = closeTimeout;
894        }
895    
896        /**
897         * @return ConnectionInfo
898         */
899        public ConnectionInfo getConnectionInfo() {
900            return this.info;
901        }
902    
903        public boolean isUseRetroactiveConsumer() {
904            return useRetroactiveConsumer;
905        }
906    
907        /**
908         * Sets whether or not retroactive consumers are enabled. Retroactive
909         * consumers allow non-durable topic subscribers to receive old messages
910         * that were published before the non-durable subscriber started.
911         */
912        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
913            this.useRetroactiveConsumer = useRetroactiveConsumer;
914        }
915    
916        public boolean isNestedMapAndListEnabled() {
917            return nestedMapAndListEnabled;
918        }
919    
920        /**
921         * Enables/disables whether or not Message properties and MapMessage entries
922         * support <a
923         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
924         * Structures</a> of Map and List objects
925         */
926        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
927            this.nestedMapAndListEnabled = structuredMapsEnabled;
928        }
929    
930        public boolean isExclusiveConsumer() {
931            return exclusiveConsumer;
932        }
933    
934        /**
935         * Enables or disables whether or not queue consumers should be exclusive or
936         * not for example to preserve ordering when not using <a
937         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
938         * 
939         * @param exclusiveConsumer
940         */
941        public void setExclusiveConsumer(boolean exclusiveConsumer) {
942            this.exclusiveConsumer = exclusiveConsumer;
943        }
944    
945        /**
946         * Adds a transport listener so that a client can be notified of events in
947         * the underlying transport
948         */
949        public void addTransportListener(TransportListener transportListener) {
950            transportListeners.add(transportListener);
951        }
952    
953        public void removeTransportListener(TransportListener transportListener) {
954            transportListeners.remove(transportListener);
955        }
956    
957        public boolean isUseDedicatedTaskRunner() {
958            return useDedicatedTaskRunner;
959        }
960        
961        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
962            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
963        }
964    
965        public TaskRunnerFactory getSessionTaskRunner() {
966            synchronized (this) {
967                if (sessionTaskRunner == null) {
968                    sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
969                }
970            }
971            return sessionTaskRunner;
972        }
973    
974        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
975            this.sessionTaskRunner = sessionTaskRunner;
976        }
977    
978        public MessageTransformer getTransformer() {
979            return transformer;
980        }
981    
982        /**
983         * Sets the transformer used to transform messages before they are sent on
984         * to the JMS bus or when they are received from the bus but before they are
985         * delivered to the JMS client
986         */
987        public void setTransformer(MessageTransformer transformer) {
988            this.transformer = transformer;
989        }
990    
991        /**
992         * @return the statsEnabled
993         */
994        public boolean isStatsEnabled() {
995            return this.stats.isEnabled();
996        }
997    
998        /**
999         * @param statsEnabled the statsEnabled to set
1000         */
1001        public void setStatsEnabled(boolean statsEnabled) {
1002            this.stats.setEnabled(statsEnabled);
1003        }
1004    
1005        /**
1006         * Returns the {@link DestinationSource} object which can be used to listen to destinations
1007         * being created or destroyed or to enquire about the current destinations available on the broker
1008         *
1009         * @return a lazily created destination source
1010         * @throws JMSException
1011         */
1012        public DestinationSource getDestinationSource() throws JMSException {
1013            if (destinationSource == null) {
1014                destinationSource = new DestinationSource(this);
1015                destinationSource.start();
1016            }
1017            return destinationSource;
1018        }
1019    
1020        // Implementation methods
1021        // -------------------------------------------------------------------------
1022    
1023        /**
1024         * Used internally for adding Sessions to the Connection
1025         * 
1026         * @param session
1027         * @throws JMSException
1028         * @throws JMSException
1029         */
1030        protected void addSession(ActiveMQSession session) throws JMSException {
1031            this.sessions.add(session);
1032            if (sessions.size() > 1 || session.isTransacted()) {
1033                optimizedMessageDispatch = false;
1034            }
1035        }
1036    
1037        /**
1038         * Used interanlly for removing Sessions from a Connection
1039         * 
1040         * @param session
1041         */
1042        protected void removeSession(ActiveMQSession session) {
1043            this.sessions.remove(session);
1044            this.removeDispatcher(session);
1045        }
1046    
1047        /**
1048         * Add a ConnectionConsumer
1049         * 
1050         * @param connectionConsumer
1051         * @throws JMSException
1052         */
1053        protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1054            this.connectionConsumers.add(connectionConsumer);
1055        }
1056    
1057        /**
1058         * Remove a ConnectionConsumer
1059         * 
1060         * @param connectionConsumer
1061         */
1062        protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1063            this.connectionConsumers.remove(connectionConsumer);
1064            this.removeDispatcher(connectionConsumer);
1065        }
1066    
1067        /**
1068         * Creates a <CODE>TopicSession</CODE> object.
1069         * 
1070         * @param transacted indicates whether the session is transacted
1071         * @param acknowledgeMode indicates whether the consumer or the client will
1072         *                acknowledge any messages it receives; ignored if the
1073         *                session is transacted. Legal values are
1074         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1075         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1076         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1077         * @return a newly created topic session
1078         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1079         *                 to create a session due to some internal error or lack of
1080         *                 support for the specific transaction and acknowledgement
1081         *                 mode.
1082         * @see Session#AUTO_ACKNOWLEDGE
1083         * @see Session#CLIENT_ACKNOWLEDGE
1084         * @see Session#DUPS_OK_ACKNOWLEDGE
1085         */
1086        public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1087            return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1088        }
1089    
1090        /**
1091         * Creates a connection consumer for this connection (optional operation).
1092         * This is an expert facility not used by regular JMS clients.
1093         * 
1094         * @param topic the topic to access
1095         * @param messageSelector only messages with properties matching the message
1096         *                selector expression are delivered. A value of null or an
1097         *                empty string indicates that there is no message selector
1098         *                for the message consumer.
1099         * @param sessionPool the server session pool to associate with this
1100         *                connection consumer
1101         * @param maxMessages the maximum number of messages that can be assigned to
1102         *                a server session at one time
1103         * @return the connection consumer
1104         * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1105         *                 to create a connection consumer due to some internal
1106         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1107         *                 and <CODE>messageSelector</CODE>.
1108         * @throws javax.jms.InvalidDestinationException if an invalid topic is
1109         *                 specified.
1110         * @throws javax.jms.InvalidSelectorException if the message selector is
1111         *                 invalid.
1112         * @see javax.jms.ConnectionConsumer
1113         */
1114        public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1115            return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1116        }
1117    
1118        /**
1119         * Creates a connection consumer for this connection (optional operation).
1120         * This is an expert facility not used by regular JMS clients.
1121         * 
1122         * @param queue the queue to access
1123         * @param messageSelector only messages with properties matching the message
1124         *                selector expression are delivered. A value of null or an
1125         *                empty string indicates that there is no message selector
1126         *                for the message consumer.
1127         * @param sessionPool the server session pool to associate with this
1128         *                connection consumer
1129         * @param maxMessages the maximum number of messages that can be assigned to
1130         *                a server session at one time
1131         * @return the connection consumer
1132         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1133         *                 to create a connection consumer due to some internal
1134         *                 error or invalid arguments for <CODE>sessionPool</CODE>
1135         *                 and <CODE>messageSelector</CODE>.
1136         * @throws javax.jms.InvalidDestinationException if an invalid queue is
1137         *                 specified.
1138         * @throws javax.jms.InvalidSelectorException if the message selector is
1139         *                 invalid.
1140         * @see javax.jms.ConnectionConsumer
1141         */
1142        public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1143            return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1144        }
1145    
1146        /**
1147         * Creates a connection consumer for this connection (optional operation).
1148         * This is an expert facility not used by regular JMS clients.
1149         * 
1150         * @param destination the destination to access
1151         * @param messageSelector only messages with properties matching the message
1152         *                selector expression are delivered. A value of null or an
1153         *                empty string indicates that there is no message selector
1154         *                for the message consumer.
1155         * @param sessionPool the server session pool to associate with this
1156         *                connection consumer
1157         * @param maxMessages the maximum number of messages that can be assigned to
1158         *                a server session at one time
1159         * @return the connection consumer
1160         * @throws JMSException if the <CODE>Connection</CODE> object fails to
1161         *                 create a connection consumer due to some internal error
1162         *                 or invalid arguments for <CODE>sessionPool</CODE> and
1163         *                 <CODE>messageSelector</CODE>.
1164         * @throws javax.jms.InvalidDestinationException if an invalid destination
1165         *                 is specified.
1166         * @throws javax.jms.InvalidSelectorException if the message selector is
1167         *                 invalid.
1168         * @see javax.jms.ConnectionConsumer
1169         * @since 1.1
1170         */
1171        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1172            return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1173        }
1174    
1175        public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1176            throws JMSException {
1177    
1178            checkClosedOrFailed();
1179            ensureConnectionInfoSent();
1180    
1181            ConsumerId consumerId = createConsumerId();
1182            ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1183            consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1184            consumerInfo.setSelector(messageSelector);
1185            consumerInfo.setPrefetchSize(maxMessages);
1186            consumerInfo.setNoLocal(noLocal);
1187            consumerInfo.setDispatchAsync(isDispatchAsync());
1188    
1189            // Allows the options on the destination to configure the consumerInfo
1190            if (consumerInfo.getDestination().getOptions() != null) {
1191                Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1192                IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1193            }
1194    
1195            return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1196        }
1197    
1198        /**
1199         * @return
1200         */
1201        private ConsumerId createConsumerId() {
1202            return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1203        }
1204    
1205        /**
1206         * @return
1207         */
1208        private ProducerId createProducerId() {
1209            return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
1210        }
1211    
1212        /**
1213         * Creates a <CODE>QueueSession</CODE> object.
1214         * 
1215         * @param transacted indicates whether the session is transacted
1216         * @param acknowledgeMode indicates whether the consumer or the client will
1217         *                acknowledge any messages it receives; ignored if the
1218         *                session is transacted. Legal values are
1219         *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1220         *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1221         *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1222         * @return a newly created queue session
1223         * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1224         *                 to create a session due to some internal error or lack of
1225         *                 support for the specific transaction and acknowledgement
1226         *                 mode.
1227         * @see Session#AUTO_ACKNOWLEDGE
1228         * @see Session#CLIENT_ACKNOWLEDGE
1229         * @see Session#DUPS_OK_ACKNOWLEDGE
1230         */
1231        public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1232            return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1233        }
1234    
1235        /**
1236         * Ensures that the clientID was manually specified and not auto-generated.
1237         * If the clientID was not specified this method will throw an exception.
1238         * This method is used to ensure that the clientID + durableSubscriber name
1239         * are used correctly.
1240         * 
1241         * @throws JMSException
1242         */
1243        public void checkClientIDWasManuallySpecified() throws JMSException {
1244            if (!userSpecifiedClientID) {
1245                throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1246            }
1247        }
1248    
1249        /**
1250         * send a Packet through the Connection - for internal use only
1251         * 
1252         * @param command
1253         * @throws JMSException
1254         */
1255        public void asyncSendPacket(Command command) throws JMSException {
1256            if (isClosed()) {
1257                throw new ConnectionClosedException();
1258            } else {
1259                doAsyncSendPacket(command);
1260            }
1261        }
1262    
1263            private void doAsyncSendPacket(Command command) throws JMSException {
1264                    try {
1265                        this.transport.oneway(command);
1266                    } catch (IOException e) {
1267                        throw JMSExceptionSupport.create(e);
1268                    }
1269            }
1270    
1271        /**
1272         * Send a packet through a Connection - for internal use only
1273         * 
1274         * @param command
1275         * @return
1276         * @throws JMSException
1277         */
1278        public Response syncSendPacket(Command command) throws JMSException {
1279            if (isClosed()) {
1280                throw new ConnectionClosedException();
1281            } else {
1282    
1283                try {
1284                    Response response = (Response)this.transport.request(command);
1285                    if (response.isException()) {
1286                        ExceptionResponse er = (ExceptionResponse)response;
1287                        if (er.getException() instanceof JMSException) {
1288                            throw (JMSException)er.getException();
1289                        } else {
1290                            if (isClosed()||closing.get()) {
1291                                LOG.debug("Received an exception but connection is closing");
1292                            }
1293                            JMSException jmsEx = null;
1294                            try {
1295                                jmsEx = JMSExceptionSupport.create(er.getException());
1296                            }catch(Throwable e) {
1297                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1298                            }
1299                            //dispose of transport for security exceptions
1300                            if (er.getException() instanceof SecurityException){
1301                                Transport t = this.transport;
1302                                if (null != t){
1303                                    ServiceSupport.dispose(t);
1304                                }
1305                            }
1306                            if(jmsEx !=null) {
1307                                throw jmsEx;
1308                            }
1309                        }
1310                    }
1311                    return response;
1312                } catch (IOException e) {
1313                    throw JMSExceptionSupport.create(e);
1314                }
1315            }
1316        }
1317    
1318        /**
1319         * Send a packet through a Connection - for internal use only
1320         * 
1321         * @param command
1322         * @return
1323         * @throws JMSException
1324         */
1325        public Response syncSendPacket(Command command, int timeout) throws JMSException {
1326            if (isClosed() || closing.get()) {
1327                throw new ConnectionClosedException();
1328            } else {
1329                return doSyncSendPacket(command, timeout);
1330            }
1331        }
1332    
1333            private Response doSyncSendPacket(Command command, int timeout)
1334                            throws JMSException {
1335                    try {
1336                        Response response = (Response) (timeout > 0
1337                        ? this.transport.request(command, timeout) 
1338                        : this.transport.request(command));
1339                        if (response != null && response.isException()) {
1340                            ExceptionResponse er = (ExceptionResponse)response;
1341                            if (er.getException() instanceof JMSException) {
1342                                throw (JMSException)er.getException();
1343                            } else {
1344                                throw JMSExceptionSupport.create(er.getException());
1345                            }
1346                        }
1347                        return response;
1348                    } catch (IOException e) {
1349                        throw JMSExceptionSupport.create(e);
1350                    }
1351            }
1352    
1353        /**
1354         * @return statistics for this Connection
1355         */
1356        public StatsImpl getStats() {
1357            return stats;
1358        }
1359    
1360        /**
1361         * simply throws an exception if the Connection is already closed or the
1362         * Transport has failed
1363         * 
1364         * @throws JMSException
1365         */
1366        protected synchronized void checkClosedOrFailed() throws JMSException {
1367            checkClosed();
1368            if (transportFailed.get()) {
1369                throw new ConnectionFailedException(firstFailureError);
1370            }
1371        }
1372    
1373        /**
1374         * simply throws an exception if the Connection is already closed
1375         * 
1376         * @throws JMSException
1377         */
1378        protected synchronized void checkClosed() throws JMSException {
1379            if (closed.get()) {
1380                throw new ConnectionClosedException();
1381            }
1382        }
1383    
1384        /**
1385         * Send the ConnectionInfo to the Broker
1386         * 
1387         * @throws JMSException
1388         */
1389        protected void ensureConnectionInfoSent() throws JMSException {
1390            synchronized(this.ensureConnectionInfoSentMutex) {
1391                // Can we skip sending the ConnectionInfo packet??
1392                if (isConnectionInfoSentToBroker || closed.get()) {
1393                    return;
1394                }
1395                //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1396                if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1397                    info.setClientId(clientIdGenerator.generateId());
1398                }
1399                syncSendPacket(info.copy());
1400        
1401                this.isConnectionInfoSentToBroker = true;
1402                // Add a temp destination advisory consumer so that
1403                // We know what the valid temporary destinations are on the
1404                // broker without having to do an RPC to the broker.
1405        
1406                ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1407                if (watchTopicAdvisories) {
1408                    advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1409                }
1410            }
1411        }
1412    
1413        public synchronized boolean isWatchTopicAdvisories() {
1414            return watchTopicAdvisories;
1415        }
1416    
1417        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1418            this.watchTopicAdvisories = watchTopicAdvisories;
1419        }
1420    
1421        /**
1422         * @return Returns the useAsyncSend.
1423         */
1424        public boolean isUseAsyncSend() {
1425            return useAsyncSend;
1426        }
1427    
1428        /**
1429         * Forces the use of <a
1430         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1431         * adds a massive performance boost; but means that the send() method will
1432         * return immediately whether the message has been sent or not which could
1433         * lead to message loss.
1434         */
1435        public void setUseAsyncSend(boolean useAsyncSend) {
1436            this.useAsyncSend = useAsyncSend;
1437        }
1438    
1439        /**
1440         * @return true if always sync send messages
1441         */
1442        public boolean isAlwaysSyncSend() {
1443            return this.alwaysSyncSend;
1444        }
1445    
1446        /**
1447         * Set true if always require messages to be sync sent
1448         * 
1449         * @param alwaysSyncSend
1450         */
1451        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1452            this.alwaysSyncSend = alwaysSyncSend;
1453        }
1454        
1455        /**
1456         * @return the messagePrioritySupported
1457         */
1458        public boolean isMessagePrioritySupported() {
1459            return this.messagePrioritySupported;
1460        }
1461    
1462        /**
1463         * @param messagePrioritySupported the messagePrioritySupported to set
1464         */
1465        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1466            this.messagePrioritySupported = messagePrioritySupported;
1467        }
1468    
1469        /**
1470         * Cleans up this connection so that it's state is as if the connection was
1471         * just created. This allows the Resource Adapter to clean up a connection
1472         * so that it can be reused without having to close and recreate the
1473         * connection.
1474         */
1475        public void cleanup() throws JMSException {
1476    
1477            if (advisoryConsumer != null && !isTransportFailed()) {
1478                advisoryConsumer.dispose();
1479                advisoryConsumer = null;
1480            }
1481    
1482            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1483                ActiveMQSession s = i.next();
1484                s.dispose();
1485            }
1486            for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1487                ActiveMQConnectionConsumer c = i.next();
1488                c.dispose();
1489            }
1490            for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
1491                ActiveMQInputStream c = i.next();
1492                c.dispose();
1493            }
1494            for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
1495                ActiveMQOutputStream c = i.next();
1496                c.dispose();
1497            }
1498    
1499            if (isConnectionInfoSentToBroker) {
1500                if (!transportFailed.get() && !closing.get()) {
1501                    syncSendPacket(info.createRemoveCommand());
1502                }
1503                isConnectionInfoSentToBroker = false;
1504            }
1505            if (userSpecifiedClientID) {
1506                info.setClientId(null);
1507                userSpecifiedClientID = false;
1508            }
1509            clientIDSet = false;
1510    
1511            started.set(false);
1512        }
1513    
1514        public void finalize() throws Throwable{
1515            if (scheduler != null){
1516                scheduler.stop();
1517            }
1518        }
1519    
1520        /**
1521         * Changes the associated username/password that is associated with this
1522         * connection. If the connection has been used, you must called cleanup()
1523         * before calling this method.
1524         * 
1525         * @throws IllegalStateException if the connection is in used.
1526         */
1527        public void changeUserInfo(String userName, String password) throws JMSException {
1528            if (isConnectionInfoSentToBroker) {
1529                throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1530            }
1531            this.info.setUserName(userName);
1532            this.info.setPassword(password);
1533        }
1534    
1535        /**
1536         * @return Returns the resourceManagerId.
1537         * @throws JMSException
1538         */
1539        public String getResourceManagerId() throws JMSException {
1540            waitForBrokerInfo();
1541            if (brokerInfo == null) {
1542                throw new JMSException("Connection failed before Broker info was received.");
1543            }
1544            return brokerInfo.getBrokerId().getValue();
1545        }
1546    
1547        /**
1548         * Returns the broker name if one is available or null if one is not
1549         * available yet.
1550         */
1551        public String getBrokerName() {
1552            try {
1553                brokerInfoReceived.await(5, TimeUnit.SECONDS);
1554                if (brokerInfo == null) {
1555                    return null;
1556                }
1557                return brokerInfo.getBrokerName();
1558            } catch (InterruptedException e) {
1559                Thread.currentThread().interrupt();
1560                return null;
1561            }
1562        }
1563    
1564        /**
1565         * Returns the broker information if it is available or null if it is not
1566         * available yet.
1567         */
1568        public BrokerInfo getBrokerInfo() {
1569            return brokerInfo;
1570        }
1571    
1572        /**
1573         * @return Returns the RedeliveryPolicy.
1574         * @throws JMSException
1575         */
1576        public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1577            return redeliveryPolicy;
1578        }
1579    
1580        /**
1581         * Sets the redelivery policy to be used when messages are rolled back
1582         */
1583        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1584            this.redeliveryPolicy = redeliveryPolicy;
1585        }
1586    
1587        public BlobTransferPolicy getBlobTransferPolicy() {
1588            if (blobTransferPolicy == null) {
1589                blobTransferPolicy = createBlobTransferPolicy();
1590            }
1591            return blobTransferPolicy;
1592        }
1593    
1594        /**
1595         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1596         * OBjects) are transferred from producers to brokers to consumers
1597         */
1598        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1599            this.blobTransferPolicy = blobTransferPolicy;
1600        }
1601    
1602        /**
1603         * @return Returns the alwaysSessionAsync.
1604         */
1605        public boolean isAlwaysSessionAsync() {
1606            return alwaysSessionAsync;
1607        }
1608    
1609        /**
1610         * If this flag is set then a separate thread is not used for dispatching
1611         * messages for each Session in the Connection. However, a separate thread
1612         * is always used if there is more than one session, or the session isn't in
1613         * auto acknowledge or duplicates ok mode
1614         */
1615        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1616            this.alwaysSessionAsync = alwaysSessionAsync;
1617        }
1618    
1619        /**
1620         * @return Returns the optimizeAcknowledge.
1621         */
1622        public boolean isOptimizeAcknowledge() {
1623            return optimizeAcknowledge;
1624        }
1625    
1626        /**
1627         * Enables an optimised acknowledgement mode where messages are acknowledged
1628         * in batches rather than individually
1629         * 
1630         * @param optimizeAcknowledge The optimizeAcknowledge to set.
1631         */
1632        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1633            this.optimizeAcknowledge = optimizeAcknowledge;
1634        }
1635    
1636        public long getWarnAboutUnstartedConnectionTimeout() {
1637            return warnAboutUnstartedConnectionTimeout;
1638        }
1639    
1640        /**
1641         * Enables the timeout from a connection creation to when a warning is
1642         * generated if the connection is not properly started via {@link #start()}
1643         * and a message is received by a consumer. It is a very common gotcha to
1644         * forget to <a
1645         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1646         * the connection</a> so this option makes the default case to create a
1647         * warning if the user forgets. To disable the warning just set the value to <
1648         * 0 (say -1).
1649         */
1650        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1651            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1652        }
1653        
1654        /**
1655         * @return the sendTimeout
1656         */
1657        public int getSendTimeout() {
1658            return sendTimeout;
1659        }
1660    
1661        /**
1662         * @param sendTimeout the sendTimeout to set
1663         */
1664        public void setSendTimeout(int sendTimeout) {
1665            this.sendTimeout = sendTimeout;
1666        }
1667        
1668        /**
1669         * @return the sendAcksAsync
1670         */
1671        public boolean isSendAcksAsync() {
1672            return sendAcksAsync;
1673        }
1674    
1675        /**
1676         * @param sendAcksAsync the sendAcksAsync to set
1677         */
1678        public void setSendAcksAsync(boolean sendAcksAsync) {
1679            this.sendAcksAsync = sendAcksAsync;
1680        }
1681    
1682    
1683        /**
1684         * Returns the time this connection was created
1685         */
1686        public long getTimeCreated() {
1687            return timeCreated;
1688        }
1689    
1690        private void waitForBrokerInfo() throws JMSException {
1691            try {
1692                brokerInfoReceived.await();
1693            } catch (InterruptedException e) {
1694                Thread.currentThread().interrupt();
1695                throw JMSExceptionSupport.create(e);
1696            }
1697        }
1698    
1699        // Package protected so that it can be used in unit tests
1700        public Transport getTransport() {
1701            return transport;
1702        }
1703    
1704        public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1705            producers.put(producerId, producer);
1706        }
1707    
1708        public void removeProducer(ProducerId producerId) {
1709            producers.remove(producerId);
1710        }
1711    
1712        public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1713            dispatchers.put(consumerId, dispatcher);
1714        }
1715    
1716        public void removeDispatcher(ConsumerId consumerId) {
1717            dispatchers.remove(consumerId);
1718        }
1719    
1720        /**
1721         * @param o - the command to consume
1722         */
1723        public void onCommand(final Object o) {
1724            final Command command = (Command)o;
1725            if (!closed.get() && command != null) {
1726                try {
1727                    command.visit(new CommandVisitorAdapter() {
1728                        @Override
1729                        public Response processMessageDispatch(MessageDispatch md) throws Exception {
1730                            waitForTransportInterruptionProcessingToComplete();
1731                            ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1732                            if (dispatcher != null) {
1733                                // Copy in case a embedded broker is dispatching via
1734                                // vm://
1735                                // md.getMessage() == null to signal end of queue
1736                                // browse.
1737                                Message msg = md.getMessage();
1738                                if (msg != null) {
1739                                    msg = msg.copy();
1740                                    msg.setReadOnlyBody(true);
1741                                    msg.setReadOnlyProperties(true);
1742                                    msg.setRedeliveryCounter(md.getRedeliveryCounter());
1743                                    msg.setConnection(ActiveMQConnection.this);
1744                                    md.setMessage(msg);
1745                                }
1746                                dispatcher.dispatch(md);
1747                            }
1748                            return null;
1749                        }
1750    
1751                        @Override
1752                        public Response processProducerAck(ProducerAck pa) throws Exception {
1753                            if (pa != null && pa.getProducerId() != null) {
1754                                ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1755                                if (producer != null) {
1756                                    producer.onProducerAck(pa);
1757                                }
1758                            }
1759                            return null;
1760                        }
1761    
1762                        @Override
1763                        public Response processBrokerInfo(BrokerInfo info) throws Exception {
1764                            brokerInfo = info;
1765                            brokerInfoReceived.countDown();
1766                            optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1767                            getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1768                            return null;
1769                        }
1770    
1771                        @Override
1772                        public Response processConnectionError(final ConnectionError error) throws Exception {
1773                            executor.execute(new Runnable() {
1774                                public void run() {
1775                                    onAsyncException(error.getException());
1776                                }
1777                            });
1778                            return null;
1779                        }
1780    
1781                        @Override
1782                        public Response processControlCommand(ControlCommand command) throws Exception {
1783                            onControlCommand(command);
1784                            return null;
1785                        }
1786    
1787                        @Override
1788                        public Response processConnectionControl(ConnectionControl control) throws Exception {
1789                            onConnectionControl((ConnectionControl)command);
1790                            return null;
1791                        }
1792    
1793                        @Override
1794                        public Response processConsumerControl(ConsumerControl control) throws Exception {
1795                            onConsumerControl((ConsumerControl)command);
1796                            return null;
1797                        }
1798    
1799                        @Override
1800                        public Response processWireFormat(WireFormatInfo info) throws Exception {
1801                            onWireFormatInfo((WireFormatInfo)command);
1802                            return null;
1803                        }
1804                    });
1805                } catch (Exception e) {
1806                    onClientInternalException(e);
1807                }
1808    
1809            }
1810            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1811                TransportListener listener = iter.next();
1812                listener.onCommand(command);
1813            }
1814        }
1815    
1816        protected void onWireFormatInfo(WireFormatInfo info) {
1817            protocolVersion.set(info.getVersion());
1818        }
1819    
1820        /**
1821         * Handles async client internal exceptions.
1822         * A client internal exception is usually one that has been thrown
1823         * by a container runtime component during asynchronous processing of a
1824         * message that does not affect the connection itself.
1825         * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1826         * its <code>onException</code> method, if one has been registered with this connection.
1827         * 
1828         * @param error the exception that the problem
1829         */
1830        public void onClientInternalException(final Throwable error) {
1831            if ( !closed.get() && !closing.get() ) {
1832                if ( this.clientInternalExceptionListener != null ) {
1833                    executor.execute(new Runnable() {
1834                        public void run() {
1835                            ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1836                        }
1837                    });
1838                } else {
1839                    LOG.debug("Async client internal exception occurred with no exception listener registered: " 
1840                            + error, error);
1841                }
1842            }
1843        }
1844        /**
1845         * Used for handling async exceptions
1846         * 
1847         * @param error
1848         */
1849        public void onAsyncException(Throwable error) {
1850            if (!closed.get() && !closing.get()) {
1851                if (this.exceptionListener != null) {
1852    
1853                    if (!(error instanceof JMSException)) {
1854                        error = JMSExceptionSupport.create(error);
1855                    }
1856                    final JMSException e = (JMSException)error;
1857    
1858                    executor.execute(new Runnable() {
1859                        public void run() {
1860                            ActiveMQConnection.this.exceptionListener.onException(e);
1861                        }
1862                    });
1863    
1864                } else {
1865                    LOG.debug("Async exception with no exception listener: " + error, error);
1866                }
1867            }
1868        }
1869    
1870        public void onException(final IOException error) {
1871                    onAsyncException(error);
1872                    if (!closing.get() && !closed.get()) {
1873                            executor.execute(new Runnable() {
1874                                    public void run() {
1875                                            transportFailed(error);
1876                                            ServiceSupport.dispose(ActiveMQConnection.this.transport);
1877                                            brokerInfoReceived.countDown();
1878                                            try {
1879                                                    cleanup();
1880                                            } catch (JMSException e) {
1881                                                    LOG.warn("Exception during connection cleanup, " + e, e);
1882                                            }
1883                                            for (Iterator<TransportListener> iter = transportListeners
1884                                                            .iterator(); iter.hasNext();) {
1885                                                    TransportListener listener = iter.next();
1886                                                    listener.onException(error);
1887                                            }
1888                                    }
1889                            });
1890                    }
1891            }
1892    
1893        public void transportInterupted() {
1894            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
1895            if (LOG.isDebugEnabled()) {
1896                LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
1897            }
1898            signalInterruptionProcessingNeeded();
1899    
1900            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1901                ActiveMQSession s = i.next();
1902                s.clearMessagesInProgress();
1903            }
1904            
1905            for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1906                connectionConsumer.clearMessagesInProgress();    
1907            }
1908            
1909            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1910                TransportListener listener = iter.next();
1911                listener.transportInterupted();
1912            }
1913        }
1914    
1915        public void transportResumed() {
1916            for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1917                TransportListener listener = iter.next();
1918                listener.transportResumed();
1919            }
1920        }
1921    
1922        /**
1923         * Create the DestinationInfo object for the temporary destination.
1924         * 
1925         * @param topic - if its true topic, else queue.
1926         * @return DestinationInfo
1927         * @throws JMSException
1928         */
1929        protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
1930    
1931            // Check if Destination info is of temporary type.
1932            ActiveMQTempDestination dest;
1933            if (topic) {
1934                dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1935            } else {
1936                dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
1937            }
1938    
1939            DestinationInfo info = new DestinationInfo();
1940            info.setConnectionId(this.info.getConnectionId());
1941            info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
1942            info.setDestination(dest);
1943            syncSendPacket(info);
1944    
1945            dest.setConnection(this);
1946            activeTempDestinations.put(dest, dest);
1947            return dest;
1948        }
1949    
1950        /**
1951         * @param destination
1952         * @throws JMSException
1953         */
1954        public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
1955    
1956            checkClosedOrFailed();
1957    
1958            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1959                ActiveMQSession s = i.next();
1960                if (s.isInUse(destination)) {
1961                    throw new JMSException("A consumer is consuming from the temporary destination");
1962                }
1963            }
1964    
1965            activeTempDestinations.remove(destination);
1966    
1967            DestinationInfo destInfo = new DestinationInfo();
1968            destInfo.setConnectionId(this.info.getConnectionId());
1969            destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
1970            destInfo.setDestination(destination);
1971            destInfo.setTimeout(0);
1972            syncSendPacket(destInfo);
1973        }
1974    
1975        public boolean isDeleted(ActiveMQDestination dest) {
1976    
1977            // If we are not watching the advisories.. then
1978            // we will assume that the temp destination does exist.
1979            if (advisoryConsumer == null) {
1980                return false;
1981            }
1982    
1983            return !activeTempDestinations.contains(dest);
1984        }
1985    
1986        public boolean isCopyMessageOnSend() {
1987            return copyMessageOnSend;
1988        }
1989    
1990        public LongSequenceGenerator getLocalTransactionIdGenerator() {
1991            return localTransactionIdGenerator;
1992        }
1993    
1994        public boolean isUseCompression() {
1995            return useCompression;
1996        }
1997    
1998        /**
1999         * Enables the use of compression of the message bodies
2000         */
2001        public void setUseCompression(boolean useCompression) {
2002            this.useCompression = useCompression;
2003        }
2004    
2005        public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2006    
2007            checkClosedOrFailed();
2008            ensureConnectionInfoSent();
2009    
2010            DestinationInfo info = new DestinationInfo();
2011            info.setConnectionId(this.info.getConnectionId());
2012            info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2013            info.setDestination(destination);
2014            info.setTimeout(0);
2015            syncSendPacket(info);
2016    
2017        }
2018    
2019        public boolean isDispatchAsync() {
2020            return dispatchAsync;
2021        }
2022    
2023        /**
2024         * Enables or disables the default setting of whether or not consumers have
2025         * their messages <a
2026         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2027         * synchronously or asynchronously by the broker</a>. For non-durable
2028         * topics for example we typically dispatch synchronously by default to
2029         * minimize context switches which boost performance. However sometimes its
2030         * better to go slower to ensure that a single blocked consumer socket does
2031         * not block delivery to other consumers.
2032         * 
2033         * @param asyncDispatch If true then consumers created on this connection
2034         *                will default to having their messages dispatched
2035         *                asynchronously. The default value is false.
2036         */
2037        public void setDispatchAsync(boolean asyncDispatch) {
2038            this.dispatchAsync = asyncDispatch;
2039        }
2040    
2041        public boolean isObjectMessageSerializationDefered() {
2042            return objectMessageSerializationDefered;
2043        }
2044    
2045        /**
2046         * When an object is set on an ObjectMessage, the JMS spec requires the
2047         * object to be serialized by that set method. Enabling this flag causes the
2048         * object to not get serialized. The object may subsequently get serialized
2049         * if the message needs to be sent over a socket or stored to disk.
2050         */
2051        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2052            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2053        }
2054    
2055        public InputStream createInputStream(Destination dest) throws JMSException {
2056            return createInputStream(dest, null);
2057        }
2058    
2059        public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
2060            return createInputStream(dest, messageSelector, false);
2061        }
2062    
2063        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
2064            return createInputStream(dest, messageSelector, noLocal,  -1);
2065        }
2066    
2067    
2068    
2069        public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2070            return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
2071        }
2072        
2073        public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
2074            return createInputStream(dest, null, false);
2075        }
2076    
2077        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
2078            return createDurableInputStream(dest, name, messageSelector, false);
2079        }
2080    
2081        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
2082            return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
2083        }
2084    
2085        public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
2086            return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
2087        }
2088        
2089        private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
2090            checkClosedOrFailed();
2091            ensureConnectionInfoSent();
2092            return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch(), timeout);
2093        }
2094    
2095        /**
2096         * Creates a persistent output stream; individual messages will be written
2097         * to disk/database by the broker
2098         */
2099        public OutputStream createOutputStream(Destination dest) throws JMSException {
2100            return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2101        }
2102    
2103        /**
2104         * Creates a non persistent output stream; messages will not be written to
2105         * disk
2106         */
2107        public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
2108            return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
2109        }
2110    
2111        /**
2112         * Creates an output stream allowing full control over the delivery mode,
2113         * the priority and time to live of the messages and the properties added to
2114         * messages on the stream.
2115         * 
2116         * @param streamProperties defines a map of key-value pairs where the keys
2117         *                are strings and the values are primitive values (numbers
2118         *                and strings) which are appended to the messages similarly
2119         *                to using the
2120         *                {@link javax.jms.Message#setObjectProperty(String, Object)}
2121         *                method
2122         */
2123        public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
2124            checkClosedOrFailed();
2125            ensureConnectionInfoSent();
2126            return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
2127        }
2128    
2129        /**
2130         * Unsubscribes a durable subscription that has been created by a client.
2131         * <P>
2132         * This method deletes the state being maintained on behalf of the
2133         * subscriber by its provider.
2134         * <P>
2135         * It is erroneous for a client to delete a durable subscription while there
2136         * is an active <CODE>MessageConsumer </CODE> or
2137         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2138         * message is part of a pending transaction or has not been acknowledged in
2139         * the session.
2140         * 
2141         * @param name the name used to identify this subscription
2142         * @throws JMSException if the session fails to unsubscribe to the durable
2143         *                 subscription due to some internal error.
2144         * @throws InvalidDestinationException if an invalid subscription name is
2145         *                 specified.
2146         * @since 1.1
2147         */
2148        public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2149            checkClosedOrFailed();
2150            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2151            rsi.setConnectionId(getConnectionInfo().getConnectionId());
2152            rsi.setSubscriptionName(name);
2153            rsi.setClientId(getConnectionInfo().getClientId());
2154            syncSendPacket(rsi);
2155        }
2156    
2157        /**
2158         * Internal send method optimized: - It does not copy the message - It can
2159         * only handle ActiveMQ messages. - You can specify if the send is async or
2160         * sync - Does not allow you to send /w a transaction.
2161         */
2162        void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2163            checkClosedOrFailed();
2164    
2165            if (destination.isTemporary() && isDeleted(destination)) {
2166                throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2167            }
2168    
2169            msg.setJMSDestination(destination);
2170            msg.setJMSDeliveryMode(deliveryMode);
2171            long expiration = 0L;
2172    
2173            if (!isDisableTimeStampsByDefault()) {
2174                long timeStamp = System.currentTimeMillis();
2175                msg.setJMSTimestamp(timeStamp);
2176                if (timeToLive > 0) {
2177                    expiration = timeToLive + timeStamp;
2178                }
2179            }
2180    
2181            msg.setJMSExpiration(expiration);
2182            msg.setJMSPriority(priority);
2183    
2184            msg.setJMSRedelivered(false);
2185            msg.setMessageId(messageId);
2186    
2187            msg.onSend();
2188    
2189            msg.setProducerId(msg.getMessageId().getProducerId());
2190    
2191            if (LOG.isDebugEnabled()) {
2192                LOG.debug("Sending message: " + msg);
2193            }
2194    
2195            if (async) {
2196                asyncSendPacket(msg);
2197            } else {
2198                syncSendPacket(msg);
2199            }
2200    
2201        }
2202    
2203        public void addOutputStream(ActiveMQOutputStream stream) {
2204            outputStreams.add(stream);
2205        }
2206    
2207        public void removeOutputStream(ActiveMQOutputStream stream) {
2208            outputStreams.remove(stream);
2209        }
2210    
2211        public void addInputStream(ActiveMQInputStream stream) {
2212            inputStreams.add(stream);
2213        }
2214    
2215        public void removeInputStream(ActiveMQInputStream stream) {
2216            inputStreams.remove(stream);
2217        }
2218    
2219        protected void onControlCommand(ControlCommand command) {
2220            String text = command.getCommand();
2221            if (text != null) {
2222                if ("shutdown".equals(text)) {
2223                    LOG.info("JVM told to shutdown");
2224                    System.exit(0);
2225                }
2226                if (false && "close".equals(text)){
2227                    LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2228                    try {
2229                        close();
2230                    } catch (JMSException e) {
2231                    }
2232                }
2233            }
2234        }
2235    
2236        protected void onConnectionControl(ConnectionControl command) {
2237            if (command.isFaultTolerant()) {
2238                this.optimizeAcknowledge = false;
2239                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2240                    ActiveMQSession s = i.next();
2241                    s.setOptimizeAcknowledge(false);
2242                }
2243            }
2244        }
2245    
2246        protected void onConsumerControl(ConsumerControl command) {
2247            if (command.isClose()) {
2248                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2249                    ActiveMQSession s = i.next();
2250                    s.close(command.getConsumerId());
2251                }
2252            } else {
2253                for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2254                    ActiveMQSession s = i.next();
2255                    s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2256                }
2257            }
2258        }
2259    
2260        protected void transportFailed(IOException error) {
2261            transportFailed.set(true);
2262            if (firstFailureError == null) {
2263                firstFailureError = error;
2264            }
2265        }
2266    
2267        /**
2268         * Should a JMS message be copied to a new JMS Message object as part of the
2269         * send() method in JMS. This is enabled by default to be compliant with the
2270         * JMS specification. You can disable it if you do not mutate JMS messages
2271         * after they are sent for a performance boost
2272         */
2273        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2274            this.copyMessageOnSend = copyMessageOnSend;
2275        }
2276    
2277        @Override
2278        public String toString() {
2279            return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2280        }
2281    
2282        protected BlobTransferPolicy createBlobTransferPolicy() {
2283            return new BlobTransferPolicy();
2284        }
2285    
2286        public int getProtocolVersion() {
2287            return protocolVersion.get();
2288        }
2289    
2290        public int getProducerWindowSize() {
2291            return producerWindowSize;
2292        }
2293    
2294        public void setProducerWindowSize(int producerWindowSize) {
2295            this.producerWindowSize = producerWindowSize;
2296        }
2297    
2298        public void setAuditDepth(int auditDepth) {
2299            connectionAudit.setAuditDepth(auditDepth);
2300            }
2301    
2302        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2303            connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2304            }
2305    
2306        protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2307            connectionAudit.removeDispatcher(dispatcher);
2308        }
2309    
2310        protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2311            return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2312        }
2313    
2314        protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2315            connectionAudit.rollbackDuplicate(dispatcher, message);
2316        }
2317    
2318            public IOException getFirstFailureError() {
2319                    return firstFailureError;
2320            }
2321            
2322            protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2323                CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2324                if (cdl != null) {
2325                if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
2326                    LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
2327                    cdl.await(10, TimeUnit.SECONDS);
2328                }
2329                signalInterruptionProcessingComplete();
2330            }
2331        }
2332            
2333            protected void transportInterruptionProcessingComplete() {
2334                CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2335                if (cdl != null) {
2336                    cdl.countDown();
2337                    try {
2338                        signalInterruptionProcessingComplete();
2339                    } catch (InterruptedException ignored) {}
2340                }
2341            }
2342    
2343        private void signalInterruptionProcessingComplete() throws InterruptedException {
2344            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
2345            if (cdl.getCount()==0) {
2346                if (LOG.isDebugEnabled()) {
2347                    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
2348                }
2349                this.transportInterruptionProcessingComplete = null;
2350    
2351                FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2352                if (failoverTransport != null) {
2353                    failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2354                    if (LOG.isDebugEnabled()) {
2355                        LOG.debug("notified failover transport (" + failoverTransport
2356                                + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2357                    }
2358                }
2359    
2360            }
2361        }
2362    
2363        private void signalInterruptionProcessingNeeded() {
2364            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2365            if (failoverTransport != null) {
2366                failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2367                if (LOG.isDebugEnabled()) {
2368                    LOG.debug("notified failover transport (" + failoverTransport
2369                            + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2370                }
2371            }
2372        }
2373    
2374        /*
2375         * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2376         * will wait to receive re dispatched messages.
2377         * default value is 0 so there is no wait by default.
2378         */
2379        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2380            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2381        }
2382        
2383        public long getConsumerFailoverRedeliveryWaitPeriod() {
2384            return consumerFailoverRedeliveryWaitPeriod;
2385        }
2386        
2387        protected Scheduler getScheduler() {
2388            return this.scheduler;
2389        }
2390        
2391        protected ThreadPoolExecutor getExecutor() {
2392            return this.executor;
2393        }
2394    
2395        /**
2396         * @return the checkForDuplicates
2397         */
2398        public boolean isCheckForDuplicates() {
2399            return this.checkForDuplicates;
2400        }
2401    
2402        /**
2403         * @param checkForDuplicates the checkForDuplicates to set
2404         */
2405        public void setCheckForDuplicates(boolean checkForDuplicates) {
2406            this.checkForDuplicates = checkForDuplicates;
2407        }
2408    
2409    }