001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.util.AbstractList; 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.Collections; 024 import java.util.Comparator; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.LinkedHashMap; 028 import java.util.LinkedList; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 import java.util.concurrent.CancellationException; 033 import java.util.concurrent.CopyOnWriteArraySet; 034 import java.util.concurrent.CountDownLatch; 035 import java.util.concurrent.DelayQueue; 036 import java.util.concurrent.Delayed; 037 import java.util.concurrent.ExecutorService; 038 import java.util.concurrent.Future; 039 import java.util.concurrent.TimeUnit; 040 import java.util.concurrent.atomic.AtomicLong; 041 import java.util.concurrent.locks.Lock; 042 import java.util.concurrent.locks.ReentrantLock; 043 import java.util.concurrent.locks.ReentrantReadWriteLock; 044 import javax.jms.InvalidSelectorException; 045 import javax.jms.JMSException; 046 import javax.jms.ResourceAllocationException; 047 import org.apache.activemq.broker.BrokerService; 048 import org.apache.activemq.broker.ConnectionContext; 049 import org.apache.activemq.broker.ProducerBrokerExchange; 050 import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 051 import org.apache.activemq.broker.region.cursors.StoreQueueCursor; 052 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 053 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; 054 import org.apache.activemq.broker.region.group.MessageGroupMap; 055 import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 056 import org.apache.activemq.broker.region.policy.DispatchPolicy; 057 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 058 import org.apache.activemq.command.*; 059 import org.apache.activemq.filter.BooleanExpression; 060 import org.apache.activemq.filter.MessageEvaluationContext; 061 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 062 import org.apache.activemq.security.SecurityContext; 063 import org.apache.activemq.selector.SelectorParser; 064 import org.apache.activemq.store.MessageRecoveryListener; 065 import org.apache.activemq.store.MessageStore; 066 import org.apache.activemq.thread.Scheduler; 067 import org.apache.activemq.thread.Task; 068 import org.apache.activemq.thread.TaskRunner; 069 import org.apache.activemq.thread.TaskRunnerFactory; 070 import org.apache.activemq.transaction.Synchronization; 071 import org.apache.activemq.usage.Usage; 072 import org.apache.activemq.usage.UsageListener; 073 import org.apache.activemq.util.BrokerSupport; 074 import org.slf4j.Logger; 075 import org.slf4j.LoggerFactory; 076 import org.slf4j.MDC; 077 078 /** 079 * The Queue is a List of MessageEntry objects that are dispatched to matching 080 * subscriptions. 081 * 082 * 083 */ 084 public class Queue extends BaseDestination implements Task, UsageListener { 085 protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); 086 protected final TaskRunnerFactory taskFactory; 087 protected TaskRunner taskRunner; 088 private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); 089 protected final List<Subscription> consumers = new ArrayList<Subscription>(50); 090 private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); 091 protected PendingMessageCursor messages; 092 private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); 093 private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>(); 094 // Messages that are paged in but have not yet been targeted at a 095 // subscription 096 private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); 097 private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100); 098 private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>(); 099 private MessageGroupMap messageGroupOwners; 100 private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); 101 private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); 102 final Lock sendLock = new ReentrantLock(); 103 private ExecutorService executor; 104 protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections 105 .synchronizedMap(new LinkedHashMap<MessageId, Runnable>()); 106 private boolean useConsumerPriority = true; 107 private boolean strictOrderDispatch = false; 108 private final QueueDispatchSelector dispatchSelector; 109 private boolean optimizedDispatch = false; 110 private boolean firstConsumer = false; 111 private int timeBeforeDispatchStarts = 0; 112 private int consumersBeforeDispatchStarts = 0; 113 private CountDownLatch consumersBeforeStartsLatch; 114 private final AtomicLong pendingWakeups = new AtomicLong(); 115 private boolean allConsumersExclusiveByDefault = false; 116 117 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 118 public void run() { 119 asyncWakeup(); 120 } 121 }; 122 private final Runnable expireMessagesTask = new Runnable() { 123 public void run() { 124 expireMessages(); 125 } 126 }; 127 128 private final Object iteratingMutex = new Object() { 129 }; 130 private final Scheduler scheduler; 131 132 class TimeoutMessage implements Delayed { 133 134 Message message; 135 ConnectionContext context; 136 long trigger; 137 138 public TimeoutMessage(Message message, ConnectionContext context, long delay) { 139 this.message = message; 140 this.context = context; 141 this.trigger = System.currentTimeMillis() + delay; 142 } 143 144 public long getDelay(TimeUnit unit) { 145 long n = trigger - System.currentTimeMillis(); 146 return unit.convert(n, TimeUnit.MILLISECONDS); 147 } 148 149 public int compareTo(Delayed delayed) { 150 long other = ((TimeoutMessage) delayed).trigger; 151 int returnValue; 152 if (this.trigger < other) { 153 returnValue = -1; 154 } else if (this.trigger > other) { 155 returnValue = 1; 156 } else { 157 returnValue = 0; 158 } 159 return returnValue; 160 } 161 162 } 163 164 DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>(); 165 166 class FlowControlTimeoutTask extends Thread { 167 168 @Override 169 public void run() { 170 TimeoutMessage timeout; 171 try { 172 while (true) { 173 timeout = flowControlTimeoutMessages.take(); 174 if (timeout != null) { 175 synchronized (messagesWaitingForSpace) { 176 if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) { 177 ExceptionResponse response = new ExceptionResponse( 178 new ResourceAllocationException( 179 "Usage Manager Memory Limit reached. Stopping producer (" 180 + timeout.message.getProducerId() 181 + ") to prevent flooding " 182 + getActiveMQDestination().getQualifiedName() 183 + "." 184 + " See http://activemq.apache.org/producer-flow-control.html for more info")); 185 response.setCorrelationId(timeout.message.getCommandId()); 186 timeout.context.getConnection().dispatchAsync(response); 187 } 188 } 189 } 190 } 191 } catch (InterruptedException e) { 192 if (LOG.isDebugEnabled()) { 193 LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping"); 194 } 195 } 196 } 197 }; 198 199 private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); 200 201 private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() { 202 203 public int compare(Subscription s1, Subscription s2) { 204 // We want the list sorted in descending order 205 return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority(); 206 } 207 }; 208 209 public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, 210 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 211 super(brokerService, store, destination, parentStats); 212 this.taskFactory = taskFactory; 213 this.dispatchSelector = new QueueDispatchSelector(destination); 214 this.scheduler = brokerService.getBroker().getScheduler(); 215 } 216 217 public List<Subscription> getConsumers() { 218 consumersLock.readLock().lock(); 219 try { 220 return new ArrayList<Subscription>(consumers); 221 }finally { 222 consumersLock.readLock().unlock(); 223 } 224 } 225 226 // make the queue easily visible in the debugger from its task runner 227 // threads 228 final class QueueThread extends Thread { 229 final Queue queue; 230 231 public QueueThread(Runnable runnable, String name, Queue queue) { 232 super(runnable, name); 233 this.queue = queue; 234 } 235 } 236 237 @Override 238 public void initialize() throws Exception { 239 if (this.messages == null) { 240 if (destination.isTemporary() || broker == null || store == null) { 241 this.messages = new VMPendingMessageCursor(isPrioritizedMessages()); 242 } else { 243 this.messages = new StoreQueueCursor(broker, this); 244 } 245 } 246 // If a VMPendingMessageCursor don't use the default Producer System 247 // Usage 248 // since it turns into a shared blocking queue which can lead to a 249 // network deadlock. 250 // If we are cursoring to disk..it's not and issue because it does not 251 // block due 252 // to large disk sizes. 253 if (messages instanceof VMPendingMessageCursor) { 254 this.systemUsage = brokerService.getSystemUsage(); 255 memoryUsage.setParent(systemUsage.getMemoryUsage()); 256 } 257 258 this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); 259 260 super.initialize(); 261 if (store != null) { 262 // Restore the persistent messages. 263 messages.setSystemUsage(systemUsage); 264 messages.setEnableAudit(isEnableAudit()); 265 messages.setMaxAuditDepth(getMaxAuditDepth()); 266 messages.setMaxProducersToAudit(getMaxProducersToAudit()); 267 messages.setUseCache(isUseCache()); 268 messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 269 if (messages.isRecoveryRequired()) { 270 store.recover(new MessageRecoveryListener() { 271 double totalMessageCount = store.getMessageCount(); 272 int recoveredMessageCount = 0; 273 274 public boolean recoverMessage(Message message) { 275 // Message could have expired while it was being 276 // loaded.. 277 if ((++recoveredMessageCount % 50000) == 0) { 278 LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered " 279 + recoveredMessageCount + " messages. " + 280 (int)(recoveredMessageCount*100/totalMessageCount) + "% complete"); 281 } 282 if (message.isExpired()) { 283 if (broker.isExpired(message)) { 284 messageExpired(createConnectionContext(), createMessageReference(message)); 285 // drop message will decrement so counter 286 // balance here 287 destinationStatistics.getMessages().increment(); 288 } 289 return true; 290 } 291 if (hasSpace()) { 292 message.setRegionDestination(Queue.this); 293 messagesLock.writeLock().lock(); 294 try{ 295 try { 296 messages.addMessageLast(message); 297 } catch (Exception e) { 298 LOG.error("Failed to add message to cursor", e); 299 } 300 }finally { 301 messagesLock.writeLock().unlock(); 302 } 303 destinationStatistics.getMessages().increment(); 304 return true; 305 } 306 return false; 307 } 308 309 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 310 throw new RuntimeException("Should not be called."); 311 } 312 313 public boolean hasSpace() { 314 return true; 315 } 316 317 public boolean isDuplicate(MessageId id) { 318 return false; 319 } 320 }); 321 } else { 322 int messageCount = store.getMessageCount(); 323 destinationStatistics.getMessages().setCount(messageCount); 324 } 325 } 326 } 327 328 /* 329 * Holder for subscription that needs attention on next iterate browser 330 * needs access to existing messages in the queue that have already been 331 * dispatched 332 */ 333 class BrowserDispatch { 334 QueueBrowserSubscription browser; 335 336 public BrowserDispatch(QueueBrowserSubscription browserSubscription) { 337 browser = browserSubscription; 338 browser.incrementQueueRef(); 339 } 340 341 void done() { 342 try { 343 browser.decrementQueueRef(); 344 } catch (Exception e) { 345 LOG.warn("decrement ref on browser: " + browser, e); 346 } 347 } 348 349 public QueueBrowserSubscription getBrowser() { 350 return browser; 351 } 352 } 353 354 LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>(); 355 356 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { 357 if (LOG.isDebugEnabled()) { 358 LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: " 359 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " 360 + getDestinationStatistics().getDispatched().getCount() + ", inflight: " 361 + getDestinationStatistics().getInflight().getCount()); 362 } 363 364 super.addSubscription(context, sub); 365 // synchronize with dispatch method so that no new messages are sent 366 // while setting up a subscription. avoid out of order messages, 367 // duplicates, etc. 368 pagedInPendingDispatchLock.writeLock().lock(); 369 try { 370 371 sub.add(context, this); 372 373 // needs to be synchronized - so no contention with dispatching 374 // consumersLock. 375 consumersLock.writeLock().lock(); 376 try { 377 378 // set a flag if this is a first consumer 379 if (consumers.size() == 0) { 380 firstConsumer = true; 381 if (consumersBeforeDispatchStarts != 0) { 382 consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); 383 } 384 } else { 385 if (consumersBeforeStartsLatch != null) { 386 consumersBeforeStartsLatch.countDown(); 387 } 388 } 389 390 addToConsumerList(sub); 391 if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) { 392 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 393 if (exclusiveConsumer == null) { 394 exclusiveConsumer = sub; 395 } else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE || 396 sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) { 397 exclusiveConsumer = sub; 398 } 399 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 400 } 401 }finally { 402 consumersLock.writeLock().unlock(); 403 } 404 405 if (sub instanceof QueueBrowserSubscription) { 406 // tee up for dispatch in next iterate 407 QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub; 408 pagedInMessagesLock.readLock().lock(); 409 try{ 410 BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription); 411 browserDispatches.addLast(browserDispatch); 412 }finally { 413 pagedInMessagesLock.readLock().unlock(); 414 } 415 } 416 417 if (!(this.optimizedDispatch || isSlave())) { 418 wakeup(); 419 } 420 }finally { 421 pagedInPendingDispatchLock.writeLock().unlock(); 422 } 423 if (this.optimizedDispatch || isSlave()) { 424 // Outside of dispatchLock() to maintain the lock hierarchy of 425 // iteratingMutex -> dispatchLock. - see 426 // https://issues.apache.org/activemq/browse/AMQ-1878 427 wakeup(); 428 } 429 } 430 431 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) 432 throws Exception { 433 super.removeSubscription(context, sub, lastDeiveredSequenceId); 434 // synchronize with dispatch method so that no new messages are sent 435 // while removing up a subscription. 436 pagedInPendingDispatchLock.writeLock().lock(); 437 try { 438 if (LOG.isDebugEnabled()) { 439 LOG.debug(getActiveMQDestination().getQualifiedName() + " remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " 440 + getDestinationStatistics().getDequeues().getCount() + ", dispatched: " 441 + getDestinationStatistics().getDispatched().getCount() + ", inflight: " 442 + getDestinationStatistics().getInflight().getCount()); 443 } 444 consumersLock.writeLock().lock(); 445 try { 446 removeFromConsumerList(sub); 447 if (sub.getConsumerInfo().isExclusive()) { 448 Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer(); 449 if (exclusiveConsumer == sub) { 450 exclusiveConsumer = null; 451 for (Subscription s : consumers) { 452 if (s.getConsumerInfo().isExclusive() 453 && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer 454 .getConsumerInfo().getPriority())) { 455 exclusiveConsumer = s; 456 457 } 458 } 459 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 460 } 461 } else if (isAllConsumersExclusiveByDefault()) { 462 Subscription exclusiveConsumer = null; 463 for (Subscription s : consumers) { 464 if (exclusiveConsumer == null 465 || s.getConsumerInfo().getPriority() > exclusiveConsumer 466 .getConsumerInfo().getPriority()) { 467 exclusiveConsumer = s; 468 } 469 } 470 dispatchSelector.setExclusiveConsumer(exclusiveConsumer); 471 } 472 ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); 473 getMessageGroupOwners().removeConsumer(consumerId); 474 475 // redeliver inflight messages 476 477 boolean markAsRedelivered = false; 478 MessageReference lastDeliveredRef = null; 479 List<MessageReference> unAckedMessages = sub.remove(context, this); 480 481 // locate last redelivered in unconsumed list (list in delivery rather than seq order) 482 if (lastDeiveredSequenceId != 0) { 483 for (MessageReference ref : unAckedMessages) { 484 if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) { 485 lastDeliveredRef = ref; 486 markAsRedelivered = true; 487 LOG.debug("found lastDeliveredSeqID: " + lastDeiveredSequenceId + ", message reference: " + ref.getMessageId()); 488 break; 489 } 490 } 491 } 492 for (MessageReference ref : unAckedMessages) { 493 QueueMessageReference qmr = (QueueMessageReference) ref; 494 if (qmr.getLockOwner() == sub) { 495 qmr.unlock(); 496 497 // have no delivery information 498 if (lastDeiveredSequenceId == 0) { 499 qmr.incrementRedeliveryCounter(); 500 } else { 501 if (markAsRedelivered) { 502 qmr.incrementRedeliveryCounter(); 503 } 504 if (ref == lastDeliveredRef) { 505 // all that follow were not redelivered 506 markAsRedelivered = false; 507 } 508 } 509 } 510 redeliveredWaitingDispatch.add(qmr); 511 } 512 if (!redeliveredWaitingDispatch.isEmpty()) { 513 doDispatch(new ArrayList<QueueMessageReference>()); 514 } 515 }finally { 516 consumersLock.writeLock().unlock(); 517 } 518 if (!(this.optimizedDispatch || isSlave())) { 519 wakeup(); 520 } 521 }finally { 522 pagedInPendingDispatchLock.writeLock().unlock(); 523 } 524 if (this.optimizedDispatch || isSlave()) { 525 // Outside of dispatchLock() to maintain the lock hierarchy of 526 // iteratingMutex -> dispatchLock. - see 527 // https://issues.apache.org/activemq/browse/AMQ-1878 528 wakeup(); 529 } 530 } 531 532 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 533 final ConnectionContext context = producerExchange.getConnectionContext(); 534 // There is delay between the client sending it and it arriving at the 535 // destination.. it may have expired. 536 message.setRegionDestination(this); 537 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 538 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 539 && !context.isInRecoveryMode(); 540 if (message.isExpired()) { 541 // message not stored - or added to stats yet - so chuck here 542 broker.getRoot().messageExpired(context, message, null); 543 if (sendProducerAck) { 544 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 545 context.getConnection().dispatchAsync(ack); 546 } 547 return; 548 } 549 if (memoryUsage.isFull()) { 550 isFull(context, memoryUsage); 551 fastProducer(context, producerInfo); 552 if (isProducerFlowControl() && context.isProducerFlowControl()) { 553 if (warnOnProducerFlowControl) { 554 warnOnProducerFlowControl = false; 555 LOG 556 .info("Usage Manager Memory Limit (" 557 + memoryUsage.getLimit() 558 + ") reached on " 559 + getActiveMQDestination().getQualifiedName() 560 + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." 561 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 562 } 563 564 if (systemUsage.isSendFailIfNoSpace()) { 565 throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" 566 + message.getProducerId() + ") to prevent flooding " 567 + getActiveMQDestination().getQualifiedName() + "." 568 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 569 } 570 571 // We can avoid blocking due to low usage if the producer is 572 // sending 573 // a sync message or if it is using a producer window 574 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 575 // copy the exchange state since the context will be 576 // modified while we are waiting 577 // for space. 578 final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 579 synchronized (messagesWaitingForSpace) { 580 // Start flow control timeout task 581 // Prevent trying to start it multiple times 582 if (!flowControlTimeoutTask.isAlive()) { 583 flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task"); 584 flowControlTimeoutTask.start(); 585 } 586 messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { 587 public void run() { 588 589 try { 590 // While waiting for space to free up... the 591 // message may have expired. 592 if (message.isExpired()) { 593 LOG.error("expired waiting for space.."); 594 broker.messageExpired(context, message, null); 595 destinationStatistics.getExpired().increment(); 596 } else { 597 doMessageSend(producerExchangeCopy, message); 598 } 599 600 if (sendProducerAck) { 601 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 602 .getSize()); 603 context.getConnection().dispatchAsync(ack); 604 } else { 605 Response response = new Response(); 606 response.setCorrelationId(message.getCommandId()); 607 context.getConnection().dispatchAsync(response); 608 } 609 610 } catch (Exception e) { 611 if (!sendProducerAck && !context.isInRecoveryMode()) { 612 ExceptionResponse response = new ExceptionResponse(e); 613 response.setCorrelationId(message.getCommandId()); 614 context.getConnection().dispatchAsync(response); 615 } else { 616 LOG.debug("unexpected exception on deferred send of :" + message, e); 617 } 618 } 619 } 620 }); 621 622 if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 623 flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage 624 .getSendFailIfNoSpaceAfterTimeout())); 625 } 626 627 registerCallbackForNotFullNotification(); 628 context.setDontSendReponse(true); 629 return; 630 } 631 632 } else { 633 634 if (memoryUsage.isFull()) { 635 waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" 636 + message.getProducerId() + ") stopped to prevent flooding " 637 + getActiveMQDestination().getQualifiedName() + "." 638 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 639 } 640 641 // The usage manager could have delayed us by the time 642 // we unblock the message could have expired.. 643 if (message.isExpired()) { 644 if (LOG.isDebugEnabled()) { 645 LOG.debug("Expired message: " + message); 646 } 647 broker.getRoot().messageExpired(context, message, null); 648 return; 649 } 650 } 651 } 652 } 653 doMessageSend(producerExchange, message); 654 if (sendProducerAck) { 655 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 656 context.getConnection().dispatchAsync(ack); 657 } 658 } 659 660 private void registerCallbackForNotFullNotification() { 661 // If the usage manager is not full, then the task will not 662 // get called.. 663 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 664 // so call it directly here. 665 sendMessagesWaitingForSpaceTask.run(); 666 } 667 } 668 669 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 670 Exception { 671 final ConnectionContext context = producerExchange.getConnectionContext(); 672 Future<Object> result = null; 673 674 checkUsage(context, message); 675 sendLock.lockInterruptibly(); 676 try { 677 if (store != null && message.isPersistent()) { 678 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 679 if (messages.isCacheEnabled()) { 680 result = store.asyncAddQueueMessage(context, message); 681 } else { 682 store.addMessage(context, message); 683 } 684 if (isReduceMemoryFootprint()) { 685 message.clearMarshalledState(); 686 } 687 } 688 if (context.isInTransaction()) { 689 // If this is a transacted message.. increase the usage now so that 690 // a big TX does not blow up 691 // our memory. This increment is decremented once the tx finishes.. 692 message.incrementReferenceCount(); 693 694 context.getTransaction().addSynchronization(new Synchronization() { 695 @Override 696 public void afterCommit() throws Exception { 697 sendLock.lockInterruptibly(); 698 try { 699 // It could take while before we receive the commit 700 // op, by that time the message could have expired.. 701 if (broker.isExpired(message)) { 702 broker.messageExpired(context, message, null); 703 destinationStatistics.getExpired().increment(); 704 return; 705 } 706 sendMessage(message); 707 } finally { 708 sendLock.unlock(); 709 message.decrementReferenceCount(); 710 } 711 messageSent(context, message); 712 } 713 @Override 714 public void afterRollback() throws Exception { 715 message.decrementReferenceCount(); 716 } 717 }); 718 } else { 719 // Add to the pending list, this takes care of incrementing the 720 // usage manager. 721 sendMessage(message); 722 } 723 } finally { 724 sendLock.unlock(); 725 } 726 if (!context.isInTransaction()) { 727 messageSent(context, message); 728 } 729 if (result != null && !result.isCancelled()) { 730 try { 731 result.get(); 732 } catch (CancellationException e) { 733 // ignore - the task has been cancelled if the message 734 // has already been deleted 735 } 736 } 737 } 738 739 private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException { 740 if (message.isPersistent()) { 741 if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 742 final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " 743 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" 744 + message.getProducerId() + ") to prevent flooding " 745 + getActiveMQDestination().getQualifiedName() + "." 746 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 747 748 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 749 } 750 } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) { 751 final String logMessage = "Usage Manager Temp Store is Full (" 752 + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit() 753 +"). Stopping producer (" + message.getProducerId() 754 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 755 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 756 757 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage); 758 } 759 } 760 761 private void expireMessages() { 762 if (LOG.isDebugEnabled()) { 763 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages .."); 764 } 765 766 // just track the insertion count 767 List<Message> browsedMessages = new AbstractList<Message>() { 768 int size = 0; 769 770 @Override 771 public void add(int index, Message element) { 772 size++; 773 } 774 775 @Override 776 public int size() { 777 return size; 778 } 779 780 @Override 781 public Message get(int index) { 782 return null; 783 } 784 }; 785 doBrowse(browsedMessages, this.getMaxExpirePageSize()); 786 asyncWakeup(); 787 if (LOG.isDebugEnabled()) { 788 LOG.debug(getActiveMQDestination().getQualifiedName() + " expiring messages done."); 789 } 790 } 791 792 public void gc() { 793 } 794 795 public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) 796 throws IOException { 797 messageConsumed(context, node); 798 if (store != null && node.isPersistent()) { 799 // the original ack may be a ranged ack, but we are trying to delete 800 // a specific 801 // message store here so we need to convert to a non ranged ack. 802 if (ack.getMessageCount() > 0) { 803 // Dup the ack 804 MessageAck a = new MessageAck(); 805 ack.copy(a); 806 ack = a; 807 // Convert to non-ranged. 808 ack.setFirstMessageId(node.getMessageId()); 809 ack.setLastMessageId(node.getMessageId()); 810 ack.setMessageCount(1); 811 } 812 813 store.removeAsyncMessage(context, ack); 814 } 815 } 816 817 Message loadMessage(MessageId messageId) throws IOException { 818 Message msg = null; 819 if (store != null) { // can be null for a temp q 820 msg = store.getMessage(messageId); 821 if (msg != null) { 822 msg.setRegionDestination(this); 823 } 824 } 825 return msg; 826 } 827 828 @Override 829 public String toString() { 830 int size = 0; 831 messagesLock.readLock().lock(); 832 try{ 833 size = messages.size(); 834 }finally { 835 messagesLock.readLock().unlock(); 836 } 837 return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() 838 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups=" 839 + messageGroupOwners; 840 } 841 842 public void start() throws Exception { 843 if (memoryUsage != null) { 844 memoryUsage.start(); 845 } 846 if (systemUsage.getStoreUsage() != null) { 847 systemUsage.getStoreUsage().start(); 848 } 849 systemUsage.getMemoryUsage().addUsageListener(this); 850 messages.start(); 851 if (getExpireMessagesPeriod() > 0) { 852 scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); 853 } 854 doPageIn(false); 855 } 856 857 public void stop() throws Exception { 858 if (taskRunner != null) { 859 taskRunner.shutdown(); 860 } 861 if (this.executor != null) { 862 this.executor.shutdownNow(); 863 } 864 865 scheduler.cancel(expireMessagesTask); 866 867 if (flowControlTimeoutTask.isAlive()) { 868 flowControlTimeoutTask.interrupt(); 869 } 870 871 if (messages != null) { 872 messages.stop(); 873 } 874 875 systemUsage.getMemoryUsage().removeUsageListener(this); 876 if (memoryUsage != null) { 877 memoryUsage.stop(); 878 } 879 if (store != null) { 880 store.stop(); 881 } 882 } 883 884 // Properties 885 // ------------------------------------------------------------------------- 886 @Override 887 public ActiveMQDestination getActiveMQDestination() { 888 return destination; 889 } 890 891 public MessageGroupMap getMessageGroupOwners() { 892 if (messageGroupOwners == null) { 893 messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); 894 } 895 return messageGroupOwners; 896 } 897 898 public DispatchPolicy getDispatchPolicy() { 899 return dispatchPolicy; 900 } 901 902 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 903 this.dispatchPolicy = dispatchPolicy; 904 } 905 906 public MessageGroupMapFactory getMessageGroupMapFactory() { 907 return messageGroupMapFactory; 908 } 909 910 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 911 this.messageGroupMapFactory = messageGroupMapFactory; 912 } 913 914 public PendingMessageCursor getMessages() { 915 return this.messages; 916 } 917 918 public void setMessages(PendingMessageCursor messages) { 919 this.messages = messages; 920 } 921 922 public boolean isUseConsumerPriority() { 923 return useConsumerPriority; 924 } 925 926 public void setUseConsumerPriority(boolean useConsumerPriority) { 927 this.useConsumerPriority = useConsumerPriority; 928 } 929 930 public boolean isStrictOrderDispatch() { 931 return strictOrderDispatch; 932 } 933 934 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 935 this.strictOrderDispatch = strictOrderDispatch; 936 } 937 938 public boolean isOptimizedDispatch() { 939 return optimizedDispatch; 940 } 941 942 public void setOptimizedDispatch(boolean optimizedDispatch) { 943 this.optimizedDispatch = optimizedDispatch; 944 } 945 946 public int getTimeBeforeDispatchStarts() { 947 return timeBeforeDispatchStarts; 948 } 949 950 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 951 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 952 } 953 954 public int getConsumersBeforeDispatchStarts() { 955 return consumersBeforeDispatchStarts; 956 } 957 958 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 959 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 960 } 961 962 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 963 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 964 } 965 966 public boolean isAllConsumersExclusiveByDefault() { 967 return allConsumersExclusiveByDefault; 968 } 969 970 971 // Implementation methods 972 // ------------------------------------------------------------------------- 973 private QueueMessageReference createMessageReference(Message message) { 974 QueueMessageReference result = new IndirectMessageReference(message); 975 return result; 976 } 977 978 public Message[] browse() { 979 List<Message> browseList = new ArrayList<Message>(); 980 doBrowse(browseList, getMaxBrowsePageSize()); 981 return browseList.toArray(new Message[browseList.size()]); 982 } 983 984 public void doBrowse(List<Message> browseList, int max) { 985 final ConnectionContext connectionContext = createConnectionContext(); 986 try { 987 pageInMessages(false); 988 List<MessageReference> toExpire = new ArrayList<MessageReference>(); 989 990 pagedInPendingDispatchLock.writeLock().lock(); 991 try { 992 addAll(pagedInPendingDispatch, browseList, max, toExpire); 993 for (MessageReference ref : toExpire) { 994 pagedInPendingDispatch.remove(ref); 995 if (broker.isExpired(ref)) { 996 LOG.debug("expiring from pagedInPending: " + ref); 997 messageExpired(connectionContext, ref); 998 } 999 } 1000 } finally { 1001 pagedInPendingDispatchLock.writeLock().unlock(); 1002 } 1003 toExpire.clear(); 1004 pagedInMessagesLock.readLock().lock(); 1005 try { 1006 addAll(pagedInMessages.values(), browseList, max, toExpire); 1007 } finally { 1008 pagedInMessagesLock.readLock().unlock(); 1009 } 1010 for (MessageReference ref : toExpire) { 1011 if (broker.isExpired(ref)) { 1012 LOG.debug("expiring from pagedInMessages: " + ref); 1013 messageExpired(connectionContext, ref); 1014 } else { 1015 pagedInMessagesLock.writeLock().lock(); 1016 try { 1017 pagedInMessages.remove(ref.getMessageId()); 1018 } finally { 1019 pagedInMessagesLock.writeLock().unlock(); 1020 } 1021 } 1022 } 1023 1024 if (browseList.size() < getMaxBrowsePageSize()) { 1025 messagesLock.writeLock().lock(); 1026 try { 1027 try { 1028 messages.reset(); 1029 while (messages.hasNext() && browseList.size() < max) { 1030 MessageReference node = messages.next(); 1031 if (node.isExpired()) { 1032 if (broker.isExpired(node)) { 1033 LOG.debug("expiring from messages: " + node); 1034 messageExpired(connectionContext, createMessageReference(node.getMessage())); 1035 } 1036 messages.remove(); 1037 } else { 1038 messages.rollback(node.getMessageId()); 1039 if (browseList.contains(node.getMessage()) == false) { 1040 browseList.add(node.getMessage()); 1041 } 1042 } 1043 node.decrementReferenceCount(); 1044 } 1045 } finally { 1046 messages.release(); 1047 } 1048 } finally { 1049 messagesLock.writeLock().unlock(); 1050 } 1051 } 1052 1053 } catch (Exception e) { 1054 LOG.error("Problem retrieving message for browse", e); 1055 } 1056 } 1057 1058 private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, 1059 List<MessageReference> toExpire) throws Exception { 1060 for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { 1061 QueueMessageReference ref = i.next(); 1062 if (ref.isExpired()) { 1063 toExpire.add(ref); 1064 } else if (l.contains(ref.getMessage()) == false) { 1065 l.add(ref.getMessage()); 1066 } 1067 } 1068 } 1069 1070 public QueueMessageReference getMessage(String id) { 1071 MessageId msgId = new MessageId(id); 1072 pagedInMessagesLock.readLock().lock(); 1073 try{ 1074 QueueMessageReference ref = this.pagedInMessages.get(msgId); 1075 if (ref != null) { 1076 return ref; 1077 } 1078 }finally { 1079 pagedInMessagesLock.readLock().unlock(); 1080 } 1081 messagesLock.readLock().lock(); 1082 try{ 1083 try { 1084 messages.reset(); 1085 while (messages.hasNext()) { 1086 MessageReference mr = messages.next(); 1087 QueueMessageReference qmr = createMessageReference(mr.getMessage()); 1088 qmr.decrementReferenceCount(); 1089 messages.rollback(qmr.getMessageId()); 1090 if (msgId.equals(qmr.getMessageId())) { 1091 return qmr; 1092 } 1093 } 1094 } finally { 1095 messages.release(); 1096 } 1097 }finally { 1098 messagesLock.readLock().unlock(); 1099 } 1100 return null; 1101 } 1102 1103 public void purge() throws Exception { 1104 ConnectionContext c = createConnectionContext(); 1105 List<MessageReference> list = null; 1106 do { 1107 doPageIn(true); 1108 pagedInMessagesLock.readLock().lock(); 1109 try { 1110 list = new ArrayList<MessageReference>(pagedInMessages.values()); 1111 }finally { 1112 pagedInMessagesLock.readLock().unlock(); 1113 } 1114 1115 for (MessageReference ref : list) { 1116 try { 1117 QueueMessageReference r = (QueueMessageReference) ref; 1118 removeMessage(c, r); 1119 } catch (IOException e) { 1120 } 1121 } 1122 // don't spin/hang if stats are out and there is nothing left in the 1123 // store 1124 } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); 1125 if (this.destinationStatistics.getMessages().getCount() > 0) { 1126 LOG.warn(getActiveMQDestination().getQualifiedName() 1127 + " after purge complete, message count stats report: " 1128 + this.destinationStatistics.getMessages().getCount()); 1129 } 1130 gc(); 1131 this.destinationStatistics.getMessages().setCount(0); 1132 getMessages().clear(); 1133 } 1134 1135 /** 1136 * Removes the message matching the given messageId 1137 */ 1138 public boolean removeMessage(String messageId) throws Exception { 1139 return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0; 1140 } 1141 1142 /** 1143 * Removes the messages matching the given selector 1144 * 1145 * @return the number of messages removed 1146 */ 1147 public int removeMatchingMessages(String selector) throws Exception { 1148 return removeMatchingMessages(selector, -1); 1149 } 1150 1151 /** 1152 * Removes the messages matching the given selector up to the maximum number 1153 * of matched messages 1154 * 1155 * @return the number of messages removed 1156 */ 1157 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception { 1158 return removeMatchingMessages(createSelectorFilter(selector), maximumMessages); 1159 } 1160 1161 /** 1162 * Removes the messages matching the given filter up to the maximum number 1163 * of matched messages 1164 * 1165 * @return the number of messages removed 1166 */ 1167 public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { 1168 int movedCounter = 0; 1169 Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>(); 1170 ConnectionContext context = createConnectionContext(); 1171 do { 1172 doPageIn(true); 1173 pagedInMessagesLock.readLock().lock(); 1174 try{ 1175 set.addAll(pagedInMessages.values()); 1176 }finally { 1177 pagedInMessagesLock.readLock().unlock(); 1178 } 1179 List<MessageReference> list = new ArrayList<MessageReference>(set); 1180 for (MessageReference ref : list) { 1181 IndirectMessageReference r = (IndirectMessageReference) ref; 1182 if (filter.evaluate(context, r)) { 1183 1184 removeMessage(context, r); 1185 set.remove(r); 1186 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1187 return movedCounter; 1188 } 1189 } 1190 } 1191 } while (set.size() < this.destinationStatistics.getMessages().getCount()); 1192 return movedCounter; 1193 } 1194 1195 /** 1196 * Copies the message matching the given messageId 1197 */ 1198 public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1199 throws Exception { 1200 return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0; 1201 } 1202 1203 /** 1204 * Copies the messages matching the given selector 1205 * 1206 * @return the number of messages copied 1207 */ 1208 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1209 throws Exception { 1210 return copyMatchingMessagesTo(context, selector, dest, -1); 1211 } 1212 1213 /** 1214 * Copies the messages matching the given selector up to the maximum number 1215 * of matched messages 1216 * 1217 * @return the number of messages copied 1218 */ 1219 public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1220 int maximumMessages) throws Exception { 1221 return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages); 1222 } 1223 1224 /** 1225 * Copies the messages matching the given filter up to the maximum number of 1226 * matched messages 1227 * 1228 * @return the number of messages copied 1229 */ 1230 public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, 1231 int maximumMessages) throws Exception { 1232 int movedCounter = 0; 1233 int count = 0; 1234 Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>(); 1235 do { 1236 int oldMaxSize = getMaxPageSize(); 1237 setMaxPageSize((int) this.destinationStatistics.getMessages().getCount()); 1238 doPageIn(true); 1239 setMaxPageSize(oldMaxSize); 1240 pagedInMessagesLock.readLock().lock(); 1241 try { 1242 set.addAll(pagedInMessages.values()); 1243 }finally { 1244 pagedInMessagesLock.readLock().unlock(); 1245 } 1246 List<MessageReference> list = new ArrayList<MessageReference>(set); 1247 for (MessageReference ref : list) { 1248 IndirectMessageReference r = (IndirectMessageReference) ref; 1249 if (filter.evaluate(context, r)) { 1250 1251 r.incrementReferenceCount(); 1252 try { 1253 Message m = r.getMessage(); 1254 BrokerSupport.resend(context, m, dest); 1255 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1256 return movedCounter; 1257 } 1258 } finally { 1259 r.decrementReferenceCount(); 1260 } 1261 } 1262 count++; 1263 } 1264 } while (count < this.destinationStatistics.getMessages().getCount()); 1265 return movedCounter; 1266 } 1267 1268 /** 1269 * Move a message 1270 * 1271 * @param context 1272 * connection context 1273 * @param m 1274 * QueueMessageReference 1275 * @param dest 1276 * ActiveMQDestination 1277 * @throws Exception 1278 */ 1279 public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { 1280 BrokerSupport.resend(context, m.getMessage(), dest); 1281 removeMessage(context, m); 1282 messagesLock.writeLock().lock(); 1283 try{ 1284 messages.rollback(m.getMessageId()); 1285 }finally { 1286 messagesLock.writeLock().unlock(); 1287 } 1288 return true; 1289 } 1290 1291 /** 1292 * Moves the message matching the given messageId 1293 */ 1294 public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) 1295 throws Exception { 1296 return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0; 1297 } 1298 1299 /** 1300 * Moves the messages matching the given selector 1301 * 1302 * @return the number of messages removed 1303 */ 1304 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) 1305 throws Exception { 1306 return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); 1307 } 1308 1309 /** 1310 * Moves the messages matching the given selector up to the maximum number 1311 * of matched messages 1312 */ 1313 public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, 1314 int maximumMessages) throws Exception { 1315 return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages); 1316 } 1317 1318 /** 1319 * Moves the messages matching the given filter up to the maximum number of 1320 * matched messages 1321 */ 1322 public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, 1323 ActiveMQDestination dest, int maximumMessages) throws Exception { 1324 int movedCounter = 0; 1325 Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>(); 1326 do { 1327 doPageIn(true); 1328 pagedInMessagesLock.readLock().lock(); 1329 try{ 1330 set.addAll(pagedInMessages.values()); 1331 }finally { 1332 pagedInMessagesLock.readLock().unlock(); 1333 } 1334 List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); 1335 for (QueueMessageReference ref : list) { 1336 if (filter.evaluate(context, ref)) { 1337 // We should only move messages that can be locked. 1338 moveMessageTo(context, ref, dest); 1339 set.remove(ref); 1340 if (++movedCounter >= maximumMessages && maximumMessages > 0) { 1341 return movedCounter; 1342 } 1343 } 1344 } 1345 } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); 1346 return movedCounter; 1347 } 1348 1349 BrowserDispatch getNextBrowserDispatch() { 1350 pagedInMessagesLock.readLock().lock(); 1351 try{ 1352 if (browserDispatches.isEmpty()) { 1353 return null; 1354 } 1355 return browserDispatches.removeFirst(); 1356 }finally { 1357 pagedInMessagesLock.readLock().unlock(); 1358 } 1359 1360 } 1361 1362 /** 1363 * @return true if we would like to iterate again 1364 * @see org.apache.activemq.thread.Task#iterate() 1365 */ 1366 public boolean iterate() { 1367 MDC.put("activemq.destination", getName()); 1368 boolean pageInMoreMessages = false; 1369 synchronized (iteratingMutex) { 1370 1371 // do early to allow dispatch of these waiting messages 1372 synchronized (messagesWaitingForSpace) { 1373 Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); 1374 while (it.hasNext()) { 1375 if (!memoryUsage.isFull()) { 1376 Runnable op = it.next(); 1377 it.remove(); 1378 op.run(); 1379 } else { 1380 registerCallbackForNotFullNotification(); 1381 break; 1382 } 1383 } 1384 } 1385 1386 if (firstConsumer) { 1387 firstConsumer = false; 1388 try { 1389 if (consumersBeforeDispatchStarts > 0) { 1390 int timeout = 1000; // wait one second by default if 1391 // consumer count isn't reached 1392 if (timeBeforeDispatchStarts > 0) { 1393 timeout = timeBeforeDispatchStarts; 1394 } 1395 if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { 1396 if (LOG.isDebugEnabled()) { 1397 LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch."); 1398 } 1399 } else { 1400 if (LOG.isDebugEnabled()) { 1401 LOG.debug(timeout + " ms elapsed and " + consumers.size() 1402 + " consumers subscribed. Starting dispatch."); 1403 } 1404 } 1405 } 1406 if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { 1407 iteratingMutex.wait(timeBeforeDispatchStarts); 1408 if (LOG.isDebugEnabled()) { 1409 LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch."); 1410 } 1411 } 1412 } catch (Exception e) { 1413 LOG.error(e.toString()); 1414 } 1415 } 1416 1417 BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch(); 1418 1419 messagesLock.readLock().lock(); 1420 try{ 1421 pageInMoreMessages |= !messages.isEmpty(); 1422 }finally { 1423 messagesLock.readLock().unlock(); 1424 } 1425 1426 pagedInPendingDispatchLock.readLock().lock(); 1427 try { 1428 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); 1429 }finally { 1430 pagedInPendingDispatchLock.readLock().unlock(); 1431 } 1432 1433 // Perhaps we should page always into the pagedInPendingDispatch 1434 // list if 1435 // !messages.isEmpty(), and then if 1436 // !pagedInPendingDispatch.isEmpty() 1437 // then we do a dispatch. 1438 if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) { 1439 try { 1440 pageInMessages(pendingBrowserDispatch != null); 1441 1442 } catch (Throwable e) { 1443 LOG.error("Failed to page in more queue messages ", e); 1444 } 1445 } 1446 1447 if (pendingBrowserDispatch != null) { 1448 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null; 1449 pagedInMessagesLock.readLock().lock(); 1450 try{ 1451 alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values()); 1452 }finally { 1453 pagedInMessagesLock.readLock().unlock(); 1454 } 1455 if (LOG.isDebugEnabled()) { 1456 LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser() 1457 + ", already dispatched/paged count: " + alreadyDispatchedMessages.size()); 1458 } 1459 do { 1460 try { 1461 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 1462 msgContext.setDestination(destination); 1463 1464 QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser(); 1465 for (QueueMessageReference node : alreadyDispatchedMessages) { 1466 if (!node.isAcked()) { 1467 msgContext.setMessageReference(node); 1468 if (browser.matches(node, msgContext)) { 1469 browser.add(node); 1470 } 1471 } 1472 } 1473 pendingBrowserDispatch.done(); 1474 } catch (Exception e) { 1475 LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e); 1476 } 1477 1478 } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null); 1479 } 1480 1481 if (pendingWakeups.get() > 0) { 1482 pendingWakeups.decrementAndGet(); 1483 } 1484 MDC.remove("activemq.destination"); 1485 return pendingWakeups.get() > 0; 1486 } 1487 } 1488 1489 protected MessageReferenceFilter createMessageIdFilter(final String messageId) { 1490 return new MessageReferenceFilter() { 1491 public boolean evaluate(ConnectionContext context, MessageReference r) { 1492 return messageId.equals(r.getMessageId().toString()); 1493 } 1494 1495 @Override 1496 public String toString() { 1497 return "MessageIdFilter: " + messageId; 1498 } 1499 }; 1500 } 1501 1502 protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException { 1503 final BooleanExpression selectorExpression = SelectorParser.parse(selector); 1504 1505 return new MessageReferenceFilter() { 1506 public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException { 1507 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext(); 1508 1509 messageEvaluationContext.setMessageReference(r); 1510 if (messageEvaluationContext.getDestination() == null) { 1511 messageEvaluationContext.setDestination(getActiveMQDestination()); 1512 } 1513 1514 return selectorExpression.matches(messageEvaluationContext); 1515 } 1516 }; 1517 } 1518 1519 protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { 1520 removeMessage(c, null, r); 1521 pagedInPendingDispatchLock.writeLock().lock(); 1522 try { 1523 pagedInPendingDispatch.remove(r); 1524 } finally { 1525 pagedInPendingDispatchLock.writeLock().unlock(); 1526 } 1527 1528 } 1529 1530 protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException { 1531 MessageAck ack = new MessageAck(); 1532 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 1533 ack.setDestination(destination); 1534 ack.setMessageID(r.getMessageId()); 1535 removeMessage(c, subs, r, ack); 1536 } 1537 1538 protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, 1539 MessageAck ack) throws IOException { 1540 reference.setAcked(true); 1541 // This sends the ack the the journal.. 1542 if (!ack.isInTransaction()) { 1543 acknowledge(context, sub, ack, reference); 1544 getDestinationStatistics().getDequeues().increment(); 1545 dropMessage(reference); 1546 } else { 1547 try { 1548 acknowledge(context, sub, ack, reference); 1549 } finally { 1550 context.getTransaction().addSynchronization(new Synchronization() { 1551 1552 @Override 1553 public void afterCommit() throws Exception { 1554 getDestinationStatistics().getDequeues().increment(); 1555 dropMessage(reference); 1556 wakeup(); 1557 } 1558 1559 @Override 1560 public void afterRollback() throws Exception { 1561 reference.setAcked(false); 1562 } 1563 }); 1564 } 1565 } 1566 if (ack.isPoisonAck()) { 1567 // message gone to DLQ, is ok to allow redelivery 1568 messagesLock.writeLock().lock(); 1569 try{ 1570 messages.rollback(reference.getMessageId()); 1571 }finally { 1572 messagesLock.writeLock().unlock(); 1573 } 1574 } 1575 1576 } 1577 1578 private void dropMessage(QueueMessageReference reference) { 1579 reference.drop(); 1580 destinationStatistics.getMessages().decrement(); 1581 pagedInMessagesLock.writeLock().lock(); 1582 try{ 1583 pagedInMessages.remove(reference.getMessageId()); 1584 }finally { 1585 pagedInMessagesLock.writeLock().unlock(); 1586 } 1587 } 1588 1589 public void messageExpired(ConnectionContext context, MessageReference reference) { 1590 messageExpired(context, null, reference); 1591 } 1592 1593 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 1594 if (LOG.isDebugEnabled()) { 1595 LOG.debug("message expired: " + reference); 1596 } 1597 broker.messageExpired(context, reference, subs); 1598 destinationStatistics.getExpired().increment(); 1599 try { 1600 removeMessage(context, subs, (QueueMessageReference) reference); 1601 } catch (IOException e) { 1602 LOG.error("Failed to remove expired Message from the store ", e); 1603 } 1604 } 1605 1606 protected ConnectionContext createConnectionContext() { 1607 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 1608 answer.setBroker(this.broker); 1609 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 1610 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 1611 return answer; 1612 } 1613 1614 final void sendMessage(final Message msg) throws Exception { 1615 messagesLock.writeLock().lock(); 1616 try{ 1617 messages.addMessageLast(msg); 1618 }finally { 1619 messagesLock.writeLock().unlock(); 1620 } 1621 } 1622 1623 final void messageSent(final ConnectionContext context, final Message msg) throws Exception { 1624 destinationStatistics.getEnqueues().increment(); 1625 destinationStatistics.getMessages().increment(); 1626 messageDelivered(context, msg); 1627 consumersLock.readLock().lock(); 1628 try { 1629 if (consumers.isEmpty()) { 1630 onMessageWithNoConsumers(context, msg); 1631 } 1632 }finally { 1633 consumersLock.readLock().unlock(); 1634 } 1635 if (LOG.isTraceEnabled()) { 1636 LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination); 1637 } 1638 wakeup(); 1639 } 1640 1641 public void wakeup() { 1642 if (optimizedDispatch || isSlave()) { 1643 iterate(); 1644 pendingWakeups.incrementAndGet(); 1645 } else { 1646 asyncWakeup(); 1647 } 1648 } 1649 1650 private void asyncWakeup() { 1651 try { 1652 pendingWakeups.incrementAndGet(); 1653 this.taskRunner.wakeup(); 1654 } catch (InterruptedException e) { 1655 LOG.warn("Async task tunner failed to wakeup ", e); 1656 } 1657 } 1658 1659 private boolean isSlave() { 1660 return broker.getBrokerService().isSlave(); 1661 } 1662 1663 private void doPageIn(boolean force) throws Exception { 1664 List<QueueMessageReference> newlyPaged = doPageInForDispatch(force); 1665 pagedInPendingDispatchLock.writeLock().lock(); 1666 try { 1667 if (pagedInPendingDispatch.isEmpty()) { 1668 pagedInPendingDispatch.addAll(newlyPaged); 1669 } else { 1670 for (QueueMessageReference qmr : newlyPaged) { 1671 if (!pagedInPendingDispatch.contains(qmr)) { 1672 pagedInPendingDispatch.add(qmr); 1673 } 1674 } 1675 } 1676 } finally { 1677 pagedInPendingDispatchLock.writeLock().unlock(); 1678 } 1679 } 1680 1681 private List<QueueMessageReference> doPageInForDispatch(boolean force) throws Exception { 1682 List<QueueMessageReference> result = null; 1683 List<QueueMessageReference> resultList = null; 1684 1685 int toPageIn = Math.min(getMaxPageSize(), messages.size()); 1686 if (LOG.isDebugEnabled()) { 1687 LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " 1688 + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size " 1689 + pagedInMessages.size() + ", enqueueCount: " + destinationStatistics.getEnqueues().getCount() 1690 + ", dequeueCount: " + destinationStatistics.getDequeues().getCount()); 1691 } 1692 1693 if (isLazyDispatch() && !force) { 1694 // Only page in the minimum number of messages which can be 1695 // dispatched immediately. 1696 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); 1697 } 1698 int pagedInPendingSize = 0; 1699 pagedInPendingDispatchLock.readLock().lock(); 1700 try { 1701 pagedInPendingSize = pagedInPendingDispatch.size(); 1702 } finally { 1703 pagedInPendingDispatchLock.readLock().unlock(); 1704 } 1705 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) { 1706 int count = 0; 1707 result = new ArrayList<QueueMessageReference>(toPageIn); 1708 messagesLock.writeLock().lock(); 1709 try { 1710 try { 1711 messages.setMaxBatchSize(toPageIn); 1712 messages.reset(); 1713 while (messages.hasNext() && count < toPageIn) { 1714 MessageReference node = messages.next(); 1715 messages.remove(); 1716 1717 QueueMessageReference ref = createMessageReference(node.getMessage()); 1718 if (ref.isExpired()) { 1719 if (broker.isExpired(ref)) { 1720 messageExpired(createConnectionContext(), ref); 1721 } else { 1722 ref.decrementReferenceCount(); 1723 } 1724 } else { 1725 result.add(ref); 1726 count++; 1727 } 1728 } 1729 } finally { 1730 messages.release(); 1731 } 1732 } finally { 1733 messagesLock.writeLock().unlock(); 1734 } 1735 // Only add new messages, not already pagedIn to avoid multiple 1736 // dispatch attempts 1737 pagedInMessagesLock.writeLock().lock(); 1738 try { 1739 resultList = new ArrayList<QueueMessageReference>(result.size()); 1740 for (QueueMessageReference ref : result) { 1741 if (!pagedInMessages.containsKey(ref.getMessageId())) { 1742 pagedInMessages.put(ref.getMessageId(), ref); 1743 resultList.add(ref); 1744 } else { 1745 ref.decrementReferenceCount(); 1746 } 1747 } 1748 } finally { 1749 pagedInMessagesLock.writeLock().unlock(); 1750 } 1751 } else { 1752 // Avoid return null list, if condition is not validated 1753 resultList = new ArrayList<QueueMessageReference>(); 1754 } 1755 1756 return resultList; 1757 } 1758 1759 private void doDispatch(List<QueueMessageReference> list) throws Exception { 1760 boolean doWakeUp = false; 1761 1762 pagedInPendingDispatchLock.writeLock().lock(); 1763 try { 1764 if (!redeliveredWaitingDispatch.isEmpty()) { 1765 // Try first to dispatch redelivered messages to keep an 1766 // proper order 1767 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); 1768 } 1769 if (!pagedInPendingDispatch.isEmpty()) { 1770 // Next dispatch anything that had not been 1771 // dispatched before. 1772 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); 1773 } 1774 // and now see if we can dispatch the new stuff.. and append to 1775 // the pending 1776 // list anything that does not actually get dispatched. 1777 if (list != null && !list.isEmpty()) { 1778 if (pagedInPendingDispatch.isEmpty()) { 1779 pagedInPendingDispatch.addAll(doActualDispatch(list)); 1780 } else { 1781 for (QueueMessageReference qmr : list) { 1782 if (!pagedInPendingDispatch.contains(qmr)) { 1783 pagedInPendingDispatch.add(qmr); 1784 } 1785 } 1786 doWakeUp = true; 1787 } 1788 } 1789 } finally { 1790 pagedInPendingDispatchLock.writeLock().unlock(); 1791 } 1792 1793 if (doWakeUp) { 1794 // avoid lock order contention 1795 asyncWakeup(); 1796 } 1797 } 1798 1799 /** 1800 * @return list of messages that could get dispatched to consumers if they 1801 * were not full. 1802 */ 1803 private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception { 1804 List<Subscription> consumers; 1805 consumersLock.writeLock().lock(); 1806 try { 1807 if (this.consumers.isEmpty() || isSlave()) { 1808 // slave dispatch happens in processDispatchNotification 1809 return list; 1810 } 1811 consumers = new ArrayList<Subscription>(this.consumers); 1812 }finally { 1813 consumersLock.writeLock().unlock(); 1814 } 1815 1816 List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size()); 1817 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); 1818 1819 for (MessageReference node : list) { 1820 Subscription target = null; 1821 int interestCount = 0; 1822 for (Subscription s : consumers) { 1823 if (s instanceof QueueBrowserSubscription) { 1824 interestCount++; 1825 continue; 1826 } 1827 if (dispatchSelector.canSelect(s, node)) { 1828 if (!fullConsumers.contains(s)) { 1829 if (!s.isFull()) { 1830 if (assignMessageGroup(s, (QueueMessageReference)node)) { 1831 // Dispatch it. 1832 s.add(node); 1833 target = s; 1834 break; 1835 } 1836 } else { 1837 // no further dispatch of list to a full consumer to 1838 // avoid out of order message receipt 1839 fullConsumers.add(s); 1840 } 1841 } 1842 interestCount++; 1843 } else { 1844 // makes sure it gets dispatched again 1845 if (!node.isDropped() && !((QueueMessageReference) node).isAcked() 1846 && (!node.isDropped() || s.getConsumerInfo().isBrowser())) { 1847 interestCount++; 1848 } 1849 } 1850 } 1851 1852 if ((target == null && interestCount > 0) || consumers.size() == 0) { 1853 // This means all subs were full or that there are no 1854 // consumers... 1855 rc.add((QueueMessageReference) node); 1856 } 1857 1858 // If it got dispatched, rotate the consumer list to get round robin 1859 // distribution. 1860 if (target != null && !strictOrderDispatch && consumers.size() > 1 1861 && !dispatchSelector.isExclusiveConsumer(target)) { 1862 consumersLock.writeLock().lock(); 1863 try { 1864 if (removeFromConsumerList(target)) { 1865 addToConsumerList(target); 1866 consumers = new ArrayList<Subscription>(this.consumers); 1867 } 1868 }finally { 1869 consumersLock.writeLock().unlock(); 1870 } 1871 } 1872 } 1873 1874 return rc; 1875 } 1876 1877 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception { 1878 //QueueMessageReference node = (QueueMessageReference) m; 1879 boolean result = true; 1880 // Keep message groups together. 1881 String groupId = node.getGroupID(); 1882 int sequence = node.getGroupSequence(); 1883 if (groupId != null) { 1884 //MessageGroupMap messageGroupOwners = ((Queue) node 1885 // .getRegionDestination()).getMessageGroupOwners(); 1886 1887 MessageGroupMap messageGroupOwners = getMessageGroupOwners(); 1888 // If we can own the first, then no-one else should own the 1889 // rest. 1890 if (sequence == 1) { 1891 assignGroup(subscription, messageGroupOwners, node, groupId); 1892 } else { 1893 1894 // Make sure that the previous owner is still valid, we may 1895 // need to become the new owner. 1896 ConsumerId groupOwner; 1897 1898 groupOwner = messageGroupOwners.get(groupId); 1899 if (groupOwner == null) { 1900 assignGroup(subscription, messageGroupOwners, node, groupId); 1901 } else { 1902 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) { 1903 // A group sequence < 1 is an end of group signal. 1904 if (sequence < 0) { 1905 messageGroupOwners.removeGroup(groupId); 1906 } 1907 } else { 1908 result = false; 1909 } 1910 } 1911 } 1912 } 1913 1914 return result; 1915 1916 } 1917 1918 protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException { 1919 messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); 1920 Message message = n.getMessage(); 1921 if (message instanceof ActiveMQMessage) { 1922 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 1923 try { 1924 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false); 1925 } catch (JMSException e) { 1926 LOG.warn("Failed to set boolean header: " + e, e); 1927 } 1928 } 1929 } 1930 1931 protected void pageInMessages(boolean force) throws Exception { 1932 doDispatch(doPageInForDispatch(force)); 1933 } 1934 1935 private void addToConsumerList(Subscription sub) { 1936 if (useConsumerPriority) { 1937 consumers.add(sub); 1938 Collections.sort(consumers, orderedCompare); 1939 } else { 1940 consumers.add(sub); 1941 } 1942 } 1943 1944 private boolean removeFromConsumerList(Subscription sub) { 1945 return consumers.remove(sub); 1946 } 1947 1948 private int getConsumerMessageCountBeforeFull() throws Exception { 1949 int total = 0; 1950 boolean zeroPrefetch = false; 1951 consumersLock.readLock().lock(); 1952 try{ 1953 for (Subscription s : consumers) { 1954 zeroPrefetch |= s.getPrefetchSize() == 0; 1955 int countBeforeFull = s.countBeforeFull(); 1956 total += countBeforeFull; 1957 } 1958 }finally { 1959 consumersLock.readLock().unlock(); 1960 } 1961 if (total == 0 && zeroPrefetch) { 1962 total = 1; 1963 } 1964 return total; 1965 } 1966 1967 /* 1968 * In slave mode, dispatch is ignored till we get this notification as the 1969 * dispatch process is non deterministic between master and slave. On a 1970 * notification, the actual dispatch to the subscription (as chosen by the 1971 * master) is completed. (non-Javadoc) 1972 * @see 1973 * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification 1974 * (org.apache.activemq.command.MessageDispatchNotification) 1975 */ 1976 @Override 1977 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 1978 // do dispatch 1979 Subscription sub = getMatchingSubscription(messageDispatchNotification); 1980 if (sub != null) { 1981 MessageReference message = getMatchingMessage(messageDispatchNotification); 1982 sub.add(message); 1983 sub.processMessageDispatchNotification(messageDispatchNotification); 1984 } 1985 } 1986 1987 private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) 1988 throws Exception { 1989 QueueMessageReference message = null; 1990 MessageId messageId = messageDispatchNotification.getMessageId(); 1991 1992 pagedInPendingDispatchLock.writeLock().lock(); 1993 try { 1994 for (QueueMessageReference ref : pagedInPendingDispatch) { 1995 if (messageId.equals(ref.getMessageId())) { 1996 message = ref; 1997 pagedInPendingDispatch.remove(ref); 1998 break; 1999 } 2000 } 2001 } finally { 2002 pagedInPendingDispatchLock.writeLock().unlock(); 2003 } 2004 2005 if (message == null) { 2006 pagedInMessagesLock.readLock().lock(); 2007 try { 2008 message = pagedInMessages.get(messageId); 2009 } finally { 2010 pagedInMessagesLock.readLock().unlock(); 2011 } 2012 } 2013 2014 if (message == null) { 2015 messagesLock.writeLock().lock(); 2016 try { 2017 try { 2018 messages.setMaxBatchSize(getMaxPageSize()); 2019 messages.reset(); 2020 while (messages.hasNext()) { 2021 MessageReference node = messages.next(); 2022 messages.remove(); 2023 if (messageId.equals(node.getMessageId())) { 2024 message = this.createMessageReference(node.getMessage()); 2025 break; 2026 } 2027 } 2028 } finally { 2029 messages.release(); 2030 } 2031 } finally { 2032 messagesLock.writeLock().unlock(); 2033 } 2034 } 2035 2036 if (message == null) { 2037 Message msg = loadMessage(messageId); 2038 if (msg != null) { 2039 message = this.createMessageReference(msg); 2040 } 2041 } 2042 2043 if (message == null) { 2044 throw new JMSException("Slave broker out of sync with master - Message: " 2045 + messageDispatchNotification.getMessageId() + " on " 2046 + messageDispatchNotification.getDestination() + " does not exist among pending(" 2047 + pagedInPendingDispatch.size() + ") for subscription: " 2048 + messageDispatchNotification.getConsumerId()); 2049 } 2050 return message; 2051 } 2052 2053 /** 2054 * Find a consumer that matches the id in the message dispatch notification 2055 * 2056 * @param messageDispatchNotification 2057 * @return sub or null if the subscription has been removed before dispatch 2058 * @throws JMSException 2059 */ 2060 private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) 2061 throws JMSException { 2062 Subscription sub = null; 2063 consumersLock.readLock().lock(); 2064 try { 2065 for (Subscription s : consumers) { 2066 if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) { 2067 sub = s; 2068 break; 2069 } 2070 } 2071 }finally { 2072 consumersLock.readLock().unlock(); 2073 } 2074 return sub; 2075 } 2076 2077 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 2078 if (oldPercentUsage > newPercentUsage) { 2079 asyncWakeup(); 2080 } 2081 } 2082 2083 @Override 2084 protected Logger getLog() { 2085 return LOG; 2086 } 2087 }