001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.net.UnknownHostException; 024 import java.util.ArrayList; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.LinkedBlockingQueue; 033 import java.util.concurrent.SynchronousQueue; 034 import java.util.concurrent.ThreadFactory; 035 import java.util.concurrent.ThreadPoolExecutor; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.atomic.AtomicBoolean; 038 039 import javax.annotation.PostConstruct; 040 import javax.annotation.PreDestroy; 041 import javax.management.MalformedObjectNameException; 042 import javax.management.ObjectName; 043 044 import org.apache.activemq.ActiveMQConnectionMetaData; 045 import org.apache.activemq.ConfigurationException; 046 import org.apache.activemq.Service; 047 import org.apache.activemq.advisory.AdvisoryBroker; 048 import org.apache.activemq.broker.cluster.ConnectionSplitBroker; 049 import org.apache.activemq.broker.ft.MasterConnector; 050 import org.apache.activemq.broker.jmx.AnnotatedMBean; 051 import org.apache.activemq.broker.jmx.BrokerView; 052 import org.apache.activemq.broker.jmx.ConnectorView; 053 import org.apache.activemq.broker.jmx.ConnectorViewMBean; 054 import org.apache.activemq.broker.jmx.FTConnectorView; 055 import org.apache.activemq.broker.jmx.JmsConnectorView; 056 import org.apache.activemq.broker.jmx.JobSchedulerView; 057 import org.apache.activemq.broker.jmx.JobSchedulerViewMBean; 058 import org.apache.activemq.broker.jmx.ManagedRegionBroker; 059 import org.apache.activemq.broker.jmx.ManagementContext; 060 import org.apache.activemq.broker.jmx.NetworkConnectorView; 061 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean; 062 import org.apache.activemq.broker.jmx.ProxyConnectorView; 063 import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 064 import org.apache.activemq.broker.region.Destination; 065 import org.apache.activemq.broker.region.DestinationFactory; 066 import org.apache.activemq.broker.region.DestinationFactoryImpl; 067 import org.apache.activemq.broker.region.DestinationInterceptor; 068 import org.apache.activemq.broker.region.RegionBroker; 069 import org.apache.activemq.broker.region.policy.PolicyMap; 070 import org.apache.activemq.broker.region.virtual.MirroredQueue; 071 import org.apache.activemq.broker.region.virtual.VirtualDestination; 072 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; 073 import org.apache.activemq.broker.region.virtual.VirtualTopic; 074 import org.apache.activemq.broker.scheduler.SchedulerBroker; 075 import org.apache.activemq.command.ActiveMQDestination; 076 import org.apache.activemq.command.BrokerId; 077 import org.apache.activemq.network.ConnectionFilter; 078 import org.apache.activemq.network.DiscoveryNetworkConnector; 079 import org.apache.activemq.network.NetworkConnector; 080 import org.apache.activemq.network.jms.JmsConnector; 081 import org.apache.activemq.proxy.ProxyConnector; 082 import org.apache.activemq.security.MessageAuthorizationPolicy; 083 import org.apache.activemq.selector.SelectorParser; 084 import org.apache.activemq.store.PersistenceAdapter; 085 import org.apache.activemq.store.PersistenceAdapterFactory; 086 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; 087 import org.apache.activemq.store.kahadb.plist.PListStore; 088 import org.apache.activemq.store.memory.MemoryPersistenceAdapter; 089 import org.apache.activemq.thread.Scheduler; 090 import org.apache.activemq.thread.TaskRunnerFactory; 091 import org.apache.activemq.transport.TransportFactory; 092 import org.apache.activemq.transport.TransportServer; 093 import org.apache.activemq.transport.vm.VMTransportFactory; 094 import org.apache.activemq.usage.SystemUsage; 095 import org.apache.activemq.util.*; 096 import org.slf4j.Logger; 097 import org.slf4j.LoggerFactory; 098 import org.slf4j.MDC; 099 100 /** 101 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a 102 * number of transport connectors, network connectors and a bunch of properties 103 * which can be used to configure the broker as its lazily created. 104 * 105 * 106 * @org.apache.xbean.XBean 107 */ 108 public class BrokerService implements Service { 109 protected CountDownLatch slaveStartSignal = new CountDownLatch(1); 110 public static final String DEFAULT_PORT = "61616"; 111 public static final String LOCAL_HOST_NAME; 112 public static final String DEFAULT_BROKER_NAME = "localhost"; 113 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class); 114 private static final long serialVersionUID = 7353129142305630237L; 115 private boolean useJmx = true; 116 private boolean enableStatistics = true; 117 private boolean persistent = true; 118 private boolean populateJMSXUserID; 119 private boolean useAuthenticatedPrincipalForJMSXUserID; 120 121 private boolean useShutdownHook = true; 122 private boolean useLoggingForShutdownErrors; 123 private boolean shutdownOnMasterFailure; 124 private boolean shutdownOnSlaveFailure; 125 private boolean waitForSlave; 126 private long waitForSlaveTimeout = 600000L; 127 private boolean passiveSlave; 128 private String brokerName = DEFAULT_BROKER_NAME; 129 private File dataDirectoryFile; 130 private File tmpDataDirectory; 131 private Broker broker; 132 private BrokerView adminView; 133 private ManagementContext managementContext; 134 private ObjectName brokerObjectName; 135 private TaskRunnerFactory taskRunnerFactory; 136 private TaskRunnerFactory persistenceTaskRunnerFactory; 137 private SystemUsage systemUsage; 138 private SystemUsage producerSystemUsage; 139 private SystemUsage consumerSystemUsaage; 140 private PersistenceAdapter persistenceAdapter; 141 private PersistenceAdapterFactory persistenceFactory; 142 protected DestinationFactory destinationFactory; 143 private MessageAuthorizationPolicy messageAuthorizationPolicy; 144 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>(); 145 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>(); 146 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>(); 147 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>(); 148 private final List<Service> services = new ArrayList<Service>(); 149 private MasterConnector masterConnector; 150 private String masterConnectorURI; 151 private transient Thread shutdownHook; 152 private String[] transportConnectorURIs; 153 private String[] networkConnectorURIs; 154 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges 155 // to other jms messaging 156 // systems 157 private boolean deleteAllMessagesOnStartup; 158 private boolean advisorySupport = true; 159 private URI vmConnectorURI; 160 private String defaultSocketURIString; 161 private PolicyMap destinationPolicy; 162 private final AtomicBoolean started = new AtomicBoolean(false); 163 private final AtomicBoolean stopped = new AtomicBoolean(false); 164 private BrokerPlugin[] plugins; 165 private boolean keepDurableSubsActive = true; 166 private boolean useVirtualTopics = true; 167 private boolean useMirroredQueues = false; 168 private boolean useTempMirroredQueues = true; 169 private BrokerId brokerId; 170 private DestinationInterceptor[] destinationInterceptors; 171 private ActiveMQDestination[] destinations; 172 private PListStore tempDataStore; 173 private int persistenceThreadPriority = Thread.MAX_PRIORITY; 174 private boolean useLocalHostBrokerName; 175 private final CountDownLatch stoppedLatch = new CountDownLatch(1); 176 private final CountDownLatch startedLatch = new CountDownLatch(1); 177 private boolean supportFailOver; 178 private Broker regionBroker; 179 private int producerSystemUsagePortion = 60; 180 private int consumerSystemUsagePortion = 40; 181 private boolean splitSystemUsageForProducersConsumers; 182 private boolean monitorConnectionSplits = false; 183 private int taskRunnerPriority = Thread.NORM_PRIORITY; 184 private boolean dedicatedTaskRunner; 185 private boolean cacheTempDestinations = false;// useful for failover 186 private int timeBeforePurgeTempDestinations = 5000; 187 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>(); 188 private boolean systemExitOnShutdown; 189 private int systemExitOnShutdownExitCode; 190 private SslContext sslContext; 191 private boolean forceStart = false; 192 private IOExceptionHandler ioExceptionHandler; 193 private boolean schedulerSupport = false; 194 private File schedulerDirectoryFile; 195 private Scheduler scheduler; 196 private ThreadPoolExecutor executor; 197 private boolean slave = true; 198 private int schedulePeriodForDestinationPurge=5000; 199 private BrokerContext brokerContext; 200 private boolean networkConnectorStartAsync = false; 201 202 static { 203 String localHostName = "localhost"; 204 try { 205 localHostName = InetAddressUtil.getLocalHostName(); 206 } catch (UnknownHostException e) { 207 LOG.error("Failed to resolve localhost"); 208 } 209 LOCAL_HOST_NAME = localHostName; 210 } 211 212 @Override 213 public String toString() { 214 return "BrokerService[" + getBrokerName() + "]"; 215 } 216 217 /** 218 * Adds a new transport connector for the given bind address 219 * 220 * @return the newly created and added transport connector 221 * @throws Exception 222 */ 223 public TransportConnector addConnector(String bindAddress) throws Exception { 224 return addConnector(new URI(bindAddress)); 225 } 226 227 /** 228 * Adds a new transport connector for the given bind address 229 * 230 * @return the newly created and added transport connector 231 * @throws Exception 232 */ 233 public TransportConnector addConnector(URI bindAddress) throws Exception { 234 return addConnector(createTransportConnector(bindAddress)); 235 } 236 237 /** 238 * Adds a new transport connector for the given TransportServer transport 239 * 240 * @return the newly created and added transport connector 241 * @throws Exception 242 */ 243 public TransportConnector addConnector(TransportServer transport) throws Exception { 244 return addConnector(new TransportConnector(transport)); 245 } 246 247 /** 248 * Adds a new transport connector 249 * 250 * @return the transport connector 251 * @throws Exception 252 */ 253 public TransportConnector addConnector(TransportConnector connector) throws Exception { 254 transportConnectors.add(connector); 255 return connector; 256 } 257 258 /** 259 * Stops and removes a transport connector from the broker. 260 * 261 * @param connector 262 * @return true if the connector has been previously added to the broker 263 * @throws Exception 264 */ 265 public boolean removeConnector(TransportConnector connector) throws Exception { 266 boolean rc = transportConnectors.remove(connector); 267 if (rc) { 268 unregisterConnectorMBean(connector); 269 } 270 return rc; 271 } 272 273 /** 274 * Adds a new network connector using the given discovery address 275 * 276 * @return the newly created and added network connector 277 * @throws Exception 278 */ 279 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception { 280 return addNetworkConnector(new URI(discoveryAddress)); 281 } 282 283 /** 284 * Adds a new proxy connector using the given bind address 285 * 286 * @return the newly created and added network connector 287 * @throws Exception 288 */ 289 public ProxyConnector addProxyConnector(String bindAddress) throws Exception { 290 return addProxyConnector(new URI(bindAddress)); 291 } 292 293 /** 294 * Adds a new network connector using the given discovery address 295 * 296 * @return the newly created and added network connector 297 * @throws Exception 298 */ 299 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception { 300 if (!isAdvisorySupport()) { 301 throw new javax.jms.IllegalStateException( 302 "Networks require advisory messages to function - advisories are currently disabled"); 303 } 304 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress); 305 return addNetworkConnector(connector); 306 } 307 308 /** 309 * Adds a new proxy connector using the given bind address 310 * 311 * @return the newly created and added network connector 312 * @throws Exception 313 */ 314 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception { 315 ProxyConnector connector = new ProxyConnector(); 316 connector.setBind(bindAddress); 317 connector.setRemote(new URI("fanout:multicast://default")); 318 return addProxyConnector(connector); 319 } 320 321 /** 322 * Adds a new network connector to connect this broker to a federated 323 * network 324 */ 325 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception { 326 connector.setBrokerService(this); 327 URI uri = getVmConnectorURI(); 328 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 329 map.put("network", "true"); 330 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 331 connector.setLocalUri(uri); 332 // Set a connection filter so that the connector does not establish loop 333 // back connections. 334 connector.setConnectionFilter(new ConnectionFilter() { 335 public boolean connectTo(URI location) { 336 List<TransportConnector> transportConnectors = getTransportConnectors(); 337 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 338 try { 339 TransportConnector tc = iter.next(); 340 if (location.equals(tc.getConnectUri())) { 341 return false; 342 } 343 } catch (Throwable e) { 344 } 345 } 346 return true; 347 } 348 }); 349 networkConnectors.add(connector); 350 if (isUseJmx()) { 351 registerNetworkConnectorMBean(connector); 352 } 353 return connector; 354 } 355 356 /** 357 * Removes the given network connector without stopping it. The caller 358 * should call {@link NetworkConnector#stop()} to close the connector 359 */ 360 public boolean removeNetworkConnector(NetworkConnector connector) { 361 boolean answer = networkConnectors.remove(connector); 362 if (answer) { 363 unregisterNetworkConnectorMBean(connector); 364 } 365 return answer; 366 } 367 368 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception { 369 URI uri = getVmConnectorURI(); 370 connector.setLocalUri(uri); 371 proxyConnectors.add(connector); 372 if (isUseJmx()) { 373 registerProxyConnectorMBean(connector); 374 } 375 return connector; 376 } 377 378 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception { 379 connector.setBrokerService(this); 380 jmsConnectors.add(connector); 381 if (isUseJmx()) { 382 registerJmsConnectorMBean(connector); 383 } 384 return connector; 385 } 386 387 public JmsConnector removeJmsConnector(JmsConnector connector) { 388 if (jmsConnectors.remove(connector)) { 389 return connector; 390 } 391 return null; 392 } 393 394 /** 395 * @return Returns the masterConnectorURI. 396 */ 397 public String getMasterConnectorURI() { 398 return masterConnectorURI; 399 } 400 401 /** 402 * @param masterConnectorURI 403 * The masterConnectorURI to set. 404 */ 405 public void setMasterConnectorURI(String masterConnectorURI) { 406 this.masterConnectorURI = masterConnectorURI; 407 } 408 409 /** 410 * @return true if this Broker is a slave to a Master 411 */ 412 public boolean isSlave() { 413 return (masterConnector != null && masterConnector.isSlave()) || 414 (masterConnector != null && masterConnector.isStoppedBeforeStart()) || 415 (masterConnector == null && slave); 416 } 417 418 public void masterFailed() { 419 if (shutdownOnMasterFailure) { 420 LOG.error("The Master has failed ... shutting down"); 421 try { 422 stop(); 423 } catch (Exception e) { 424 LOG.error("Failed to stop for master failure", e); 425 } 426 } else { 427 LOG.warn("Master Failed - starting all connectors"); 428 try { 429 startAllConnectors(); 430 broker.nowMasterBroker(); 431 } catch (Exception e) { 432 LOG.error("Failed to startAllConnectors", e); 433 } 434 } 435 } 436 437 public boolean isStarted() { 438 return started.get(); 439 } 440 441 public void start(boolean force) throws Exception { 442 forceStart = force; 443 stopped.set(false); 444 started.set(false); 445 start(); 446 } 447 448 // Service interface 449 // ------------------------------------------------------------------------- 450 451 protected boolean shouldAutostart() { 452 return true; 453 } 454 455 /** 456 * 457 * @throws Exception 458 * @org. apache.xbean.InitMethod 459 */ 460 @PostConstruct 461 public void autoStart() throws Exception { 462 if(shouldAutostart()) { 463 start(); 464 } 465 } 466 467 public void start() throws Exception { 468 if (stopped.get() || !started.compareAndSet(false, true)) { 469 // lets just ignore redundant start() calls 470 // as its way too easy to not be completely sure if start() has been 471 // called or not with the gazillion of different configuration 472 // mechanisms 473 // throw new IllegalStateException("Allready started."); 474 return; 475 } 476 477 MDC.put("activemq.broker", brokerName); 478 479 try { 480 if (systemExitOnShutdown && useShutdownHook) { 481 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); 482 } 483 processHelperProperties(); 484 if (isUseJmx()) { 485 startManagementContext(); 486 } 487 getPersistenceAdapter().setUsageManager(getProducerSystemUsage()); 488 getPersistenceAdapter().setBrokerName(getBrokerName()); 489 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter()); 490 if (deleteAllMessagesOnStartup) { 491 deleteAllMessages(); 492 } 493 getPersistenceAdapter().start(); 494 slave = false; 495 startDestinations(); 496 addShutdownHook(); 497 getBroker().start(); 498 if (isUseJmx()) { 499 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) { 500 // try to restart management context 501 // typical for slaves that use the same ports as master 502 managementContext.stop(); 503 startManagementContext(); 504 } 505 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker; 506 managedBroker.setContextBroker(broker); 507 adminView.setBroker(managedBroker); 508 } 509 BrokerRegistry.getInstance().bind(getBrokerName(), this); 510 // see if there is a MasterBroker service and if so, configure 511 // it and start it. 512 for (Service service : services) { 513 if (service instanceof MasterConnector) { 514 configureService(service); 515 service.start(); 516 } 517 } 518 if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) { 519 startAllConnectors(); 520 } 521 if (!stopped.get()) { 522 if (isUseJmx() && masterConnector != null) { 523 registerFTConnectorMBean(masterConnector); 524 } 525 } 526 if (brokerId == null) { 527 brokerId = broker.getBrokerId(); 528 } 529 if (ioExceptionHandler == null) { 530 setIoExceptionHandler(new DefaultIOExceptionHandler()); 531 } 532 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started"); 533 getBroker().brokerServiceStarted(); 534 startedLatch.countDown(); 535 } catch (Exception e) { 536 LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e); 537 try { 538 if (!stopped.get()) { 539 stop(); 540 } 541 } catch (Exception ex) { 542 LOG.warn("Failed to stop broker after failure in start ", ex); 543 } 544 throw e; 545 } finally { 546 MDC.remove("activemq.broker"); 547 } 548 } 549 550 /** 551 * 552 * @throws Exception 553 * @org.apache .xbean.DestroyMethod 554 */ 555 @PreDestroy 556 public void stop() throws Exception { 557 if (!started.get()) { 558 return; 559 } 560 561 MDC.put("activemq.broker", brokerName); 562 563 if (systemExitOnShutdown) { 564 new Thread() { 565 @Override 566 public void run() { 567 System.exit(systemExitOnShutdownExitCode); 568 } 569 }.start(); 570 } 571 572 LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down"); 573 removeShutdownHook(); 574 ServiceStopper stopper = new ServiceStopper(); 575 if (services != null) { 576 for (Service service : services) { 577 stopper.stop(service); 578 } 579 } 580 stopAllConnectors(stopper); 581 // remove any VMTransports connected 582 // this has to be done after services are stopped, 583 // to avoid timimg issue with discovery (spinning up a new instance) 584 BrokerRegistry.getInstance().unbind(getBrokerName()); 585 VMTransportFactory.stopped(getBrokerName()); 586 if (broker != null) { 587 stopper.stop(broker); 588 broker = null; 589 } 590 591 if (tempDataStore != null) { 592 tempDataStore.stop(); 593 tempDataStore = null; 594 } 595 stopper.stop(persistenceAdapter); 596 persistenceAdapter = null; 597 slave = true; 598 if (isUseJmx()) { 599 stopper.stop(getManagementContext()); 600 managementContext = null; 601 } 602 // Clear SelectorParser cache to free memory 603 SelectorParser.clearCache(); 604 stopped.set(true); 605 stoppedLatch.countDown(); 606 if (masterConnectorURI == null) { 607 // master start has not finished yet 608 if (slaveStartSignal.getCount() == 1) { 609 started.set(false); 610 slaveStartSignal.countDown(); 611 } 612 } else { 613 for (Service service : services) { 614 if (service instanceof MasterConnector) { 615 MasterConnector mConnector = (MasterConnector) service; 616 if (!mConnector.isSlave()) { 617 // means should be slave but not connected to master yet 618 started.set(false); 619 mConnector.stopBeforeConnected(); 620 } 621 } 622 } 623 } 624 if (this.taskRunnerFactory != null) { 625 this.taskRunnerFactory.shutdown(); 626 this.taskRunnerFactory = null; 627 } 628 if (this.scheduler != null) { 629 this.scheduler.stop(); 630 this.scheduler = null; 631 } 632 if (this.executor != null) { 633 this.executor.shutdownNow(); 634 this.executor = null; 635 } 636 637 this.destinationInterceptors = null; 638 this.destinationFactory = null; 639 640 LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped"); 641 synchronized (shutdownHooks) { 642 for (Runnable hook : shutdownHooks) { 643 try { 644 hook.run(); 645 } catch (Throwable e) { 646 stopper.onException(hook, e); 647 } 648 } 649 } 650 651 MDC.remove("activemq.broker"); 652 653 stopper.throwFirstException(); 654 } 655 656 public boolean checkQueueSize(String queueName) { 657 long count = 0; 658 long queueSize = 0; 659 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap(); 660 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) { 661 if (entry.getKey().isQueue()) { 662 if (entry.getValue().getName().matches(queueName)) { 663 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount(); 664 count += queueSize; 665 if (queueSize > 0) { 666 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:" 667 + queueSize); 668 } 669 } 670 } 671 } 672 return count == 0; 673 } 674 675 /** 676 * This method (both connectorName and queueName are using regex to match) 677 * 1. stop the connector (supposed the user input the connector which the 678 * clients connect to) 2. to check whether there is any pending message on 679 * the queues defined by queueName 3. supposedly, after stop the connector, 680 * client should failover to other broker and pending messages should be 681 * forwarded. if no pending messages, the method finally call stop to stop 682 * the broker. 683 * 684 * @param connectorName 685 * @param queueName 686 * @param timeout 687 * @param pollInterval 688 * @throws Exception 689 */ 690 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) 691 throws Exception { 692 if (isUseJmx()) { 693 if (connectorName == null || queueName == null || timeout <= 0) { 694 throw new Exception( 695 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully."); 696 } 697 if (pollInterval <= 0) { 698 pollInterval = 30; 699 } 700 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:" 701 + timeout + " pollInterval:" + pollInterval); 702 TransportConnector connector; 703 for (int i = 0; i < transportConnectors.size(); i++) { 704 connector = transportConnectors.get(i); 705 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) { 706 connector.stop(); 707 } 708 } 709 long start = System.currentTimeMillis(); 710 while (System.currentTimeMillis() - start < timeout * 1000) { 711 // check quesize until it gets zero 712 if (checkQueueSize(queueName)) { 713 stop(); 714 break; 715 } else { 716 Thread.sleep(pollInterval * 1000); 717 } 718 } 719 if (stopped.get()) { 720 LOG.info("Successfully stop the broker."); 721 } else { 722 LOG.info("There is still pending message on the queue. Please check and stop the broker manually."); 723 } 724 } 725 } 726 727 /** 728 * A helper method to block the caller thread until the broker has been 729 * stopped 730 */ 731 public void waitUntilStopped() { 732 while (isStarted() && !stopped.get()) { 733 try { 734 stoppedLatch.await(); 735 } catch (InterruptedException e) { 736 // ignore 737 } 738 } 739 } 740 741 /** 742 * A helper method to block the caller thread until the broker has fully started 743 * @return boolean true if wait succeeded false if broker was not started or was stopped 744 */ 745 public boolean waitUntilStarted() { 746 boolean waitSucceeded = false; 747 while (isStarted() && !stopped.get() && !waitSucceeded) { 748 try { 749 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS); 750 } catch (InterruptedException ignore) { 751 } 752 } 753 return waitSucceeded; 754 } 755 756 // Properties 757 // ------------------------------------------------------------------------- 758 /** 759 * Returns the message broker 760 */ 761 public Broker getBroker() throws Exception { 762 if (broker == null) { 763 LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker (" 764 + getBrokerName() + ") is starting"); 765 LOG.info("For help or more information please see: http://activemq.apache.org/"); 766 broker = createBroker(); 767 } 768 return broker; 769 } 770 771 /** 772 * Returns the administration view of the broker; used to create and destroy 773 * resources such as queues and topics. Note this method returns null if JMX 774 * is disabled. 775 */ 776 public BrokerView getAdminView() throws Exception { 777 if (adminView == null) { 778 // force lazy creation 779 getBroker(); 780 } 781 return adminView; 782 } 783 784 public void setAdminView(BrokerView adminView) { 785 this.adminView = adminView; 786 } 787 788 public String getBrokerName() { 789 return brokerName; 790 } 791 792 /** 793 * Sets the name of this broker; which must be unique in the network 794 * 795 * @param brokerName 796 */ 797 public void setBrokerName(String brokerName) { 798 if (brokerName == null) { 799 throw new NullPointerException("The broker name cannot be null"); 800 } 801 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_"); 802 if (!str.equals(brokerName)) { 803 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str); 804 } 805 this.brokerName = str.trim(); 806 } 807 808 public PersistenceAdapterFactory getPersistenceFactory() { 809 return persistenceFactory; 810 } 811 812 public File getDataDirectoryFile() { 813 if (dataDirectoryFile == null) { 814 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory()); 815 } 816 return dataDirectoryFile; 817 } 818 819 public File getBrokerDataDirectory() { 820 String brokerDir = getBrokerName(); 821 return new File(getDataDirectoryFile(), brokerDir); 822 } 823 824 /** 825 * Sets the directory in which the data files will be stored by default for 826 * the JDBC and Journal persistence adaptors. 827 * 828 * @param dataDirectory 829 * the directory to store data files 830 */ 831 public void setDataDirectory(String dataDirectory) { 832 setDataDirectoryFile(new File(dataDirectory)); 833 } 834 835 /** 836 * Sets the directory in which the data files will be stored by default for 837 * the JDBC and Journal persistence adaptors. 838 * 839 * @param dataDirectoryFile 840 * the directory to store data files 841 */ 842 public void setDataDirectoryFile(File dataDirectoryFile) { 843 this.dataDirectoryFile = dataDirectoryFile; 844 } 845 846 /** 847 * @return the tmpDataDirectory 848 */ 849 public File getTmpDataDirectory() { 850 if (tmpDataDirectory == null) { 851 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage"); 852 } 853 return tmpDataDirectory; 854 } 855 856 /** 857 * @param tmpDataDirectory 858 * the tmpDataDirectory to set 859 */ 860 public void setTmpDataDirectory(File tmpDataDirectory) { 861 this.tmpDataDirectory = tmpDataDirectory; 862 } 863 864 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) { 865 this.persistenceFactory = persistenceFactory; 866 } 867 868 public void setDestinationFactory(DestinationFactory destinationFactory) { 869 this.destinationFactory = destinationFactory; 870 } 871 872 public boolean isPersistent() { 873 return persistent; 874 } 875 876 /** 877 * Sets whether or not persistence is enabled or disabled. 878 */ 879 public void setPersistent(boolean persistent) { 880 this.persistent = persistent; 881 } 882 883 public boolean isPopulateJMSXUserID() { 884 return populateJMSXUserID; 885 } 886 887 /** 888 * Sets whether or not the broker should populate the JMSXUserID header. 889 */ 890 public void setPopulateJMSXUserID(boolean populateJMSXUserID) { 891 this.populateJMSXUserID = populateJMSXUserID; 892 } 893 894 public SystemUsage getSystemUsage() { 895 try { 896 if (systemUsage == null) { 897 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore()); 898 systemUsage.setExecutor(getExecutor()); 899 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 900 // 64 901 // Meg 902 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10 903 // Gb 904 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 905 // GB 906 addService(this.systemUsage); 907 } 908 return systemUsage; 909 } catch (IOException e) { 910 LOG.error("Cannot create SystemUsage", e); 911 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage()); 912 } 913 } 914 915 public void setSystemUsage(SystemUsage memoryManager) { 916 if (this.systemUsage != null) { 917 removeService(this.systemUsage); 918 } 919 this.systemUsage = memoryManager; 920 if (this.systemUsage.getExecutor()==null) { 921 this.systemUsage.setExecutor(getExecutor()); 922 } 923 addService(this.systemUsage); 924 } 925 926 /** 927 * @return the consumerUsageManager 928 * @throws IOException 929 */ 930 public SystemUsage getConsumerSystemUsage() throws IOException { 931 if (this.consumerSystemUsaage == null) { 932 if (splitSystemUsageForProducersConsumers) { 933 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer"); 934 float portion = consumerSystemUsagePortion / 100f; 935 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion); 936 addService(this.consumerSystemUsaage); 937 } else { 938 consumerSystemUsaage = getSystemUsage(); 939 } 940 } 941 return this.consumerSystemUsaage; 942 } 943 944 /** 945 * @param consumerSystemUsaage 946 * the storeSystemUsage to set 947 */ 948 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) { 949 if (this.consumerSystemUsaage != null) { 950 removeService(this.consumerSystemUsaage); 951 } 952 this.consumerSystemUsaage = consumerSystemUsaage; 953 addService(this.consumerSystemUsaage); 954 } 955 956 /** 957 * @return the producerUsageManager 958 * @throws IOException 959 */ 960 public SystemUsage getProducerSystemUsage() throws IOException { 961 if (producerSystemUsage == null) { 962 if (splitSystemUsageForProducersConsumers) { 963 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); 964 float portion = producerSystemUsagePortion / 100f; 965 producerSystemUsage.getMemoryUsage().setUsagePortion(portion); 966 addService(producerSystemUsage); 967 } else { 968 producerSystemUsage = getSystemUsage(); 969 } 970 } 971 return producerSystemUsage; 972 } 973 974 /** 975 * @param producerUsageManager 976 * the producerUsageManager to set 977 */ 978 public void setProducerSystemUsage(SystemUsage producerUsageManager) { 979 if (this.producerSystemUsage != null) { 980 removeService(this.producerSystemUsage); 981 } 982 this.producerSystemUsage = producerUsageManager; 983 addService(this.producerSystemUsage); 984 } 985 986 public PersistenceAdapter getPersistenceAdapter() throws IOException { 987 if (persistenceAdapter == null) { 988 persistenceAdapter = createPersistenceAdapter(); 989 configureService(persistenceAdapter); 990 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 991 } 992 return persistenceAdapter; 993 } 994 995 /** 996 * Sets the persistence adaptor implementation to use for this broker 997 * 998 * @throws IOException 999 */ 1000 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { 1001 this.persistenceAdapter = persistenceAdapter; 1002 configureService(this.persistenceAdapter); 1003 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); 1004 } 1005 1006 public TaskRunnerFactory getTaskRunnerFactory() { 1007 if (this.taskRunnerFactory == null) { 1008 this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000, 1009 isDedicatedTaskRunner()); 1010 } 1011 return this.taskRunnerFactory; 1012 } 1013 1014 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 1015 this.taskRunnerFactory = taskRunnerFactory; 1016 } 1017 1018 public TaskRunnerFactory getPersistenceTaskRunnerFactory() { 1019 if (taskRunnerFactory == null) { 1020 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, 1021 true, 1000, isDedicatedTaskRunner()); 1022 } 1023 return persistenceTaskRunnerFactory; 1024 } 1025 1026 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) { 1027 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory; 1028 } 1029 1030 public boolean isUseJmx() { 1031 return useJmx; 1032 } 1033 1034 public boolean isEnableStatistics() { 1035 return enableStatistics; 1036 } 1037 1038 /** 1039 * Sets whether or not the Broker's services enable statistics or not. 1040 */ 1041 public void setEnableStatistics(boolean enableStatistics) { 1042 this.enableStatistics = enableStatistics; 1043 } 1044 1045 /** 1046 * Sets whether or not the Broker's services should be exposed into JMX or 1047 * not. 1048 */ 1049 public void setUseJmx(boolean useJmx) { 1050 this.useJmx = useJmx; 1051 } 1052 1053 public ObjectName getBrokerObjectName() throws IOException { 1054 if (brokerObjectName == null) { 1055 brokerObjectName = createBrokerObjectName(); 1056 } 1057 return brokerObjectName; 1058 } 1059 1060 /** 1061 * Sets the JMX ObjectName for this broker 1062 */ 1063 public void setBrokerObjectName(ObjectName brokerObjectName) { 1064 this.brokerObjectName = brokerObjectName; 1065 } 1066 1067 public ManagementContext getManagementContext() { 1068 if (managementContext == null) { 1069 managementContext = new ManagementContext(); 1070 } 1071 return managementContext; 1072 } 1073 1074 public void setManagementContext(ManagementContext managementContext) { 1075 this.managementContext = managementContext; 1076 } 1077 1078 public NetworkConnector getNetworkConnectorByName(String connectorName) { 1079 for (NetworkConnector connector : networkConnectors) { 1080 if (connector.getName().equals(connectorName)) { 1081 return connector; 1082 } 1083 } 1084 return null; 1085 } 1086 1087 public String[] getNetworkConnectorURIs() { 1088 return networkConnectorURIs; 1089 } 1090 1091 public void setNetworkConnectorURIs(String[] networkConnectorURIs) { 1092 this.networkConnectorURIs = networkConnectorURIs; 1093 } 1094 1095 public TransportConnector getConnectorByName(String connectorName) { 1096 for (TransportConnector connector : transportConnectors) { 1097 if (connector.getName().equals(connectorName)) { 1098 return connector; 1099 } 1100 } 1101 return null; 1102 } 1103 1104 public Map<String, String> getTransportConnectorURIsAsMap() { 1105 Map<String, String> answer = new HashMap<String, String>(); 1106 for (TransportConnector connector : transportConnectors) { 1107 try { 1108 URI uri = connector.getConnectUri(); 1109 String scheme = uri.getScheme(); 1110 if (scheme != null) { 1111 answer.put(scheme.toLowerCase(), uri.toString()); 1112 } 1113 } catch (Exception e) { 1114 LOG.debug("Failed to read URI to build transportURIsAsMap", e); 1115 } 1116 } 1117 return answer; 1118 } 1119 1120 public String[] getTransportConnectorURIs() { 1121 return transportConnectorURIs; 1122 } 1123 1124 public void setTransportConnectorURIs(String[] transportConnectorURIs) { 1125 this.transportConnectorURIs = transportConnectorURIs; 1126 } 1127 1128 /** 1129 * @return Returns the jmsBridgeConnectors. 1130 */ 1131 public JmsConnector[] getJmsBridgeConnectors() { 1132 return jmsBridgeConnectors; 1133 } 1134 1135 /** 1136 * @param jmsConnectors 1137 * The jmsBridgeConnectors to set. 1138 */ 1139 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) { 1140 this.jmsBridgeConnectors = jmsConnectors; 1141 } 1142 1143 public Service[] getServices() { 1144 return services.toArray(new Service[0]); 1145 } 1146 1147 /** 1148 * Sets the services associated with this broker such as a 1149 * {@link MasterConnector} 1150 */ 1151 public void setServices(Service[] services) { 1152 this.services.clear(); 1153 if (services != null) { 1154 for (int i = 0; i < services.length; i++) { 1155 this.services.add(services[i]); 1156 } 1157 } 1158 } 1159 1160 /** 1161 * Adds a new service so that it will be started as part of the broker 1162 * lifecycle 1163 */ 1164 public void addService(Service service) { 1165 services.add(service); 1166 } 1167 1168 public void removeService(Service service) { 1169 services.remove(service); 1170 } 1171 1172 public boolean isUseLoggingForShutdownErrors() { 1173 return useLoggingForShutdownErrors; 1174 } 1175 1176 /** 1177 * Sets whether or not we should use commons-logging when reporting errors 1178 * when shutting down the broker 1179 */ 1180 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) { 1181 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors; 1182 } 1183 1184 public boolean isUseShutdownHook() { 1185 return useShutdownHook; 1186 } 1187 1188 /** 1189 * Sets whether or not we should use a shutdown handler to close down the 1190 * broker cleanly if the JVM is terminated. It is recommended you leave this 1191 * enabled. 1192 */ 1193 public void setUseShutdownHook(boolean useShutdownHook) { 1194 this.useShutdownHook = useShutdownHook; 1195 } 1196 1197 public boolean isAdvisorySupport() { 1198 return advisorySupport; 1199 } 1200 1201 /** 1202 * Allows the support of advisory messages to be disabled for performance 1203 * reasons. 1204 */ 1205 public void setAdvisorySupport(boolean advisorySupport) { 1206 this.advisorySupport = advisorySupport; 1207 } 1208 1209 public List<TransportConnector> getTransportConnectors() { 1210 return new ArrayList<TransportConnector>(transportConnectors); 1211 } 1212 1213 /** 1214 * Sets the transport connectors which this broker will listen on for new 1215 * clients 1216 * 1217 * @org.apache.xbean.Property 1218 * nestedType="org.apache.activemq.broker.TransportConnector" 1219 */ 1220 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { 1221 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { 1222 TransportConnector connector = iter.next(); 1223 addConnector(connector); 1224 } 1225 } 1226 1227 public List<NetworkConnector> getNetworkConnectors() { 1228 return new ArrayList<NetworkConnector>(networkConnectors); 1229 } 1230 1231 public List<ProxyConnector> getProxyConnectors() { 1232 return new ArrayList<ProxyConnector>(proxyConnectors); 1233 } 1234 1235 /** 1236 * Sets the network connectors which this broker will use to connect to 1237 * other brokers in a federated network 1238 * 1239 * @org.apache.xbean.Property 1240 * nestedType="org.apache.activemq.network.NetworkConnector" 1241 */ 1242 public void setNetworkConnectors(List networkConnectors) throws Exception { 1243 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { 1244 NetworkConnector connector = (NetworkConnector) iter.next(); 1245 addNetworkConnector(connector); 1246 } 1247 } 1248 1249 /** 1250 * Sets the network connectors which this broker will use to connect to 1251 * other brokers in a federated network 1252 */ 1253 public void setProxyConnectors(List proxyConnectors) throws Exception { 1254 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { 1255 ProxyConnector connector = (ProxyConnector) iter.next(); 1256 addProxyConnector(connector); 1257 } 1258 } 1259 1260 public PolicyMap getDestinationPolicy() { 1261 return destinationPolicy; 1262 } 1263 1264 /** 1265 * Sets the destination specific policies available either for exact 1266 * destinations or for wildcard areas of destinations. 1267 */ 1268 public void setDestinationPolicy(PolicyMap policyMap) { 1269 this.destinationPolicy = policyMap; 1270 } 1271 1272 public BrokerPlugin[] getPlugins() { 1273 return plugins; 1274 } 1275 1276 /** 1277 * Sets a number of broker plugins to install such as for security 1278 * authentication or authorization 1279 */ 1280 public void setPlugins(BrokerPlugin[] plugins) { 1281 this.plugins = plugins; 1282 } 1283 1284 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1285 return messageAuthorizationPolicy; 1286 } 1287 1288 /** 1289 * Sets the policy used to decide if the current connection is authorized to 1290 * consume a given message 1291 */ 1292 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1293 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1294 } 1295 1296 /** 1297 * Delete all messages from the persistent store 1298 * 1299 * @throws IOException 1300 */ 1301 public void deleteAllMessages() throws IOException { 1302 getPersistenceAdapter().deleteAllMessages(); 1303 } 1304 1305 public boolean isDeleteAllMessagesOnStartup() { 1306 return deleteAllMessagesOnStartup; 1307 } 1308 1309 /** 1310 * Sets whether or not all messages are deleted on startup - mostly only 1311 * useful for testing. 1312 */ 1313 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) { 1314 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; 1315 } 1316 1317 public URI getVmConnectorURI() { 1318 if (vmConnectorURI == null) { 1319 try { 1320 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_")); 1321 } catch (URISyntaxException e) { 1322 LOG.error("Badly formed URI from " + getBrokerName(), e); 1323 } 1324 } 1325 return vmConnectorURI; 1326 } 1327 1328 public void setVmConnectorURI(URI vmConnectorURI) { 1329 this.vmConnectorURI = vmConnectorURI; 1330 } 1331 1332 public String getDefaultSocketURIString() { 1333 1334 if (started.get()) { 1335 if (this.defaultSocketURIString ==null) { 1336 for (TransportConnector tc:this.transportConnectors) { 1337 String result = null; 1338 try { 1339 result = tc.getPublishableConnectString(); 1340 } catch (Exception e) { 1341 LOG.warn("Failed to get the ConnectURI for "+tc,e); 1342 } 1343 if (result != null) { 1344 this.defaultSocketURIString =result; 1345 break; 1346 } 1347 } 1348 } 1349 return this.defaultSocketURIString; 1350 } 1351 return null; 1352 } 1353 1354 /** 1355 * @return Returns the shutdownOnMasterFailure. 1356 */ 1357 public boolean isShutdownOnMasterFailure() { 1358 return shutdownOnMasterFailure; 1359 } 1360 1361 /** 1362 * @param shutdownOnMasterFailure 1363 * The shutdownOnMasterFailure to set. 1364 */ 1365 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) { 1366 this.shutdownOnMasterFailure = shutdownOnMasterFailure; 1367 } 1368 1369 public boolean isKeepDurableSubsActive() { 1370 return keepDurableSubsActive; 1371 } 1372 1373 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) { 1374 this.keepDurableSubsActive = keepDurableSubsActive; 1375 } 1376 1377 public boolean isUseVirtualTopics() { 1378 return useVirtualTopics; 1379 } 1380 1381 /** 1382 * Sets whether or not <a 1383 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 1384 * Topics</a> should be supported by default if they have not been 1385 * explicitly configured. 1386 */ 1387 public void setUseVirtualTopics(boolean useVirtualTopics) { 1388 this.useVirtualTopics = useVirtualTopics; 1389 } 1390 1391 public DestinationInterceptor[] getDestinationInterceptors() { 1392 return destinationInterceptors; 1393 } 1394 1395 public boolean isUseMirroredQueues() { 1396 return useMirroredQueues; 1397 } 1398 1399 /** 1400 * Sets whether or not <a 1401 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored 1402 * Queues</a> should be supported by default if they have not been 1403 * explicitly configured. 1404 */ 1405 public void setUseMirroredQueues(boolean useMirroredQueues) { 1406 this.useMirroredQueues = useMirroredQueues; 1407 } 1408 1409 /** 1410 * Sets the destination interceptors to use 1411 */ 1412 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) { 1413 this.destinationInterceptors = destinationInterceptors; 1414 } 1415 1416 public ActiveMQDestination[] getDestinations() { 1417 return destinations; 1418 } 1419 1420 /** 1421 * Sets the destinations which should be loaded/created on startup 1422 */ 1423 public void setDestinations(ActiveMQDestination[] destinations) { 1424 this.destinations = destinations; 1425 } 1426 1427 /** 1428 * @return the tempDataStore 1429 */ 1430 public synchronized PListStore getTempDataStore() { 1431 if (tempDataStore == null) { 1432 if (!isPersistent()) { 1433 return null; 1434 } 1435 boolean result = true; 1436 boolean empty = true; 1437 try { 1438 File directory = getTmpDataDirectory(); 1439 if (directory.exists() && directory.isDirectory()) { 1440 File[] files = directory.listFiles(); 1441 if (files != null && files.length > 0) { 1442 empty = false; 1443 for (int i = 0; i < files.length; i++) { 1444 File file = files[i]; 1445 if (!file.isDirectory()) { 1446 result &= file.delete(); 1447 } 1448 } 1449 } 1450 } 1451 if (!empty) { 1452 String str = result ? "Successfully deleted" : "Failed to delete"; 1453 LOG.info(str + " temporary storage"); 1454 } 1455 this.tempDataStore = new PListStore(); 1456 this.tempDataStore.setDirectory(getTmpDataDirectory()); 1457 this.tempDataStore.start(); 1458 } catch (Exception e) { 1459 throw new RuntimeException(e); 1460 } 1461 } 1462 return tempDataStore; 1463 } 1464 1465 /** 1466 * @param tempDataStore 1467 * the tempDataStore to set 1468 */ 1469 public void setTempDataStore(PListStore tempDataStore) { 1470 this.tempDataStore = tempDataStore; 1471 } 1472 1473 public int getPersistenceThreadPriority() { 1474 return persistenceThreadPriority; 1475 } 1476 1477 public void setPersistenceThreadPriority(int persistenceThreadPriority) { 1478 this.persistenceThreadPriority = persistenceThreadPriority; 1479 } 1480 1481 /** 1482 * @return the useLocalHostBrokerName 1483 */ 1484 public boolean isUseLocalHostBrokerName() { 1485 return this.useLocalHostBrokerName; 1486 } 1487 1488 /** 1489 * @param useLocalHostBrokerName 1490 * the useLocalHostBrokerName to set 1491 */ 1492 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) { 1493 this.useLocalHostBrokerName = useLocalHostBrokerName; 1494 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) { 1495 brokerName = LOCAL_HOST_NAME; 1496 } 1497 } 1498 1499 /** 1500 * @return the supportFailOver 1501 */ 1502 public boolean isSupportFailOver() { 1503 return this.supportFailOver; 1504 } 1505 1506 /** 1507 * @param supportFailOver 1508 * the supportFailOver to set 1509 */ 1510 public void setSupportFailOver(boolean supportFailOver) { 1511 this.supportFailOver = supportFailOver; 1512 } 1513 1514 /** 1515 * Looks up and lazily creates if necessary the destination for the given 1516 * JMS name 1517 */ 1518 public Destination getDestination(ActiveMQDestination destination) throws Exception { 1519 return getBroker().addDestination(getAdminConnectionContext(), destination,false); 1520 } 1521 1522 public void removeDestination(ActiveMQDestination destination) throws Exception { 1523 getBroker().removeDestination(getAdminConnectionContext(), destination, 0); 1524 } 1525 1526 public int getProducerSystemUsagePortion() { 1527 return producerSystemUsagePortion; 1528 } 1529 1530 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) { 1531 this.producerSystemUsagePortion = producerSystemUsagePortion; 1532 } 1533 1534 public int getConsumerSystemUsagePortion() { 1535 return consumerSystemUsagePortion; 1536 } 1537 1538 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) { 1539 this.consumerSystemUsagePortion = consumerSystemUsagePortion; 1540 } 1541 1542 public boolean isSplitSystemUsageForProducersConsumers() { 1543 return splitSystemUsageForProducersConsumers; 1544 } 1545 1546 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) { 1547 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers; 1548 } 1549 1550 public boolean isMonitorConnectionSplits() { 1551 return monitorConnectionSplits; 1552 } 1553 1554 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) { 1555 this.monitorConnectionSplits = monitorConnectionSplits; 1556 } 1557 1558 public int getTaskRunnerPriority() { 1559 return taskRunnerPriority; 1560 } 1561 1562 public void setTaskRunnerPriority(int taskRunnerPriority) { 1563 this.taskRunnerPriority = taskRunnerPriority; 1564 } 1565 1566 public boolean isDedicatedTaskRunner() { 1567 return dedicatedTaskRunner; 1568 } 1569 1570 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 1571 this.dedicatedTaskRunner = dedicatedTaskRunner; 1572 } 1573 1574 public boolean isCacheTempDestinations() { 1575 return cacheTempDestinations; 1576 } 1577 1578 public void setCacheTempDestinations(boolean cacheTempDestinations) { 1579 this.cacheTempDestinations = cacheTempDestinations; 1580 } 1581 1582 public int getTimeBeforePurgeTempDestinations() { 1583 return timeBeforePurgeTempDestinations; 1584 } 1585 1586 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) { 1587 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations; 1588 } 1589 1590 public boolean isUseTempMirroredQueues() { 1591 return useTempMirroredQueues; 1592 } 1593 1594 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) { 1595 this.useTempMirroredQueues = useTempMirroredQueues; 1596 } 1597 1598 // 1599 // Implementation methods 1600 // ------------------------------------------------------------------------- 1601 /** 1602 * Handles any lazy-creation helper properties which are added to make 1603 * things easier to configure inside environments such as Spring 1604 * 1605 * @throws Exception 1606 */ 1607 protected void processHelperProperties() throws Exception { 1608 boolean masterServiceExists = false; 1609 if (transportConnectorURIs != null) { 1610 for (int i = 0; i < transportConnectorURIs.length; i++) { 1611 String uri = transportConnectorURIs[i]; 1612 addConnector(uri); 1613 } 1614 } 1615 if (networkConnectorURIs != null) { 1616 for (int i = 0; i < networkConnectorURIs.length; i++) { 1617 String uri = networkConnectorURIs[i]; 1618 addNetworkConnector(uri); 1619 } 1620 } 1621 if (jmsBridgeConnectors != null) { 1622 for (int i = 0; i < jmsBridgeConnectors.length; i++) { 1623 addJmsConnector(jmsBridgeConnectors[i]); 1624 } 1625 } 1626 for (Service service : services) { 1627 if (service instanceof MasterConnector) { 1628 masterServiceExists = true; 1629 break; 1630 } 1631 } 1632 if (masterConnectorURI != null) { 1633 if (masterServiceExists) { 1634 throw new IllegalStateException( 1635 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property"); 1636 } else { 1637 addService(new MasterConnector(masterConnectorURI)); 1638 } 1639 } 1640 } 1641 1642 public void stopAllConnectors(ServiceStopper stopper) { 1643 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 1644 NetworkConnector connector = iter.next(); 1645 unregisterNetworkConnectorMBean(connector); 1646 stopper.stop(connector); 1647 } 1648 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 1649 ProxyConnector connector = iter.next(); 1650 stopper.stop(connector); 1651 } 1652 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 1653 JmsConnector connector = iter.next(); 1654 stopper.stop(connector); 1655 } 1656 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 1657 TransportConnector connector = iter.next(); 1658 stopper.stop(connector); 1659 } 1660 } 1661 1662 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException { 1663 try { 1664 ObjectName objectName = createConnectorObjectName(connector); 1665 connector = connector.asManagedConnector(getManagementContext(), objectName); 1666 ConnectorViewMBean view = new ConnectorView(connector); 1667 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1668 return connector; 1669 } catch (Throwable e) { 1670 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e); 1671 } 1672 } 1673 1674 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException { 1675 if (isUseJmx()) { 1676 try { 1677 ObjectName objectName = createConnectorObjectName(connector); 1678 getManagementContext().unregisterMBean(objectName); 1679 } catch (Throwable e) { 1680 throw IOExceptionSupport.create( 1681 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e); 1682 } 1683 } 1684 } 1685 1686 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1687 return adaptor; 1688 } 1689 1690 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { 1691 if (isUseJmx()) { 1692 } 1693 } 1694 1695 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { 1696 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1697 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName=" 1698 + JMXSupport.encodeObjectNamePart(connector.getName())); 1699 } 1700 1701 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException { 1702 NetworkConnectorViewMBean view = new NetworkConnectorView(connector); 1703 try { 1704 ObjectName objectName = createNetworkConnectorObjectName(connector); 1705 connector.setObjectName(objectName); 1706 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1707 } catch (Throwable e) { 1708 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e); 1709 } 1710 } 1711 1712 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) 1713 throws MalformedObjectNameException { 1714 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1715 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," 1716 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1717 } 1718 1719 1720 public ObjectName createDuplexNetworkConnectorObjectName(String transport) 1721 throws MalformedObjectNameException { 1722 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1723 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector," 1724 + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport)); 1725 } 1726 1727 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) { 1728 if (isUseJmx()) { 1729 try { 1730 ObjectName objectName = createNetworkConnectorObjectName(connector); 1731 getManagementContext().unregisterMBean(objectName); 1732 } catch (Exception e) { 1733 LOG.error("Network Connector could not be unregistered from JMX: " + e, e); 1734 } 1735 } 1736 } 1737 1738 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException { 1739 ProxyConnectorView view = new ProxyConnectorView(connector); 1740 try { 1741 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1742 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector," 1743 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1744 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1745 } catch (Throwable e) { 1746 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1747 } 1748 } 1749 1750 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException { 1751 FTConnectorView view = new FTConnectorView(connector); 1752 try { 1753 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1754 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector"); 1755 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1756 } catch (Throwable e) { 1757 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1758 } 1759 } 1760 1761 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { 1762 JmsConnectorView view = new JmsConnectorView(connector); 1763 try { 1764 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1765 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector," 1766 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName())); 1767 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1768 } catch (Throwable e) { 1769 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e); 1770 } 1771 } 1772 1773 /** 1774 * Factory method to create a new broker 1775 * 1776 * @throws Exception 1777 * @throws 1778 * @throws 1779 */ 1780 protected Broker createBroker() throws Exception { 1781 regionBroker = createRegionBroker(); 1782 Broker broker = addInterceptors(regionBroker); 1783 // Add a filter that will stop access to the broker once stopped 1784 broker = new MutableBrokerFilter(broker) { 1785 Broker old; 1786 1787 @Override 1788 public void stop() throws Exception { 1789 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { 1790 // Just ignore additional stop actions. 1791 @Override 1792 public void stop() throws Exception { 1793 } 1794 }); 1795 old.stop(); 1796 } 1797 1798 @Override 1799 public void start() throws Exception { 1800 if (forceStart && old != null) { 1801 this.next.set(old); 1802 } 1803 getNext().start(); 1804 } 1805 }; 1806 return broker; 1807 } 1808 1809 /** 1810 * Factory method to create the core region broker onto which interceptors 1811 * are added 1812 * 1813 * @throws Exception 1814 */ 1815 protected Broker createRegionBroker() throws Exception { 1816 if (destinationInterceptors == null) { 1817 destinationInterceptors = createDefaultDestinationInterceptor(); 1818 } 1819 configureServices(destinationInterceptors); 1820 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); 1821 if (destinationFactory == null) { 1822 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); 1823 } 1824 return createRegionBroker(destinationInterceptor); 1825 } 1826 1827 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { 1828 RegionBroker regionBroker; 1829 if (isUseJmx()) { 1830 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), 1831 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); 1832 } else { 1833 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, 1834 destinationInterceptor,getScheduler(),getExecutor()); 1835 } 1836 destinationFactory.setRegionBroker(regionBroker); 1837 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); 1838 regionBroker.setBrokerName(getBrokerName()); 1839 regionBroker.getDestinationStatistics().setEnabled(enableStatistics); 1840 if (brokerId != null) { 1841 regionBroker.setBrokerId(brokerId); 1842 } 1843 return regionBroker; 1844 } 1845 1846 /** 1847 * Create the default destination interceptor 1848 */ 1849 protected DestinationInterceptor[] createDefaultDestinationInterceptor() { 1850 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>(); 1851 if (isUseVirtualTopics()) { 1852 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); 1853 VirtualTopic virtualTopic = new VirtualTopic(); 1854 virtualTopic.setName("VirtualTopic.>"); 1855 VirtualDestination[] virtualDestinations = { virtualTopic }; 1856 interceptor.setVirtualDestinations(virtualDestinations); 1857 answer.add(interceptor); 1858 } 1859 if (isUseMirroredQueues()) { 1860 MirroredQueue interceptor = new MirroredQueue(); 1861 answer.add(interceptor); 1862 } 1863 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()]; 1864 answer.toArray(array); 1865 return array; 1866 } 1867 1868 /** 1869 * Strategy method to add interceptors to the broker 1870 * 1871 * @throws IOException 1872 */ 1873 protected Broker addInterceptors(Broker broker) throws Exception { 1874 if (isSchedulerSupport()) { 1875 SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile()); 1876 if (isUseJmx()) { 1877 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); 1878 try { 1879 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" 1880 + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," 1881 + "Type=jobScheduler," + "jobSchedulerName=JMS"); 1882 1883 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); 1884 this.adminView.setJMSJobScheduler(objectName); 1885 } catch (Throwable e) { 1886 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " 1887 + e.getMessage(), e); 1888 } 1889 1890 } 1891 broker = sb; 1892 } 1893 if (isAdvisorySupport()) { 1894 broker = new AdvisoryBroker(broker); 1895 } 1896 broker = new CompositeDestinationBroker(broker); 1897 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); 1898 if (isPopulateJMSXUserID()) { 1899 UserIDBroker userIDBroker = new UserIDBroker(broker); 1900 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); 1901 broker = userIDBroker; 1902 } 1903 if (isMonitorConnectionSplits()) { 1904 broker = new ConnectionSplitBroker(broker); 1905 } 1906 if (plugins != null) { 1907 for (int i = 0; i < plugins.length; i++) { 1908 BrokerPlugin plugin = plugins[i]; 1909 broker = plugin.installPlugin(broker); 1910 } 1911 } 1912 return broker; 1913 } 1914 1915 protected PersistenceAdapter createPersistenceAdapter() throws IOException { 1916 if (isPersistent()) { 1917 PersistenceAdapterFactory fac = getPersistenceFactory(); 1918 if (fac != null) { 1919 return fac.createPersistenceAdapter(); 1920 }else { 1921 KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); 1922 File dir = new File(getBrokerDataDirectory(),"KahaDB"); 1923 adaptor.setDirectory(dir); 1924 return adaptor; 1925 } 1926 } else { 1927 return new MemoryPersistenceAdapter(); 1928 } 1929 } 1930 1931 protected ObjectName createBrokerObjectName() throws IOException { 1932 try { 1933 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName=" 1934 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker"); 1935 } catch (Throwable e) { 1936 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e); 1937 } 1938 } 1939 1940 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception { 1941 TransportServer transport = TransportFactory.bind(this, brokerURI); 1942 return new TransportConnector(transport); 1943 } 1944 1945 /** 1946 * Extracts the port from the options 1947 */ 1948 protected Object getPort(Map options) { 1949 Object port = options.get("port"); 1950 if (port == null) { 1951 port = DEFAULT_PORT; 1952 LOG.warn("No port specified so defaulting to: " + port); 1953 } 1954 return port; 1955 } 1956 1957 protected void addShutdownHook() { 1958 if (useShutdownHook) { 1959 shutdownHook = new Thread("ActiveMQ ShutdownHook") { 1960 @Override 1961 public void run() { 1962 containerShutdown(); 1963 } 1964 }; 1965 Runtime.getRuntime().addShutdownHook(shutdownHook); 1966 } 1967 } 1968 1969 protected void removeShutdownHook() { 1970 if (shutdownHook != null) { 1971 try { 1972 Runtime.getRuntime().removeShutdownHook(shutdownHook); 1973 } catch (Exception e) { 1974 LOG.debug("Caught exception, must be shutting down: " + e); 1975 } 1976 } 1977 } 1978 1979 /** 1980 * Sets hooks to be executed when broker shut down 1981 * 1982 * @org.apache.xbean.Property 1983 */ 1984 public void setShutdownHooks(List<Runnable> hooks) throws Exception { 1985 for (Runnable hook : hooks) { 1986 addShutdownHook(hook); 1987 } 1988 } 1989 1990 /** 1991 * Causes a clean shutdown of the container when the VM is being shut down 1992 */ 1993 protected void containerShutdown() { 1994 try { 1995 stop(); 1996 } catch (IOException e) { 1997 Throwable linkedException = e.getCause(); 1998 if (linkedException != null) { 1999 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException); 2000 } else { 2001 logError("Failed to shut down: " + e, e); 2002 } 2003 if (!useLoggingForShutdownErrors) { 2004 e.printStackTrace(System.err); 2005 } 2006 } catch (Exception e) { 2007 logError("Failed to shut down: " + e, e); 2008 } 2009 } 2010 2011 protected void logError(String message, Throwable e) { 2012 if (useLoggingForShutdownErrors) { 2013 LOG.error("Failed to shut down: " + e); 2014 } else { 2015 System.err.println("Failed to shut down: " + e); 2016 } 2017 } 2018 2019 /** 2020 * Starts any configured destinations on startup 2021 */ 2022 protected void startDestinations() throws Exception { 2023 if (destinations != null) { 2024 ConnectionContext adminConnectionContext = getAdminConnectionContext(); 2025 for (int i = 0; i < destinations.length; i++) { 2026 ActiveMQDestination destination = destinations[i]; 2027 getBroker().addDestination(adminConnectionContext, destination,true); 2028 } 2029 } 2030 } 2031 2032 /** 2033 * Returns the broker's administration connection context used for 2034 * configuring the broker at startup 2035 */ 2036 public ConnectionContext getAdminConnectionContext() throws Exception { 2037 return BrokerSupport.getConnectionContext(getBroker()); 2038 } 2039 2040 protected void waitForSlave() { 2041 try { 2042 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) { 2043 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 2044 } 2045 } catch (InterruptedException e) { 2046 LOG.error("Exception waiting for slave:" + e); 2047 } 2048 } 2049 2050 protected void slaveConnectionEstablished() { 2051 slaveStartSignal.countDown(); 2052 } 2053 2054 protected void startManagementContext() throws Exception { 2055 getManagementContext().start(); 2056 adminView = new BrokerView(this, null); 2057 ObjectName objectName = getBrokerObjectName(); 2058 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName); 2059 } 2060 2061 /** 2062 * Start all transport and network connections, proxies and bridges 2063 * 2064 * @throws Exception 2065 */ 2066 public void startAllConnectors() throws Exception { 2067 if (!isSlave()) { 2068 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations(); 2069 List<TransportConnector> al = new ArrayList<TransportConnector>(); 2070 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) { 2071 TransportConnector connector = iter.next(); 2072 connector.setBrokerService(this); 2073 al.add(startTransportConnector(connector)); 2074 } 2075 if (al.size() > 0) { 2076 // let's clear the transportConnectors list and replace it with 2077 // the started transportConnector instances 2078 this.transportConnectors.clear(); 2079 setTransportConnectors(al); 2080 } 2081 URI uri = getVmConnectorURI(); 2082 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 2083 map.put("network", "true"); 2084 map.put("async", "false"); 2085 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 2086 if (isWaitForSlave()) { 2087 waitForSlave(); 2088 } 2089 if (!stopped.get()) { 2090 ThreadPoolExecutor networkConnectorStartExecutor = null; 2091 if (isNetworkConnectorStartAsync()) { 2092 // spin up as many threads as needed 2093 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 2094 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 2095 new ThreadFactory() { 2096 int count=0; 2097 public Thread newThread(Runnable runnable) { 2098 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); 2099 thread.setDaemon(true); 2100 return thread; 2101 } 2102 }); 2103 } 2104 2105 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { 2106 final NetworkConnector connector = iter.next(); 2107 connector.setLocalUri(uri); 2108 connector.setBrokerName(getBrokerName()); 2109 connector.setDurableDestinations(durableDestinations); 2110 if (getDefaultSocketURIString() != null) { 2111 connector.setBrokerURL(getDefaultSocketURIString()); 2112 } 2113 if (networkConnectorStartExecutor != null) { 2114 final Map context = MDCHelper.getCopyOfContextMap(); 2115 networkConnectorStartExecutor.execute(new Runnable() { 2116 public void run() { 2117 try { 2118 MDCHelper.setContextMap(context); 2119 LOG.info("Async start of " + connector); 2120 connector.start(); 2121 } catch(Exception e) { 2122 LOG.error("Async start of network connector: " + connector + " failed", e); 2123 } 2124 } 2125 }); 2126 } else { 2127 connector.start(); 2128 } 2129 } 2130 if (networkConnectorStartExecutor != null) { 2131 // executor done when enqueued tasks are complete 2132 networkConnectorStartExecutor.shutdown(); 2133 networkConnectorStartExecutor = null; 2134 } 2135 2136 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) { 2137 ProxyConnector connector = iter.next(); 2138 connector.start(); 2139 } 2140 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) { 2141 JmsConnector connector = iter.next(); 2142 connector.start(); 2143 } 2144 for (Service service : services) { 2145 configureService(service); 2146 service.start(); 2147 } 2148 } 2149 } 2150 } 2151 2152 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception { 2153 connector.setTaskRunnerFactory(getTaskRunnerFactory()); 2154 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy(); 2155 if (policy != null) { 2156 connector.setMessageAuthorizationPolicy(policy); 2157 } 2158 if (isUseJmx()) { 2159 connector = registerConnectorMBean(connector); 2160 } 2161 connector.getStatistics().setEnabled(enableStatistics); 2162 connector.start(); 2163 return connector; 2164 } 2165 2166 /** 2167 * Perform any custom dependency injection 2168 */ 2169 protected void configureServices(Object[] services) { 2170 for (Object service : services) { 2171 configureService(service); 2172 } 2173 } 2174 2175 /** 2176 * Perform any custom dependency injection 2177 */ 2178 protected void configureService(Object service) { 2179 if (service instanceof BrokerServiceAware) { 2180 BrokerServiceAware serviceAware = (BrokerServiceAware) service; 2181 serviceAware.setBrokerService(this); 2182 } 2183 if (masterConnector == null) { 2184 if (service instanceof MasterConnector) { 2185 masterConnector = (MasterConnector) service; 2186 supportFailOver = true; 2187 } 2188 } 2189 } 2190 2191 public void handleIOException(IOException exception) { 2192 if (ioExceptionHandler != null) { 2193 ioExceptionHandler.handle(exception); 2194 } else { 2195 LOG.info("Ignoring IO exception, " + exception, exception); 2196 } 2197 } 2198 2199 /** 2200 * Starts all destiantions in persistence store. This includes all inactive 2201 * destinations 2202 */ 2203 protected void startDestinationsInPersistenceStore(Broker broker) throws Exception { 2204 Set destinations = destinationFactory.getDestinations(); 2205 if (destinations != null) { 2206 Iterator iter = destinations.iterator(); 2207 ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); 2208 if (adminConnectionContext == null) { 2209 ConnectionContext context = new ConnectionContext(); 2210 context.setBroker(broker); 2211 adminConnectionContext = context; 2212 broker.setAdminConnectionContext(adminConnectionContext); 2213 } 2214 while (iter.hasNext()) { 2215 ActiveMQDestination destination = (ActiveMQDestination) iter.next(); 2216 broker.addDestination(adminConnectionContext, destination,false); 2217 } 2218 } 2219 } 2220 2221 protected synchronized ThreadPoolExecutor getExecutor() { 2222 if (this.executor == null) { 2223 this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 2224 public Thread newThread(Runnable runnable) { 2225 Thread thread = new Thread(runnable, "Usage Async Task"); 2226 thread.setDaemon(true); 2227 return thread; 2228 } 2229 }); 2230 } 2231 return this.executor; 2232 } 2233 2234 public synchronized Scheduler getScheduler() { 2235 if (this.scheduler==null) { 2236 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler"); 2237 try { 2238 this.scheduler.start(); 2239 } catch (Exception e) { 2240 LOG.error("Failed to start Scheduler ",e); 2241 } 2242 } 2243 return this.scheduler; 2244 } 2245 2246 public Broker getRegionBroker() { 2247 return regionBroker; 2248 } 2249 2250 public void setRegionBroker(Broker regionBroker) { 2251 this.regionBroker = regionBroker; 2252 } 2253 2254 public void addShutdownHook(Runnable hook) { 2255 synchronized (shutdownHooks) { 2256 shutdownHooks.add(hook); 2257 } 2258 } 2259 2260 public void removeShutdownHook(Runnable hook) { 2261 synchronized (shutdownHooks) { 2262 shutdownHooks.remove(hook); 2263 } 2264 } 2265 2266 public boolean isSystemExitOnShutdown() { 2267 return systemExitOnShutdown; 2268 } 2269 2270 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) { 2271 this.systemExitOnShutdown = systemExitOnShutdown; 2272 } 2273 2274 public int getSystemExitOnShutdownExitCode() { 2275 return systemExitOnShutdownExitCode; 2276 } 2277 2278 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) { 2279 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode; 2280 } 2281 2282 public SslContext getSslContext() { 2283 return sslContext; 2284 } 2285 2286 public void setSslContext(SslContext sslContext) { 2287 this.sslContext = sslContext; 2288 } 2289 2290 public boolean isShutdownOnSlaveFailure() { 2291 return shutdownOnSlaveFailure; 2292 } 2293 2294 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) { 2295 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure; 2296 } 2297 2298 public boolean isWaitForSlave() { 2299 return waitForSlave; 2300 } 2301 2302 public void setWaitForSlave(boolean waitForSlave) { 2303 this.waitForSlave = waitForSlave; 2304 } 2305 2306 public long getWaitForSlaveTimeout() { 2307 return this.waitForSlaveTimeout; 2308 } 2309 2310 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) { 2311 this.waitForSlaveTimeout = waitForSlaveTimeout; 2312 } 2313 2314 public CountDownLatch getSlaveStartSignal() { 2315 return slaveStartSignal; 2316 } 2317 2318 /** 2319 * Get the passiveSlave 2320 * @return the passiveSlave 2321 */ 2322 public boolean isPassiveSlave() { 2323 return this.passiveSlave; 2324 } 2325 2326 /** 2327 * Set the passiveSlave 2328 * @param passiveSlave the passiveSlave to set 2329 */ 2330 public void setPassiveSlave(boolean passiveSlave) { 2331 this.passiveSlave = passiveSlave; 2332 } 2333 2334 /** 2335 * override the Default IOException handler, called when persistence adapter 2336 * has experiences File or JDBC I/O Exceptions 2337 * 2338 * @param ioExceptionHandler 2339 */ 2340 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) { 2341 configureService(ioExceptionHandler); 2342 this.ioExceptionHandler = ioExceptionHandler; 2343 } 2344 2345 public IOExceptionHandler getIoExceptionHandler() { 2346 return ioExceptionHandler; 2347 } 2348 2349 /** 2350 * @return the schedulerSupport 2351 */ 2352 public boolean isSchedulerSupport() { 2353 return this.schedulerSupport; 2354 } 2355 2356 /** 2357 * @param schedulerSupport the schedulerSupport to set 2358 */ 2359 public void setSchedulerSupport(boolean schedulerSupport) { 2360 this.schedulerSupport = schedulerSupport; 2361 } 2362 2363 /** 2364 * @return the schedulerDirectory 2365 */ 2366 public File getSchedulerDirectoryFile() { 2367 if (this.schedulerDirectoryFile == null) { 2368 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler"); 2369 } 2370 return schedulerDirectoryFile; 2371 } 2372 2373 /** 2374 * @param schedulerDirectory the schedulerDirectory to set 2375 */ 2376 public void setSchedulerDirectoryFile(File schedulerDirectory) { 2377 this.schedulerDirectoryFile = schedulerDirectory; 2378 } 2379 2380 public void setSchedulerDirectory(String schedulerDirectory) { 2381 setSchedulerDirectoryFile(new File(schedulerDirectory)); 2382 } 2383 2384 public int getSchedulePeriodForDestinationPurge() { 2385 return this.schedulePeriodForDestinationPurge; 2386 } 2387 2388 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) { 2389 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge; 2390 } 2391 2392 public BrokerContext getBrokerContext() { 2393 return brokerContext; 2394 } 2395 2396 public void setBrokerContext(BrokerContext brokerContext) { 2397 this.brokerContext = brokerContext; 2398 } 2399 2400 public void setBrokerId(String brokerId) { 2401 this.brokerId = new BrokerId(brokerId); 2402 } 2403 2404 public boolean isUseAuthenticatedPrincipalForJMSXUserID() { 2405 return useAuthenticatedPrincipalForJMSXUserID; 2406 } 2407 2408 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) { 2409 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID; 2410 } 2411 2412 public boolean isNetworkConnectorStartAsync() { 2413 return networkConnectorStartAsync; 2414 } 2415 2416 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) { 2417 this.networkConnectorStartAsync = networkConnectorStartAsync; 2418 } 2419 }