001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.File;
020    import java.io.InputStream;
021    import java.io.Serializable;
022    import java.net.URL;
023    import java.util.Collections;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    import javax.jms.BytesMessage;
030    import javax.jms.Destination;
031    import javax.jms.IllegalStateException;
032    import javax.jms.InvalidDestinationException;
033    import javax.jms.InvalidSelectorException;
034    import javax.jms.JMSException;
035    import javax.jms.MapMessage;
036    import javax.jms.Message;
037    import javax.jms.MessageConsumer;
038    import javax.jms.MessageListener;
039    import javax.jms.MessageProducer;
040    import javax.jms.ObjectMessage;
041    import javax.jms.Queue;
042    import javax.jms.QueueBrowser;
043    import javax.jms.QueueReceiver;
044    import javax.jms.QueueSender;
045    import javax.jms.QueueSession;
046    import javax.jms.Session;
047    import javax.jms.StreamMessage;
048    import javax.jms.TemporaryQueue;
049    import javax.jms.TemporaryTopic;
050    import javax.jms.TextMessage;
051    import javax.jms.Topic;
052    import javax.jms.TopicPublisher;
053    import javax.jms.TopicSession;
054    import javax.jms.TopicSubscriber;
055    import javax.jms.TransactionRolledBackException;
056    import org.apache.activemq.blob.BlobDownloader;
057    import org.apache.activemq.blob.BlobTransferPolicy;
058    import org.apache.activemq.blob.BlobUploader;
059    import org.apache.activemq.command.ActiveMQBlobMessage;
060    import org.apache.activemq.command.ActiveMQBytesMessage;
061    import org.apache.activemq.command.ActiveMQDestination;
062    import org.apache.activemq.command.ActiveMQMapMessage;
063    import org.apache.activemq.command.ActiveMQMessage;
064    import org.apache.activemq.command.ActiveMQObjectMessage;
065    import org.apache.activemq.command.ActiveMQQueue;
066    import org.apache.activemq.command.ActiveMQStreamMessage;
067    import org.apache.activemq.command.ActiveMQTempDestination;
068    import org.apache.activemq.command.ActiveMQTempQueue;
069    import org.apache.activemq.command.ActiveMQTempTopic;
070    import org.apache.activemq.command.ActiveMQTextMessage;
071    import org.apache.activemq.command.ActiveMQTopic;
072    import org.apache.activemq.command.Command;
073    import org.apache.activemq.command.ConsumerId;
074    import org.apache.activemq.command.MessageAck;
075    import org.apache.activemq.command.MessageDispatch;
076    import org.apache.activemq.command.MessageId;
077    import org.apache.activemq.command.ProducerId;
078    import org.apache.activemq.command.RemoveInfo;
079    import org.apache.activemq.command.Response;
080    import org.apache.activemq.command.SessionId;
081    import org.apache.activemq.command.SessionInfo;
082    import org.apache.activemq.command.TransactionId;
083    import org.apache.activemq.management.JMSSessionStatsImpl;
084    import org.apache.activemq.management.StatsCapable;
085    import org.apache.activemq.management.StatsImpl;
086    import org.apache.activemq.thread.Scheduler;
087    import org.apache.activemq.transaction.Synchronization;
088    import org.apache.activemq.usage.MemoryUsage;
089    import org.apache.activemq.util.Callback;
090    import org.apache.activemq.util.LongSequenceGenerator;
091    import org.slf4j.Logger;
092    import org.slf4j.LoggerFactory;
093    
094    /**
095     * <P>
096     * A <CODE>Session</CODE> object is a single-threaded context for producing
097     * and consuming messages. Although it may allocate provider resources outside
098     * the Java virtual machine (JVM), it is considered a lightweight JMS object.
099     * <P>
100     * A session serves several purposes:
101     * <UL>
102     * <LI>It is a factory for its message producers and consumers.
103     * <LI>It supplies provider-optimized message factories.
104     * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
105     * <CODE>TemporaryQueues</CODE>.
106     * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
107     * objects for those clients that need to dynamically manipulate
108     * provider-specific destination names.
109     * <LI>It supports a single series of transactions that combine work spanning
110     * its producers and consumers into atomic units.
111     * <LI>It defines a serial order for the messages it consumes and the messages
112     * it produces.
113     * <LI>It retains messages it consumes until they have been acknowledged.
114     * <LI>It serializes execution of message listeners registered with its message
115     * consumers.
116     * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
117     * </UL>
118     * <P>
119     * A session can create and service multiple message producers and consumers.
120     * <P>
121     * One typical use is to have a thread block on a synchronous
122     * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
123     * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
124     * <P>
125     * If a client desires to have one thread produce messages while others consume
126     * them, the client should use a separate session for its producing thread.
127     * <P>
128     * Once a connection has been started, any session with one or more registered
129     * message listeners is dedicated to the thread of control that delivers
130     * messages to it. It is erroneous for client code to use this session or any of
131     * its constituent objects from another thread of control. The only exception to
132     * this rule is the use of the session or connection <CODE>close</CODE>
133     * method.
134     * <P>
135     * It should be easy for most clients to partition their work naturally into
136     * sessions. This model allows clients to start simply and incrementally add
137     * message processing complexity as their need for concurrency grows.
138     * <P>
139     * The <CODE>close</CODE> method is the only session method that can be called
140     * while some other session method is being executed in another thread.
141     * <P>
142     * A session may be specified as transacted. Each transacted session supports a
143     * single series of transactions. Each transaction groups a set of message sends
144     * and a set of message receives into an atomic unit of work. In effect,
145     * transactions organize a session's input message stream and output message
146     * stream into series of atomic units. When a transaction commits, its atomic
147     * unit of input is acknowledged and its associated atomic unit of output is
148     * sent. If a transaction rollback is done, the transaction's sent messages are
149     * destroyed and the session's input is automatically recovered.
150     * <P>
151     * The content of a transaction's input and output units is simply those
152     * messages that have been produced and consumed within the session's current
153     * transaction.
154     * <P>
155     * A transaction is completed using either its session's <CODE>commit</CODE>
156     * method or its session's <CODE>rollback </CODE> method. The completion of a
157     * session's current transaction automatically begins the next. The result is
158     * that a transacted session always has a current transaction within which its
159     * work is done.
160     * <P>
161     * The Java Transaction Service (JTS) or some other transaction monitor may be
162     * used to combine a session's transaction with transactions on other resources
163     * (databases, other JMS sessions, etc.). Since Java distributed transactions
164     * are controlled via the Java Transaction API (JTA), use of the session's
165     * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
166     * prohibited.
167     * <P>
168     * The JMS API does not require support for JTA; however, it does define how a
169     * provider supplies this support.
170     * <P>
171     * Although it is also possible for a JMS client to handle distributed
172     * transactions directly, it is unlikely that many JMS clients will do this.
173     * Support for JTA in the JMS API is targeted at systems vendors who will be
174     * integrating the JMS API into their application server products.
175     * 
176     * 
177     * @see javax.jms.Session
178     * @see javax.jms.QueueSession
179     * @see javax.jms.TopicSession
180     * @see javax.jms.XASession
181     */
182    public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
183            
184            /**
185             * Only acknowledge an individual message - using message.acknowledge()
186             * as opposed to CLIENT_ACKNOWLEDGE which 
187             * acknowledges all messages consumed by a session at when acknowledge()
188             * is called
189             */
190        public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
191        public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
192    
193        public static interface DeliveryListener {
194            void beforeDelivery(ActiveMQSession session, Message msg);
195    
196            void afterDelivery(ActiveMQSession session, Message msg);
197        }
198    
199        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
200        private final Scheduler scheduler;
201        private final ThreadPoolExecutor connectionExecutor;
202    
203        protected int acknowledgementMode;
204        protected final ActiveMQConnection connection;
205        protected final SessionInfo info;
206        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
207        protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
208        protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
209        protected final ActiveMQSessionExecutor executor;
210        protected final AtomicBoolean started = new AtomicBoolean(false);
211    
212        protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
213        protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
214    
215        protected boolean closed;
216        private volatile boolean synchronizationRegistered;
217        protected boolean asyncDispatch;
218        protected boolean sessionAsyncDispatch;
219        protected final boolean debug;
220        protected Object sendMutex = new Object();
221    
222        private MessageListener messageListener;
223        private final JMSSessionStatsImpl stats;
224        private TransactionContext transactionContext;
225        private DeliveryListener deliveryListener;
226        private MessageTransformer transformer;
227        private BlobTransferPolicy blobTransferPolicy;
228        private long lastDeliveredSequenceId;
229    
230        /**
231         * Construct the Session
232         * 
233         * @param connection
234         * @param sessionId
235         * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
236         *                Session.SESSION_TRANSACTED
237         * @param asyncDispatch
238         * @param sessionAsyncDispatch
239         * @throws JMSException on internal error
240         */
241        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
242            this.debug = LOG.isDebugEnabled();
243            this.connection = connection;
244            this.acknowledgementMode = acknowledgeMode;
245            this.asyncDispatch = asyncDispatch;
246            this.sessionAsyncDispatch = sessionAsyncDispatch;
247            this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
248            setTransactionContext(new TransactionContext(connection));
249            stats = new JMSSessionStatsImpl(producers, consumers);
250            this.connection.asyncSendPacket(info);
251            setTransformer(connection.getTransformer());
252            setBlobTransferPolicy(connection.getBlobTransferPolicy());
253            this.scheduler=connection.getScheduler();
254            this.connectionExecutor=connection.getExecutor();
255            this.executor = new ActiveMQSessionExecutor(this);
256            connection.addSession(this);        
257            if (connection.isStarted()) {
258                start();
259            }
260    
261        }
262    
263        protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
264            this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
265        }
266    
267        /**
268         * Sets the transaction context of the session.
269         * 
270         * @param transactionContext - provides the means to control a JMS
271         *                transaction.
272         */
273        public void setTransactionContext(TransactionContext transactionContext) {
274            this.transactionContext = transactionContext;
275        }
276    
277        /**
278         * Returns the transaction context of the session.
279         * 
280         * @return transactionContext - session's transaction context.
281         */
282        public TransactionContext getTransactionContext() {
283            return transactionContext;
284        }
285    
286        /*
287         * (non-Javadoc)
288         * 
289         * @see org.apache.activemq.management.StatsCapable#getStats()
290         */
291        public StatsImpl getStats() {
292            return stats;
293        }
294    
295        /**
296         * Returns the session's statistics.
297         * 
298         * @return stats - session's statistics.
299         */
300        public JMSSessionStatsImpl getSessionStats() {
301            return stats;
302        }
303    
304        /**
305         * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
306         * object is used to send a message containing a stream of uninterpreted
307         * bytes.
308         * 
309         * @return the an ActiveMQBytesMessage
310         * @throws JMSException if the JMS provider fails to create this message due
311         *                 to some internal error.
312         */
313        public BytesMessage createBytesMessage() throws JMSException {
314            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
315            configureMessage(message);
316            return message;
317        }
318    
319        /**
320         * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
321         * object is used to send a self-defining set of name-value pairs, where
322         * names are <CODE>String</CODE> objects and values are primitive values
323         * in the Java programming language.
324         * 
325         * @return an ActiveMQMapMessage
326         * @throws JMSException if the JMS provider fails to create this message due
327         *                 to some internal error.
328         */
329        public MapMessage createMapMessage() throws JMSException {
330            ActiveMQMapMessage message = new ActiveMQMapMessage();
331            configureMessage(message);
332            return message;
333        }
334    
335        /**
336         * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
337         * interface is the root interface of all JMS messages. A
338         * <CODE>Message</CODE> object holds all the standard message header
339         * information. It can be sent when a message containing only header
340         * information is sufficient.
341         * 
342         * @return an ActiveMQMessage
343         * @throws JMSException if the JMS provider fails to create this message due
344         *                 to some internal error.
345         */
346        public Message createMessage() throws JMSException {
347            ActiveMQMessage message = new ActiveMQMessage();
348            configureMessage(message);
349            return message;
350        }
351    
352        /**
353         * Creates an <CODE>ObjectMessage</CODE> object. An
354         * <CODE>ObjectMessage</CODE> object is used to send a message that
355         * contains a serializable Java object.
356         * 
357         * @return an ActiveMQObjectMessage
358         * @throws JMSException if the JMS provider fails to create this message due
359         *                 to some internal error.
360         */
361        public ObjectMessage createObjectMessage() throws JMSException {
362            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
363            configureMessage(message);
364            return message;
365        }
366    
367        /**
368         * Creates an initialized <CODE>ObjectMessage</CODE> object. An
369         * <CODE>ObjectMessage</CODE> object is used to send a message that
370         * contains a serializable Java object.
371         * 
372         * @param object the object to use to initialize this message
373         * @return an ActiveMQObjectMessage
374         * @throws JMSException if the JMS provider fails to create this message due
375         *                 to some internal error.
376         */
377        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
378            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
379            configureMessage(message);
380            message.setObject(object);
381            return message;
382        }
383    
384        /**
385         * Creates a <CODE>StreamMessage</CODE> object. A
386         * <CODE>StreamMessage</CODE> object is used to send a self-defining
387         * stream of primitive values in the Java programming language.
388         * 
389         * @return an ActiveMQStreamMessage
390         * @throws JMSException if the JMS provider fails to create this message due
391         *                 to some internal error.
392         */
393        public StreamMessage createStreamMessage() throws JMSException {
394            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
395            configureMessage(message);
396            return message;
397        }
398    
399        /**
400         * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
401         * object is used to send a message containing a <CODE>String</CODE>
402         * object.
403         * 
404         * @return an ActiveMQTextMessage
405         * @throws JMSException if the JMS provider fails to create this message due
406         *                 to some internal error.
407         */
408        public TextMessage createTextMessage() throws JMSException {
409            ActiveMQTextMessage message = new ActiveMQTextMessage();
410            configureMessage(message);
411            return message;
412        }
413    
414        /**
415         * Creates an initialized <CODE>TextMessage</CODE> object. A
416         * <CODE>TextMessage</CODE> object is used to send a message containing a
417         * <CODE>String</CODE>.
418         * 
419         * @param text the string used to initialize this message
420         * @return an ActiveMQTextMessage
421         * @throws JMSException if the JMS provider fails to create this message due
422         *                 to some internal error.
423         */
424        public TextMessage createTextMessage(String text) throws JMSException {
425            ActiveMQTextMessage message = new ActiveMQTextMessage();
426            message.setText(text);
427            configureMessage(message);
428            return message;
429        }
430    
431        /**
432         * Creates an initialized <CODE>BlobMessage</CODE> object. A
433         * <CODE>BlobMessage</CODE> object is used to send a message containing a
434         * <CODE>URL</CODE> which points to some network addressible BLOB.
435         * 
436         * @param url the network addressable URL used to pass directly to the
437         *                consumer
438         * @return a BlobMessage
439         * @throws JMSException if the JMS provider fails to create this message due
440         *                 to some internal error.
441         */
442        public BlobMessage createBlobMessage(URL url) throws JMSException {
443            return createBlobMessage(url, false);
444        }
445    
446        /**
447         * Creates an initialized <CODE>BlobMessage</CODE> object. A
448         * <CODE>BlobMessage</CODE> object is used to send a message containing a
449         * <CODE>URL</CODE> which points to some network addressible BLOB.
450         * 
451         * @param url the network addressable URL used to pass directly to the
452         *                consumer
453         * @param deletedByBroker indicates whether or not the resource is deleted
454         *                by the broker when the message is acknowledged
455         * @return a BlobMessage
456         * @throws JMSException if the JMS provider fails to create this message due
457         *                 to some internal error.
458         */
459        public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
460            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
461            configureMessage(message);
462            message.setURL(url);
463            message.setDeletedByBroker(deletedByBroker);
464            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
465            return message;
466        }
467    
468        /**
469         * Creates an initialized <CODE>BlobMessage</CODE> object. A
470         * <CODE>BlobMessage</CODE> object is used to send a message containing
471         * the <CODE>File</CODE> content. Before the message is sent the file
472         * conent will be uploaded to the broker or some other remote repository
473         * depending on the {@link #getBlobTransferPolicy()}.
474         * 
475         * @param file the file to be uploaded to some remote repo (or the broker)
476         *                depending on the strategy
477         * @return a BlobMessage
478         * @throws JMSException if the JMS provider fails to create this message due
479         *                 to some internal error.
480         */
481        public BlobMessage createBlobMessage(File file) throws JMSException {
482            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
483            configureMessage(message);
484            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
485            message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
486            message.setDeletedByBroker(true);
487            message.setName(file.getName());
488            return message;
489        }
490    
491        /**
492         * Creates an initialized <CODE>BlobMessage</CODE> object. A
493         * <CODE>BlobMessage</CODE> object is used to send a message containing
494         * the <CODE>File</CODE> content. Before the message is sent the file
495         * conent will be uploaded to the broker or some other remote repository
496         * depending on the {@link #getBlobTransferPolicy()}.
497         * 
498         * @param in the stream to be uploaded to some remote repo (or the broker)
499         *                depending on the strategy
500         * @return a BlobMessage
501         * @throws JMSException if the JMS provider fails to create this message due
502         *                 to some internal error.
503         */
504        public BlobMessage createBlobMessage(InputStream in) throws JMSException {
505            ActiveMQBlobMessage message = new ActiveMQBlobMessage();
506            configureMessage(message);
507            message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
508            message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
509            message.setDeletedByBroker(true);
510            return message;
511        }
512    
513        /**
514         * Indicates whether the session is in transacted mode.
515         * 
516         * @return true if the session is in transacted mode
517         * @throws JMSException if there is some internal error.
518         */
519        public boolean getTransacted() throws JMSException {
520            checkClosed();
521            return isTransacted();
522        }
523    
524        /**
525         * Returns the acknowledgement mode of the session. The acknowledgement mode
526         * is set at the time that the session is created. If the session is
527         * transacted, the acknowledgement mode is ignored.
528         * 
529         * @return If the session is not transacted, returns the current
530         *         acknowledgement mode for the session. If the session is
531         *         transacted, returns SESSION_TRANSACTED.
532         * @throws JMSException
533         * @see javax.jms.Connection#createSession(boolean,int)
534         * @since 1.1 exception JMSException if there is some internal error.
535         */
536        public int getAcknowledgeMode() throws JMSException {
537            checkClosed();
538            return this.acknowledgementMode;
539        }
540    
541        /**
542         * Commits all messages done in this transaction and releases any locks
543         * currently held.
544         * 
545         * @throws JMSException if the JMS provider fails to commit the transaction
546         *                 due to some internal error.
547         * @throws TransactionRolledBackException if the transaction is rolled back
548         *                 due to some internal error during commit.
549         * @throws javax.jms.IllegalStateException if the method is not called by a
550         *                 transacted session.
551         */
552        public void commit() throws JMSException {
553            checkClosed();
554            if (!getTransacted()) {
555                throw new javax.jms.IllegalStateException("Not a transacted session");
556            }
557            if (LOG.isDebugEnabled()) {
558                LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
559            }
560            transactionContext.commit();
561        }
562    
563        /**
564         * Rolls back any messages done in this transaction and releases any locks
565         * currently held.
566         * 
567         * @throws JMSException if the JMS provider fails to roll back the
568         *                 transaction due to some internal error.
569         * @throws javax.jms.IllegalStateException if the method is not called by a
570         *                 transacted session.
571         */
572        public void rollback() throws JMSException {
573            checkClosed();
574            if (!getTransacted()) {
575                throw new javax.jms.IllegalStateException("Not a transacted session");
576            }
577            if (LOG.isDebugEnabled()) {
578                LOG.debug(getSessionId() + " Transaction Rollback");
579            }
580            transactionContext.rollback();
581        }
582    
583        /**
584         * Closes the session.
585         * <P>
586         * Since a provider may allocate some resources on behalf of a session
587         * outside the JVM, clients should close the resources when they are not
588         * needed. Relying on garbage collection to eventually reclaim these
589         * resources may not be timely enough.
590         * <P>
591         * There is no need to close the producers and consumers of a closed
592         * session.
593         * <P>
594         * This call will block until a <CODE>receive</CODE> call or message
595         * listener in progress has completed. A blocked message consumer
596         * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
597         * is closed.
598         * <P>
599         * Closing a transacted session must roll back the transaction in progress.
600         * <P>
601         * This method is the only <CODE>Session</CODE> method that can be called
602         * concurrently.
603         * <P>
604         * Invoking any other <CODE>Session</CODE> method on a closed session must
605         * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
606         * closed session must <I>not </I> throw an exception.
607         * 
608         * @throws JMSException if the JMS provider fails to close the session due
609         *                 to some internal error.
610         */
611        public void close() throws JMSException {
612            if (!closed) {
613                if (getTransactionContext().isInXATransaction()) {
614                    if (!synchronizationRegistered) {
615                        synchronizationRegistered = true;
616                        getTransactionContext().addSynchronization(new Synchronization() {
617    
618                                            @Override
619                                            public void afterCommit() throws Exception {
620                                                doClose();
621                                                synchronizationRegistered = false;
622                                            }
623    
624                                            @Override
625                                            public void afterRollback() throws Exception {
626                                                doClose();
627                                                synchronizationRegistered = false;
628                                            }
629                                        });
630                    }
631    
632                } else {
633                    doClose();
634                }
635            }
636        }
637    
638        private void doClose() throws JMSException {
639            dispose();
640            RemoveInfo removeCommand = info.createRemoveCommand();
641            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
642            connection.asyncSendPacket(removeCommand);
643        }
644    
645        void clearMessagesInProgress() {
646            executor.clearMessagesInProgress();        
647            // we are called from inside the transport reconnection logic
648            // which involves us clearing all the connections' consumers
649            // dispatch and delivered lists. So rather than trying to 
650            // grab a mutex (which could be already owned by the message 
651            // listener calling the send or an ack) we allow it to complete in 
652            // a separate thread via the scheduler and notify us via 
653            // connection.transportInterruptionProcessingComplete()
654            //
655            for (final ActiveMQMessageConsumer consumer : consumers) {
656                consumer.inProgressClearRequired();
657                scheduler.executeAfterDelay(new Runnable() {
658                    public void run() {
659                        consumer.clearMessagesInProgress();
660                    }}, 0l);
661            }
662        }
663    
664        void deliverAcks() {
665            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
666                ActiveMQMessageConsumer consumer = iter.next();
667                consumer.deliverAcks();
668            }
669        }
670    
671        public synchronized void dispose() throws JMSException {
672            if (!closed) {
673    
674                try {
675                    executor.stop();
676    
677                    for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
678                        ActiveMQMessageConsumer consumer = iter.next();
679                        consumer.setFailureError(connection.getFirstFailureError());
680                        consumer.dispose();
681                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
682                    }
683                    consumers.clear();
684    
685                    for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
686                        ActiveMQMessageProducer producer = iter.next();
687                        producer.dispose();
688                    }
689                    producers.clear();
690    
691                    try {
692                        if (getTransactionContext().isInLocalTransaction()) {
693                            rollback();
694                        }
695                    } catch (JMSException e) {
696                    }
697    
698                } finally {
699                    connection.removeSession(this);
700                    this.transactionContext = null;
701                    closed = true;
702                }
703            }
704        }
705    
706        /**
707         * Checks that the session is not closed then configures the message
708         */
709        protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
710            checkClosed();
711            message.setConnection(connection);
712        }
713    
714        /**
715         * Check if the session is closed. It is used for ensuring that the session
716         * is open before performing various operations.
717         * 
718         * @throws IllegalStateException if the Session is closed
719         */
720        protected void checkClosed() throws IllegalStateException {
721            if (closed) {
722                throw new IllegalStateException("The Session is closed");
723            }
724        }
725    
726        /**
727         * Stops message delivery in this session, and restarts message delivery
728         * with the oldest unacknowledged message.
729         * <P>
730         * All consumers deliver messages in a serial order. Acknowledging a
731         * received message automatically acknowledges all messages that have been
732         * delivered to the client.
733         * <P>
734         * Restarting a session causes it to take the following actions:
735         * <UL>
736         * <LI>Stop message delivery
737         * <LI>Mark all messages that might have been delivered but not
738         * acknowledged as "redelivered"
739         * <LI>Restart the delivery sequence including all unacknowledged messages
740         * that had been previously delivered. Redelivered messages do not have to
741         * be delivered in exactly their original delivery order.
742         * </UL>
743         * 
744         * @throws JMSException if the JMS provider fails to stop and restart
745         *                 message delivery due to some internal error.
746         * @throws IllegalStateException if the method is called by a transacted
747         *                 session.
748         */
749        public void recover() throws JMSException {
750    
751            checkClosed();
752            if (getTransacted()) {
753                throw new IllegalStateException("This session is transacted");
754            }
755    
756            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
757                ActiveMQMessageConsumer c = iter.next();
758                c.rollback();
759            }
760    
761        }
762    
763        /**
764         * Returns the session's distinguished message listener (optional).
765         * 
766         * @return the message listener associated with this session
767         * @throws JMSException if the JMS provider fails to get the message
768         *                 listener due to an internal error.
769         * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
770         * @see javax.jms.ServerSessionPool
771         * @see javax.jms.ServerSession
772         */
773        public MessageListener getMessageListener() throws JMSException {
774            checkClosed();
775            return this.messageListener;
776        }
777    
778        /**
779         * Sets the session's distinguished message listener (optional).
780         * <P>
781         * When the distinguished message listener is set, no other form of message
782         * receipt in the session can be used; however, all forms of sending
783         * messages are still supported.
784         * <P>
785         * This is an expert facility not used by regular JMS clients.
786         * 
787         * @param listener the message listener to associate with this session
788         * @throws JMSException if the JMS provider fails to set the message
789         *                 listener due to an internal error.
790         * @see javax.jms.Session#getMessageListener()
791         * @see javax.jms.ServerSessionPool
792         * @see javax.jms.ServerSession
793         */
794        public void setMessageListener(MessageListener listener) throws JMSException {
795            checkClosed();
796            this.messageListener = listener;
797    
798            if (listener != null) {
799                executor.setDispatchedBySessionPool(true);
800            }
801        }
802    
803        /**
804         * Optional operation, intended to be used only by Application Servers, not
805         * by ordinary JMS clients.
806         * 
807         * @see javax.jms.ServerSession
808         */
809        public void run() {
810            MessageDispatch messageDispatch;
811            while ((messageDispatch = executor.dequeueNoWait()) != null) {
812                final MessageDispatch md = messageDispatch;
813                ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
814                if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) {
815                    // TODO: Ack it without delivery to client
816                    continue;
817                }
818    
819                if (isClientAcknowledge()||isIndividualAcknowledge()) {
820                    message.setAcknowledgeCallback(new Callback() {
821                        public void execute() throws Exception {
822                        }
823                    });
824                }
825    
826                if (deliveryListener != null) {
827                    deliveryListener.beforeDelivery(this, message);
828                }
829    
830                md.setDeliverySequenceId(getNextDeliveryId());
831    
832                try {
833                    messageListener.onMessage(message);
834                } catch (RuntimeException e) {
835                    LOG.error("error dispatching message: ", e);
836                    // A problem while invoking the MessageListener does not
837                    // in general indicate a problem with the connection to the broker, i.e.
838                    // it will usually be sufficient to let the afterDelivery() method either
839                    // commit or roll back in order to deal with the exception.
840                    // However, we notify any registered client internal exception listener
841                    // of the problem.
842                    connection.onClientInternalException(e);
843                }
844    
845                try {
846                    MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
847                    ack.setFirstMessageId(md.getMessage().getMessageId());
848                    doStartTransaction();
849                    ack.setTransactionId(getTransactionContext().getTransactionId());
850                    if (ack.getTransactionId() != null) {
851                        getTransactionContext().addSynchronization(new Synchronization() {
852    
853                            @Override
854                            public void afterRollback() throws Exception {
855                                md.getMessage().onMessageRolledBack();
856                                // ensure we don't filter this as a duplicate
857                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
858                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
859                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
860                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
861                                    && redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
862                                    // We need to NACK the messages so that they get
863                                    // sent to the
864                                    // DLQ.
865                                    // Acknowledge the last message.
866                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
867                                    ack.setFirstMessageId(md.getMessage().getMessageId());
868                                    asyncSendPacket(ack);
869                                } else {
870                                    
871                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
872                                    ack.setFirstMessageId(md.getMessage().getMessageId());
873                                    asyncSendPacket(ack);
874    
875                                    // Figure out how long we should wait to resend
876                                    // this message.
877                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
878                                    for (int i = 0; i < redeliveryCounter; i++) {
879                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
880                                    }
881                                    scheduler.executeAfterDelay(new Runnable() {
882    
883                                        public void run() {
884                                            ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
885                                        }
886                                    }, redeliveryDelay);
887                                }
888                            }
889                        });
890                    }
891                    asyncSendPacket(ack);
892                } catch (Throwable e) {
893                    connection.onClientInternalException(e);
894                }
895    
896                if (deliveryListener != null) {
897                    deliveryListener.afterDelivery(this, message);
898                }
899            }
900        }
901    
902        /**
903         * Creates a <CODE>MessageProducer</CODE> to send messages to the
904         * specified destination.
905         * <P>
906         * A client uses a <CODE>MessageProducer</CODE> object to send messages to
907         * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
908         * inherit from <CODE>Destination</CODE>, they can be used in the
909         * destination parameter to create a <CODE>MessageProducer</CODE> object.
910         * 
911         * @param destination the <CODE>Destination</CODE> to send to, or null if
912         *                this is a producer which does not have a specified
913         *                destination.
914         * @return the MessageProducer
915         * @throws JMSException if the session fails to create a MessageProducer due
916         *                 to some internal error.
917         * @throws InvalidDestinationException if an invalid destination is
918         *                 specified.
919         * @since 1.1
920         */
921        public MessageProducer createProducer(Destination destination) throws JMSException {
922            checkClosed();
923            if (destination instanceof CustomDestination) {
924                CustomDestination customDestination = (CustomDestination)destination;
925                return customDestination.createProducer(this);
926            }
927            int timeSendOut = connection.getSendTimeout();
928            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
929        }
930    
931        /**
932         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
933         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
934         * <CODE>Destination</CODE>, they can be used in the destination
935         * parameter to create a <CODE>MessageConsumer</CODE>.
936         * 
937         * @param destination the <CODE>Destination</CODE> to access.
938         * @return the MessageConsumer
939         * @throws JMSException if the session fails to create a consumer due to
940         *                 some internal error.
941         * @throws InvalidDestinationException if an invalid destination is
942         *                 specified.
943         * @since 1.1
944         */
945        public MessageConsumer createConsumer(Destination destination) throws JMSException {
946            return createConsumer(destination, (String) null);
947        }
948    
949        /**
950         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
951         * using a message selector. Since <CODE> Queue</CODE> and
952         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
953         * can be used in the destination parameter to create a
954         * <CODE>MessageConsumer</CODE>.
955         * <P>
956         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
957         * that have been sent to a destination.
958         * 
959         * @param destination the <CODE>Destination</CODE> to access
960         * @param messageSelector only messages with properties matching the message
961         *                selector expression are delivered. A value of null or an
962         *                empty string indicates that there is no message selector
963         *                for the message consumer.
964         * @return the MessageConsumer
965         * @throws JMSException if the session fails to create a MessageConsumer due
966         *                 to some internal error.
967         * @throws InvalidDestinationException if an invalid destination is
968         *                 specified.
969         * @throws InvalidSelectorException if the message selector is invalid.
970         * @since 1.1
971         */
972        public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
973            return createConsumer(destination, messageSelector, false);
974        }
975    
976        /**
977         * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
978         * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
979         * <CODE>Destination</CODE>, they can be used in the destination
980         * parameter to create a <CODE>MessageConsumer</CODE>.
981         *
982         * @param destination the <CODE>Destination</CODE> to access.
983         * @param messageListener the listener to use for async consumption of messages
984         * @return the MessageConsumer
985         * @throws JMSException if the session fails to create a consumer due to
986         *                 some internal error.
987         * @throws InvalidDestinationException if an invalid destination is
988         *                 specified.
989         * @since 1.1
990         */
991        public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
992            return createConsumer(destination, null, messageListener);
993        }
994    
995        /**
996         * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
997         * using a message selector. Since <CODE> Queue</CODE> and
998         * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
999         * can be used in the destination parameter to create a
1000         * <CODE>MessageConsumer</CODE>.
1001         * <P>
1002         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1003         * that have been sent to a destination.
1004         *
1005         * @param destination the <CODE>Destination</CODE> to access
1006         * @param messageSelector only messages with properties matching the message
1007         *                selector expression are delivered. A value of null or an
1008         *                empty string indicates that there is no message selector
1009         *                for the message consumer.
1010         * @param messageListener the listener to use for async consumption of messages
1011         * @return the MessageConsumer
1012         * @throws JMSException if the session fails to create a MessageConsumer due
1013         *                 to some internal error.
1014         * @throws InvalidDestinationException if an invalid destination is
1015         *                 specified.
1016         * @throws InvalidSelectorException if the message selector is invalid.
1017         * @since 1.1
1018         */
1019        public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1020            return createConsumer(destination, messageSelector, false, messageListener);
1021        }
1022    
1023        /**
1024         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1025         * using a message selector. This method can specify whether messages
1026         * published by its own connection should be delivered to it, if the
1027         * destination is a topic.
1028         * <P>
1029         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1030         * <CODE>Destination</CODE>, they can be used in the destination
1031         * parameter to create a <CODE>MessageConsumer</CODE>.
1032         * <P>
1033         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1034         * that have been published to a destination.
1035         * <P>
1036         * In some cases, a connection may both publish and subscribe to a topic.
1037         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1038         * inhibit the delivery of messages published by its own connection. The
1039         * default value for this attribute is False. The <CODE>noLocal</CODE>
1040         * value must be supported by destinations that are topics.
1041         * 
1042         * @param destination the <CODE>Destination</CODE> to access
1043         * @param messageSelector only messages with properties matching the message
1044         *                selector expression are delivered. A value of null or an
1045         *                empty string indicates that there is no message selector
1046         *                for the message consumer.
1047         * @param noLocal - if true, and the destination is a topic, inhibits the
1048         *                delivery of messages published by its own connection. The
1049         *                behavior for <CODE>NoLocal</CODE> is not specified if
1050         *                the destination is a queue.
1051         * @return the MessageConsumer
1052         * @throws JMSException if the session fails to create a MessageConsumer due
1053         *                 to some internal error.
1054         * @throws InvalidDestinationException if an invalid destination is
1055         *                 specified.
1056         * @throws InvalidSelectorException if the message selector is invalid.
1057         * @since 1.1
1058         */
1059        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1060            return createConsumer(destination, messageSelector, noLocal, null);
1061        }
1062    
1063        /**
1064         * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1065         * using a message selector. This method can specify whether messages
1066         * published by its own connection should be delivered to it, if the
1067         * destination is a topic.
1068         * <P>
1069         * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1070         * <CODE>Destination</CODE>, they can be used in the destination
1071         * parameter to create a <CODE>MessageConsumer</CODE>.
1072         * <P>
1073         * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1074         * that have been published to a destination.
1075         * <P>
1076         * In some cases, a connection may both publish and subscribe to a topic.
1077         * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1078         * inhibit the delivery of messages published by its own connection. The
1079         * default value for this attribute is False. The <CODE>noLocal</CODE>
1080         * value must be supported by destinations that are topics.
1081         *
1082         * @param destination the <CODE>Destination</CODE> to access
1083         * @param messageSelector only messages with properties matching the message
1084         *                selector expression are delivered. A value of null or an
1085         *                empty string indicates that there is no message selector
1086         *                for the message consumer.
1087         * @param noLocal - if true, and the destination is a topic, inhibits the
1088         *                delivery of messages published by its own connection. The
1089         *                behavior for <CODE>NoLocal</CODE> is not specified if
1090         *                the destination is a queue.
1091         * @param messageListener the listener to use for async consumption of messages
1092         * @return the MessageConsumer
1093         * @throws JMSException if the session fails to create a MessageConsumer due
1094         *                 to some internal error.
1095         * @throws InvalidDestinationException if an invalid destination is
1096         *                 specified.
1097         * @throws InvalidSelectorException if the message selector is invalid.
1098         * @since 1.1
1099         */
1100        public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1101            checkClosed();
1102    
1103            if (destination instanceof CustomDestination) {
1104                CustomDestination customDestination = (CustomDestination)destination;
1105                return customDestination.createConsumer(this, messageSelector, noLocal);
1106            }
1107    
1108            ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1109            int prefetch = 0;
1110            if (destination instanceof Topic) {
1111                prefetch = prefetchPolicy.getTopicPrefetch();
1112            } else {
1113                prefetch = prefetchPolicy.getQueuePrefetch();
1114            }
1115            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1116            return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1117                    prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1118        }
1119    
1120        /**
1121         * Creates a queue identity given a <CODE>Queue</CODE> name.
1122         * <P>
1123         * This facility is provided for the rare cases where clients need to
1124         * dynamically manipulate queue identity. It allows the creation of a queue
1125         * identity with a provider-specific name. Clients that depend on this
1126         * ability are not portable.
1127         * <P>
1128         * Note that this method is not for creating the physical queue. The
1129         * physical creation of queues is an administrative task and is not to be
1130         * initiated by the JMS API. The one exception is the creation of temporary
1131         * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1132         * method.
1133         * 
1134         * @param queueName the name of this <CODE>Queue</CODE>
1135         * @return a <CODE>Queue</CODE> with the given name
1136         * @throws JMSException if the session fails to create a queue due to some
1137         *                 internal error.
1138         * @since 1.1
1139         */
1140        public Queue createQueue(String queueName) throws JMSException {
1141            checkClosed();
1142            if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1143                return new ActiveMQTempQueue(queueName);
1144            }
1145            return new ActiveMQQueue(queueName);
1146        }
1147    
1148        /**
1149         * Creates a topic identity given a <CODE>Topic</CODE> name.
1150         * <P>
1151         * This facility is provided for the rare cases where clients need to
1152         * dynamically manipulate topic identity. This allows the creation of a
1153         * topic identity with a provider-specific name. Clients that depend on this
1154         * ability are not portable.
1155         * <P>
1156         * Note that this method is not for creating the physical topic. The
1157         * physical creation of topics is an administrative task and is not to be
1158         * initiated by the JMS API. The one exception is the creation of temporary
1159         * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1160         * method.
1161         * 
1162         * @param topicName the name of this <CODE>Topic</CODE>
1163         * @return a <CODE>Topic</CODE> with the given name
1164         * @throws JMSException if the session fails to create a topic due to some
1165         *                 internal error.
1166         * @since 1.1
1167         */
1168        public Topic createTopic(String topicName) throws JMSException {
1169            checkClosed();
1170            if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1171                return new ActiveMQTempTopic(topicName);
1172            }
1173            return new ActiveMQTopic(topicName);
1174        }
1175    
1176        /**
1177         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1178         * the specified queue.
1179         * 
1180         * @param queue the <CODE>queue</CODE> to access
1181         * @exception InvalidDestinationException if an invalid destination is
1182         *                    specified
1183         * @since 1.1
1184         */
1185        /**
1186         * Creates a durable subscriber to the specified topic.
1187         * <P>
1188         * If a client needs to receive all the messages published on a topic,
1189         * including the ones published while the subscriber is inactive, it uses a
1190         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1191         * record of this durable subscription and insures that all messages from
1192         * the topic's publishers are retained until they are acknowledged by this
1193         * durable subscriber or they have expired.
1194         * <P>
1195         * Sessions with durable subscribers must always provide the same client
1196         * identifier. In addition, each client must specify a name that uniquely
1197         * identifies (within client identifier) each durable subscription it
1198         * creates. Only one session at a time can have a
1199         * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1200         * <P>
1201         * A client can change an existing durable subscription by creating a
1202         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1203         * and/or message selector. Changing a durable subscriber is equivalent to
1204         * unsubscribing (deleting) the old one and creating a new one.
1205         * <P>
1206         * In some cases, a connection may both publish and subscribe to a topic.
1207         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1208         * inhibit the delivery of messages published by its own connection. The
1209         * default value for this attribute is false.
1210         * 
1211         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1212         * @param name the name used to identify this subscription
1213         * @return the TopicSubscriber
1214         * @throws JMSException if the session fails to create a subscriber due to
1215         *                 some internal error.
1216         * @throws InvalidDestinationException if an invalid topic is specified.
1217         * @since 1.1
1218         */
1219        public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1220            checkClosed();
1221            return createDurableSubscriber(topic, name, null, false);
1222        }
1223    
1224        /**
1225         * Creates a durable subscriber to the specified topic, using a message
1226         * selector and specifying whether messages published by its own connection
1227         * should be delivered to it.
1228         * <P>
1229         * If a client needs to receive all the messages published on a topic,
1230         * including the ones published while the subscriber is inactive, it uses a
1231         * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1232         * record of this durable subscription and insures that all messages from
1233         * the topic's publishers are retained until they are acknowledged by this
1234         * durable subscriber or they have expired.
1235         * <P>
1236         * Sessions with durable subscribers must always provide the same client
1237         * identifier. In addition, each client must specify a name which uniquely
1238         * identifies (within client identifier) each durable subscription it
1239         * creates. Only one session at a time can have a
1240         * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1241         * inactive durable subscriber is one that exists but does not currently
1242         * have a message consumer associated with it.
1243         * <P>
1244         * A client can change an existing durable subscription by creating a
1245         * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1246         * and/or message selector. Changing a durable subscriber is equivalent to
1247         * unsubscribing (deleting) the old one and creating a new one.
1248         * 
1249         * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1250         * @param name the name used to identify this subscription
1251         * @param messageSelector only messages with properties matching the message
1252         *                selector expression are delivered. A value of null or an
1253         *                empty string indicates that there is no message selector
1254         *                for the message consumer.
1255         * @param noLocal if set, inhibits the delivery of messages published by its
1256         *                own connection
1257         * @return the Queue Browser
1258         * @throws JMSException if the session fails to create a subscriber due to
1259         *                 some internal error.
1260         * @throws InvalidDestinationException if an invalid topic is specified.
1261         * @throws InvalidSelectorException if the message selector is invalid.
1262         * @since 1.1
1263         */
1264        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1265            checkClosed();
1266    
1267            if (topic instanceof CustomDestination) {
1268                CustomDestination customDestination = (CustomDestination)topic;
1269                return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1270            }
1271    
1272            connection.checkClientIDWasManuallySpecified();
1273            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1274            int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1275            int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1276            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1277                                               noLocal, false, asyncDispatch);
1278        }
1279    
1280        /**
1281         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1282         * the specified queue.
1283         * 
1284         * @param queue the <CODE>queue</CODE> to access
1285         * @return the Queue Browser
1286         * @throws JMSException if the session fails to create a browser due to some
1287         *                 internal error.
1288         * @throws InvalidDestinationException if an invalid destination is
1289         *                 specified
1290         * @since 1.1
1291         */
1292        public QueueBrowser createBrowser(Queue queue) throws JMSException {
1293            checkClosed();
1294            return createBrowser(queue, null);
1295        }
1296    
1297        /**
1298         * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1299         * the specified queue using a message selector.
1300         * 
1301         * @param queue the <CODE>queue</CODE> to access
1302         * @param messageSelector only messages with properties matching the message
1303         *                selector expression are delivered. A value of null or an
1304         *                empty string indicates that there is no message selector
1305         *                for the message consumer.
1306         * @return the Queue Browser
1307         * @throws JMSException if the session fails to create a browser due to some
1308         *                 internal error.
1309         * @throws InvalidDestinationException if an invalid destination is
1310         *                 specified
1311         * @throws InvalidSelectorException if the message selector is invalid.
1312         * @since 1.1
1313         */
1314        public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1315            checkClosed();
1316            return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1317        }
1318    
1319        /**
1320         * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1321         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1322         * 
1323         * @return a temporary queue identity
1324         * @throws JMSException if the session fails to create a temporary queue due
1325         *                 to some internal error.
1326         * @since 1.1
1327         */
1328        public TemporaryQueue createTemporaryQueue() throws JMSException {
1329            checkClosed();
1330            return (TemporaryQueue)connection.createTempDestination(false);
1331        }
1332    
1333        /**
1334         * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1335         * of the <CODE>Connection</CODE> unless it is deleted earlier.
1336         * 
1337         * @return a temporary topic identity
1338         * @throws JMSException if the session fails to create a temporary topic due
1339         *                 to some internal error.
1340         * @since 1.1
1341         */
1342        public TemporaryTopic createTemporaryTopic() throws JMSException {
1343            checkClosed();
1344            return (TemporaryTopic)connection.createTempDestination(true);
1345        }
1346    
1347        /**
1348         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1349         * the specified queue.
1350         * 
1351         * @param queue the <CODE>Queue</CODE> to access
1352         * @return
1353         * @throws JMSException if the session fails to create a receiver due to
1354         *                 some internal error.
1355         * @throws JMSException
1356         * @throws InvalidDestinationException if an invalid queue is specified.
1357         */
1358        public QueueReceiver createReceiver(Queue queue) throws JMSException {
1359            checkClosed();
1360            return createReceiver(queue, null);
1361        }
1362    
1363        /**
1364         * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1365         * the specified queue using a message selector.
1366         * 
1367         * @param queue the <CODE>Queue</CODE> to access
1368         * @param messageSelector only messages with properties matching the message
1369         *                selector expression are delivered. A value of null or an
1370         *                empty string indicates that there is no message selector
1371         *                for the message consumer.
1372         * @return QueueReceiver
1373         * @throws JMSException if the session fails to create a receiver due to
1374         *                 some internal error.
1375         * @throws InvalidDestinationException if an invalid queue is specified.
1376         * @throws InvalidSelectorException if the message selector is invalid.
1377         */
1378        public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1379            checkClosed();
1380    
1381            if (queue instanceof CustomDestination) {
1382                CustomDestination customDestination = (CustomDestination)queue;
1383                return customDestination.createReceiver(this, messageSelector);
1384            }
1385    
1386            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1387            return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1388                                             prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1389        }
1390    
1391        /**
1392         * Creates a <CODE>QueueSender</CODE> object to send messages to the
1393         * specified queue.
1394         * 
1395         * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1396         *                unidentified producer
1397         * @return QueueSender
1398         * @throws JMSException if the session fails to create a sender due to some
1399         *                 internal error.
1400         * @throws InvalidDestinationException if an invalid queue is specified.
1401         */
1402        public QueueSender createSender(Queue queue) throws JMSException {
1403            checkClosed();
1404            if (queue instanceof CustomDestination) {
1405                CustomDestination customDestination = (CustomDestination)queue;
1406                return customDestination.createSender(this);
1407            }
1408            int timeSendOut = connection.getSendTimeout();
1409            return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1410        }
1411    
1412        /**
1413         * Creates a nondurable subscriber to the specified topic. <p/>
1414         * <P>
1415         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1416         * that have been published to a topic. <p/>
1417         * <P>
1418         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1419         * receive only messages that are published while they are active. <p/>
1420         * <P>
1421         * In some cases, a connection may both publish and subscribe to a topic.
1422         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1423         * inhibit the delivery of messages published by its own connection. The
1424         * default value for this attribute is false.
1425         * 
1426         * @param topic the <CODE>Topic</CODE> to subscribe to
1427         * @return TopicSubscriber
1428         * @throws JMSException if the session fails to create a subscriber due to
1429         *                 some internal error.
1430         * @throws InvalidDestinationException if an invalid topic is specified.
1431         */
1432        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1433            checkClosed();
1434            return createSubscriber(topic, null, false);
1435        }
1436    
1437        /**
1438         * Creates a nondurable subscriber to the specified topic, using a message
1439         * selector or specifying whether messages published by its own connection
1440         * should be delivered to it. <p/>
1441         * <P>
1442         * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1443         * that have been published to a topic. <p/>
1444         * <P>
1445         * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1446         * receive only messages that are published while they are active. <p/>
1447         * <P>
1448         * Messages filtered out by a subscriber's message selector will never be
1449         * delivered to the subscriber. From the subscriber's perspective, they do
1450         * not exist. <p/>
1451         * <P>
1452         * In some cases, a connection may both publish and subscribe to a topic.
1453         * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1454         * inhibit the delivery of messages published by its own connection. The
1455         * default value for this attribute is false.
1456         * 
1457         * @param topic the <CODE>Topic</CODE> to subscribe to
1458         * @param messageSelector only messages with properties matching the message
1459         *                selector expression are delivered. A value of null or an
1460         *                empty string indicates that there is no message selector
1461         *                for the message consumer.
1462         * @param noLocal if set, inhibits the delivery of messages published by its
1463         *                own connection
1464         * @return TopicSubscriber
1465         * @throws JMSException if the session fails to create a subscriber due to
1466         *                 some internal error.
1467         * @throws InvalidDestinationException if an invalid topic is specified.
1468         * @throws InvalidSelectorException if the message selector is invalid.
1469         */
1470        public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1471            checkClosed();
1472    
1473            if (topic instanceof CustomDestination) {
1474                CustomDestination customDestination = (CustomDestination)topic;
1475                return customDestination.createSubscriber(this, messageSelector, noLocal);
1476            }
1477    
1478            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1479            return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1480                .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1481        }
1482    
1483        /**
1484         * Creates a publisher for the specified topic. <p/>
1485         * <P>
1486         * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1487         * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1488         * a topic, it defines a new sequence of messages that have no ordering
1489         * relationship with the messages it has previously sent.
1490         * 
1491         * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1492         *                an unidentified producer
1493         * @return TopicPublisher
1494         * @throws JMSException if the session fails to create a publisher due to
1495         *                 some internal error.
1496         * @throws InvalidDestinationException if an invalid topic is specified.
1497         */
1498        public TopicPublisher createPublisher(Topic topic) throws JMSException {
1499            checkClosed();
1500    
1501            if (topic instanceof CustomDestination) {
1502                CustomDestination customDestination = (CustomDestination)topic;
1503                return customDestination.createPublisher(this);
1504            }
1505            int timeSendOut = connection.getSendTimeout();
1506            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1507        }
1508    
1509        /**
1510         * Unsubscribes a durable subscription that has been created by a client.
1511         * <P>
1512         * This method deletes the state being maintained on behalf of the
1513         * subscriber by its provider.
1514         * <P>
1515         * It is erroneous for a client to delete a durable subscription while there
1516         * is an active <CODE>MessageConsumer </CODE> or
1517         * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1518         * message is part of a pending transaction or has not been acknowledged in
1519         * the session.
1520         * 
1521         * @param name the name used to identify this subscription
1522         * @throws JMSException if the session fails to unsubscribe to the durable
1523         *                 subscription due to some internal error.
1524         * @throws InvalidDestinationException if an invalid subscription name is
1525         *                 specified.
1526         * @since 1.1
1527         */
1528        public void unsubscribe(String name) throws JMSException {
1529            checkClosed();
1530            connection.unsubscribe(name);
1531        }
1532    
1533        public void dispatch(MessageDispatch messageDispatch) {
1534            try {
1535                executor.execute(messageDispatch);
1536            } catch (InterruptedException e) {
1537                Thread.currentThread().interrupt();
1538                connection.onClientInternalException(e);
1539            }
1540        }
1541    
1542        /**
1543         * Acknowledges all consumed messages of the session of this consumed
1544         * message.
1545         * <P>
1546         * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1547         * for use when a client has specified that its JMS session's consumed
1548         * messages are to be explicitly acknowledged. By invoking
1549         * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1550         * all messages consumed by the session that the message was delivered to.
1551         * <P>
1552         * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1553         * sessions and sessions specified to use implicit acknowledgement modes.
1554         * <P>
1555         * A client may individually acknowledge each message as it is consumed, or
1556         * it may choose to acknowledge messages as an application-defined group
1557         * (which is done by calling acknowledge on the last received message of the
1558         * group, thereby acknowledging all messages consumed by the session.)
1559         * <P>
1560         * Messages that have been received but not acknowledged may be redelivered.
1561         * 
1562         * @throws JMSException if the JMS provider fails to acknowledge the
1563         *                 messages due to some internal error.
1564         * @throws javax.jms.IllegalStateException if this method is called on a
1565         *                 closed session.
1566         * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1567         */
1568        public void acknowledge() throws JMSException {
1569            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1570                ActiveMQMessageConsumer c = iter.next();
1571                c.acknowledge();
1572            }
1573        }
1574    
1575        /**
1576         * Add a message consumer.
1577         * 
1578         * @param consumer - message consumer.
1579         * @throws JMSException
1580         */
1581        protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1582            this.consumers.add(consumer);
1583            if (consumer.isDurableSubscriber()) {
1584                stats.onCreateDurableSubscriber();
1585            }
1586            this.connection.addDispatcher(consumer.getConsumerId(), this);
1587        }
1588    
1589        /**
1590         * Remove the message consumer.
1591         * 
1592         * @param consumer - consumer to be removed.
1593         * @throws JMSException
1594         */
1595        protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1596            this.connection.removeDispatcher(consumer.getConsumerId());
1597            if (consumer.isDurableSubscriber()) {
1598                stats.onRemoveDurableSubscriber();
1599            }
1600            this.consumers.remove(consumer);
1601            this.connection.removeDispatcher(consumer);
1602        }
1603    
1604        /**
1605         * Adds a message producer.
1606         * 
1607         * @param producer - message producer to be added.
1608         * @throws JMSException
1609         */
1610        protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1611            this.producers.add(producer);
1612            this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1613        }
1614    
1615        /**
1616         * Removes a message producer.
1617         * 
1618         * @param producer - message producer to be removed.
1619         * @throws JMSException
1620         */
1621        protected void removeProducer(ActiveMQMessageProducer producer) {
1622            this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1623            this.producers.remove(producer);
1624        }
1625    
1626        /**
1627         * Start this Session.
1628         * 
1629         * @throws JMSException
1630         */
1631        protected void start() throws JMSException {
1632            started.set(true);
1633            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1634                ActiveMQMessageConsumer c = iter.next();
1635                c.start();
1636            }
1637            executor.start();
1638        }
1639    
1640        /**
1641         * Stops this session.
1642         * 
1643         * @throws JMSException
1644         */
1645        protected void stop() throws JMSException {
1646    
1647            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1648                ActiveMQMessageConsumer c = iter.next();
1649                c.stop();
1650            }
1651    
1652            started.set(false);
1653            executor.stop();
1654        }
1655    
1656        /**
1657         * Returns the session id.
1658         * 
1659         * @return value - session id.
1660         */
1661        protected SessionId getSessionId() {
1662            return info.getSessionId();
1663        }
1664    
1665        /**
1666         * @return
1667         */
1668        protected ConsumerId getNextConsumerId() {
1669            return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1670        }
1671    
1672        /**
1673         * @return
1674         */
1675        protected ProducerId getNextProducerId() {
1676            return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1677        }
1678    
1679        /**
1680         * Sends the message for dispatch by the broker.
1681         * 
1682         * @param producer - message producer.
1683         * @param destination - message destination.
1684         * @param message - message to be sent.
1685         * @param deliveryMode - JMS messsage delivery mode.
1686         * @param priority - message priority.
1687         * @param timeToLive - message expiration.
1688         * @param producerWindow
1689         * @throws JMSException
1690         */
1691        protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1692                            MemoryUsage producerWindow, int sendTimeout) throws JMSException {
1693    
1694            checkClosed();
1695            if (destination.isTemporary() && connection.isDeleted(destination)) {
1696                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1697            }
1698            synchronized (sendMutex) {
1699                // tell the Broker we are about to start a new transaction
1700                doStartTransaction();
1701                TransactionId txid = transactionContext.getTransactionId();
1702                long sequenceNumber = producer.getMessageSequence();
1703    
1704                //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1705                message.setJMSDeliveryMode(deliveryMode);
1706                long expiration = 0L;
1707                if (!producer.getDisableMessageTimestamp()) {
1708                    long timeStamp = System.currentTimeMillis();
1709                    message.setJMSTimestamp(timeStamp);
1710                    if (timeToLive > 0) {
1711                        expiration = timeToLive + timeStamp;
1712                    }
1713                }
1714                message.setJMSExpiration(expiration);
1715                message.setJMSPriority(priority);
1716                message.setJMSRedelivered(false);
1717    
1718                // transform to our own message format here
1719                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1720    
1721                // Set the message id.
1722                if (msg == message) {
1723                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1724                } else {
1725                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1726                    message.setJMSMessageID(msg.getMessageId().toString());
1727                }
1728                //clear the brokerPath in case we are re-sending this message
1729                msg.setBrokerPath(null);
1730                // destination format is provider specific so only set on transformed message
1731                msg.setJMSDestination(destination);
1732    
1733                msg.setTransactionId(txid);
1734                if (connection.isCopyMessageOnSend()) {
1735                    msg = (ActiveMQMessage)msg.copy();
1736                }
1737                msg.setConnection(connection);
1738                msg.onSend();
1739                msg.setProducerId(msg.getMessageId().getProducerId());
1740                if (LOG.isTraceEnabled()) {
1741                    LOG.trace(getSessionId() + " sending message: " + msg);
1742                }
1743                if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1744                    this.connection.asyncSendPacket(msg);
1745                    if (producerWindow != null) {
1746                        // Since we defer lots of the marshaling till we hit the
1747                        // wire, this might not
1748                        // provide and accurate size. We may change over to doing
1749                        // more aggressive marshaling,
1750                        // to get more accurate sizes.. this is more important once
1751                        // users start using producer window
1752                        // flow control.
1753                        int size = msg.getSize();
1754                        producerWindow.increaseUsage(size);
1755                    }
1756                } else {
1757                    if (sendTimeout > 0) {
1758                        this.connection.syncSendPacket(msg,sendTimeout);
1759                    }else {
1760                        this.connection.syncSendPacket(msg);
1761                    }
1762                }
1763    
1764            }
1765        }
1766    
1767        /**
1768         * Send TransactionInfo to indicate transaction has started
1769         * 
1770         * @throws JMSException if some internal error occurs
1771         */
1772        protected void doStartTransaction() throws JMSException {
1773            if (getTransacted() && !transactionContext.isInXATransaction()) {
1774                transactionContext.begin();
1775            }
1776        }
1777    
1778        /**
1779         * Checks whether the session has unconsumed messages.
1780         * 
1781         * @return true - if there are unconsumed messages.
1782         */
1783        public boolean hasUncomsumedMessages() {
1784            return executor.hasUncomsumedMessages();
1785        }
1786    
1787        /**
1788         * Checks whether the session uses transactions.
1789         * 
1790         * @return true - if the session uses transactions.
1791         */
1792        public boolean isTransacted() {
1793            return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
1794        }
1795    
1796        /**
1797         * Checks whether the session used client acknowledgment.
1798         * 
1799         * @return true - if the session uses client acknowledgment.
1800         */
1801        protected boolean isClientAcknowledge() {
1802            return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
1803        }
1804    
1805        /**
1806         * Checks whether the session used auto acknowledgment.
1807         * 
1808         * @return true - if the session uses client acknowledgment.
1809         */
1810        public boolean isAutoAcknowledge() {
1811            return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
1812        }
1813    
1814        /**
1815         * Checks whether the session used dup ok acknowledgment.
1816         * 
1817         * @return true - if the session uses client acknowledgment.
1818         */
1819        public boolean isDupsOkAcknowledge() {
1820            return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
1821        }
1822        
1823        public boolean isIndividualAcknowledge(){
1824            return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
1825        }
1826    
1827        /**
1828         * Returns the message delivery listener.
1829         * 
1830         * @return deliveryListener - message delivery listener.
1831         */
1832        public DeliveryListener getDeliveryListener() {
1833            return deliveryListener;
1834        }
1835    
1836        /**
1837         * Sets the message delivery listener.
1838         * 
1839         * @param deliveryListener - message delivery listener.
1840         */
1841        public void setDeliveryListener(DeliveryListener deliveryListener) {
1842            this.deliveryListener = deliveryListener;
1843        }
1844    
1845        /**
1846         * Returns the SessionInfo bean.
1847         * 
1848         * @return info - SessionInfo bean.
1849         * @throws JMSException
1850         */
1851        protected SessionInfo getSessionInfo() throws JMSException {
1852            SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
1853            return info;
1854        }
1855    
1856        /**
1857         * Send the asynchronus command.
1858         * 
1859         * @param command - command to be executed.
1860         * @throws JMSException
1861         */
1862        public void asyncSendPacket(Command command) throws JMSException {
1863            connection.asyncSendPacket(command);
1864        }
1865    
1866        /**
1867         * Send the synchronus command.
1868         * 
1869         * @param command - command to be executed.
1870         * @return Response
1871         * @throws JMSException
1872         */
1873        public Response syncSendPacket(Command command) throws JMSException {
1874            return connection.syncSendPacket(command);
1875        }
1876    
1877        public long getNextDeliveryId() {
1878            return deliveryIdGenerator.getNextSequenceId();
1879        }
1880    
1881        public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
1882    
1883            List<MessageDispatch> c = unconsumedMessages.removeAll();
1884            for (MessageDispatch md : c) {
1885                this.connection.rollbackDuplicate(dispatcher, md.getMessage());
1886            }
1887            Collections.reverse(c);
1888    
1889            for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
1890                MessageDispatch md = iter.next();
1891                executor.executeFirst(md);
1892            }
1893    
1894        }
1895    
1896        public boolean isRunning() {
1897            return started.get();
1898        }
1899    
1900        public boolean isAsyncDispatch() {
1901            return asyncDispatch;
1902        }
1903    
1904        public void setAsyncDispatch(boolean asyncDispatch) {
1905            this.asyncDispatch = asyncDispatch;
1906        }
1907    
1908        /**
1909         * @return Returns the sessionAsyncDispatch.
1910         */
1911        public boolean isSessionAsyncDispatch() {
1912            return sessionAsyncDispatch;
1913        }
1914    
1915        /**
1916         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
1917         */
1918        public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
1919            this.sessionAsyncDispatch = sessionAsyncDispatch;
1920        }
1921    
1922        public MessageTransformer getTransformer() {
1923            return transformer;
1924        }
1925    
1926        public ActiveMQConnection getConnection() {
1927            return connection;
1928        }
1929    
1930        /**
1931         * Sets the transformer used to transform messages before they are sent on
1932         * to the JMS bus or when they are received from the bus but before they are
1933         * delivered to the JMS client
1934         */
1935        public void setTransformer(MessageTransformer transformer) {
1936            this.transformer = transformer;
1937        }
1938    
1939        public BlobTransferPolicy getBlobTransferPolicy() {
1940            return blobTransferPolicy;
1941        }
1942    
1943        /**
1944         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1945         * OBjects) are transferred from producers to brokers to consumers
1946         */
1947        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1948            this.blobTransferPolicy = blobTransferPolicy;
1949        }
1950    
1951        public List getUnconsumedMessages() {
1952            return executor.getUnconsumedMessages();
1953        }
1954    
1955        @Override
1956        public String toString() {
1957            return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "}";
1958        }
1959    
1960        public void checkMessageListener() throws JMSException {
1961            if (messageListener != null) {
1962                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1963            }
1964            for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
1965                ActiveMQMessageConsumer consumer = i.next();
1966                if (consumer.getMessageListener() != null) {
1967                    throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
1968                }
1969            }
1970        }
1971    
1972        protected void setOptimizeAcknowledge(boolean value) {
1973            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1974                ActiveMQMessageConsumer c = iter.next();
1975                c.setOptimizeAcknowledge(value);
1976            }
1977        }
1978    
1979        protected void setPrefetchSize(ConsumerId id, int prefetch) {
1980            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1981                ActiveMQMessageConsumer c = iter.next();
1982                if (c.getConsumerId().equals(id)) {
1983                    c.setPrefetchSize(prefetch);
1984                    break;
1985                }
1986            }
1987        }
1988    
1989        protected void close(ConsumerId id) {
1990            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1991                ActiveMQMessageConsumer c = iter.next();
1992                if (c.getConsumerId().equals(id)) {
1993                    try {
1994                        c.close();
1995                    } catch (JMSException e) {
1996                        LOG.warn("Exception closing consumer", e);
1997                    }
1998                    LOG.warn("Closed consumer on Command");
1999                    break;
2000                }
2001            }
2002        }
2003    
2004        public boolean isInUse(ActiveMQTempDestination destination) {
2005            for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2006                ActiveMQMessageConsumer c = iter.next();
2007                if (c.isInUse(destination)) {
2008                    return true;
2009                }
2010            }
2011            return false;
2012        }
2013        
2014        /**
2015         * highest sequence id of the last message delivered by this session.
2016         * Passed to the broker in the close command, maintained by dispose()
2017         * @return lastDeliveredSequenceId
2018         */
2019        public long getLastDeliveredSequenceId() {
2020            return lastDeliveredSequenceId;
2021        }
2022        
2023        protected void sendAck(MessageAck ack) throws JMSException {
2024            sendAck(ack,false);
2025        }
2026        
2027        protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2028            if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2029                asyncSendPacket(ack);
2030            } else {
2031                syncSendPacket(ack);
2032            }
2033        }
2034        
2035        protected Scheduler getScheduler() {
2036            return this.scheduler;
2037        }
2038        
2039        protected ThreadPoolExecutor getConnectionExecutor() {
2040            return this.connectionExecutor;
2041        }
2042    }