001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.Executors; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicBoolean; 031 import java.util.concurrent.atomic.AtomicReference; 032 import javax.jms.IllegalStateException; 033 import javax.jms.InvalidDestinationException; 034 import javax.jms.JMSException; 035 import javax.jms.Message; 036 import javax.jms.MessageConsumer; 037 import javax.jms.MessageListener; 038 import javax.jms.TransactionRolledBackException; 039 import org.apache.activemq.blob.BlobDownloader; 040 import org.apache.activemq.command.ActiveMQBlobMessage; 041 import org.apache.activemq.command.ActiveMQDestination; 042 import org.apache.activemq.command.ActiveMQMessage; 043 import org.apache.activemq.command.ActiveMQTempDestination; 044 import org.apache.activemq.command.CommandTypes; 045 import org.apache.activemq.command.ConsumerId; 046 import org.apache.activemq.command.ConsumerInfo; 047 import org.apache.activemq.command.MessageAck; 048 import org.apache.activemq.command.MessageDispatch; 049 import org.apache.activemq.command.MessageId; 050 import org.apache.activemq.command.MessagePull; 051 import org.apache.activemq.command.RemoveInfo; 052 import org.apache.activemq.command.TransactionId; 053 import org.apache.activemq.management.JMSConsumerStatsImpl; 054 import org.apache.activemq.management.StatsCapable; 055 import org.apache.activemq.management.StatsImpl; 056 import org.apache.activemq.selector.SelectorParser; 057 import org.apache.activemq.thread.Scheduler; 058 import org.apache.activemq.transaction.Synchronization; 059 import org.apache.activemq.util.Callback; 060 import org.apache.activemq.util.IntrospectionSupport; 061 import org.apache.activemq.util.JMSExceptionSupport; 062 import org.slf4j.Logger; 063 import org.slf4j.LoggerFactory; 064 065 /** 066 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 067 * from a destination. A <CODE> MessageConsumer</CODE> object is created by 068 * passing a <CODE>Destination</CODE> object to a message-consumer creation 069 * method supplied by a session. 070 * <P> 071 * <CODE>MessageConsumer</CODE> is the parent interface for all message 072 * consumers. 073 * <P> 074 * A message consumer can be created with a message selector. A message selector 075 * allows the client to restrict the messages delivered to the message consumer 076 * to those that match the selector. 077 * <P> 078 * A client may either synchronously receive a message consumer's messages or 079 * have the consumer asynchronously deliver them as they arrive. 080 * <P> 081 * For synchronous receipt, a client can request the next message from a message 082 * consumer using one of its <CODE> receive</CODE> methods. There are several 083 * variations of <CODE>receive</CODE> that allow a client to poll or wait for 084 * the next message. 085 * <P> 086 * For asynchronous delivery, a client can register a 087 * <CODE>MessageListener</CODE> object with a message consumer. As messages 088 * arrive at the message consumer, it delivers them by calling the 089 * <CODE>MessageListener</CODE>'s<CODE> 090 * onMessage</CODE> method. 091 * <P> 092 * It is a client programming error for a <CODE>MessageListener</CODE> to 093 * throw an exception. 094 * 095 * 096 * @see javax.jms.MessageConsumer 097 * @see javax.jms.QueueReceiver 098 * @see javax.jms.TopicSubscriber 099 * @see javax.jms.Session 100 */ 101 public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { 102 103 @SuppressWarnings("serial") 104 class PreviouslyDeliveredMap<K, V> extends HashMap<K, V> { 105 final TransactionId transactionId; 106 public PreviouslyDeliveredMap(TransactionId transactionId) { 107 this.transactionId = transactionId; 108 } 109 } 110 111 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class); 112 protected final Scheduler scheduler; 113 protected final ActiveMQSession session; 114 protected final ConsumerInfo info; 115 116 // These are the messages waiting to be delivered to the client 117 protected final MessageDispatchChannel unconsumedMessages; 118 119 // The are the messages that were delivered to the consumer but that have 120 // not been acknowledged. It's kept in reverse order since we 121 // Always walk list in reverse order. 122 private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); 123 // track duplicate deliveries in a transaction such that the tx integrity can be validated 124 private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; 125 private int deliveredCounter; 126 private int additionalWindowSize; 127 private long redeliveryDelay; 128 private int ackCounter; 129 private int dispatchedCount; 130 private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>(); 131 private final JMSConsumerStatsImpl stats; 132 133 private final String selector; 134 private boolean synchronizationRegistered; 135 private final AtomicBoolean started = new AtomicBoolean(false); 136 137 private MessageAvailableListener availableListener; 138 139 private RedeliveryPolicy redeliveryPolicy; 140 private boolean optimizeAcknowledge; 141 private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); 142 private ExecutorService executorService; 143 private MessageTransformer transformer; 144 private boolean clearDispatchList; 145 boolean inProgressClearRequiredFlag; 146 147 private MessageAck pendingAck; 148 private long lastDeliveredSequenceId; 149 150 private IOException failureError; 151 152 private long optimizeAckTimestamp = System.currentTimeMillis(); 153 private final long optimizeAckTimeout = 300; 154 private long failoverRedeliveryWaitPeriod = 0; 155 156 /** 157 * Create a MessageConsumer 158 * 159 * @param session 160 * @param dest 161 * @param name 162 * @param selector 163 * @param prefetch 164 * @param maximumPendingMessageCount 165 * @param noLocal 166 * @param browser 167 * @param dispatchAsync 168 * @param messageListener 169 * @throws JMSException 170 */ 171 public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, 172 String name, String selector, int prefetch, 173 int maximumPendingMessageCount, boolean noLocal, boolean browser, 174 boolean dispatchAsync, MessageListener messageListener) throws JMSException { 175 if (dest == null) { 176 throw new InvalidDestinationException("Don't understand null destinations"); 177 } else if (dest.getPhysicalName() == null) { 178 throw new InvalidDestinationException("The destination object was not given a physical name."); 179 } else if (dest.isTemporary()) { 180 String physicalName = dest.getPhysicalName(); 181 182 if (physicalName == null) { 183 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); 184 } 185 186 String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); 187 188 if (physicalName.indexOf(connectionID) < 0) { 189 throw new InvalidDestinationException( 190 "Cannot use a Temporary destination from another Connection"); 191 } 192 193 if (session.connection.isDeleted(dest)) { 194 throw new InvalidDestinationException( 195 "Cannot use a Temporary destination that has been deleted"); 196 } 197 if (prefetch < 0) { 198 throw new JMSException("Cannot have a prefetch size less than zero"); 199 } 200 } 201 if (session.connection.isMessagePrioritySupported()) { 202 this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); 203 }else { 204 this.unconsumedMessages = new FifoMessageDispatchChannel(); 205 } 206 207 this.session = session; 208 this.scheduler = session.getScheduler(); 209 this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); 210 setTransformer(session.getTransformer()); 211 212 this.info = new ConsumerInfo(consumerId); 213 this.info.setExclusive(this.session.connection.isExclusiveConsumer()); 214 this.info.setSubscriptionName(name); 215 this.info.setPrefetchSize(prefetch); 216 this.info.setCurrentPrefetchSize(prefetch); 217 this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); 218 this.info.setNoLocal(noLocal); 219 this.info.setDispatchAsync(dispatchAsync); 220 this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); 221 this.info.setSelector(null); 222 223 // Allows the options on the destination to configure the consumerInfo 224 if (dest.getOptions() != null) { 225 Map<String, String> options = new HashMap<String, String>(dest.getOptions()); 226 IntrospectionSupport.setProperties(this.info, options, "consumer."); 227 } 228 229 this.info.setDestination(dest); 230 this.info.setBrowser(browser); 231 if (selector != null && selector.trim().length() != 0) { 232 // Validate the selector 233 SelectorParser.parse(selector); 234 this.info.setSelector(selector); 235 this.selector = selector; 236 } else if (info.getSelector() != null) { 237 // Validate the selector 238 SelectorParser.parse(this.info.getSelector()); 239 this.selector = this.info.getSelector(); 240 } else { 241 this.selector = null; 242 } 243 244 this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); 245 this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() 246 && !info.isBrowser(); 247 this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); 248 this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); 249 if (messageListener != null) { 250 setMessageListener(messageListener); 251 } 252 try { 253 this.session.addConsumer(this); 254 this.session.syncSendPacket(info); 255 } catch (JMSException e) { 256 this.session.removeConsumer(this); 257 throw e; 258 } 259 260 if (session.connection.isStarted()) { 261 start(); 262 } 263 } 264 265 private boolean isAutoAcknowledgeEach() { 266 return session.isAutoAcknowledge() || ( session.isDupsOkAcknowledge() && getDestination().isQueue() ); 267 } 268 269 private boolean isAutoAcknowledgeBatch() { 270 return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; 271 } 272 273 public StatsImpl getStats() { 274 return stats; 275 } 276 277 public JMSConsumerStatsImpl getConsumerStats() { 278 return stats; 279 } 280 281 public RedeliveryPolicy getRedeliveryPolicy() { 282 return redeliveryPolicy; 283 } 284 285 /** 286 * Sets the redelivery policy used when messages are redelivered 287 */ 288 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 289 this.redeliveryPolicy = redeliveryPolicy; 290 } 291 292 public MessageTransformer getTransformer() { 293 return transformer; 294 } 295 296 /** 297 * Sets the transformer used to transform messages before they are sent on 298 * to the JMS bus 299 */ 300 public void setTransformer(MessageTransformer transformer) { 301 this.transformer = transformer; 302 } 303 304 /** 305 * @return Returns the value. 306 */ 307 public ConsumerId getConsumerId() { 308 return info.getConsumerId(); 309 } 310 311 /** 312 * @return the consumer name - used for durable consumers 313 */ 314 public String getConsumerName() { 315 return this.info.getSubscriptionName(); 316 } 317 318 /** 319 * @return true if this consumer does not accept locally produced messages 320 */ 321 protected boolean isNoLocal() { 322 return info.isNoLocal(); 323 } 324 325 /** 326 * Retrieve is a browser 327 * 328 * @return true if a browser 329 */ 330 protected boolean isBrowser() { 331 return info.isBrowser(); 332 } 333 334 /** 335 * @return ActiveMQDestination 336 */ 337 protected ActiveMQDestination getDestination() { 338 return info.getDestination(); 339 } 340 341 /** 342 * @return Returns the prefetchNumber. 343 */ 344 public int getPrefetchNumber() { 345 return info.getPrefetchSize(); 346 } 347 348 /** 349 * @return true if this is a durable topic subscriber 350 */ 351 public boolean isDurableSubscriber() { 352 return info.getSubscriptionName() != null && info.getDestination().isTopic(); 353 } 354 355 /** 356 * Gets this message consumer's message selector expression. 357 * 358 * @return this message consumer's message selector, or null if no message 359 * selector exists for the message consumer (that is, if the message 360 * selector was not set or was set to null or the empty string) 361 * @throws JMSException if the JMS provider fails to receive the next 362 * message due to some internal error. 363 */ 364 public String getMessageSelector() throws JMSException { 365 checkClosed(); 366 return selector; 367 } 368 369 /** 370 * Gets the message consumer's <CODE>MessageListener</CODE>. 371 * 372 * @return the listener for the message consumer, or null if no listener is 373 * set 374 * @throws JMSException if the JMS provider fails to get the message 375 * listener due to some internal error. 376 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) 377 */ 378 public MessageListener getMessageListener() throws JMSException { 379 checkClosed(); 380 return this.messageListener.get(); 381 } 382 383 /** 384 * Sets the message consumer's <CODE>MessageListener</CODE>. 385 * <P> 386 * Setting the message listener to null is the equivalent of unsetting the 387 * message listener for the message consumer. 388 * <P> 389 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> 390 * while messages are being consumed by an existing listener or the consumer 391 * is being used to consume messages synchronously is undefined. 392 * 393 * @param listener the listener to which the messages are to be delivered 394 * @throws JMSException if the JMS provider fails to receive the next 395 * message due to some internal error. 396 * @see javax.jms.MessageConsumer#getMessageListener 397 */ 398 public void setMessageListener(MessageListener listener) throws JMSException { 399 checkClosed(); 400 if (info.getPrefetchSize() == 0) { 401 throw new JMSException( 402 "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); 403 } 404 if (listener != null) { 405 boolean wasRunning = session.isRunning(); 406 if (wasRunning) { 407 session.stop(); 408 } 409 410 this.messageListener.set(listener); 411 session.redispatch(this, unconsumedMessages); 412 413 if (wasRunning) { 414 session.start(); 415 } 416 } else { 417 this.messageListener.set(null); 418 } 419 } 420 421 public MessageAvailableListener getAvailableListener() { 422 return availableListener; 423 } 424 425 /** 426 * Sets the listener used to notify synchronous consumers that there is a 427 * message available so that the {@link MessageConsumer#receiveNoWait()} can 428 * be called. 429 */ 430 public void setAvailableListener(MessageAvailableListener availableListener) { 431 this.availableListener = availableListener; 432 } 433 434 /** 435 * Used to get an enqueued message from the unconsumedMessages list. The 436 * amount of time this method blocks is based on the timeout value. - if 437 * timeout==-1 then it blocks until a message is received. - if timeout==0 438 * then it it tries to not block at all, it returns a message if it is 439 * available - if timeout>0 then it blocks up to timeout amount of time. 440 * Expired messages will consumed by this method. 441 * 442 * @throws JMSException 443 * @return null if we timeout or if the consumer is closed. 444 */ 445 private MessageDispatch dequeue(long timeout) throws JMSException { 446 try { 447 long deadline = 0; 448 if (timeout > 0) { 449 deadline = System.currentTimeMillis() + timeout; 450 } 451 while (true) { 452 MessageDispatch md = unconsumedMessages.dequeue(timeout); 453 if (md == null) { 454 if (timeout > 0 && !unconsumedMessages.isClosed()) { 455 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 456 } else { 457 if (failureError != null) { 458 throw JMSExceptionSupport.create(failureError); 459 } else { 460 return null; 461 } 462 } 463 } else if (md.getMessage() == null) { 464 return null; 465 } else if (md.getMessage().isExpired()) { 466 if (LOG.isDebugEnabled()) { 467 LOG.debug(getConsumerId() + " received expired message: " + md); 468 } 469 beforeMessageIsConsumed(md); 470 afterMessageIsConsumed(md, true); 471 if (timeout > 0) { 472 timeout = Math.max(deadline - System.currentTimeMillis(), 0); 473 } 474 } else { 475 if (LOG.isTraceEnabled()) { 476 LOG.trace(getConsumerId() + " received message: " + md); 477 } 478 return md; 479 } 480 } 481 } catch (InterruptedException e) { 482 Thread.currentThread().interrupt(); 483 throw JMSExceptionSupport.create(e); 484 } 485 } 486 487 /** 488 * Receives the next message produced for this message consumer. 489 * <P> 490 * This call blocks indefinitely until a message is produced or until this 491 * message consumer is closed. 492 * <P> 493 * If this <CODE>receive</CODE> is done within a transaction, the consumer 494 * retains the message until the transaction commits. 495 * 496 * @return the next message produced for this message consumer, or null if 497 * this message consumer is concurrently closed 498 */ 499 public Message receive() throws JMSException { 500 checkClosed(); 501 checkMessageListener(); 502 503 sendPullCommand(0); 504 MessageDispatch md = dequeue(-1); 505 if (md == null) { 506 return null; 507 } 508 509 beforeMessageIsConsumed(md); 510 afterMessageIsConsumed(md, false); 511 512 return createActiveMQMessage(md); 513 } 514 515 /** 516 * @param md 517 * @return 518 */ 519 private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { 520 ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); 521 if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 522 ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy())); 523 } 524 if (transformer != null) { 525 Message transformedMessage = transformer.consumerTransform(session, this, m); 526 if (transformedMessage != null) { 527 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); 528 } 529 } 530 if (session.isClientAcknowledge()) { 531 m.setAcknowledgeCallback(new Callback() { 532 public void execute() throws Exception { 533 session.checkClosed(); 534 session.acknowledge(); 535 } 536 }); 537 }else if (session.isIndividualAcknowledge()) { 538 m.setAcknowledgeCallback(new Callback() { 539 public void execute() throws Exception { 540 session.checkClosed(); 541 acknowledge(md); 542 } 543 }); 544 } 545 return m; 546 } 547 548 /** 549 * Receives the next message that arrives within the specified timeout 550 * interval. 551 * <P> 552 * This call blocks until a message arrives, the timeout expires, or this 553 * message consumer is closed. A <CODE>timeout</CODE> of zero never 554 * expires, and the call blocks indefinitely. 555 * 556 * @param timeout the timeout value (in milliseconds), a time out of zero 557 * never expires. 558 * @return the next message produced for this message consumer, or null if 559 * the timeout expires or this message consumer is concurrently 560 * closed 561 */ 562 public Message receive(long timeout) throws JMSException { 563 checkClosed(); 564 checkMessageListener(); 565 if (timeout == 0) { 566 return this.receive(); 567 568 } 569 570 sendPullCommand(timeout); 571 while (timeout > 0) { 572 573 MessageDispatch md; 574 if (info.getPrefetchSize() == 0) { 575 md = dequeue(-1); // We let the broker let us know when we timeout. 576 } else { 577 md = dequeue(timeout); 578 } 579 580 if (md == null) { 581 return null; 582 } 583 584 beforeMessageIsConsumed(md); 585 afterMessageIsConsumed(md, false); 586 return createActiveMQMessage(md); 587 } 588 return null; 589 } 590 591 /** 592 * Receives the next message if one is immediately available. 593 * 594 * @return the next message produced for this message consumer, or null if 595 * one is not available 596 * @throws JMSException if the JMS provider fails to receive the next 597 * message due to some internal error. 598 */ 599 public Message receiveNoWait() throws JMSException { 600 checkClosed(); 601 checkMessageListener(); 602 sendPullCommand(-1); 603 604 MessageDispatch md; 605 if (info.getPrefetchSize() == 0) { 606 md = dequeue(-1); // We let the broker let us know when we 607 // timeout. 608 } else { 609 md = dequeue(0); 610 } 611 612 if (md == null) { 613 return null; 614 } 615 616 beforeMessageIsConsumed(md); 617 afterMessageIsConsumed(md, false); 618 return createActiveMQMessage(md); 619 } 620 621 /** 622 * Closes the message consumer. 623 * <P> 624 * Since a provider may allocate some resources on behalf of a <CODE> 625 * MessageConsumer</CODE> 626 * outside the Java virtual machine, clients should close them when they are 627 * not needed. Relying on garbage collection to eventually reclaim these 628 * resources may not be timely enough. 629 * <P> 630 * This call blocks until a <CODE>receive</CODE> or message listener in 631 * progress has completed. A blocked message consumer <CODE>receive </CODE> 632 * call returns null when this message consumer is closed. 633 * 634 * @throws JMSException if the JMS provider fails to close the consumer due 635 * to some internal error. 636 */ 637 public void close() throws JMSException { 638 if (!unconsumedMessages.isClosed()) { 639 if (session.getTransactionContext().isInTransaction()) { 640 session.getTransactionContext().addSynchronization(new Synchronization() { 641 @Override 642 public void afterCommit() throws Exception { 643 doClose(); 644 } 645 646 @Override 647 public void afterRollback() throws Exception { 648 doClose(); 649 } 650 }); 651 } else { 652 doClose(); 653 } 654 } 655 } 656 657 void doClose() throws JMSException { 658 dispose(); 659 RemoveInfo removeCommand = info.createRemoveCommand(); 660 if (LOG.isDebugEnabled()) { 661 LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId); 662 } 663 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 664 this.session.asyncSendPacket(removeCommand); 665 } 666 667 void inProgressClearRequired() { 668 inProgressClearRequiredFlag = true; 669 // deal with delivered messages async to avoid lock contention with in progress acks 670 clearDispatchList = true; 671 } 672 673 void clearMessagesInProgress() { 674 if (inProgressClearRequiredFlag) { 675 synchronized (unconsumedMessages.getMutex()) { 676 if (inProgressClearRequiredFlag) { 677 if (LOG.isDebugEnabled()) { 678 LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt"); 679 } 680 // ensure unconsumed are rolledback up front as they may get redelivered to another consumer 681 List<MessageDispatch> list = unconsumedMessages.removeAll(); 682 if (!this.info.isBrowser()) { 683 for (MessageDispatch old : list) { 684 session.connection.rollbackDuplicate(this, old.getMessage()); 685 } 686 } 687 // allow dispatch on this connection to resume 688 session.connection.transportInterruptionProcessingComplete(); 689 inProgressClearRequiredFlag = false; 690 } 691 } 692 } 693 } 694 695 void deliverAcks() { 696 MessageAck ack = null; 697 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 698 if (isAutoAcknowledgeEach()) { 699 synchronized(deliveredMessages) { 700 ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 701 if (ack != null) { 702 deliveredMessages.clear(); 703 ackCounter = 0; 704 } else { 705 ack = pendingAck; 706 pendingAck = null; 707 } 708 } 709 } else if (pendingAck != null && pendingAck.isStandardAck()) { 710 ack = pendingAck; 711 pendingAck = null; 712 } 713 if (ack != null) { 714 final MessageAck ackToSend = ack; 715 716 if (executorService == null) { 717 executorService = Executors.newSingleThreadExecutor(); 718 } 719 executorService.submit(new Runnable() { 720 public void run() { 721 try { 722 session.sendAck(ackToSend,true); 723 } catch (JMSException e) { 724 LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); 725 } finally { 726 deliveryingAcknowledgements.set(false); 727 } 728 } 729 }); 730 } else { 731 deliveryingAcknowledgements.set(false); 732 } 733 } 734 } 735 736 public void dispose() throws JMSException { 737 if (!unconsumedMessages.isClosed()) { 738 739 // Do we have any acks we need to send out before closing? 740 // Ack any delivered messages now. 741 if (!session.getTransacted()) { 742 deliverAcks(); 743 if (isAutoAcknowledgeBatch()) { 744 acknowledge(); 745 } 746 } 747 if (executorService != null) { 748 executorService.shutdown(); 749 try { 750 executorService.awaitTermination(60, TimeUnit.SECONDS); 751 } catch (InterruptedException e) { 752 Thread.currentThread().interrupt(); 753 } 754 } 755 756 if (session.isClientAcknowledge()) { 757 if (!this.info.isBrowser()) { 758 // rollback duplicates that aren't acknowledged 759 List<MessageDispatch> tmp = null; 760 synchronized (this.deliveredMessages) { 761 tmp = new ArrayList<MessageDispatch>(this.deliveredMessages); 762 } 763 for (MessageDispatch old : tmp) { 764 this.session.connection.rollbackDuplicate(this, old.getMessage()); 765 } 766 tmp.clear(); 767 } 768 } 769 if (!session.isTransacted()) { 770 synchronized(deliveredMessages) { 771 deliveredMessages.clear(); 772 } 773 } 774 unconsumedMessages.close(); 775 this.session.removeConsumer(this); 776 List<MessageDispatch> list = unconsumedMessages.removeAll(); 777 if (!this.info.isBrowser()) { 778 for (MessageDispatch old : list) { 779 // ensure we don't filter this as a duplicate 780 session.connection.rollbackDuplicate(this, old.getMessage()); 781 } 782 } 783 } 784 } 785 786 /** 787 * @throws IllegalStateException 788 */ 789 protected void checkClosed() throws IllegalStateException { 790 if (unconsumedMessages.isClosed()) { 791 throw new IllegalStateException("The Consumer is closed"); 792 } 793 } 794 795 /** 796 * If we have a zero prefetch specified then send a pull command to the 797 * broker to pull a message we are about to receive 798 */ 799 protected void sendPullCommand(long timeout) throws JMSException { 800 clearDispatchList(); 801 if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { 802 MessagePull messagePull = new MessagePull(); 803 messagePull.configure(info); 804 messagePull.setTimeout(timeout); 805 session.asyncSendPacket(messagePull); 806 } 807 } 808 809 protected void checkMessageListener() throws JMSException { 810 session.checkMessageListener(); 811 } 812 813 protected void setOptimizeAcknowledge(boolean value) { 814 if (optimizeAcknowledge && !value) { 815 deliverAcks(); 816 } 817 optimizeAcknowledge = value; 818 } 819 820 protected void setPrefetchSize(int prefetch) { 821 deliverAcks(); 822 this.info.setCurrentPrefetchSize(prefetch); 823 } 824 825 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { 826 md.setDeliverySequenceId(session.getNextDeliveryId()); 827 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 828 if (!isAutoAcknowledgeBatch()) { 829 synchronized(deliveredMessages) { 830 deliveredMessages.addFirst(md); 831 } 832 if (session.getTransacted()) { 833 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 834 } 835 } 836 } 837 838 private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { 839 if (unconsumedMessages.isClosed()) { 840 return; 841 } 842 if (messageExpired) { 843 synchronized (deliveredMessages) { 844 deliveredMessages.remove(md); 845 } 846 stats.getExpiredMessageCount().increment(); 847 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 848 } else { 849 stats.onMessage(); 850 if (session.getTransacted()) { 851 // Do nothing. 852 } else if (isAutoAcknowledgeEach()) { 853 if (deliveryingAcknowledgements.compareAndSet(false, true)) { 854 synchronized (deliveredMessages) { 855 if (!deliveredMessages.isEmpty()) { 856 if (optimizeAcknowledge) { 857 ackCounter++; 858 if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) { 859 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 860 if (ack != null) { 861 deliveredMessages.clear(); 862 ackCounter = 0; 863 session.sendAck(ack); 864 optimizeAckTimestamp = System.currentTimeMillis(); 865 } 866 } 867 } else { 868 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 869 if (ack!=null) { 870 deliveredMessages.clear(); 871 session.sendAck(ack); 872 } 873 } 874 } 875 } 876 deliveryingAcknowledgements.set(false); 877 } 878 } else if (isAutoAcknowledgeBatch()) { 879 ackLater(md, MessageAck.STANDARD_ACK_TYPE); 880 } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { 881 boolean messageUnackedByConsumer = false; 882 synchronized (deliveredMessages) { 883 messageUnackedByConsumer = deliveredMessages.contains(md); 884 } 885 if (messageUnackedByConsumer) { 886 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 887 } 888 } 889 else { 890 throw new IllegalStateException("Invalid session state."); 891 } 892 } 893 } 894 895 /** 896 * Creates a MessageAck for all messages contained in deliveredMessages. 897 * Caller should hold the lock for deliveredMessages. 898 * 899 * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 900 * @return <code>null</code> if nothing to ack. 901 */ 902 private MessageAck makeAckForAllDeliveredMessages(byte type) { 903 synchronized (deliveredMessages) { 904 if (deliveredMessages.isEmpty()) 905 return null; 906 907 MessageDispatch md = deliveredMessages.getFirst(); 908 MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); 909 ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); 910 return ack; 911 } 912 } 913 914 private void ackLater(MessageDispatch md, byte ackType) throws JMSException { 915 916 // Don't acknowledge now, but we may need to let the broker know the 917 // consumer got the message to expand the pre-fetch window 918 if (session.getTransacted()) { 919 session.doStartTransaction(); 920 if (!synchronizationRegistered) { 921 synchronizationRegistered = true; 922 session.getTransactionContext().addSynchronization(new Synchronization() { 923 @Override 924 public void beforeEnd() throws Exception { 925 acknowledge(); 926 synchronizationRegistered = false; 927 } 928 929 @Override 930 public void afterCommit() throws Exception { 931 commit(); 932 synchronizationRegistered = false; 933 } 934 935 @Override 936 public void afterRollback() throws Exception { 937 rollback(); 938 synchronizationRegistered = false; 939 } 940 }); 941 } 942 } 943 944 deliveredCounter++; 945 946 MessageAck oldPendingAck = pendingAck; 947 pendingAck = new MessageAck(md, ackType, deliveredCounter); 948 pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); 949 if( oldPendingAck==null ) { 950 pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); 951 } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { 952 pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); 953 } else { 954 // old pending ack being superseded by ack of another type, if is is not a delivered 955 // ack and hence important, send it now so it is not lost. 956 if ( !oldPendingAck.isDeliveredAck()) { 957 if (LOG.isDebugEnabled()) { 958 LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 959 } 960 session.sendAck(oldPendingAck); 961 } else { 962 if (LOG.isDebugEnabled()) { 963 LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck); 964 } 965 } 966 } 967 968 if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { 969 session.sendAck(pendingAck); 970 pendingAck=null; 971 deliveredCounter = 0; 972 additionalWindowSize = 0; 973 } 974 } 975 976 /** 977 * Acknowledge all the messages that have been delivered to the client up to 978 * this point. 979 * 980 * @throws JMSException 981 */ 982 public void acknowledge() throws JMSException { 983 clearDispatchList(); 984 waitForRedeliveries(); 985 synchronized(deliveredMessages) { 986 // Acknowledge all messages so far. 987 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); 988 if (ack == null) 989 return; // no msgs 990 991 if (session.getTransacted()) { 992 rollbackOnFailedRecoveryRedelivery(); 993 session.doStartTransaction(); 994 ack.setTransactionId(session.getTransactionContext().getTransactionId()); 995 } 996 session.sendAck(ack); 997 pendingAck = null; 998 999 // Adjust the counters 1000 deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); 1001 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1002 1003 if (!session.getTransacted()) { 1004 deliveredMessages.clear(); 1005 } 1006 } 1007 } 1008 1009 private void waitForRedeliveries() { 1010 if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) { 1011 long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod; 1012 int numberNotReplayed; 1013 do { 1014 numberNotReplayed = 0; 1015 synchronized(deliveredMessages) { 1016 if (previouslyDeliveredMessages != null) { 1017 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1018 if (!entry.getValue()) { 1019 numberNotReplayed++; 1020 } 1021 } 1022 } 1023 } 1024 if (numberNotReplayed > 0) { 1025 LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: " 1026 + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId()); 1027 try { 1028 Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); 1029 } catch (InterruptedException outOfhere) { 1030 break; 1031 } 1032 } 1033 } while (numberNotReplayed > 0 && expiry < System.currentTimeMillis()); 1034 } 1035 } 1036 1037 /* 1038 * called with deliveredMessages locked 1039 */ 1040 private void rollbackOnFailedRecoveryRedelivery() throws JMSException { 1041 if (previouslyDeliveredMessages != null) { 1042 // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback 1043 // as messages have been dispatched else where. 1044 int numberNotReplayed = 0; 1045 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1046 if (!entry.getValue()) { 1047 numberNotReplayed++; 1048 if (LOG.isDebugEnabled()) { 1049 LOG.debug("previously delivered message has not been replayed in transaction: " 1050 + previouslyDeliveredMessages.transactionId 1051 + " , messageId: " + entry.getKey()); 1052 } 1053 } 1054 } 1055 if (numberNotReplayed > 0) { 1056 String message = "rolling back transaction (" 1057 + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed 1058 + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); 1059 LOG.warn(message); 1060 throw new TransactionRolledBackException(message); 1061 } 1062 } 1063 } 1064 1065 void acknowledge(MessageDispatch md) throws JMSException { 1066 MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); 1067 session.sendAck(ack); 1068 synchronized(deliveredMessages){ 1069 deliveredMessages.remove(md); 1070 } 1071 } 1072 1073 public void commit() throws JMSException { 1074 synchronized (deliveredMessages) { 1075 deliveredMessages.clear(); 1076 clearPreviouslyDelivered(); 1077 } 1078 redeliveryDelay = 0; 1079 } 1080 1081 public void rollback() throws JMSException { 1082 synchronized (unconsumedMessages.getMutex()) { 1083 if (optimizeAcknowledge) { 1084 // remove messages read but not acked at the broker yet through 1085 // optimizeAcknowledge 1086 if (!this.info.isBrowser()) { 1087 synchronized(deliveredMessages) { 1088 for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { 1089 // ensure we don't filter this as a duplicate 1090 MessageDispatch md = deliveredMessages.removeLast(); 1091 session.connection.rollbackDuplicate(this, md.getMessage()); 1092 } 1093 } 1094 } 1095 } 1096 synchronized(deliveredMessages) { 1097 rollbackPreviouslyDeliveredAndNotRedelivered(); 1098 if (deliveredMessages.isEmpty()) { 1099 return; 1100 } 1101 1102 // use initial delay for first redelivery 1103 MessageDispatch lastMd = deliveredMessages.getFirst(); 1104 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); 1105 if (currentRedeliveryCount > 0) { 1106 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 1107 } else { 1108 redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 1109 } 1110 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); 1111 1112 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1113 MessageDispatch md = iter.next(); 1114 md.getMessage().onMessageRolledBack(); 1115 // ensure we don't filter this as a duplicate 1116 session.connection.rollbackDuplicate(this, md.getMessage()); 1117 } 1118 1119 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 1120 && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { 1121 // We need to NACK the messages so that they get sent to the 1122 // DLQ. 1123 // Acknowledge the last message. 1124 1125 MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); 1126 ack.setPoisonCause(lastMd.getRollbackCause()); 1127 ack.setFirstMessageId(firstMsgId); 1128 session.sendAck(ack,true); 1129 // Adjust the window size. 1130 additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); 1131 redeliveryDelay = 0; 1132 } else { 1133 1134 // only redelivery_ack after first delivery 1135 if (currentRedeliveryCount > 0) { 1136 MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); 1137 ack.setFirstMessageId(firstMsgId); 1138 session.sendAck(ack,true); 1139 } 1140 1141 // stop the delivery of messages. 1142 unconsumedMessages.stop(); 1143 1144 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) { 1145 MessageDispatch md = iter.next(); 1146 unconsumedMessages.enqueueFirst(md); 1147 } 1148 1149 if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { 1150 // Start up the delivery again a little later. 1151 scheduler.executeAfterDelay(new Runnable() { 1152 public void run() { 1153 try { 1154 if (started.get()) { 1155 start(); 1156 } 1157 } catch (JMSException e) { 1158 session.connection.onAsyncException(e); 1159 } 1160 } 1161 }, redeliveryDelay); 1162 } else { 1163 start(); 1164 } 1165 1166 } 1167 deliveredCounter -= deliveredMessages.size(); 1168 deliveredMessages.clear(); 1169 } 1170 } 1171 if (messageListener.get() != null) { 1172 session.redispatch(this, unconsumedMessages); 1173 } 1174 } 1175 1176 /* 1177 * called with unconsumedMessages && deliveredMessages locked 1178 * remove any message not re-delivered as they can't be replayed to this 1179 * consumer on rollback 1180 */ 1181 private void rollbackPreviouslyDeliveredAndNotRedelivered() { 1182 if (previouslyDeliveredMessages != null) { 1183 for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) { 1184 if (!entry.getValue()) { 1185 removeFromDeliveredMessages(entry.getKey()); 1186 } 1187 } 1188 clearPreviouslyDelivered(); 1189 } 1190 } 1191 1192 /* 1193 * called with deliveredMessages locked 1194 */ 1195 private void removeFromDeliveredMessages(MessageId key) { 1196 Iterator<MessageDispatch> iterator = deliveredMessages.iterator(); 1197 while (iterator.hasNext()) { 1198 MessageDispatch candidate = iterator.next(); 1199 if (key.equals(candidate.getMessage().getMessageId())) { 1200 session.connection.rollbackDuplicate(this, candidate.getMessage()); 1201 iterator.remove(); 1202 break; 1203 } 1204 } 1205 } 1206 /* 1207 * called with deliveredMessages locked 1208 */ 1209 private void clearPreviouslyDelivered() { 1210 if (previouslyDeliveredMessages != null) { 1211 previouslyDeliveredMessages.clear(); 1212 previouslyDeliveredMessages = null; 1213 } 1214 } 1215 1216 public void dispatch(MessageDispatch md) { 1217 MessageListener listener = this.messageListener.get(); 1218 try { 1219 clearMessagesInProgress(); 1220 clearDispatchList(); 1221 synchronized (unconsumedMessages.getMutex()) { 1222 if (!unconsumedMessages.isClosed()) { 1223 if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { 1224 if (listener != null && unconsumedMessages.isRunning()) { 1225 ActiveMQMessage message = createActiveMQMessage(md); 1226 beforeMessageIsConsumed(md); 1227 try { 1228 boolean expired = message.isExpired(); 1229 if (!expired) { 1230 listener.onMessage(message); 1231 } 1232 afterMessageIsConsumed(md, expired); 1233 } catch (RuntimeException e) { 1234 LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e); 1235 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { 1236 // schedual redelivery and possible dlq processing 1237 md.setRollbackCause(e); 1238 rollback(); 1239 } else { 1240 // Transacted or Client ack: Deliver the 1241 // next message. 1242 afterMessageIsConsumed(md, false); 1243 } 1244 } 1245 } else { 1246 if (!unconsumedMessages.isRunning()) { 1247 // delayed redelivery, ensure it can be re delivered 1248 session.connection.rollbackDuplicate(this, md.getMessage()); 1249 } 1250 unconsumedMessages.enqueue(md); 1251 if (availableListener != null) { 1252 availableListener.onMessageAvailable(this); 1253 } 1254 } 1255 } else { 1256 if (!session.isTransacted()) { 1257 if (LOG.isDebugEnabled()) { 1258 LOG.debug(getConsumerId() + " ignoring (auto acking) duplicate: " + md.getMessage()); 1259 } 1260 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 1261 session.sendAck(ack); 1262 } else { 1263 if (LOG.isDebugEnabled()) { 1264 LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage()); 1265 } 1266 boolean needsPoisonAck = false; 1267 synchronized (deliveredMessages) { 1268 if (previouslyDeliveredMessages != null) { 1269 previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); 1270 } else { 1271 // delivery while pending redelivery to another consumer on the same connection 1272 // not waiting for redelivery will help here 1273 needsPoisonAck = true; 1274 } 1275 } 1276 if (needsPoisonAck) { 1277 LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" 1278 + " consumer on this connection, failoverRedeliveryWaitPeriod=" 1279 + failoverRedeliveryWaitPeriod + ". Message: " + md); 1280 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 1281 poisonAck.setFirstMessageId(md.getMessage().getMessageId()); 1282 session.sendAck(poisonAck); 1283 } else { 1284 ackLater(md, MessageAck.DELIVERED_ACK_TYPE); 1285 } 1286 } 1287 } 1288 } 1289 } 1290 if (++dispatchedCount % 1000 == 0) { 1291 dispatchedCount = 0; 1292 Thread.yield(); 1293 } 1294 } catch (Exception e) { 1295 session.connection.onClientInternalException(e); 1296 } 1297 } 1298 1299 // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again 1300 private void clearDispatchList() { 1301 if (clearDispatchList) { 1302 synchronized (deliveredMessages) { 1303 if (clearDispatchList) { 1304 if (!deliveredMessages.isEmpty()) { 1305 if (session.isTransacted()) { 1306 if (LOG.isDebugEnabled()) { 1307 LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1308 } 1309 if (previouslyDeliveredMessages == null) { 1310 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId()); 1311 } 1312 for (MessageDispatch delivered : deliveredMessages) { 1313 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); 1314 } 1315 } else { 1316 if (LOG.isDebugEnabled()) { 1317 LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); 1318 } 1319 deliveredMessages.clear(); 1320 pendingAck = null; 1321 } 1322 } 1323 clearDispatchList = false; 1324 } 1325 } 1326 } 1327 } 1328 1329 public int getMessageSize() { 1330 return unconsumedMessages.size(); 1331 } 1332 1333 public void start() throws JMSException { 1334 if (unconsumedMessages.isClosed()) { 1335 return; 1336 } 1337 started.set(true); 1338 unconsumedMessages.start(); 1339 session.executor.wakeup(); 1340 } 1341 1342 public void stop() { 1343 started.set(false); 1344 unconsumedMessages.stop(); 1345 } 1346 1347 @Override 1348 public String toString() { 1349 return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() 1350 + " }"; 1351 } 1352 1353 /** 1354 * Delivers a message to the message listener. 1355 * 1356 * @return 1357 * @throws JMSException 1358 */ 1359 public boolean iterate() { 1360 MessageListener listener = this.messageListener.get(); 1361 if (listener != null) { 1362 MessageDispatch md = unconsumedMessages.dequeueNoWait(); 1363 if (md != null) { 1364 dispatch(md); 1365 return true; 1366 } 1367 } 1368 return false; 1369 } 1370 1371 public boolean isInUse(ActiveMQTempDestination destination) { 1372 return info.getDestination().equals(destination); 1373 } 1374 1375 public long getLastDeliveredSequenceId() { 1376 return lastDeliveredSequenceId; 1377 } 1378 1379 public IOException getFailureError() { 1380 return failureError; 1381 } 1382 1383 public void setFailureError(IOException failureError) { 1384 this.failureError = failureError; 1385 } 1386 }