001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import javax.jms.ResourceAllocationException; 021 import org.apache.activemq.advisory.AdvisorySupport; 022 import org.apache.activemq.broker.Broker; 023 import org.apache.activemq.broker.BrokerService; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.ProducerBrokerExchange; 026 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 027 import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQTopic; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageDispatchNotification; 032 import org.apache.activemq.command.ProducerInfo; 033 import org.apache.activemq.state.ProducerState; 034 import org.apache.activemq.store.MessageStore; 035 import org.apache.activemq.usage.MemoryUsage; 036 import org.apache.activemq.usage.SystemUsage; 037 import org.apache.activemq.usage.Usage; 038 import org.slf4j.Logger; 039 040 /** 041 * 042 */ 043 public abstract class BaseDestination implements Destination { 044 /** 045 * The maximum number of messages to page in to the destination from 046 * persistent storage 047 */ 048 public static final int MAX_PAGE_SIZE = 200; 049 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 050 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 051 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 052 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 053 public static final int MAX_AUDIT_DEPTH = 2048; 054 055 protected final ActiveMQDestination destination; 056 protected final Broker broker; 057 protected final MessageStore store; 058 protected SystemUsage systemUsage; 059 protected MemoryUsage memoryUsage; 060 private boolean producerFlowControl = true; 061 protected boolean warnOnProducerFlowControl = true; 062 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 063 064 private int maxProducersToAudit = 1024; 065 private int maxAuditDepth = 2048; 066 private boolean enableAudit = true; 067 private int maxPageSize = MAX_PAGE_SIZE; 068 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 069 private boolean useCache = true; 070 private int minimumMessageSize = 1024; 071 private boolean lazyDispatch = false; 072 private boolean advisoryForSlowConsumers; 073 private boolean advisdoryForFastProducers; 074 private boolean advisoryForDiscardingMessages; 075 private boolean advisoryWhenFull; 076 private boolean advisoryForDelivery; 077 private boolean advisoryForConsumed; 078 private boolean sendAdvisoryIfNoConsumers; 079 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 080 protected final BrokerService brokerService; 081 protected final Broker regionBroker; 082 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 083 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 084 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 085 protected int cursorMemoryHighWaterMark = 70; 086 protected int storeUsageHighWaterMark = 100; 087 private SlowConsumerStrategy slowConsumerStrategy; 088 private boolean prioritizedMessages; 089 private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 090 private boolean gcIfInactive; 091 private long lastActiveTime=0l; 092 private boolean reduceMemoryFootprint = false; 093 094 /** 095 * @param broker 096 * @param store 097 * @param destination 098 * @param parentStats 099 * @throws Exception 100 */ 101 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 102 this.brokerService = brokerService; 103 this.broker = brokerService.getBroker(); 104 this.store = store; 105 this.destination = destination; 106 // let's copy the enabled property from the parent DestinationStatistics 107 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 108 this.destinationStatistics.setParent(parentStats); 109 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 110 this.memoryUsage = this.systemUsage.getMemoryUsage(); 111 this.memoryUsage.setUsagePortion(1.0f); 112 this.regionBroker = brokerService.getRegionBroker(); 113 } 114 115 /** 116 * initialize the destination 117 * 118 * @throws Exception 119 */ 120 public void initialize() throws Exception { 121 // Let the store know what usage manager we are using so that he can 122 // flush messages to disk when usage gets high. 123 if (store != null) { 124 store.setMemoryUsage(this.memoryUsage); 125 } 126 } 127 128 /** 129 * @return the producerFlowControl 130 */ 131 public boolean isProducerFlowControl() { 132 return producerFlowControl; 133 } 134 135 /** 136 * @param producerFlowControl the producerFlowControl to set 137 */ 138 public void setProducerFlowControl(boolean producerFlowControl) { 139 this.producerFlowControl = producerFlowControl; 140 } 141 142 /** 143 * Set's the interval at which warnings about producers being blocked by 144 * resource usage will be triggered. Values of 0 or less will disable 145 * warnings 146 * 147 * @param blockedProducerWarningInterval the interval at which warning about 148 * blocked producers will be triggered. 149 */ 150 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 151 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 152 } 153 154 /** 155 * 156 * @return the interval at which warning about blocked producers will be 157 * triggered. 158 */ 159 public long getBlockedProducerWarningInterval() { 160 return blockedProducerWarningInterval; 161 } 162 163 /** 164 * @return the maxProducersToAudit 165 */ 166 public int getMaxProducersToAudit() { 167 return maxProducersToAudit; 168 } 169 170 /** 171 * @param maxProducersToAudit the maxProducersToAudit to set 172 */ 173 public void setMaxProducersToAudit(int maxProducersToAudit) { 174 this.maxProducersToAudit = maxProducersToAudit; 175 } 176 177 /** 178 * @return the maxAuditDepth 179 */ 180 public int getMaxAuditDepth() { 181 return maxAuditDepth; 182 } 183 184 /** 185 * @param maxAuditDepth the maxAuditDepth to set 186 */ 187 public void setMaxAuditDepth(int maxAuditDepth) { 188 this.maxAuditDepth = maxAuditDepth; 189 } 190 191 /** 192 * @return the enableAudit 193 */ 194 public boolean isEnableAudit() { 195 return enableAudit; 196 } 197 198 /** 199 * @param enableAudit the enableAudit to set 200 */ 201 public void setEnableAudit(boolean enableAudit) { 202 this.enableAudit = enableAudit; 203 } 204 205 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 206 destinationStatistics.getProducers().increment(); 207 this.lastActiveTime=0l; 208 } 209 210 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 211 destinationStatistics.getProducers().decrement(); 212 } 213 214 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 215 destinationStatistics.getConsumers().increment(); 216 this.lastActiveTime=0l; 217 } 218 219 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 220 destinationStatistics.getConsumers().decrement(); 221 } 222 223 224 public final MemoryUsage getMemoryUsage() { 225 return memoryUsage; 226 } 227 228 public DestinationStatistics getDestinationStatistics() { 229 return destinationStatistics; 230 } 231 232 public ActiveMQDestination getActiveMQDestination() { 233 return destination; 234 } 235 236 public final String getName() { 237 return getActiveMQDestination().getPhysicalName(); 238 } 239 240 public final MessageStore getMessageStore() { 241 return store; 242 } 243 244 public final boolean isActive() { 245 return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0; 246 } 247 248 public int getMaxPageSize() { 249 return maxPageSize; 250 } 251 252 public void setMaxPageSize(int maxPageSize) { 253 this.maxPageSize = maxPageSize; 254 } 255 256 public int getMaxBrowsePageSize() { 257 return this.maxBrowsePageSize; 258 } 259 260 public void setMaxBrowsePageSize(int maxPageSize) { 261 this.maxBrowsePageSize = maxPageSize; 262 } 263 264 public int getMaxExpirePageSize() { 265 return this.maxExpirePageSize; 266 } 267 268 public void setMaxExpirePageSize(int maxPageSize) { 269 this.maxExpirePageSize = maxPageSize; 270 } 271 272 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 273 this.expireMessagesPeriod = expireMessagesPeriod; 274 } 275 276 public long getExpireMessagesPeriod() { 277 return expireMessagesPeriod; 278 } 279 280 public boolean isUseCache() { 281 return useCache; 282 } 283 284 public void setUseCache(boolean useCache) { 285 this.useCache = useCache; 286 } 287 288 public int getMinimumMessageSize() { 289 return minimumMessageSize; 290 } 291 292 public void setMinimumMessageSize(int minimumMessageSize) { 293 this.minimumMessageSize = minimumMessageSize; 294 } 295 296 public boolean isLazyDispatch() { 297 return lazyDispatch; 298 } 299 300 public void setLazyDispatch(boolean lazyDispatch) { 301 this.lazyDispatch = lazyDispatch; 302 } 303 304 protected long getDestinationSequenceId() { 305 return regionBroker.getBrokerSequenceId(); 306 } 307 308 /** 309 * @return the advisoryForSlowConsumers 310 */ 311 public boolean isAdvisoryForSlowConsumers() { 312 return advisoryForSlowConsumers; 313 } 314 315 /** 316 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 317 */ 318 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 319 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 320 } 321 322 /** 323 * @return the advisoryForDiscardingMessages 324 */ 325 public boolean isAdvisoryForDiscardingMessages() { 326 return advisoryForDiscardingMessages; 327 } 328 329 /** 330 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 331 * set 332 */ 333 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 334 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 335 } 336 337 /** 338 * @return the advisoryWhenFull 339 */ 340 public boolean isAdvisoryWhenFull() { 341 return advisoryWhenFull; 342 } 343 344 /** 345 * @param advisoryWhenFull the advisoryWhenFull to set 346 */ 347 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 348 this.advisoryWhenFull = advisoryWhenFull; 349 } 350 351 /** 352 * @return the advisoryForDelivery 353 */ 354 public boolean isAdvisoryForDelivery() { 355 return advisoryForDelivery; 356 } 357 358 /** 359 * @param advisoryForDelivery the advisoryForDelivery to set 360 */ 361 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 362 this.advisoryForDelivery = advisoryForDelivery; 363 } 364 365 /** 366 * @return the advisoryForConsumed 367 */ 368 public boolean isAdvisoryForConsumed() { 369 return advisoryForConsumed; 370 } 371 372 /** 373 * @param advisoryForConsumed the advisoryForConsumed to set 374 */ 375 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 376 this.advisoryForConsumed = advisoryForConsumed; 377 } 378 379 /** 380 * @return the advisdoryForFastProducers 381 */ 382 public boolean isAdvisdoryForFastProducers() { 383 return advisdoryForFastProducers; 384 } 385 386 /** 387 * @param advisdoryForFastProducers the advisdoryForFastProducers to set 388 */ 389 public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) { 390 this.advisdoryForFastProducers = advisdoryForFastProducers; 391 } 392 393 public boolean isSendAdvisoryIfNoConsumers() { 394 return sendAdvisoryIfNoConsumers; 395 } 396 397 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 398 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 399 } 400 401 /** 402 * @return the dead letter strategy 403 */ 404 public DeadLetterStrategy getDeadLetterStrategy() { 405 return deadLetterStrategy; 406 } 407 408 /** 409 * set the dead letter strategy 410 * 411 * @param deadLetterStrategy 412 */ 413 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 414 this.deadLetterStrategy = deadLetterStrategy; 415 } 416 417 public int getCursorMemoryHighWaterMark() { 418 return this.cursorMemoryHighWaterMark; 419 } 420 421 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 422 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 423 } 424 425 /** 426 * called when message is consumed 427 * 428 * @param context 429 * @param messageReference 430 */ 431 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 432 if (advisoryForConsumed) { 433 broker.messageConsumed(context, messageReference); 434 } 435 } 436 437 /** 438 * Called when message is delivered to the broker 439 * 440 * @param context 441 * @param messageReference 442 */ 443 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 444 if (advisoryForDelivery) { 445 broker.messageDelivered(context, messageReference); 446 } 447 } 448 449 /** 450 * Called when a message is discarded - e.g. running low on memory This will 451 * happen only if the policy is enabled - e.g. non durable topics 452 * 453 * @param context 454 * @param messageReference 455 */ 456 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 457 if (advisoryForDiscardingMessages) { 458 broker.messageDiscarded(context, sub, messageReference); 459 } 460 } 461 462 /** 463 * Called when there is a slow consumer 464 * 465 * @param context 466 * @param subs 467 */ 468 public void slowConsumer(ConnectionContext context, Subscription subs) { 469 if (advisoryForSlowConsumers) { 470 broker.slowConsumer(context, this, subs); 471 } 472 if (slowConsumerStrategy != null) { 473 slowConsumerStrategy.slowConsumer(context, subs); 474 } 475 } 476 477 /** 478 * Called to notify a producer is too fast 479 * 480 * @param context 481 * @param producerInfo 482 */ 483 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 484 if (advisdoryForFastProducers) { 485 broker.fastProducer(context, producerInfo); 486 } 487 } 488 489 /** 490 * Called when a Usage reaches a limit 491 * 492 * @param context 493 * @param usage 494 */ 495 public void isFull(ConnectionContext context, Usage usage) { 496 if (advisoryWhenFull) { 497 broker.isFull(context, this, usage); 498 } 499 } 500 501 public void dispose(ConnectionContext context) throws IOException { 502 if (this.store != null) { 503 this.store.removeAllMessages(context); 504 this.store.dispose(context); 505 } 506 this.destinationStatistics.setParent(null); 507 this.memoryUsage.stop(); 508 } 509 510 /** 511 * Provides a hook to allow messages with no consumer to be processed in 512 * some way - such as to send to a dead letter queue or something.. 513 */ 514 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 515 if (!msg.isPersistent()) { 516 if (isSendAdvisoryIfNoConsumers()) { 517 // allow messages with no consumers to be dispatched to a dead 518 // letter queue 519 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 520 521 Message message = msg.copy(); 522 // The original destination and transaction id do not get 523 // filled when the message is first sent, 524 // it is only populated if the message is routed to another 525 // destination like the DLQ 526 if (message.getOriginalDestination() != null) { 527 message.setOriginalDestination(message.getDestination()); 528 } 529 if (message.getOriginalTransactionId() != null) { 530 message.setOriginalTransactionId(message.getTransactionId()); 531 } 532 533 ActiveMQTopic advisoryTopic; 534 if (destination.isQueue()) { 535 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 536 } else { 537 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 538 } 539 message.setDestination(advisoryTopic); 540 message.setTransactionId(null); 541 542 // Disable flow control for this since since we don't want 543 // to block. 544 boolean originalFlowControl = context.isProducerFlowControl(); 545 try { 546 context.setProducerFlowControl(false); 547 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 548 producerExchange.setMutable(false); 549 producerExchange.setConnectionContext(context); 550 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 551 context.getBroker().send(producerExchange, message); 552 } finally { 553 context.setProducerFlowControl(originalFlowControl); 554 } 555 556 } 557 } 558 } 559 } 560 561 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 562 } 563 564 public final int getStoreUsageHighWaterMark() { 565 return this.storeUsageHighWaterMark; 566 } 567 568 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 569 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 570 } 571 572 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 573 waitForSpace(context, usage, 100, warning); 574 } 575 576 protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 577 if (systemUsage.isSendFailIfNoSpace()) { 578 getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning); 579 throw new ResourceAllocationException(warning); 580 } 581 if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 582 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 583 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning); 584 throw new ResourceAllocationException(warning); 585 } 586 } else { 587 long start = System.currentTimeMillis(); 588 long nextWarn = start; 589 while (!usage.waitForSpace(1000, highWaterMark)) { 590 if (context.getStopping().get()) { 591 throw new IOException("Connection closed, send aborted."); 592 } 593 594 long now = System.currentTimeMillis(); 595 if (now >= nextWarn) { 596 getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); 597 nextWarn = now + blockedProducerWarningInterval; 598 } 599 } 600 } 601 } 602 603 protected abstract Logger getLog(); 604 605 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 606 this.slowConsumerStrategy = slowConsumerStrategy; 607 } 608 609 public SlowConsumerStrategy getSlowConsumerStrategy() { 610 return this.slowConsumerStrategy; 611 } 612 613 614 public boolean isPrioritizedMessages() { 615 return this.prioritizedMessages; 616 } 617 618 public void setPrioritizedMessages(boolean prioritizedMessages) { 619 this.prioritizedMessages = prioritizedMessages; 620 if (store != null) { 621 store.setPrioritizedMessages(prioritizedMessages); 622 } 623 } 624 625 /** 626 * @return the inactiveTimoutBeforeGC 627 */ 628 public long getInactiveTimoutBeforeGC() { 629 return this.inactiveTimoutBeforeGC; 630 } 631 632 /** 633 * @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set 634 */ 635 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 636 this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC; 637 } 638 639 /** 640 * @return the gcIfInactive 641 */ 642 public boolean isGcIfInactive() { 643 return this.gcIfInactive; 644 } 645 646 /** 647 * @param gcIfInactive the gcIfInactive to set 648 */ 649 public void setGcIfInactive(boolean gcIfInactive) { 650 this.gcIfInactive = gcIfInactive; 651 } 652 653 public void markForGC(long timeStamp) { 654 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 655 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) { 656 this.lastActiveTime = timeStamp; 657 } 658 } 659 660 public boolean canGC() { 661 boolean result = false; 662 if (isGcIfInactive()&& this.lastActiveTime != 0l) { 663 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) { 664 result = true; 665 } 666 } 667 return result; 668 } 669 670 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 671 this.reduceMemoryFootprint = reduceMemoryFootprint; 672 } 673 674 protected boolean isReduceMemoryFootprint() { 675 return this.reduceMemoryFootprint; 676 } 677 }