001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.ArrayList; 022 import java.util.Collections; 023 import java.util.HashMap; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.ThreadPoolExecutor; 030 import javax.jms.InvalidClientIDException; 031 import javax.jms.JMSException; 032 import org.apache.activemq.advisory.AdvisorySupport; 033 import org.apache.activemq.broker.Broker; 034 import org.apache.activemq.broker.BrokerService; 035 import org.apache.activemq.broker.Connection; 036 import org.apache.activemq.broker.ConnectionContext; 037 import org.apache.activemq.broker.ConsumerBrokerExchange; 038 import org.apache.activemq.broker.EmptyBroker; 039 import org.apache.activemq.broker.ProducerBrokerExchange; 040 import org.apache.activemq.broker.TransportConnector; 041 import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 042 import org.apache.activemq.broker.region.policy.PolicyMap; 043 import org.apache.activemq.command.ActiveMQDestination; 044 import org.apache.activemq.command.BrokerId; 045 import org.apache.activemq.command.BrokerInfo; 046 import org.apache.activemq.command.ConnectionId; 047 import org.apache.activemq.command.ConnectionInfo; 048 import org.apache.activemq.command.ConsumerControl; 049 import org.apache.activemq.command.ConsumerInfo; 050 import org.apache.activemq.command.DestinationInfo; 051 import org.apache.activemq.command.Message; 052 import org.apache.activemq.command.MessageAck; 053 import org.apache.activemq.command.MessageDispatch; 054 import org.apache.activemq.command.MessageDispatchNotification; 055 import org.apache.activemq.command.MessagePull; 056 import org.apache.activemq.command.ProducerInfo; 057 import org.apache.activemq.command.RemoveSubscriptionInfo; 058 import org.apache.activemq.command.Response; 059 import org.apache.activemq.command.TransactionId; 060 import org.apache.activemq.state.ConnectionState; 061 import org.apache.activemq.store.kahadb.plist.PListStore; 062 import org.apache.activemq.thread.Scheduler; 063 import org.apache.activemq.thread.TaskRunnerFactory; 064 import org.apache.activemq.usage.SystemUsage; 065 import org.apache.activemq.util.BrokerSupport; 066 import org.apache.activemq.util.IdGenerator; 067 import org.apache.activemq.util.InetAddressUtil; 068 import org.apache.activemq.util.LongSequenceGenerator; 069 import org.apache.activemq.util.ServiceStopper; 070 import org.slf4j.Logger; 071 import org.slf4j.LoggerFactory; 072 073 /** 074 * Routes Broker operations to the correct messaging regions for processing. 075 * 076 * 077 */ 078 public class RegionBroker extends EmptyBroker { 079 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 080 private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); 081 private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); 082 083 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 084 protected DestinationFactory destinationFactory; 085 protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>()); 086 087 private final Region queueRegion; 088 private final Region topicRegion; 089 private final Region tempQueueRegion; 090 private final Region tempTopicRegion; 091 protected final BrokerService brokerService; 092 private boolean started; 093 private boolean keepDurableSubsActive; 094 095 private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>(); 096 private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>(); 097 private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>(); 098 099 private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 100 private BrokerId brokerId; 101 private String brokerName; 102 private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 103 private final DestinationInterceptor destinationInterceptor; 104 private ConnectionContext adminConnectionContext; 105 private final Scheduler scheduler; 106 private final ThreadPoolExecutor executor; 107 108 private final Runnable purgeInactiveDestinationsTask = new Runnable() { 109 public void run() { 110 purgeInactiveDestinations(); 111 } 112 }; 113 114 public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory, 115 DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 116 this.brokerService = brokerService; 117 this.executor=executor; 118 this.scheduler = scheduler; 119 if (destinationFactory == null) { 120 throw new IllegalArgumentException("null destinationFactory"); 121 } 122 this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId()); 123 this.destinationFactory = destinationFactory; 124 queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 125 topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 126 this.destinationInterceptor = destinationInterceptor; 127 tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory); 128 tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); 129 } 130 131 @Override 132 public Map<ActiveMQDestination, Destination> getDestinationMap() { 133 Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap(); 134 answer.putAll(getTopicRegion().getDestinationMap()); 135 return answer; 136 } 137 138 @Override 139 public Set <Destination> getDestinations(ActiveMQDestination destination) { 140 switch (destination.getDestinationType()) { 141 case ActiveMQDestination.QUEUE_TYPE: 142 return queueRegion.getDestinations(destination); 143 case ActiveMQDestination.TOPIC_TYPE: 144 return topicRegion.getDestinations(destination); 145 case ActiveMQDestination.TEMP_QUEUE_TYPE: 146 return tempQueueRegion.getDestinations(destination); 147 case ActiveMQDestination.TEMP_TOPIC_TYPE: 148 return tempTopicRegion.getDestinations(destination); 149 default: 150 return Collections.emptySet(); 151 } 152 } 153 154 @Override 155 public Broker getAdaptor(Class type) { 156 if (type.isInstance(this)) { 157 return this; 158 } 159 return null; 160 } 161 162 public Region getQueueRegion() { 163 return queueRegion; 164 } 165 166 public Region getTempQueueRegion() { 167 return tempQueueRegion; 168 } 169 170 public Region getTempTopicRegion() { 171 return tempTopicRegion; 172 } 173 174 public Region getTopicRegion() { 175 return topicRegion; 176 } 177 178 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 179 return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 180 } 181 182 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 183 return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 184 } 185 186 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 187 return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 188 } 189 190 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 191 return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 192 } 193 194 @Override 195 public void start() throws Exception { 196 ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); 197 started = true; 198 queueRegion.start(); 199 topicRegion.start(); 200 tempQueueRegion.start(); 201 tempTopicRegion.start(); 202 int period = this.brokerService.getSchedulePeriodForDestinationPurge(); 203 if (period > 0) { 204 this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period); 205 } 206 } 207 208 @Override 209 public void stop() throws Exception { 210 started = false; 211 this.scheduler.cancel(purgeInactiveDestinationsTask); 212 ServiceStopper ss = new ServiceStopper(); 213 doStop(ss); 214 ss.throwFirstException(); 215 // clear the state 216 clientIdSet.clear(); 217 connections.clear(); 218 destinations.clear(); 219 brokerInfos.clear(); 220 } 221 222 public PolicyMap getDestinationPolicy() { 223 return brokerService != null ? brokerService.getDestinationPolicy() : null; 224 } 225 226 @Override 227 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 228 String clientId = info.getClientId(); 229 if (clientId == null) { 230 throw new InvalidClientIDException("No clientID specified for connection request"); 231 } 232 synchronized (clientIdSet) { 233 ConnectionContext oldContext = clientIdSet.get(clientId); 234 if (oldContext != null) { 235 if (context.isFaultTolerant() || context.isNetworkConnection()){ 236 //remove the old connection 237 try{ 238 removeConnection(oldContext, info, new Exception("remove stale client")); 239 }catch(Exception e){ 240 LOG.warn("Failed to remove stale connection ",e); 241 } 242 }else{ 243 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " 244 + oldContext.getConnection().getRemoteAddress()); 245 } 246 } else { 247 clientIdSet.put(clientId, context); 248 } 249 } 250 251 connections.add(context.getConnection()); 252 } 253 254 @Override 255 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 256 String clientId = info.getClientId(); 257 if (clientId == null) { 258 throw new InvalidClientIDException("No clientID specified for connection disconnect request"); 259 } 260 synchronized (clientIdSet) { 261 ConnectionContext oldValue = clientIdSet.get(clientId); 262 // we may be removing the duplicate connection, not the first 263 // connection to be created 264 // so lets check that their connection IDs are the same 265 if (oldValue == context) { 266 if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) { 267 clientIdSet.remove(clientId); 268 } 269 } 270 } 271 connections.remove(context.getConnection()); 272 } 273 274 protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) { 275 return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); 276 } 277 278 @Override 279 public Connection[] getClients() throws Exception { 280 ArrayList<Connection> l = new ArrayList<Connection>(connections); 281 Connection rc[] = new Connection[l.size()]; 282 l.toArray(rc); 283 return rc; 284 } 285 286 @Override 287 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { 288 289 Destination answer; 290 291 answer = destinations.get(destination); 292 if (answer != null) { 293 return answer; 294 } 295 296 switch (destination.getDestinationType()) { 297 case ActiveMQDestination.QUEUE_TYPE: 298 answer = queueRegion.addDestination(context, destination,true); 299 break; 300 case ActiveMQDestination.TOPIC_TYPE: 301 answer = topicRegion.addDestination(context, destination,true); 302 break; 303 case ActiveMQDestination.TEMP_QUEUE_TYPE: 304 answer = tempQueueRegion.addDestination(context, destination,create); 305 break; 306 case ActiveMQDestination.TEMP_TOPIC_TYPE: 307 answer = tempTopicRegion.addDestination(context, destination,create); 308 break; 309 default: 310 throw createUnknownDestinationTypeException(destination); 311 } 312 313 destinations.put(destination, answer); 314 return answer; 315 316 } 317 318 @Override 319 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 320 321 if (destinations.containsKey(destination)) { 322 switch (destination.getDestinationType()) { 323 case ActiveMQDestination.QUEUE_TYPE: 324 queueRegion.removeDestination(context, destination, timeout); 325 removeAdvisoryTopics("Queue.", context, destination, timeout); 326 break; 327 case ActiveMQDestination.TOPIC_TYPE: 328 topicRegion.removeDestination(context, destination, timeout); 329 removeAdvisoryTopics("Topic.", context, destination, timeout); 330 break; 331 case ActiveMQDestination.TEMP_QUEUE_TYPE: 332 tempQueueRegion.removeDestination(context, destination, timeout); 333 break; 334 case ActiveMQDestination.TEMP_TOPIC_TYPE: 335 tempTopicRegion.removeDestination(context, destination, timeout); 336 break; 337 default: 338 throw createUnknownDestinationTypeException(destination); 339 } 340 destinations.remove(destination); 341 342 } 343 344 } 345 346 public void removeAdvisoryTopics(String destinationType, ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 347 if (this.brokerService.isAdvisorySupport()) { 348 String producerAdvisoryTopic = AdvisorySupport.PRODUCER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName(); 349 String consumerAdvisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + destinationType + destination.getPhysicalName(); 350 351 ActiveMQDestination dests[] = getDestinations(); 352 for (ActiveMQDestination dest: dests) { 353 String name = dest.getPhysicalName(); 354 if ( name.equals(producerAdvisoryTopic) || name.equals(consumerAdvisoryTopic) ) { 355 try { 356 removeDestination(context, dest, timeout); 357 } catch (JMSException ignore) { 358 // at least ignore the Unknown Destination Type JMSException 359 } 360 } 361 } 362 } 363 } 364 365 @Override 366 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 367 addDestination(context, info.getDestination(),true); 368 369 } 370 371 @Override 372 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 373 removeDestination(context, info.getDestination(), info.getTimeout()); 374 375 } 376 377 @Override 378 public ActiveMQDestination[] getDestinations() throws Exception { 379 ArrayList<ActiveMQDestination> l; 380 381 l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet()); 382 383 ActiveMQDestination rc[] = new ActiveMQDestination[l.size()]; 384 l.toArray(rc); 385 return rc; 386 } 387 388 @Override 389 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 390 ActiveMQDestination destination = info.getDestination(); 391 synchronized (purgeInactiveDestinationsTask) { 392 if (destination != null) { 393 394 // This seems to cause the destination to be added but without 395 // advisories firing... 396 context.getBroker().addDestination(context, destination, false); 397 switch (destination.getDestinationType()) { 398 case ActiveMQDestination.QUEUE_TYPE: 399 queueRegion.addProducer(context, info); 400 break; 401 case ActiveMQDestination.TOPIC_TYPE: 402 topicRegion.addProducer(context, info); 403 break; 404 case ActiveMQDestination.TEMP_QUEUE_TYPE: 405 tempQueueRegion.addProducer(context, info); 406 break; 407 case ActiveMQDestination.TEMP_TOPIC_TYPE: 408 tempTopicRegion.addProducer(context, info); 409 break; 410 } 411 } 412 } 413 } 414 415 @Override 416 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 417 ActiveMQDestination destination = info.getDestination(); 418 synchronized (purgeInactiveDestinationsTask) { 419 if (destination != null) { 420 switch (destination.getDestinationType()) { 421 case ActiveMQDestination.QUEUE_TYPE: 422 queueRegion.removeProducer(context, info); 423 break; 424 case ActiveMQDestination.TOPIC_TYPE: 425 topicRegion.removeProducer(context, info); 426 break; 427 case ActiveMQDestination.TEMP_QUEUE_TYPE: 428 tempQueueRegion.removeProducer(context, info); 429 break; 430 case ActiveMQDestination.TEMP_TOPIC_TYPE: 431 tempTopicRegion.removeProducer(context, info); 432 break; 433 } 434 } 435 } 436 } 437 438 @Override 439 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 440 ActiveMQDestination destination = info.getDestination(); 441 if (destinationInterceptor != null) { 442 destinationInterceptor.create(this, context, destination); 443 } 444 synchronized (purgeInactiveDestinationsTask) { 445 switch (destination.getDestinationType()) { 446 case ActiveMQDestination.QUEUE_TYPE: 447 return queueRegion.addConsumer(context, info); 448 449 case ActiveMQDestination.TOPIC_TYPE: 450 return topicRegion.addConsumer(context, info); 451 452 case ActiveMQDestination.TEMP_QUEUE_TYPE: 453 return tempQueueRegion.addConsumer(context, info); 454 455 case ActiveMQDestination.TEMP_TOPIC_TYPE: 456 return tempTopicRegion.addConsumer(context, info); 457 458 default: 459 throw createUnknownDestinationTypeException(destination); 460 } 461 } 462 } 463 464 @Override 465 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 466 ActiveMQDestination destination = info.getDestination(); 467 synchronized (purgeInactiveDestinationsTask) { 468 switch (destination.getDestinationType()) { 469 470 case ActiveMQDestination.QUEUE_TYPE: 471 queueRegion.removeConsumer(context, info); 472 break; 473 case ActiveMQDestination.TOPIC_TYPE: 474 topicRegion.removeConsumer(context, info); 475 break; 476 case ActiveMQDestination.TEMP_QUEUE_TYPE: 477 tempQueueRegion.removeConsumer(context, info); 478 break; 479 case ActiveMQDestination.TEMP_TOPIC_TYPE: 480 tempTopicRegion.removeConsumer(context, info); 481 break; 482 default: 483 throw createUnknownDestinationTypeException(destination); 484 } 485 } 486 } 487 488 @Override 489 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 490 synchronized (purgeInactiveDestinationsTask) { 491 topicRegion.removeSubscription(context, info); 492 } 493 } 494 495 @Override 496 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 497 message.setBrokerInTime(System.currentTimeMillis()); 498 if (producerExchange.isMutable() || producerExchange.getRegion() == null 499 || (producerExchange.getRegion() != null && producerExchange.getRegion().getDestinationMap().get(message.getDestination()) == null)) { 500 ActiveMQDestination destination = message.getDestination(); 501 // ensure the destination is registered with the RegionBroker 502 producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination,false); 503 Region region; 504 switch (destination.getDestinationType()) { 505 case ActiveMQDestination.QUEUE_TYPE: 506 region = queueRegion; 507 break; 508 case ActiveMQDestination.TOPIC_TYPE: 509 region = topicRegion; 510 break; 511 case ActiveMQDestination.TEMP_QUEUE_TYPE: 512 region = tempQueueRegion; 513 break; 514 case ActiveMQDestination.TEMP_TOPIC_TYPE: 515 region = tempTopicRegion; 516 break; 517 default: 518 throw createUnknownDestinationTypeException(destination); 519 } 520 producerExchange.setRegion(region); 521 producerExchange.setRegionDestination(null); 522 } 523 producerExchange.getRegion().send(producerExchange, message); 524 } 525 526 @Override 527 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 528 if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { 529 ActiveMQDestination destination = ack.getDestination(); 530 Region region; 531 switch (destination.getDestinationType()) { 532 case ActiveMQDestination.QUEUE_TYPE: 533 region = queueRegion; 534 break; 535 case ActiveMQDestination.TOPIC_TYPE: 536 region = topicRegion; 537 break; 538 case ActiveMQDestination.TEMP_QUEUE_TYPE: 539 region = tempQueueRegion; 540 break; 541 case ActiveMQDestination.TEMP_TOPIC_TYPE: 542 region = tempTopicRegion; 543 break; 544 default: 545 throw createUnknownDestinationTypeException(destination); 546 } 547 consumerExchange.setRegion(region); 548 } 549 consumerExchange.getRegion().acknowledge(consumerExchange, ack); 550 } 551 552 @Override 553 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 554 ActiveMQDestination destination = pull.getDestination(); 555 switch (destination.getDestinationType()) { 556 case ActiveMQDestination.QUEUE_TYPE: 557 return queueRegion.messagePull(context, pull); 558 559 case ActiveMQDestination.TOPIC_TYPE: 560 return topicRegion.messagePull(context, pull); 561 562 case ActiveMQDestination.TEMP_QUEUE_TYPE: 563 return tempQueueRegion.messagePull(context, pull); 564 565 case ActiveMQDestination.TEMP_TOPIC_TYPE: 566 return tempTopicRegion.messagePull(context, pull); 567 default: 568 throw createUnknownDestinationTypeException(destination); 569 } 570 } 571 572 @Override 573 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 574 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 575 } 576 577 @Override 578 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 579 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 580 } 581 582 @Override 583 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 584 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 585 } 586 587 @Override 588 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 589 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 590 } 591 592 @Override 593 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 594 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 595 } 596 597 @Override 598 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 599 throw new IllegalAccessException("Transaction operation not implemented by this broker."); 600 } 601 602 @Override 603 public void gc() { 604 queueRegion.gc(); 605 topicRegion.gc(); 606 } 607 608 @Override 609 public BrokerId getBrokerId() { 610 if (brokerId == null) { 611 brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); 612 } 613 return brokerId; 614 } 615 616 public void setBrokerId(BrokerId brokerId) { 617 this.brokerId = brokerId; 618 } 619 620 @Override 621 public String getBrokerName() { 622 if (brokerName == null) { 623 try { 624 brokerName = InetAddressUtil.getLocalHostName().toLowerCase(); 625 } catch (Exception e) { 626 brokerName = "localhost"; 627 } 628 } 629 return brokerName; 630 } 631 632 public void setBrokerName(String brokerName) { 633 this.brokerName = brokerName; 634 } 635 636 public DestinationStatistics getDestinationStatistics() { 637 return destinationStatistics; 638 } 639 640 protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) { 641 return new JMSException("Unknown destination type: " + destination.getDestinationType()); 642 } 643 644 @Override 645 public synchronized void addBroker(Connection connection, BrokerInfo info) { 646 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 647 if (existing == null) { 648 existing = info.copy(); 649 existing.setPeerBrokerInfos(null); 650 brokerInfos.put(info.getBrokerId(), existing); 651 } 652 existing.incrementRefCount(); 653 LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); 654 addBrokerInClusterUpdate(); 655 } 656 657 @Override 658 public synchronized void removeBroker(Connection connection, BrokerInfo info) { 659 if (info != null) { 660 BrokerInfo existing = brokerInfos.get(info.getBrokerId()); 661 if (existing != null && existing.decrementRefCount() == 0) { 662 brokerInfos.remove(info.getBrokerId()); 663 } 664 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size()); 665 removeBrokerInClusterUpdate(); 666 } 667 } 668 669 @Override 670 public synchronized BrokerInfo[] getPeerBrokerInfos() { 671 BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; 672 result = brokerInfos.values().toArray(result); 673 return result; 674 } 675 676 @Override 677 public void preProcessDispatch(MessageDispatch messageDispatch) { 678 Message message = messageDispatch.getMessage(); 679 if (message != null) { 680 long endTime = System.currentTimeMillis(); 681 message.setBrokerOutTime(endTime); 682 if (getBrokerService().isEnableStatistics()) { 683 long totalTime = endTime - message.getBrokerInTime(); 684 message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime); 685 } 686 } 687 } 688 689 @Override 690 public void postProcessDispatch(MessageDispatch messageDispatch) { 691 } 692 693 @Override 694 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 695 ActiveMQDestination destination = messageDispatchNotification.getDestination(); 696 switch (destination.getDestinationType()) { 697 case ActiveMQDestination.QUEUE_TYPE: 698 queueRegion.processDispatchNotification(messageDispatchNotification); 699 break; 700 case ActiveMQDestination.TOPIC_TYPE: 701 topicRegion.processDispatchNotification(messageDispatchNotification); 702 break; 703 case ActiveMQDestination.TEMP_QUEUE_TYPE: 704 tempQueueRegion.processDispatchNotification(messageDispatchNotification); 705 break; 706 case ActiveMQDestination.TEMP_TOPIC_TYPE: 707 tempTopicRegion.processDispatchNotification(messageDispatchNotification); 708 break; 709 default: 710 throw createUnknownDestinationTypeException(destination); 711 } 712 } 713 714 public boolean isSlaveBroker() { 715 return brokerService.isSlave(); 716 } 717 718 @Override 719 public boolean isStopped() { 720 return !started; 721 } 722 723 @Override 724 public Set<ActiveMQDestination> getDurableDestinations() { 725 return destinationFactory.getDestinations(); 726 } 727 728 protected void doStop(ServiceStopper ss) { 729 ss.stop(queueRegion); 730 ss.stop(topicRegion); 731 ss.stop(tempQueueRegion); 732 ss.stop(tempTopicRegion); 733 } 734 735 public boolean isKeepDurableSubsActive() { 736 return keepDurableSubsActive; 737 } 738 739 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 740 this.keepDurableSubsActive = keepDurableSubsActive; 741 } 742 743 public DestinationInterceptor getDestinationInterceptor() { 744 return destinationInterceptor; 745 } 746 747 @Override 748 public ConnectionContext getAdminConnectionContext() { 749 return adminConnectionContext; 750 } 751 752 @Override 753 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 754 this.adminConnectionContext = adminConnectionContext; 755 } 756 757 public Map<ConnectionId, ConnectionState> getConnectionStates() { 758 return connectionStates; 759 } 760 761 @Override 762 public PListStore getTempDataStore() { 763 return brokerService.getTempDataStore(); 764 } 765 766 @Override 767 public URI getVmConnectorURI() { 768 return brokerService.getVmConnectorURI(); 769 } 770 771 @Override 772 public void brokerServiceStarted() { 773 } 774 775 @Override 776 public BrokerService getBrokerService() { 777 return brokerService; 778 } 779 780 @Override 781 public boolean isExpired(MessageReference messageReference) { 782 boolean expired = false; 783 if (messageReference.isExpired()) { 784 try { 785 // prevent duplicate expiry processing 786 Message message = messageReference.getMessage(); 787 synchronized (message) { 788 expired = stampAsExpired(message); 789 } 790 } catch (IOException e) { 791 LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e); 792 } 793 } 794 return expired; 795 } 796 797 private boolean stampAsExpired(Message message) throws IOException { 798 boolean stamped=false; 799 if (message.getProperty(ORIGINAL_EXPIRATION) == null) { 800 long expiration=message.getExpiration(); 801 message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration)); 802 stamped = true; 803 } 804 return stamped; 805 } 806 807 808 @Override 809 public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { 810 if (LOG.isDebugEnabled()) { 811 LOG.debug("Message expired " + node); 812 } 813 getRoot().sendToDeadLetterQueue(context, node, subscription); 814 } 815 816 @Override 817 public void sendToDeadLetterQueue(ConnectionContext context, 818 MessageReference node, Subscription subscription){ 819 try{ 820 if(node!=null){ 821 Message message=node.getMessage(); 822 if(message!=null && node.getRegionDestination()!=null){ 823 DeadLetterStrategy deadLetterStrategy=node 824 .getRegionDestination().getDeadLetterStrategy(); 825 if(deadLetterStrategy!=null){ 826 if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ 827 // message may be inflight to other subscriptions so do not modify 828 message = message.copy(); 829 stampAsExpired(message); 830 message.setExpiration(0); 831 if(!message.isPersistent()){ 832 message.setPersistent(true); 833 message.setProperty("originalDeliveryMode", 834 "NON_PERSISTENT"); 835 } 836 // The original destination and transaction id do 837 // not get filled when the message is first sent, 838 // it is only populated if the message is routed to 839 // another destination like the DLQ 840 ActiveMQDestination deadLetterDestination=deadLetterStrategy 841 .getDeadLetterQueueFor(message, subscription); 842 if (context.getBroker()==null) { 843 context.setBroker(getRoot()); 844 } 845 BrokerSupport.resendNoCopy(context,message, 846 deadLetterDestination); 847 } 848 } else { 849 if (LOG.isDebugEnabled()) { 850 LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " 851 + message.getMessageId() + ", destination: " + message.getDestination()); 852 } 853 } 854 } 855 } 856 }catch(Exception e){ 857 LOG.warn("Caught an exception sending to DLQ: "+node,e); 858 } 859 } 860 861 @Override 862 public Broker getRoot() { 863 try { 864 return getBrokerService().getBroker(); 865 } catch (Exception e) { 866 LOG.error("Trying to get Root Broker " + e); 867 throw new RuntimeException("The broker from the BrokerService should not throw an exception"); 868 } 869 } 870 871 /** 872 * @return the broker sequence id 873 */ 874 @Override 875 public long getBrokerSequenceId() { 876 synchronized(sequenceGenerator) { 877 return sequenceGenerator.getNextSequenceId(); 878 } 879 } 880 881 882 @Override 883 public Scheduler getScheduler() { 884 return this.scheduler; 885 } 886 887 public ThreadPoolExecutor getExecutor() { 888 return this.executor; 889 } 890 891 @Override 892 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { 893 ActiveMQDestination destination = control.getDestination(); 894 switch (destination.getDestinationType()) { 895 case ActiveMQDestination.QUEUE_TYPE: 896 queueRegion.processConsumerControl(consumerExchange, control); 897 break; 898 899 case ActiveMQDestination.TOPIC_TYPE: 900 topicRegion.processConsumerControl(consumerExchange, control); 901 break; 902 903 case ActiveMQDestination.TEMP_QUEUE_TYPE: 904 tempQueueRegion.processConsumerControl(consumerExchange, control); 905 break; 906 907 case ActiveMQDestination.TEMP_TOPIC_TYPE: 908 tempTopicRegion.processConsumerControl(consumerExchange, control); 909 break; 910 911 default: 912 LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control); 913 } 914 } 915 916 protected void addBrokerInClusterUpdate() { 917 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 918 for (TransportConnector connector : connectors) { 919 if (connector.isUpdateClusterClients()) { 920 connector.updateClientClusterInfo(); 921 } 922 } 923 } 924 925 protected void removeBrokerInClusterUpdate() { 926 List<TransportConnector> connectors = this.brokerService.getTransportConnectors(); 927 for (TransportConnector connector : connectors) { 928 if (connector.isUpdateClusterClients() && connector.isUpdateClusterClientsOnRemove()) { 929 connector.updateClientClusterInfo(); 930 } 931 } 932 } 933 934 protected void purgeInactiveDestinations() { 935 synchronized (purgeInactiveDestinationsTask) { 936 List<BaseDestination> list = new ArrayList<BaseDestination>(); 937 Map<ActiveMQDestination, Destination> map = getDestinationMap(); 938 long timeStamp = System.currentTimeMillis(); 939 for (Destination d : map.values()) { 940 if (d instanceof BaseDestination) { 941 BaseDestination bd = (BaseDestination) d; 942 bd.markForGC(timeStamp); 943 if (bd.canGC()) { 944 list.add(bd); 945 } 946 } 947 } 948 949 if (list.isEmpty() == false) { 950 951 ConnectionContext context = BrokerSupport.getConnectionContext(this); 952 context.setBroker(this); 953 954 for (BaseDestination dest : list) { 955 dest.getLog().info( 956 dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC() 957 + " ms - removing ..."); 958 try { 959 getRoot().removeDestination(context, dest.getActiveMQDestination(), 0); 960 } catch (Exception e) { 961 LOG.error("Failed to remove inactive destination " + dest, e); 962 } 963 } 964 } 965 } 966 } 967 }