001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicReference;
032    import javax.jms.IllegalStateException;
033    import javax.jms.InvalidDestinationException;
034    import javax.jms.JMSException;
035    import javax.jms.Message;
036    import javax.jms.MessageConsumer;
037    import javax.jms.MessageListener;
038    import javax.jms.TransactionRolledBackException;
039    import org.apache.activemq.blob.BlobDownloader;
040    import org.apache.activemq.command.ActiveMQBlobMessage;
041    import org.apache.activemq.command.ActiveMQDestination;
042    import org.apache.activemq.command.ActiveMQMessage;
043    import org.apache.activemq.command.ActiveMQTempDestination;
044    import org.apache.activemq.command.CommandTypes;
045    import org.apache.activemq.command.ConsumerId;
046    import org.apache.activemq.command.ConsumerInfo;
047    import org.apache.activemq.command.MessageAck;
048    import org.apache.activemq.command.MessageDispatch;
049    import org.apache.activemq.command.MessageId;
050    import org.apache.activemq.command.MessagePull;
051    import org.apache.activemq.command.RemoveInfo;
052    import org.apache.activemq.command.TransactionId;
053    import org.apache.activemq.management.JMSConsumerStatsImpl;
054    import org.apache.activemq.management.StatsCapable;
055    import org.apache.activemq.management.StatsImpl;
056    import org.apache.activemq.selector.SelectorParser;
057    import org.apache.activemq.thread.Scheduler;
058    import org.apache.activemq.transaction.Synchronization;
059    import org.apache.activemq.util.Callback;
060    import org.apache.activemq.util.IntrospectionSupport;
061    import org.apache.activemq.util.JMSExceptionSupport;
062    import org.slf4j.Logger;
063    import org.slf4j.LoggerFactory;
064    
065    /**
066     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
067     * from a destination. A <CODE> MessageConsumer</CODE> object is created by
068     * passing a <CODE>Destination</CODE> object to a message-consumer creation
069     * method supplied by a session.
070     * <P>
071     * <CODE>MessageConsumer</CODE> is the parent interface for all message
072     * consumers.
073     * <P>
074     * A message consumer can be created with a message selector. A message selector
075     * allows the client to restrict the messages delivered to the message consumer
076     * to those that match the selector.
077     * <P>
078     * A client may either synchronously receive a message consumer's messages or
079     * have the consumer asynchronously deliver them as they arrive.
080     * <P>
081     * For synchronous receipt, a client can request the next message from a message
082     * consumer using one of its <CODE> receive</CODE> methods. There are several
083     * variations of <CODE>receive</CODE> that allow a client to poll or wait for
084     * the next message.
085     * <P>
086     * For asynchronous delivery, a client can register a
087     * <CODE>MessageListener</CODE> object with a message consumer. As messages
088     * arrive at the message consumer, it delivers them by calling the
089     * <CODE>MessageListener</CODE>'s<CODE>
090     * onMessage</CODE> method.
091     * <P>
092     * It is a client programming error for a <CODE>MessageListener</CODE> to
093     * throw an exception.
094     * 
095     * 
096     * @see javax.jms.MessageConsumer
097     * @see javax.jms.QueueReceiver
098     * @see javax.jms.TopicSubscriber
099     * @see javax.jms.Session
100     */
101    public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
102    
103        @SuppressWarnings("serial")
104        class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> {
105            final TransactionId transactionId;
106            public PreviouslyDeliveredMap(TransactionId transactionId) {
107                this.transactionId = transactionId;
108            }
109        }
110    
111        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
112        protected final Scheduler scheduler;
113        protected final ActiveMQSession session;
114        protected final ConsumerInfo info;
115    
116        // These are the messages waiting to be delivered to the client
117        protected final MessageDispatchChannel unconsumedMessages;
118    
119        // The are the messages that were delivered to the consumer but that have
120        // not been acknowledged. It's kept in reverse order since we
121        // Always walk list in reverse order.
122        private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
123        // track duplicate deliveries in a transaction such that the tx integrity can be validated
124        private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
125        private int deliveredCounter;
126        private int additionalWindowSize;
127        private long redeliveryDelay;
128        private int ackCounter;
129        private int dispatchedCount;
130        private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
131        private final JMSConsumerStatsImpl stats;
132    
133        private final String selector;
134        private boolean synchronizationRegistered;
135        private final AtomicBoolean started = new AtomicBoolean(false);
136    
137        private MessageAvailableListener availableListener;
138    
139        private RedeliveryPolicy redeliveryPolicy;
140        private boolean optimizeAcknowledge;
141        private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
142        private ExecutorService executorService;
143        private MessageTransformer transformer;
144        private boolean clearDispatchList;
145        boolean inProgressClearRequiredFlag;
146    
147        private MessageAck pendingAck;
148        private long lastDeliveredSequenceId;
149    
150        private IOException failureError;
151        
152        private long optimizeAckTimestamp = System.currentTimeMillis();
153        private final long optimizeAckTimeout = 300;
154        private long failoverRedeliveryWaitPeriod = 0;
155    
156        /**
157         * Create a MessageConsumer
158         * 
159         * @param session
160         * @param dest
161         * @param name
162         * @param selector
163         * @param prefetch
164         * @param maximumPendingMessageCount
165         * @param noLocal
166         * @param browser
167         * @param dispatchAsync
168         * @param messageListener
169         * @throws JMSException
170         */
171        public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
172                String name, String selector, int prefetch,
173                int maximumPendingMessageCount, boolean noLocal, boolean browser,
174                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
175            if (dest == null) {
176                throw new InvalidDestinationException("Don't understand null destinations");
177            } else if (dest.getPhysicalName() == null) {
178                throw new InvalidDestinationException("The destination object was not given a physical name.");
179            } else if (dest.isTemporary()) {
180                String physicalName = dest.getPhysicalName();
181    
182                if (physicalName == null) {
183                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
184                }
185    
186                String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue();
187    
188                if (physicalName.indexOf(connectionID) < 0) {
189                    throw new InvalidDestinationException(
190                                                          "Cannot use a Temporary destination from another Connection");
191                }
192    
193                if (session.connection.isDeleted(dest)) {
194                    throw new InvalidDestinationException(
195                                                          "Cannot use a Temporary destination that has been deleted");
196                }
197                if (prefetch < 0) {
198                    throw new JMSException("Cannot have a prefetch size less than zero");
199                }
200            }
201            if (session.connection.isMessagePrioritySupported()) {
202                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
203            }else {
204                this.unconsumedMessages = new FifoMessageDispatchChannel();
205            }
206    
207            this.session = session;
208            this.scheduler = session.getScheduler();
209            this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
210            setTransformer(session.getTransformer());
211    
212            this.info = new ConsumerInfo(consumerId);
213            this.info.setExclusive(this.session.connection.isExclusiveConsumer());
214            this.info.setSubscriptionName(name);
215            this.info.setPrefetchSize(prefetch);
216            this.info.setCurrentPrefetchSize(prefetch);
217            this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount);
218            this.info.setNoLocal(noLocal);
219            this.info.setDispatchAsync(dispatchAsync);
220            this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer());
221            this.info.setSelector(null);
222    
223            // Allows the options on the destination to configure the consumerInfo
224            if (dest.getOptions() != null) {
225                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
226                IntrospectionSupport.setProperties(this.info, options, "consumer.");
227            }
228    
229            this.info.setDestination(dest);
230            this.info.setBrowser(browser);
231            if (selector != null && selector.trim().length() != 0) {
232                // Validate the selector
233                SelectorParser.parse(selector);
234                this.info.setSelector(selector);
235                this.selector = selector;
236            } else if (info.getSelector() != null) {
237                // Validate the selector
238                SelectorParser.parse(this.info.getSelector());
239                this.selector = this.info.getSelector();
240            } else {
241                this.selector = null;
242            }
243    
244            this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest);
245            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
246                                       && !info.isBrowser();
247            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
248            this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
249            if (messageListener != null) {
250                setMessageListener(messageListener);
251            }
252            try {
253                this.session.addConsumer(this);
254                this.session.syncSendPacket(info);
255            } catch (JMSException e) {
256                this.session.removeConsumer(this);
257                throw e;
258            }
259    
260            if (session.connection.isStarted()) {
261                start();
262            }
263        }
264    
265        private boolean isAutoAcknowledgeEach() {
266            return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() );
267        }
268    
269        private boolean isAutoAcknowledgeBatch() {
270            return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
271        }
272    
273        public StatsImpl getStats() {
274            return stats;
275        }
276    
277        public JMSConsumerStatsImpl getConsumerStats() {
278            return stats;
279        }
280    
281        public RedeliveryPolicy getRedeliveryPolicy() {
282            return redeliveryPolicy;
283        }
284    
285        /**
286         * Sets the redelivery policy used when messages are redelivered
287         */
288        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
289            this.redeliveryPolicy = redeliveryPolicy;
290        }
291    
292        public MessageTransformer getTransformer() {
293            return transformer;
294        }
295    
296        /**
297         * Sets the transformer used to transform messages before they are sent on
298         * to the JMS bus
299         */
300        public void setTransformer(MessageTransformer transformer) {
301            this.transformer = transformer;
302        }
303    
304        /**
305         * @return Returns the value.
306         */
307        public ConsumerId getConsumerId() {
308            return info.getConsumerId();
309        }
310    
311        /**
312         * @return the consumer name - used for durable consumers
313         */
314        public String getConsumerName() {
315            return this.info.getSubscriptionName();
316        }
317    
318        /**
319         * @return true if this consumer does not accept locally produced messages
320         */
321        protected boolean isNoLocal() {
322            return info.isNoLocal();
323        }
324    
325        /**
326         * Retrieve is a browser
327         * 
328         * @return true if a browser
329         */
330        protected boolean isBrowser() {
331            return info.isBrowser();
332        }
333    
334        /**
335         * @return ActiveMQDestination
336         */
337        protected ActiveMQDestination getDestination() {
338            return info.getDestination();
339        }
340    
341        /**
342         * @return Returns the prefetchNumber.
343         */
344        public int getPrefetchNumber() {
345            return info.getPrefetchSize();
346        }
347    
348        /**
349         * @return true if this is a durable topic subscriber
350         */
351        public boolean isDurableSubscriber() {
352            return info.getSubscriptionName() != null && info.getDestination().isTopic();
353        }
354    
355        /**
356         * Gets this message consumer's message selector expression.
357         * 
358         * @return this message consumer's message selector, or null if no message
359         *         selector exists for the message consumer (that is, if the message
360         *         selector was not set or was set to null or the empty string)
361         * @throws JMSException if the JMS provider fails to receive the next
362         *                 message due to some internal error.
363         */
364        public String getMessageSelector() throws JMSException {
365            checkClosed();
366            return selector;
367        }
368    
369        /**
370         * Gets the message consumer's <CODE>MessageListener</CODE>.
371         * 
372         * @return the listener for the message consumer, or null if no listener is
373         *         set
374         * @throws JMSException if the JMS provider fails to get the message
375         *                 listener due to some internal error.
376         * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
377         */
378        public MessageListener getMessageListener() throws JMSException {
379            checkClosed();
380            return this.messageListener.get();
381        }
382    
383        /**
384         * Sets the message consumer's <CODE>MessageListener</CODE>.
385         * <P>
386         * Setting the message listener to null is the equivalent of unsetting the
387         * message listener for the message consumer.
388         * <P>
389         * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
390         * while messages are being consumed by an existing listener or the consumer
391         * is being used to consume messages synchronously is undefined.
392         * 
393         * @param listener the listener to which the messages are to be delivered
394         * @throws JMSException if the JMS provider fails to receive the next
395         *                 message due to some internal error.
396         * @see javax.jms.MessageConsumer#getMessageListener
397         */
398        public void setMessageListener(MessageListener listener) throws JMSException {
399            checkClosed();
400            if (info.getPrefetchSize() == 0) {
401                throw new JMSException(
402                                       "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
403            }
404            if (listener != null) {
405                boolean wasRunning = session.isRunning();
406                if (wasRunning) {
407                    session.stop();
408                }
409    
410                this.messageListener.set(listener);
411                session.redispatch(this, unconsumedMessages);
412    
413                if (wasRunning) {
414                    session.start();
415                }
416            } else {
417                this.messageListener.set(null);
418            }
419        }
420    
421        public MessageAvailableListener getAvailableListener() {
422            return availableListener;
423        }
424    
425        /**
426         * Sets the listener used to notify synchronous consumers that there is a
427         * message available so that the {@link MessageConsumer#receiveNoWait()} can
428         * be called.
429         */
430        public void setAvailableListener(MessageAvailableListener availableListener) {
431            this.availableListener = availableListener;
432        }
433    
434        /**
435         * Used to get an enqueued message from the unconsumedMessages list. The
436         * amount of time this method blocks is based on the timeout value. - if
437         * timeout==-1 then it blocks until a message is received. - if timeout==0
438         * then it it tries to not block at all, it returns a message if it is
439         * available - if timeout>0 then it blocks up to timeout amount of time.
440         * Expired messages will consumed by this method.
441         * 
442         * @throws JMSException
443         * @return null if we timeout or if the consumer is closed.
444         */
445        private MessageDispatch dequeue(long timeout) throws JMSException {
446            try {
447                long deadline = 0;
448                if (timeout > 0) {
449                    deadline = System.currentTimeMillis() + timeout;
450                }
451                while (true) {
452                    MessageDispatch md = unconsumedMessages.dequeue(timeout);
453                    if (md == null) {
454                        if (timeout > 0 && !unconsumedMessages.isClosed()) {
455                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
456                        } else {
457                            if (failureError != null) {
458                                    throw JMSExceptionSupport.create(failureError);
459                            } else {
460                                    return null;
461                            }
462                        }
463                    } else if (md.getMessage() == null) {
464                        return null;
465                    } else if (md.getMessage().isExpired()) {
466                        if (LOG.isDebugEnabled()) {
467                            LOG.debug(getConsumerId() + " received expired message: " + md);
468                        }
469                        beforeMessageIsConsumed(md);
470                        afterMessageIsConsumed(md, true);
471                        if (timeout > 0) {
472                            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
473                        }
474                    } else {
475                        if (LOG.isTraceEnabled()) {
476                            LOG.trace(getConsumerId() + " received message: " + md);
477                        }
478                        return md;
479                    }
480                }
481            } catch (InterruptedException e) {
482                Thread.currentThread().interrupt();
483                throw JMSExceptionSupport.create(e);
484            }
485        }
486    
487        /**
488         * Receives the next message produced for this message consumer.
489         * <P>
490         * This call blocks indefinitely until a message is produced or until this
491         * message consumer is closed.
492         * <P>
493         * If this <CODE>receive</CODE> is done within a transaction, the consumer
494         * retains the message until the transaction commits.
495         * 
496         * @return the next message produced for this message consumer, or null if
497         *         this message consumer is concurrently closed
498         */
499        public Message receive() throws JMSException {
500            checkClosed();
501            checkMessageListener();
502    
503            sendPullCommand(0);
504            MessageDispatch md = dequeue(-1);
505            if (md == null) {
506                return null;
507            }
508    
509            beforeMessageIsConsumed(md);
510            afterMessageIsConsumed(md, false);
511    
512            return createActiveMQMessage(md);
513        }
514    
515        /**
516         * @param md
517         * @return
518         */
519        private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
520            ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
521            if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
522                    ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
523            }
524            if (transformer != null) {
525                Message transformedMessage = transformer.consumerTransform(session, this, m);
526                if (transformedMessage != null) {
527                    m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
528                }
529            }
530            if (session.isClientAcknowledge()) {
531                m.setAcknowledgeCallback(new Callback() {
532                    public void execute() throws Exception {
533                        session.checkClosed();
534                        session.acknowledge();
535                    }
536                });
537            }else if (session.isIndividualAcknowledge()) {
538                m.setAcknowledgeCallback(new Callback() {
539                    public void execute() throws Exception {
540                        session.checkClosed();
541                        acknowledge(md);
542                    }
543                });
544            }
545            return m;
546        }
547    
548        /**
549         * Receives the next message that arrives within the specified timeout
550         * interval.
551         * <P>
552         * This call blocks until a message arrives, the timeout expires, or this
553         * message consumer is closed. A <CODE>timeout</CODE> of zero never
554         * expires, and the call blocks indefinitely.
555         * 
556         * @param timeout the timeout value (in milliseconds), a time out of zero
557         *                never expires.
558         * @return the next message produced for this message consumer, or null if
559         *         the timeout expires or this message consumer is concurrently
560         *         closed
561         */
562        public Message receive(long timeout) throws JMSException {
563            checkClosed();
564            checkMessageListener();
565            if (timeout == 0) {
566                return this.receive();
567    
568            }
569    
570            sendPullCommand(timeout);
571            while (timeout > 0) {
572    
573                MessageDispatch md;
574                if (info.getPrefetchSize() == 0) {
575                    md = dequeue(-1); // We let the broker let us know when we timeout.
576                } else {
577                    md = dequeue(timeout);
578                }
579    
580                if (md == null) {
581                    return null;
582                }
583    
584                beforeMessageIsConsumed(md);
585                afterMessageIsConsumed(md, false);
586                return createActiveMQMessage(md);
587            }
588            return null;
589        }
590    
591        /**
592         * Receives the next message if one is immediately available.
593         * 
594         * @return the next message produced for this message consumer, or null if
595         *         one is not available
596         * @throws JMSException if the JMS provider fails to receive the next
597         *                 message due to some internal error.
598         */
599        public Message receiveNoWait() throws JMSException {
600            checkClosed();
601            checkMessageListener();
602            sendPullCommand(-1);
603    
604            MessageDispatch md;
605            if (info.getPrefetchSize() == 0) {
606                md = dequeue(-1); // We let the broker let us know when we
607                // timeout.
608            } else {
609                md = dequeue(0);
610            }
611    
612            if (md == null) {
613                return null;
614            }
615    
616            beforeMessageIsConsumed(md);
617            afterMessageIsConsumed(md, false);
618            return createActiveMQMessage(md);
619        }
620    
621        /**
622         * Closes the message consumer.
623         * <P>
624         * Since a provider may allocate some resources on behalf of a <CODE>
625         * MessageConsumer</CODE>
626         * outside the Java virtual machine, clients should close them when they are
627         * not needed. Relying on garbage collection to eventually reclaim these
628         * resources may not be timely enough.
629         * <P>
630         * This call blocks until a <CODE>receive</CODE> or message listener in
631         * progress has completed. A blocked message consumer <CODE>receive </CODE>
632         * call returns null when this message consumer is closed.
633         * 
634         * @throws JMSException if the JMS provider fails to close the consumer due
635         *                 to some internal error.
636         */
637        public void close() throws JMSException {
638            if (!unconsumedMessages.isClosed()) {
639                if (session.getTransactionContext().isInTransaction()) {
640                    session.getTransactionContext().addSynchronization(new Synchronization() {
641                        @Override
642                        public void afterCommit() throws Exception {
643                            doClose();
644                        }
645    
646                        @Override
647                        public void afterRollback() throws Exception {
648                            doClose();
649                        }
650                    });
651                } else {
652                    doClose();
653                } 
654            }
655        }
656    
657        void doClose() throws JMSException {
658            dispose();
659            RemoveInfo removeCommand = info.createRemoveCommand();
660            if (LOG.isDebugEnabled()) {
661                LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
662            }
663            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
664            this.session.asyncSendPacket(removeCommand);
665        }
666        
667        void inProgressClearRequired() {
668            inProgressClearRequiredFlag = true;
669            // deal with delivered messages async to avoid lock contention with in progress acks
670            clearDispatchList = true;
671        }
672        
673        void clearMessagesInProgress() {
674            if (inProgressClearRequiredFlag) {
675                synchronized (unconsumedMessages.getMutex()) {
676                    if (inProgressClearRequiredFlag) {
677                        if (LOG.isDebugEnabled()) {
678                            LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
679                        }
680                        // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
681                        List<MessageDispatch> list = unconsumedMessages.removeAll();
682                        if (!this.info.isBrowser()) {
683                            for (MessageDispatch old : list) {
684                                session.connection.rollbackDuplicate(this, old.getMessage());
685                            }
686                        }
687                        // allow dispatch on this connection to resume
688                        session.connection.transportInterruptionProcessingComplete();
689                        inProgressClearRequiredFlag = false;
690                    }
691                }
692            }
693        }
694    
695        void deliverAcks() {
696            MessageAck ack = null;
697            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
698                if (isAutoAcknowledgeEach()) {
699                    synchronized(deliveredMessages) {
700                        ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
701                        if (ack != null) {
702                            deliveredMessages.clear();
703                            ackCounter = 0;
704                            } else {
705                                ack = pendingAck;
706                                pendingAck = null;
707                            }
708                    }
709                } else if (pendingAck != null && pendingAck.isStandardAck()) {
710                    ack = pendingAck;
711                    pendingAck = null;
712                }
713                if (ack != null) {
714                    final MessageAck ackToSend = ack;
715                    
716                    if (executorService == null) {
717                        executorService = Executors.newSingleThreadExecutor();
718                    }
719                    executorService.submit(new Runnable() {
720                        public void run() {
721                            try {
722                                session.sendAck(ackToSend,true);
723                            } catch (JMSException e) {
724                                LOG.error(getConsumerId() + " failed to delivered acknowledgements", e);
725                            } finally {
726                                deliveryingAcknowledgements.set(false);
727                            }
728                        }
729                    });
730                } else {
731                    deliveryingAcknowledgements.set(false);
732                }
733            }
734        }
735    
736        public void dispose() throws JMSException {
737            if (!unconsumedMessages.isClosed()) {
738                
739                // Do we have any acks we need to send out before closing?
740                // Ack any delivered messages now.
741                if (!session.getTransacted()) { 
742                    deliverAcks();
743                    if (isAutoAcknowledgeBatch()) {
744                        acknowledge();
745                    }
746                }
747                if (executorService != null) {
748                    executorService.shutdown();
749                    try {
750                        executorService.awaitTermination(60, TimeUnit.SECONDS);
751                    } catch (InterruptedException e) {
752                        Thread.currentThread().interrupt();
753                    }
754                }
755                
756                if (session.isClientAcknowledge()) {
757                    if (!this.info.isBrowser()) {
758                        // rollback duplicates that aren't acknowledged
759                        List<MessageDispatch> tmp = null;
760                        synchronized (this.deliveredMessages) {
761                            tmp = new ArrayList<MessageDispatch>(this.deliveredMessages);
762                        }
763                        for (MessageDispatch old : tmp) {
764                            this.session.connection.rollbackDuplicate(this, old.getMessage());
765                        }
766                        tmp.clear();
767                    }
768                }
769                if (!session.isTransacted()) {
770                    synchronized(deliveredMessages) {
771                        deliveredMessages.clear();
772                    }
773                }
774                unconsumedMessages.close();
775                this.session.removeConsumer(this);
776                List<MessageDispatch> list = unconsumedMessages.removeAll();
777                if (!this.info.isBrowser()) {
778                    for (MessageDispatch old : list) {
779                        // ensure we don't filter this as a duplicate
780                        session.connection.rollbackDuplicate(this, old.getMessage());
781                    }
782                }
783            }
784        }
785    
786        /**
787         * @throws IllegalStateException
788         */
789        protected void checkClosed() throws IllegalStateException {
790            if (unconsumedMessages.isClosed()) {
791                throw new IllegalStateException("The Consumer is closed");
792            }
793        }
794    
795        /**
796         * If we have a zero prefetch specified then send a pull command to the
797         * broker to pull a message we are about to receive
798         */
799        protected void sendPullCommand(long timeout) throws JMSException {
800            clearDispatchList();
801            if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
802                MessagePull messagePull = new MessagePull();
803                messagePull.configure(info);
804                messagePull.setTimeout(timeout);
805                session.asyncSendPacket(messagePull);
806            }
807        }
808    
809        protected void checkMessageListener() throws JMSException {
810            session.checkMessageListener();
811        }
812    
813        protected void setOptimizeAcknowledge(boolean value) {
814            if (optimizeAcknowledge && !value) {
815                deliverAcks();
816            }
817            optimizeAcknowledge = value;
818        }
819    
820        protected void setPrefetchSize(int prefetch) {
821            deliverAcks();
822            this.info.setCurrentPrefetchSize(prefetch);
823        }
824    
825        private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
826            md.setDeliverySequenceId(session.getNextDeliveryId());
827            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
828            if (!isAutoAcknowledgeBatch()) {
829                synchronized(deliveredMessages) {
830                    deliveredMessages.addFirst(md);
831                }
832                if (session.getTransacted()) {
833                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
834                }
835            }
836        }
837        
838        private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
839            if (unconsumedMessages.isClosed()) {
840                return;
841            }
842            if (messageExpired) {
843                synchronized (deliveredMessages) {
844                    deliveredMessages.remove(md);
845                }
846                stats.getExpiredMessageCount().increment();
847                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
848            } else {
849                stats.onMessage();
850                if (session.getTransacted()) {
851                    // Do nothing.
852                } else if (isAutoAcknowledgeEach()) {
853                    if (deliveryingAcknowledgements.compareAndSet(false, true)) {
854                        synchronized (deliveredMessages) {
855                            if (!deliveredMessages.isEmpty()) {
856                                if (optimizeAcknowledge) {
857                                    ackCounter++;
858                                    if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
859                                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
860                                            if (ack != null) {
861                                                deliveredMessages.clear();
862                                                ackCounter = 0;
863                                                session.sendAck(ack);
864                                                optimizeAckTimestamp = System.currentTimeMillis();
865                                            }
866                                    }
867                                } else {
868                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
869                                    if (ack!=null) {
870                                        deliveredMessages.clear();
871                                        session.sendAck(ack);
872                                    }
873                                }
874                            }
875                        }
876                        deliveryingAcknowledgements.set(false);
877                    }
878                } else if (isAutoAcknowledgeBatch()) {
879                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
880                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
881                    boolean messageUnackedByConsumer = false;
882                    synchronized (deliveredMessages) {
883                        messageUnackedByConsumer = deliveredMessages.contains(md);
884                    }
885                    if (messageUnackedByConsumer) {
886                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
887                    }
888                } 
889                else {
890                    throw new IllegalStateException("Invalid session state.");
891                }
892            }
893        }
894    
895        /**
896         * Creates a MessageAck for all messages contained in deliveredMessages.
897         * Caller should hold the lock for deliveredMessages.
898         * 
899         * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
900         * @return <code>null</code> if nothing to ack.
901         */
902            private MessageAck makeAckForAllDeliveredMessages(byte type) {
903                    synchronized (deliveredMessages) {
904                            if (deliveredMessages.isEmpty())
905                                    return null;
906                                
907                            MessageDispatch md = deliveredMessages.getFirst();
908                        MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
909                        ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
910                        return ack;
911                    }
912            }
913    
914        private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
915    
916            // Don't acknowledge now, but we may need to let the broker know the
917            // consumer got the message to expand the pre-fetch window
918            if (session.getTransacted()) {
919                session.doStartTransaction();
920                if (!synchronizationRegistered) {
921                    synchronizationRegistered = true;
922                    session.getTransactionContext().addSynchronization(new Synchronization() {
923                        @Override
924                        public void beforeEnd() throws Exception {
925                            acknowledge();
926                            synchronizationRegistered = false;
927                        }
928    
929                        @Override
930                        public void afterCommit() throws Exception {
931                            commit();
932                            synchronizationRegistered = false;
933                        }
934    
935                        @Override
936                        public void afterRollback() throws Exception {
937                            rollback();
938                            synchronizationRegistered = false;
939                        }
940                    });
941                }
942            }
943    
944            deliveredCounter++;
945            
946            MessageAck oldPendingAck = pendingAck;
947            pendingAck = new MessageAck(md, ackType, deliveredCounter);
948            pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
949            if( oldPendingAck==null ) {
950                pendingAck.setFirstMessageId(pendingAck.getLastMessageId());
951            } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) {
952                pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
953            } else {
954                // old pending ack being superseded by ack of another type, if is is not a delivered
955                // ack and hence important, send it now so it is not lost.
956                if ( !oldPendingAck.isDeliveredAck()) {
957                    if (LOG.isDebugEnabled()) {
958                        LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
959                    }
960                    session.sendAck(oldPendingAck);
961                } else {
962                    if (LOG.isDebugEnabled()) {
963                        LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
964                    }
965                }
966            }
967            
968            if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
969                session.sendAck(pendingAck);
970                pendingAck=null;
971                deliveredCounter = 0;
972                additionalWindowSize = 0;
973            }
974        }
975    
976        /**
977         * Acknowledge all the messages that have been delivered to the client up to
978         * this point.
979         * 
980         * @throws JMSException
981         */
982        public void acknowledge() throws JMSException {
983            clearDispatchList();
984            waitForRedeliveries();
985            synchronized(deliveredMessages) {
986                // Acknowledge all messages so far.
987                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
988                if (ack == null)
989                    return; // no msgs
990                
991                if (session.getTransacted()) {
992                    rollbackOnFailedRecoveryRedelivery();
993                    session.doStartTransaction();
994                    ack.setTransactionId(session.getTransactionContext().getTransactionId());
995                }
996                session.sendAck(ack);
997                pendingAck = null;
998                
999                // Adjust the counters
1000                deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
1001                additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1002                
1003                if (!session.getTransacted()) {  
1004                    deliveredMessages.clear();
1005                } 
1006            }
1007        }
1008        
1009        private void waitForRedeliveries() {
1010            if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) {
1011                long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
1012                int numberNotReplayed;
1013                do {
1014                    numberNotReplayed = 0;
1015                    synchronized(deliveredMessages) {
1016                        if (previouslyDeliveredMessages != null) { 
1017                            for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1018                                if (!entry.getValue()) {
1019                                    numberNotReplayed++;
1020                                }
1021                            }
1022                        }
1023                    }
1024                    if (numberNotReplayed > 0) {
1025                        LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
1026                                + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
1027                        try {
1028                            Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
1029                        } catch (InterruptedException outOfhere) {
1030                            break;
1031                        }
1032                    }
1033                } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis());
1034            }
1035        }
1036    
1037        /*
1038         * called with deliveredMessages locked
1039         */
1040        private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
1041            if (previouslyDeliveredMessages != null) {
1042                // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback
1043                // as messages have been dispatched else where.
1044                int numberNotReplayed = 0;
1045                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1046                    if (!entry.getValue()) {
1047                        numberNotReplayed++;
1048                        if (LOG.isDebugEnabled()) {
1049                            LOG.debug("previously delivered message has not been replayed in transaction: "
1050                                    + previouslyDeliveredMessages.transactionId 
1051                                    + " , messageId: " + entry.getKey());
1052                        }
1053                    }
1054                }
1055                if (numberNotReplayed > 0) {
1056                    String message = "rolling back transaction (" 
1057                        + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed
1058                        + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
1059                    LOG.warn(message);
1060                    throw new TransactionRolledBackException(message);   
1061                }
1062            }
1063        }
1064    
1065        void acknowledge(MessageDispatch md) throws JMSException {
1066            MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
1067            session.sendAck(ack);
1068            synchronized(deliveredMessages){
1069                deliveredMessages.remove(md);
1070            }
1071        }
1072    
1073        public void commit() throws JMSException {
1074            synchronized (deliveredMessages) {
1075                deliveredMessages.clear();
1076                clearPreviouslyDelivered();
1077            }
1078            redeliveryDelay = 0;
1079        }
1080    
1081        public void rollback() throws JMSException {
1082            synchronized (unconsumedMessages.getMutex()) {
1083                if (optimizeAcknowledge) {
1084                    // remove messages read but not acked at the broker yet through
1085                    // optimizeAcknowledge
1086                    if (!this.info.isBrowser()) {
1087                        synchronized(deliveredMessages) {
1088                            for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) {
1089                                // ensure we don't filter this as a duplicate
1090                                MessageDispatch md = deliveredMessages.removeLast();
1091                                session.connection.rollbackDuplicate(this, md.getMessage());
1092                            }
1093                        }
1094                    }
1095                }
1096                synchronized(deliveredMessages) {
1097                    rollbackPreviouslyDeliveredAndNotRedelivered();
1098                    if (deliveredMessages.isEmpty()) {
1099                        return;
1100                    }
1101        
1102                    // use initial delay for first redelivery
1103                    MessageDispatch lastMd = deliveredMessages.getFirst();
1104                    final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
1105                    if (currentRedeliveryCount > 0) {
1106                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1107                    } else {
1108                        redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1109                    }
1110                    MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
1111        
1112                    for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1113                        MessageDispatch md = iter.next();
1114                        md.getMessage().onMessageRolledBack();
1115                        // ensure we don't filter this as a duplicate
1116                        session.connection.rollbackDuplicate(this, md.getMessage());
1117                    }
1118        
1119                    if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
1120                        && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
1121                        // We need to NACK the messages so that they get sent to the
1122                        // DLQ.
1123                        // Acknowledge the last message.
1124                        
1125                        MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
1126                        ack.setPoisonCause(lastMd.getRollbackCause());
1127                                            ack.setFirstMessageId(firstMsgId);
1128                        session.sendAck(ack,true);
1129                        // Adjust the window size.
1130                        additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
1131                        redeliveryDelay = 0;
1132                    } else {
1133                        
1134                        // only redelivery_ack after first delivery
1135                        if (currentRedeliveryCount > 0) {
1136                            MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
1137                            ack.setFirstMessageId(firstMsgId);
1138                            session.sendAck(ack,true);
1139                        }
1140        
1141                        // stop the delivery of messages.
1142                        unconsumedMessages.stop();
1143        
1144                        for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
1145                            MessageDispatch md = iter.next();
1146                            unconsumedMessages.enqueueFirst(md);
1147                        }
1148        
1149                        if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
1150                            // Start up the delivery again a little later.
1151                            scheduler.executeAfterDelay(new Runnable() {
1152                                public void run() {
1153                                    try {
1154                                        if (started.get()) {
1155                                            start();
1156                                        }
1157                                    } catch (JMSException e) {
1158                                        session.connection.onAsyncException(e);
1159                                    }
1160                                }
1161                            }, redeliveryDelay);
1162                        } else {
1163                            start();
1164                        }
1165        
1166                    }
1167                    deliveredCounter -= deliveredMessages.size();
1168                    deliveredMessages.clear();
1169                }
1170            }
1171            if (messageListener.get() != null) {
1172                session.redispatch(this, unconsumedMessages);
1173            }
1174        }
1175    
1176        /*
1177         * called with unconsumedMessages && deliveredMessages locked
1178         * remove any message not re-delivered as they can't be replayed to this 
1179         * consumer on rollback
1180         */
1181        private void rollbackPreviouslyDeliveredAndNotRedelivered() {
1182            if (previouslyDeliveredMessages != null) {
1183                for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
1184                    if (!entry.getValue()) {              
1185                        removeFromDeliveredMessages(entry.getKey());
1186                    }
1187                }
1188                clearPreviouslyDelivered();
1189            }
1190        }
1191    
1192        /*
1193         * called with deliveredMessages locked
1194         */
1195        private void removeFromDeliveredMessages(MessageId key) {
1196            Iterator<MessageDispatch> iterator = deliveredMessages.iterator();
1197            while (iterator.hasNext()) {
1198                MessageDispatch candidate = iterator.next();
1199                if (key.equals(candidate.getMessage().getMessageId())) {
1200                    session.connection.rollbackDuplicate(this, candidate.getMessage());
1201                    iterator.remove();
1202                    break;
1203                }
1204            }
1205        }
1206        /*
1207         * called with deliveredMessages locked
1208         */
1209        private void clearPreviouslyDelivered() {
1210            if (previouslyDeliveredMessages != null) {
1211                previouslyDeliveredMessages.clear();
1212                previouslyDeliveredMessages = null;
1213            }
1214        }
1215    
1216        public void dispatch(MessageDispatch md) {
1217            MessageListener listener = this.messageListener.get();
1218            try {
1219                clearMessagesInProgress();
1220                clearDispatchList();
1221                synchronized (unconsumedMessages.getMutex()) {
1222                    if (!unconsumedMessages.isClosed()) {
1223                        if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
1224                            if (listener != null && unconsumedMessages.isRunning()) {
1225                                ActiveMQMessage message = createActiveMQMessage(md);
1226                                beforeMessageIsConsumed(md);
1227                                try {
1228                                    boolean expired = message.isExpired();
1229                                    if (!expired) {
1230                                        listener.onMessage(message);
1231                                    }
1232                                    afterMessageIsConsumed(md, expired);
1233                                } catch (RuntimeException e) {
1234                                    LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
1235                                    if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
1236                                        // schedual redelivery and possible dlq processing
1237                                        md.setRollbackCause(e);
1238                                        rollback();
1239                                    } else {
1240                                        // Transacted or Client ack: Deliver the
1241                                        // next message.
1242                                        afterMessageIsConsumed(md, false);
1243                                    }
1244                                }
1245                            } else {
1246                                if (!unconsumedMessages.isRunning()) {
1247                                    // delayed redelivery, ensure it can be re delivered
1248                                    session.connection.rollbackDuplicate(this, md.getMessage());
1249                                }
1250                                unconsumedMessages.enqueue(md);
1251                                if (availableListener != null) {
1252                                    availableListener.onMessageAvailable(this);
1253                                }
1254                            }
1255                        } else {
1256                            if (!session.isTransacted()) {
1257                                if (LOG.isDebugEnabled()) {
1258                                    LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage());
1259                                }
1260                                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
1261                                session.sendAck(ack);
1262                            } else {
1263                                if (LOG.isDebugEnabled()) {
1264                                    LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
1265                                }
1266                                boolean needsPoisonAck = false;
1267                                synchronized (deliveredMessages) {
1268                                    if (previouslyDeliveredMessages != null) {
1269                                        previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
1270                                    } else {
1271                                        // delivery while pending redelivery to another consumer on the same connection
1272                                        // not waiting for redelivery will help here
1273                                        needsPoisonAck = true;
1274                                    }
1275                                }
1276                                if (needsPoisonAck) {
1277                                    LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
1278                                            + " consumer on this connection, failoverRedeliveryWaitPeriod=" 
1279                                            + failoverRedeliveryWaitPeriod + ". Message: " + md);
1280                                    MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
1281                                    poisonAck.setFirstMessageId(md.getMessage().getMessageId());
1282                                    session.sendAck(poisonAck);
1283                                } else {
1284                                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
1285                                }
1286                            }
1287                        }
1288                    }
1289                }
1290                if (++dispatchedCount % 1000 == 0) {
1291                    dispatchedCount = 0;
1292                    Thread.yield();
1293                }
1294            } catch (Exception e) {
1295                session.connection.onClientInternalException(e);
1296            }
1297        }
1298    
1299        // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again
1300        private void clearDispatchList() {
1301            if (clearDispatchList) {
1302                synchronized (deliveredMessages) {  
1303                    if (clearDispatchList) {
1304                        if (!deliveredMessages.isEmpty()) {
1305                            if (session.isTransacted()) {    
1306                                if (LOG.isDebugEnabled()) {
1307                                    LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1308                                }
1309                                if (previouslyDeliveredMessages == null) {
1310                                    previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
1311                                }
1312                                for (MessageDispatch delivered : deliveredMessages) {
1313                                    previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
1314                                }
1315                            } else {
1316                                if (LOG.isDebugEnabled()) {
1317                                    LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
1318                                }
1319                                deliveredMessages.clear();
1320                                pendingAck = null;
1321                            }
1322                        }
1323                        clearDispatchList = false;
1324                    }
1325                }
1326            }
1327        }
1328    
1329        public int getMessageSize() {
1330            return unconsumedMessages.size();
1331        }
1332    
1333        public void start() throws JMSException {
1334            if (unconsumedMessages.isClosed()) {
1335                return;
1336            }
1337            started.set(true);
1338            unconsumedMessages.start();
1339            session.executor.wakeup();
1340        }
1341    
1342        public void stop() {
1343            started.set(false);
1344            unconsumedMessages.stop();
1345        }
1346    
1347        @Override
1348        public String toString() {
1349            return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get()
1350                   + " }";
1351        }
1352    
1353        /**
1354         * Delivers a message to the message listener.
1355         * 
1356         * @return
1357         * @throws JMSException
1358         */
1359        public boolean iterate() {
1360            MessageListener listener = this.messageListener.get();
1361            if (listener != null) {
1362                MessageDispatch md = unconsumedMessages.dequeueNoWait();
1363                if (md != null) {
1364                    dispatch(md);
1365                    return true;
1366                }
1367            }
1368            return false;
1369        }
1370    
1371        public boolean isInUse(ActiveMQTempDestination destination) {
1372            return info.getDestination().equals(destination);
1373        }
1374    
1375        public long getLastDeliveredSequenceId() {
1376            return lastDeliveredSequenceId;
1377        }
1378    
1379            public IOException getFailureError() {
1380                    return failureError;
1381            }
1382    
1383            public void setFailureError(IOException failureError) {
1384                    this.failureError = failureError;
1385            }
1386    }