001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.net.UnknownHostException;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.LinkedBlockingQueue;
033    import java.util.concurrent.SynchronousQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import javax.annotation.PostConstruct;
040    import javax.annotation.PreDestroy;
041    import javax.management.MalformedObjectNameException;
042    import javax.management.ObjectName;
043    
044    import org.apache.activemq.ActiveMQConnectionMetaData;
045    import org.apache.activemq.ConfigurationException;
046    import org.apache.activemq.Service;
047    import org.apache.activemq.advisory.AdvisoryBroker;
048    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
049    import org.apache.activemq.broker.ft.MasterConnector;
050    import org.apache.activemq.broker.jmx.AnnotatedMBean;
051    import org.apache.activemq.broker.jmx.BrokerView;
052    import org.apache.activemq.broker.jmx.ConnectorView;
053    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
054    import org.apache.activemq.broker.jmx.FTConnectorView;
055    import org.apache.activemq.broker.jmx.JmsConnectorView;
056    import org.apache.activemq.broker.jmx.JobSchedulerView;
057    import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
058    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
059    import org.apache.activemq.broker.jmx.ManagementContext;
060    import org.apache.activemq.broker.jmx.NetworkConnectorView;
061    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
062    import org.apache.activemq.broker.jmx.ProxyConnectorView;
063    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
064    import org.apache.activemq.broker.region.Destination;
065    import org.apache.activemq.broker.region.DestinationFactory;
066    import org.apache.activemq.broker.region.DestinationFactoryImpl;
067    import org.apache.activemq.broker.region.DestinationInterceptor;
068    import org.apache.activemq.broker.region.RegionBroker;
069    import org.apache.activemq.broker.region.policy.PolicyMap;
070    import org.apache.activemq.broker.region.virtual.MirroredQueue;
071    import org.apache.activemq.broker.region.virtual.VirtualDestination;
072    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
073    import org.apache.activemq.broker.region.virtual.VirtualTopic;
074    import org.apache.activemq.broker.scheduler.SchedulerBroker;
075    import org.apache.activemq.command.ActiveMQDestination;
076    import org.apache.activemq.command.BrokerId;
077    import org.apache.activemq.network.ConnectionFilter;
078    import org.apache.activemq.network.DiscoveryNetworkConnector;
079    import org.apache.activemq.network.NetworkConnector;
080    import org.apache.activemq.network.jms.JmsConnector;
081    import org.apache.activemq.proxy.ProxyConnector;
082    import org.apache.activemq.security.MessageAuthorizationPolicy;
083    import org.apache.activemq.selector.SelectorParser;
084    import org.apache.activemq.store.PersistenceAdapter;
085    import org.apache.activemq.store.PersistenceAdapterFactory;
086    import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
087    import org.apache.activemq.store.kahadb.plist.PListStore;
088    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
089    import org.apache.activemq.thread.Scheduler;
090    import org.apache.activemq.thread.TaskRunnerFactory;
091    import org.apache.activemq.transport.TransportFactory;
092    import org.apache.activemq.transport.TransportServer;
093    import org.apache.activemq.transport.vm.VMTransportFactory;
094    import org.apache.activemq.usage.SystemUsage;
095    import org.apache.activemq.util.*;
096    import org.slf4j.Logger;
097    import org.slf4j.LoggerFactory;
098    import org.slf4j.MDC;
099    
100    /**
101     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
102     * number of transport connectors, network connectors and a bunch of properties
103     * which can be used to configure the broker as its lazily created.
104     * 
105     * 
106     * @org.apache.xbean.XBean
107     */
108    public class BrokerService implements Service {
109        protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
110        public static final String DEFAULT_PORT = "61616";
111        public static final String LOCAL_HOST_NAME;
112        public static final String DEFAULT_BROKER_NAME = "localhost";
113        private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
114        private static final long serialVersionUID = 7353129142305630237L;
115        private boolean useJmx = true;
116        private boolean enableStatistics = true;
117        private boolean persistent = true;
118        private boolean populateJMSXUserID;
119        private boolean useAuthenticatedPrincipalForJMSXUserID;
120    
121        private boolean useShutdownHook = true;
122        private boolean useLoggingForShutdownErrors;
123        private boolean shutdownOnMasterFailure;
124        private boolean shutdownOnSlaveFailure;
125        private boolean waitForSlave;
126        private long waitForSlaveTimeout = 600000L;
127        private boolean passiveSlave;
128        private String brokerName = DEFAULT_BROKER_NAME;
129        private File dataDirectoryFile;
130        private File tmpDataDirectory;
131        private Broker broker;
132        private BrokerView adminView;
133        private ManagementContext managementContext;
134        private ObjectName brokerObjectName;
135        private TaskRunnerFactory taskRunnerFactory;
136        private TaskRunnerFactory persistenceTaskRunnerFactory;
137        private SystemUsage systemUsage;
138        private SystemUsage producerSystemUsage;
139        private SystemUsage consumerSystemUsaage;
140        private PersistenceAdapter persistenceAdapter;
141        private PersistenceAdapterFactory persistenceFactory;
142        protected DestinationFactory destinationFactory;
143        private MessageAuthorizationPolicy messageAuthorizationPolicy;
144        private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
145        private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
146        private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
147        private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
148        private final List<Service> services = new ArrayList<Service>();
149        private MasterConnector masterConnector;
150        private String masterConnectorURI;
151        private transient Thread shutdownHook;
152        private String[] transportConnectorURIs;
153        private String[] networkConnectorURIs;
154        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
155        // to other jms messaging
156        // systems
157        private boolean deleteAllMessagesOnStartup;
158        private boolean advisorySupport = true;
159        private URI vmConnectorURI;
160        private String defaultSocketURIString;
161        private PolicyMap destinationPolicy;
162        private final AtomicBoolean started = new AtomicBoolean(false);
163        private final AtomicBoolean stopped = new AtomicBoolean(false);
164        private BrokerPlugin[] plugins;
165        private boolean keepDurableSubsActive = true;
166        private boolean useVirtualTopics = true;
167        private boolean useMirroredQueues = false;
168        private boolean useTempMirroredQueues = true;
169        private BrokerId brokerId;
170        private DestinationInterceptor[] destinationInterceptors;
171        private ActiveMQDestination[] destinations;
172        private PListStore tempDataStore;
173        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
174        private boolean useLocalHostBrokerName;
175        private final CountDownLatch stoppedLatch = new CountDownLatch(1);
176        private final CountDownLatch startedLatch = new CountDownLatch(1);
177        private boolean supportFailOver;
178        private Broker regionBroker;
179        private int producerSystemUsagePortion = 60;
180        private int consumerSystemUsagePortion = 40;
181        private boolean splitSystemUsageForProducersConsumers;
182        private boolean monitorConnectionSplits = false;
183        private int taskRunnerPriority = Thread.NORM_PRIORITY;
184        private boolean dedicatedTaskRunner;
185        private boolean cacheTempDestinations = false;// useful for failover
186        private int timeBeforePurgeTempDestinations = 5000;
187        private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
188        private boolean systemExitOnShutdown;
189        private int systemExitOnShutdownExitCode;
190        private SslContext sslContext;
191        private boolean forceStart = false;
192        private IOExceptionHandler ioExceptionHandler;
193        private boolean schedulerSupport = false;
194        private File schedulerDirectoryFile;
195        private Scheduler scheduler;
196        private ThreadPoolExecutor executor;
197        private boolean slave = true;
198        private int schedulePeriodForDestinationPurge=5000;
199        private BrokerContext brokerContext;
200        private boolean networkConnectorStartAsync = false;
201    
202            static {
203            String localHostName = "localhost";
204            try {
205                localHostName =  InetAddressUtil.getLocalHostName();
206            } catch (UnknownHostException e) {
207                LOG.error("Failed to resolve localhost");
208            }
209            LOCAL_HOST_NAME = localHostName;
210        }
211    
212        @Override
213        public String toString() {
214            return "BrokerService[" + getBrokerName() + "]";
215        }
216    
217        /**
218         * Adds a new transport connector for the given bind address
219         * 
220         * @return the newly created and added transport connector
221         * @throws Exception
222         */
223        public TransportConnector addConnector(String bindAddress) throws Exception {
224            return addConnector(new URI(bindAddress));
225        }
226    
227        /**
228         * Adds a new transport connector for the given bind address
229         * 
230         * @return the newly created and added transport connector
231         * @throws Exception
232         */
233        public TransportConnector addConnector(URI bindAddress) throws Exception {
234            return addConnector(createTransportConnector(bindAddress));
235        }
236    
237        /**
238         * Adds a new transport connector for the given TransportServer transport
239         * 
240         * @return the newly created and added transport connector
241         * @throws Exception
242         */
243        public TransportConnector addConnector(TransportServer transport) throws Exception {
244            return addConnector(new TransportConnector(transport));
245        }
246    
247        /**
248         * Adds a new transport connector
249         * 
250         * @return the transport connector
251         * @throws Exception
252         */
253        public TransportConnector addConnector(TransportConnector connector) throws Exception {
254            transportConnectors.add(connector);
255            return connector;
256        }
257    
258        /**
259         * Stops and removes a transport connector from the broker.
260         * 
261         * @param connector
262         * @return true if the connector has been previously added to the broker
263         * @throws Exception
264         */
265        public boolean removeConnector(TransportConnector connector) throws Exception {
266            boolean rc = transportConnectors.remove(connector);
267            if (rc) {
268                unregisterConnectorMBean(connector);
269            }
270            return rc;
271        }
272    
273        /**
274         * Adds a new network connector using the given discovery address
275         * 
276         * @return the newly created and added network connector
277         * @throws Exception
278         */
279        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
280            return addNetworkConnector(new URI(discoveryAddress));
281        }
282    
283        /**
284         * Adds a new proxy connector using the given bind address
285         * 
286         * @return the newly created and added network connector
287         * @throws Exception
288         */
289        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
290            return addProxyConnector(new URI(bindAddress));
291        }
292    
293        /**
294         * Adds a new network connector using the given discovery address
295         * 
296         * @return the newly created and added network connector
297         * @throws Exception
298         */
299        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
300            if (!isAdvisorySupport()) {
301                throw new javax.jms.IllegalStateException(
302                        "Networks require advisory messages to function - advisories are currently disabled");
303            }
304            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
305            return addNetworkConnector(connector);
306        }
307    
308        /**
309         * Adds a new proxy connector using the given bind address
310         * 
311         * @return the newly created and added network connector
312         * @throws Exception
313         */
314        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
315            ProxyConnector connector = new ProxyConnector();
316            connector.setBind(bindAddress);
317            connector.setRemote(new URI("fanout:multicast://default"));
318            return addProxyConnector(connector);
319        }
320    
321        /**
322         * Adds a new network connector to connect this broker to a federated
323         * network
324         */
325        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
326            connector.setBrokerService(this);
327            URI uri = getVmConnectorURI();
328            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
329            map.put("network", "true");
330            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
331            connector.setLocalUri(uri);
332            // Set a connection filter so that the connector does not establish loop
333            // back connections.
334            connector.setConnectionFilter(new ConnectionFilter() {
335                public boolean connectTo(URI location) {
336                    List<TransportConnector> transportConnectors = getTransportConnectors();
337                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
338                        try {
339                            TransportConnector tc = iter.next();
340                            if (location.equals(tc.getConnectUri())) {
341                                return false;
342                            }
343                        } catch (Throwable e) {
344                        }
345                    }
346                    return true;
347                }
348            });
349            networkConnectors.add(connector);
350            if (isUseJmx()) {
351                registerNetworkConnectorMBean(connector);
352            }
353            return connector;
354        }
355    
356        /**
357         * Removes the given network connector without stopping it. The caller
358         * should call {@link NetworkConnector#stop()} to close the connector
359         */
360        public boolean removeNetworkConnector(NetworkConnector connector) {
361            boolean answer = networkConnectors.remove(connector);
362            if (answer) {
363                unregisterNetworkConnectorMBean(connector);
364            }
365            return answer;
366        }
367    
368        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
369            URI uri = getVmConnectorURI();
370            connector.setLocalUri(uri);
371            proxyConnectors.add(connector);
372            if (isUseJmx()) {
373                registerProxyConnectorMBean(connector);
374            }
375            return connector;
376        }
377    
378        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
379            connector.setBrokerService(this);
380            jmsConnectors.add(connector);
381            if (isUseJmx()) {
382                registerJmsConnectorMBean(connector);
383            }
384            return connector;
385        }
386    
387        public JmsConnector removeJmsConnector(JmsConnector connector) {
388            if (jmsConnectors.remove(connector)) {
389                return connector;
390            }
391            return null;
392        }
393    
394        /**
395         * @return Returns the masterConnectorURI.
396         */
397        public String getMasterConnectorURI() {
398            return masterConnectorURI;
399        }
400    
401        /**
402         * @param masterConnectorURI
403         *            The masterConnectorURI to set.
404         */
405        public void setMasterConnectorURI(String masterConnectorURI) {
406            this.masterConnectorURI = masterConnectorURI;
407        }
408    
409        /**
410         * @return true if this Broker is a slave to a Master
411         */
412        public boolean isSlave() {
413            return (masterConnector != null && masterConnector.isSlave()) ||
414                (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
415                (masterConnector == null && slave);
416        }
417    
418        public void masterFailed() {
419            if (shutdownOnMasterFailure) {
420                LOG.error("The Master has failed ... shutting down");
421                try {
422                    stop();
423                } catch (Exception e) {
424                    LOG.error("Failed to stop for master failure", e);
425                }
426            } else {
427                LOG.warn("Master Failed - starting all connectors");
428                try {
429                    startAllConnectors();
430                    broker.nowMasterBroker();
431                } catch (Exception e) {
432                    LOG.error("Failed to startAllConnectors", e);
433                }
434            }
435        }
436    
437        public boolean isStarted() {
438            return started.get();
439        }
440    
441        public void start(boolean force) throws Exception {
442            forceStart = force;
443            stopped.set(false);
444            started.set(false);
445            start();
446        }
447    
448        // Service interface
449        // -------------------------------------------------------------------------
450    
451        protected boolean shouldAutostart() {
452            return true;
453        }
454    
455        /**
456         *
457         * @throws Exception
458         * @org. apache.xbean.InitMethod
459         */
460        @PostConstruct
461        public void autoStart() throws Exception {
462            if(shouldAutostart()) {
463                start();
464            }
465        }
466    
467        public void start() throws Exception {
468            if (stopped.get() || !started.compareAndSet(false, true)) {
469                // lets just ignore redundant start() calls
470                // as its way too easy to not be completely sure if start() has been
471                // called or not with the gazillion of different configuration
472                // mechanisms
473                // throw new IllegalStateException("Allready started.");
474                return;
475            }
476    
477            MDC.put("activemq.broker", brokerName);
478    
479            try {
480                    if (systemExitOnShutdown && useShutdownHook) {
481                            throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
482                    }
483                processHelperProperties();
484                if (isUseJmx()) {
485                    startManagementContext();
486                }
487                getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
488                getPersistenceAdapter().setBrokerName(getBrokerName());
489                LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
490                if (deleteAllMessagesOnStartup) {
491                    deleteAllMessages();
492                }
493                getPersistenceAdapter().start();
494                slave = false;
495                startDestinations();
496                addShutdownHook();
497                getBroker().start();
498                if (isUseJmx()) {
499                    if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
500                            // try to restart management context
501                            // typical for slaves that use the same ports as master
502                            managementContext.stop();
503                            startManagementContext();
504                    }
505                    ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
506                    managedBroker.setContextBroker(broker);
507                    adminView.setBroker(managedBroker);
508                }
509                BrokerRegistry.getInstance().bind(getBrokerName(), this);
510                // see if there is a MasterBroker service and if so, configure
511                // it and start it.
512                for (Service service : services) {
513                    if (service instanceof MasterConnector) {
514                        configureService(service);
515                        service.start();
516                    }
517                }
518                if (!isSlave() && (this.masterConnector == null || isShutdownOnMasterFailure() == false)) {
519                    startAllConnectors();
520                }
521                if (!stopped.get()) {
522                    if (isUseJmx() && masterConnector != null) {
523                        registerFTConnectorMBean(masterConnector);
524                    }
525                }
526                if (brokerId == null) {
527                    brokerId = broker.getBrokerId();
528                }
529                if (ioExceptionHandler == null) {
530                    setIoExceptionHandler(new DefaultIOExceptionHandler());
531                }
532                LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
533                getBroker().brokerServiceStarted();
534                startedLatch.countDown();
535            } catch (Exception e) {
536                LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: " + e, e);
537                try {
538                    if (!stopped.get()) {
539                        stop();
540                    }
541                } catch (Exception ex) {
542                    LOG.warn("Failed to stop broker after failure in start ", ex);
543                }
544                throw e;
545            } finally {
546                MDC.remove("activemq.broker");
547            }
548        }
549    
550        /**
551         *
552         * @throws Exception
553         * @org.apache .xbean.DestroyMethod
554         */
555        @PreDestroy
556        public void stop() throws Exception {
557            if (!started.get()) {
558                return;
559            }
560    
561            MDC.put("activemq.broker", brokerName);
562    
563            if (systemExitOnShutdown) {
564                    new Thread() {
565                            @Override
566                    public void run() {
567                                    System.exit(systemExitOnShutdownExitCode);
568                            }
569                    }.start();
570            }
571    
572            LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is shutting down");
573            removeShutdownHook();
574            ServiceStopper stopper = new ServiceStopper();
575            if (services != null) {
576                for (Service service : services) {
577                    stopper.stop(service);
578                }
579            }
580            stopAllConnectors(stopper);
581            // remove any VMTransports connected
582            // this has to be done after services are stopped,
583            // to avoid timimg issue with discovery (spinning up a new instance)
584            BrokerRegistry.getInstance().unbind(getBrokerName());
585            VMTransportFactory.stopped(getBrokerName());
586            if (broker != null) {
587                stopper.stop(broker);
588                broker = null;
589            }
590    
591            if (tempDataStore != null) {
592                tempDataStore.stop();
593                tempDataStore = null;
594            }
595            stopper.stop(persistenceAdapter);
596            persistenceAdapter = null;
597            slave = true;
598            if (isUseJmx()) {
599                stopper.stop(getManagementContext());
600                managementContext = null;
601            }
602            // Clear SelectorParser cache to free memory
603            SelectorParser.clearCache();
604            stopped.set(true);
605            stoppedLatch.countDown();
606            if (masterConnectorURI == null) {
607                // master start has not finished yet
608                if (slaveStartSignal.getCount() == 1) {
609                    started.set(false);
610                    slaveStartSignal.countDown();
611                }
612            } else {
613                for (Service service : services) {
614                    if (service instanceof MasterConnector) {
615                        MasterConnector mConnector = (MasterConnector) service;
616                        if (!mConnector.isSlave()) {
617                            // means should be slave but not connected to master yet
618                            started.set(false);
619                            mConnector.stopBeforeConnected();
620                        }
621                    }
622                }
623            }
624            if (this.taskRunnerFactory != null) {
625                this.taskRunnerFactory.shutdown();
626                this.taskRunnerFactory = null;
627            }
628            if (this.scheduler != null) {
629                this.scheduler.stop();
630                this.scheduler = null;
631            }
632            if (this.executor != null) {
633                this.executor.shutdownNow();
634                this.executor = null;
635            }
636    
637            this.destinationInterceptors = null;
638            this.destinationFactory = null;
639    
640            LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") stopped");
641            synchronized (shutdownHooks) {
642                for (Runnable hook : shutdownHooks) {
643                    try {
644                        hook.run();
645                    } catch (Throwable e) {
646                        stopper.onException(hook, e);
647                    }
648                }
649            }
650    
651            MDC.remove("activemq.broker");
652    
653            stopper.throwFirstException();
654        }
655        
656        public boolean checkQueueSize(String queueName) {
657            long count = 0;
658            long queueSize = 0;
659            Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
660            for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
661                if (entry.getKey().isQueue()) {
662                    if (entry.getValue().getName().matches(queueName)) {
663                        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
664                        count += queueSize;
665                        if (queueSize > 0) {
666                            LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
667                                    + queueSize);
668                        }
669                    }
670                }
671            }
672            return count == 0;
673        }
674    
675        /**
676         * This method (both connectorName and queueName are using regex to match)
677         * 1. stop the connector (supposed the user input the connector which the
678         * clients connect to) 2. to check whether there is any pending message on
679         * the queues defined by queueName 3. supposedly, after stop the connector,
680         * client should failover to other broker and pending messages should be
681         * forwarded. if no pending messages, the method finally call stop to stop
682         * the broker.
683         * 
684         * @param connectorName
685         * @param queueName
686         * @param timeout
687         * @param pollInterval
688         * @throws Exception
689         */
690        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
691                throws Exception {
692            if (isUseJmx()) {
693                if (connectorName == null || queueName == null || timeout <= 0) {
694                    throw new Exception(
695                            "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
696                }
697                if (pollInterval <= 0) {
698                    pollInterval = 30;
699                }
700                LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
701                        + timeout + " pollInterval:" + pollInterval);
702                TransportConnector connector;
703                for (int i = 0; i < transportConnectors.size(); i++) {
704                    connector = transportConnectors.get(i);
705                    if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
706                        connector.stop();
707                    }
708                }
709                long start = System.currentTimeMillis();
710                while (System.currentTimeMillis() - start < timeout * 1000) {
711                    // check quesize until it gets zero
712                    if (checkQueueSize(queueName)) {
713                        stop();
714                        break;
715                    } else {
716                        Thread.sleep(pollInterval * 1000);
717                    }
718                }
719                if (stopped.get()) {
720                    LOG.info("Successfully stop the broker.");
721                } else {
722                    LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
723                }
724            }
725        }
726    
727        /**
728         * A helper method to block the caller thread until the broker has been
729         * stopped
730         */
731        public void waitUntilStopped() {
732            while (isStarted() && !stopped.get()) {
733                try {
734                    stoppedLatch.await();
735                } catch (InterruptedException e) {
736                    // ignore
737                }
738            }
739        }
740    
741        /**
742         * A helper method to block the caller thread until the broker has fully started
743         * @return boolean true if wait succeeded false if broker was not started or was stopped
744         */
745        public boolean waitUntilStarted() {
746            boolean waitSucceeded = false;
747            while (isStarted() && !stopped.get() && !waitSucceeded) {
748                try {
749                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
750                } catch (InterruptedException ignore) {
751                }
752            }
753            return waitSucceeded;
754        }
755    
756        // Properties
757        // -------------------------------------------------------------------------
758        /**
759         * Returns the message broker
760         */
761        public Broker getBroker() throws Exception {
762            if (broker == null) {
763                LOG.info("ActiveMQ " + ActiveMQConnectionMetaData.PROVIDER_VERSION + " JMS Message Broker ("
764                        + getBrokerName() + ") is starting");
765                LOG.info("For help or more information please see: http://activemq.apache.org/");
766                broker = createBroker();
767            }
768            return broker;
769        }
770    
771        /**
772         * Returns the administration view of the broker; used to create and destroy
773         * resources such as queues and topics. Note this method returns null if JMX
774         * is disabled.
775         */
776        public BrokerView getAdminView() throws Exception {
777            if (adminView == null) {
778                // force lazy creation
779                getBroker();
780            }
781            return adminView;
782        }
783    
784        public void setAdminView(BrokerView adminView) {
785            this.adminView = adminView;
786        }
787    
788        public String getBrokerName() {
789            return brokerName;
790        }
791    
792        /**
793         * Sets the name of this broker; which must be unique in the network
794         * 
795         * @param brokerName
796         */
797        public void setBrokerName(String brokerName) {
798            if (brokerName == null) {
799                throw new NullPointerException("The broker name cannot be null");
800            }
801            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
802            if (!str.equals(brokerName)) {
803                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
804            }
805            this.brokerName = str.trim();
806        }
807    
808        public PersistenceAdapterFactory getPersistenceFactory() {
809            return persistenceFactory;
810        }
811    
812        public File getDataDirectoryFile() {
813            if (dataDirectoryFile == null) {
814                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
815            }
816            return dataDirectoryFile;
817        }
818    
819        public File getBrokerDataDirectory() {
820            String brokerDir = getBrokerName();
821            return new File(getDataDirectoryFile(), brokerDir);
822        }
823    
824        /**
825         * Sets the directory in which the data files will be stored by default for
826         * the JDBC and Journal persistence adaptors.
827         * 
828         * @param dataDirectory
829         *            the directory to store data files
830         */
831        public void setDataDirectory(String dataDirectory) {
832            setDataDirectoryFile(new File(dataDirectory));
833        }
834    
835        /**
836         * Sets the directory in which the data files will be stored by default for
837         * the JDBC and Journal persistence adaptors.
838         * 
839         * @param dataDirectoryFile
840         *            the directory to store data files
841         */
842        public void setDataDirectoryFile(File dataDirectoryFile) {
843            this.dataDirectoryFile = dataDirectoryFile;
844        }
845    
846        /**
847         * @return the tmpDataDirectory
848         */
849        public File getTmpDataDirectory() {
850            if (tmpDataDirectory == null) {
851                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
852            }
853            return tmpDataDirectory;
854        }
855    
856        /**
857         * @param tmpDataDirectory
858         *            the tmpDataDirectory to set
859         */
860        public void setTmpDataDirectory(File tmpDataDirectory) {
861            this.tmpDataDirectory = tmpDataDirectory;
862        }
863    
864        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
865            this.persistenceFactory = persistenceFactory;
866        }
867    
868        public void setDestinationFactory(DestinationFactory destinationFactory) {
869            this.destinationFactory = destinationFactory;
870        }
871    
872        public boolean isPersistent() {
873            return persistent;
874        }
875    
876        /**
877         * Sets whether or not persistence is enabled or disabled.
878         */
879        public void setPersistent(boolean persistent) {
880            this.persistent = persistent;
881        }
882    
883        public boolean isPopulateJMSXUserID() {
884            return populateJMSXUserID;
885        }
886    
887        /**
888         * Sets whether or not the broker should populate the JMSXUserID header.
889         */
890        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
891            this.populateJMSXUserID = populateJMSXUserID;
892        }
893    
894        public SystemUsage getSystemUsage() {
895            try {
896                if (systemUsage == null) {
897                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
898                    systemUsage.setExecutor(getExecutor());
899                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default
900                                                                             // 64
901                                                                             // Meg
902                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 100); // 10
903                                                                                    // Gb
904                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100
905                                                                                     // GB
906                    addService(this.systemUsage);
907                }
908                return systemUsage;
909            } catch (IOException e) {
910                LOG.error("Cannot create SystemUsage", e);
911                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
912            }
913        }
914    
915        public void setSystemUsage(SystemUsage memoryManager) {
916            if (this.systemUsage != null) {
917                removeService(this.systemUsage);
918            }
919            this.systemUsage = memoryManager;
920            if (this.systemUsage.getExecutor()==null) {
921                this.systemUsage.setExecutor(getExecutor());
922            }
923            addService(this.systemUsage);
924        }
925    
926        /**
927         * @return the consumerUsageManager
928         * @throws IOException
929         */
930        public SystemUsage getConsumerSystemUsage() throws IOException {
931            if (this.consumerSystemUsaage == null) {
932                if (splitSystemUsageForProducersConsumers) {
933                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
934                    float portion = consumerSystemUsagePortion / 100f;
935                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
936                    addService(this.consumerSystemUsaage);
937                } else {
938                    consumerSystemUsaage = getSystemUsage();
939                }
940            }
941            return this.consumerSystemUsaage;
942        }
943    
944        /**
945         * @param consumerSystemUsaage
946         *            the storeSystemUsage to set
947         */
948        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
949            if (this.consumerSystemUsaage != null) {
950                removeService(this.consumerSystemUsaage);
951            }
952            this.consumerSystemUsaage = consumerSystemUsaage;
953            addService(this.consumerSystemUsaage);
954        }
955    
956        /**
957         * @return the producerUsageManager
958         * @throws IOException
959         */
960        public SystemUsage getProducerSystemUsage() throws IOException {
961            if (producerSystemUsage == null) {
962                if (splitSystemUsageForProducersConsumers) {
963                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
964                    float portion = producerSystemUsagePortion / 100f;
965                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
966                    addService(producerSystemUsage);
967                } else {
968                    producerSystemUsage = getSystemUsage();
969                }
970            }
971            return producerSystemUsage;
972        }
973    
974        /**
975         * @param producerUsageManager
976         *            the producerUsageManager to set
977         */
978        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
979            if (this.producerSystemUsage != null) {
980                removeService(this.producerSystemUsage);
981            }
982            this.producerSystemUsage = producerUsageManager;
983            addService(this.producerSystemUsage);
984        }
985    
986        public PersistenceAdapter getPersistenceAdapter() throws IOException {
987            if (persistenceAdapter == null) {
988                persistenceAdapter = createPersistenceAdapter();
989                configureService(persistenceAdapter);
990                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
991            }
992            return persistenceAdapter;
993        }
994    
995        /**
996         * Sets the persistence adaptor implementation to use for this broker
997         * 
998         * @throws IOException
999         */
1000        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1001            this.persistenceAdapter = persistenceAdapter;
1002            configureService(this.persistenceAdapter);
1003            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1004        }
1005    
1006        public TaskRunnerFactory getTaskRunnerFactory() {
1007            if (this.taskRunnerFactory == null) {
1008                this.taskRunnerFactory = new TaskRunnerFactory("BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1009                        isDedicatedTaskRunner());
1010            }
1011            return this.taskRunnerFactory;
1012        }
1013    
1014        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1015            this.taskRunnerFactory = taskRunnerFactory;
1016        }
1017    
1018        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1019            if (taskRunnerFactory == null) {
1020                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1021                        true, 1000, isDedicatedTaskRunner());
1022            }
1023            return persistenceTaskRunnerFactory;
1024        }
1025    
1026        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1027            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1028        }
1029    
1030        public boolean isUseJmx() {
1031            return useJmx;
1032        }
1033    
1034        public boolean isEnableStatistics() {
1035            return enableStatistics;
1036        }
1037    
1038        /**
1039         * Sets whether or not the Broker's services enable statistics or not.
1040         */
1041        public void setEnableStatistics(boolean enableStatistics) {
1042            this.enableStatistics = enableStatistics;
1043        }
1044    
1045        /**
1046         * Sets whether or not the Broker's services should be exposed into JMX or
1047         * not.
1048         */
1049        public void setUseJmx(boolean useJmx) {
1050            this.useJmx = useJmx;
1051        }
1052    
1053        public ObjectName getBrokerObjectName() throws IOException {
1054            if (brokerObjectName == null) {
1055                brokerObjectName = createBrokerObjectName();
1056            }
1057            return brokerObjectName;
1058        }
1059    
1060        /**
1061         * Sets the JMX ObjectName for this broker
1062         */
1063        public void setBrokerObjectName(ObjectName brokerObjectName) {
1064            this.brokerObjectName = brokerObjectName;
1065        }
1066    
1067        public ManagementContext getManagementContext() {
1068            if (managementContext == null) {
1069                managementContext = new ManagementContext();
1070            }
1071            return managementContext;
1072        }
1073    
1074        public void setManagementContext(ManagementContext managementContext) {
1075            this.managementContext = managementContext;
1076        }
1077    
1078        public NetworkConnector getNetworkConnectorByName(String connectorName) {
1079            for (NetworkConnector connector : networkConnectors) {
1080                if (connector.getName().equals(connectorName)) {
1081                    return connector;
1082                }
1083            }
1084            return null;
1085        }
1086    
1087        public String[] getNetworkConnectorURIs() {
1088            return networkConnectorURIs;
1089        }
1090    
1091        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1092            this.networkConnectorURIs = networkConnectorURIs;
1093        }
1094    
1095        public TransportConnector getConnectorByName(String connectorName) {
1096            for (TransportConnector connector : transportConnectors) {
1097                if (connector.getName().equals(connectorName)) {
1098                    return connector;
1099                }
1100            }
1101            return null;
1102        }
1103        
1104        public Map<String, String> getTransportConnectorURIsAsMap() {
1105            Map<String, String> answer = new HashMap<String, String>();
1106            for (TransportConnector connector : transportConnectors) {
1107                try {
1108                    URI uri = connector.getConnectUri();
1109                    String scheme = uri.getScheme();
1110                    if (scheme != null) {
1111                        answer.put(scheme.toLowerCase(), uri.toString());
1112                    }
1113                } catch (Exception e) {
1114                    LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1115                }
1116            }
1117            return answer;
1118        }
1119    
1120        public String[] getTransportConnectorURIs() {
1121            return transportConnectorURIs;
1122        }
1123    
1124        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1125            this.transportConnectorURIs = transportConnectorURIs;
1126        }
1127    
1128        /**
1129         * @return Returns the jmsBridgeConnectors.
1130         */
1131        public JmsConnector[] getJmsBridgeConnectors() {
1132            return jmsBridgeConnectors;
1133        }
1134    
1135        /**
1136         * @param jmsConnectors
1137         *            The jmsBridgeConnectors to set.
1138         */
1139        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1140            this.jmsBridgeConnectors = jmsConnectors;
1141        }
1142    
1143        public Service[] getServices() {
1144            return services.toArray(new Service[0]);
1145        }
1146    
1147        /**
1148         * Sets the services associated with this broker such as a
1149         * {@link MasterConnector}
1150         */
1151        public void setServices(Service[] services) {
1152            this.services.clear();
1153            if (services != null) {
1154                for (int i = 0; i < services.length; i++) {
1155                    this.services.add(services[i]);
1156                }
1157            }
1158        }
1159    
1160        /**
1161         * Adds a new service so that it will be started as part of the broker
1162         * lifecycle
1163         */
1164        public void addService(Service service) {
1165            services.add(service);
1166        }
1167    
1168        public void removeService(Service service) {
1169            services.remove(service);
1170        }
1171    
1172        public boolean isUseLoggingForShutdownErrors() {
1173            return useLoggingForShutdownErrors;
1174        }
1175    
1176        /**
1177         * Sets whether or not we should use commons-logging when reporting errors
1178         * when shutting down the broker
1179         */
1180        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1181            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1182        }
1183    
1184        public boolean isUseShutdownHook() {
1185            return useShutdownHook;
1186        }
1187    
1188        /**
1189         * Sets whether or not we should use a shutdown handler to close down the
1190         * broker cleanly if the JVM is terminated. It is recommended you leave this
1191         * enabled.
1192         */
1193        public void setUseShutdownHook(boolean useShutdownHook) {
1194            this.useShutdownHook = useShutdownHook;
1195        }
1196    
1197        public boolean isAdvisorySupport() {
1198            return advisorySupport;
1199        }
1200    
1201        /**
1202         * Allows the support of advisory messages to be disabled for performance
1203         * reasons.
1204         */
1205        public void setAdvisorySupport(boolean advisorySupport) {
1206            this.advisorySupport = advisorySupport;
1207        }
1208    
1209        public List<TransportConnector> getTransportConnectors() {
1210            return new ArrayList<TransportConnector>(transportConnectors);
1211        }
1212    
1213        /**
1214         * Sets the transport connectors which this broker will listen on for new
1215         * clients
1216         * 
1217         * @org.apache.xbean.Property 
1218         *                            nestedType="org.apache.activemq.broker.TransportConnector"
1219         */
1220        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1221            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1222                TransportConnector connector = iter.next();
1223                addConnector(connector);
1224            }
1225        }
1226    
1227        public List<NetworkConnector> getNetworkConnectors() {
1228            return new ArrayList<NetworkConnector>(networkConnectors);
1229        }
1230    
1231        public List<ProxyConnector> getProxyConnectors() {
1232            return new ArrayList<ProxyConnector>(proxyConnectors);
1233        }
1234    
1235        /**
1236         * Sets the network connectors which this broker will use to connect to
1237         * other brokers in a federated network
1238         * 
1239         * @org.apache.xbean.Property 
1240         *                            nestedType="org.apache.activemq.network.NetworkConnector"
1241         */
1242        public void setNetworkConnectors(List networkConnectors) throws Exception {
1243            for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1244                NetworkConnector connector = (NetworkConnector) iter.next();
1245                addNetworkConnector(connector);
1246            }
1247        }
1248    
1249        /**
1250         * Sets the network connectors which this broker will use to connect to
1251         * other brokers in a federated network
1252         */
1253        public void setProxyConnectors(List proxyConnectors) throws Exception {
1254            for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1255                ProxyConnector connector = (ProxyConnector) iter.next();
1256                addProxyConnector(connector);
1257            }
1258        }
1259    
1260        public PolicyMap getDestinationPolicy() {
1261            return destinationPolicy;
1262        }
1263    
1264        /**
1265         * Sets the destination specific policies available either for exact
1266         * destinations or for wildcard areas of destinations.
1267         */
1268        public void setDestinationPolicy(PolicyMap policyMap) {
1269            this.destinationPolicy = policyMap;
1270        }
1271    
1272        public BrokerPlugin[] getPlugins() {
1273            return plugins;
1274        }
1275    
1276        /**
1277         * Sets a number of broker plugins to install such as for security
1278         * authentication or authorization
1279         */
1280        public void setPlugins(BrokerPlugin[] plugins) {
1281            this.plugins = plugins;
1282        }
1283    
1284        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1285            return messageAuthorizationPolicy;
1286        }
1287    
1288        /**
1289         * Sets the policy used to decide if the current connection is authorized to
1290         * consume a given message
1291         */
1292        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1293            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1294        }
1295    
1296        /**
1297         * Delete all messages from the persistent store
1298         * 
1299         * @throws IOException
1300         */
1301        public void deleteAllMessages() throws IOException {
1302            getPersistenceAdapter().deleteAllMessages();
1303        }
1304    
1305        public boolean isDeleteAllMessagesOnStartup() {
1306            return deleteAllMessagesOnStartup;
1307        }
1308    
1309        /**
1310         * Sets whether or not all messages are deleted on startup - mostly only
1311         * useful for testing.
1312         */
1313        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1314            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1315        }
1316    
1317        public URI getVmConnectorURI() {
1318            if (vmConnectorURI == null) {
1319                try {
1320                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1321                } catch (URISyntaxException e) {
1322                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1323                }
1324            }
1325            return vmConnectorURI;
1326        }
1327    
1328        public void setVmConnectorURI(URI vmConnectorURI) {
1329            this.vmConnectorURI = vmConnectorURI;
1330        }
1331        
1332        public String getDefaultSocketURIString() {
1333           
1334                if (started.get()) {
1335                    if (this.defaultSocketURIString ==null) {
1336                        for (TransportConnector tc:this.transportConnectors) {
1337                            String result = null;
1338                            try {
1339                                result = tc.getPublishableConnectString();
1340                            } catch (Exception e) {
1341                              LOG.warn("Failed to get the ConnectURI for "+tc,e);
1342                            }
1343                            if (result != null) {
1344                                this.defaultSocketURIString =result;
1345                                break;
1346                            }
1347                        }
1348                    }
1349                    return this.defaultSocketURIString;
1350                }
1351           return null;
1352        }
1353    
1354        /**
1355         * @return Returns the shutdownOnMasterFailure.
1356         */
1357        public boolean isShutdownOnMasterFailure() {
1358            return shutdownOnMasterFailure;
1359        }
1360    
1361        /**
1362         * @param shutdownOnMasterFailure
1363         *            The shutdownOnMasterFailure to set.
1364         */
1365        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1366            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1367        }
1368    
1369        public boolean isKeepDurableSubsActive() {
1370            return keepDurableSubsActive;
1371        }
1372    
1373        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1374            this.keepDurableSubsActive = keepDurableSubsActive;
1375        }
1376    
1377        public boolean isUseVirtualTopics() {
1378            return useVirtualTopics;
1379        }
1380    
1381        /**
1382         * Sets whether or not <a
1383         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1384         * Topics</a> should be supported by default if they have not been
1385         * explicitly configured.
1386         */
1387        public void setUseVirtualTopics(boolean useVirtualTopics) {
1388            this.useVirtualTopics = useVirtualTopics;
1389        }
1390    
1391        public DestinationInterceptor[] getDestinationInterceptors() {
1392            return destinationInterceptors;
1393        }
1394    
1395        public boolean isUseMirroredQueues() {
1396            return useMirroredQueues;
1397        }
1398    
1399        /**
1400         * Sets whether or not <a
1401         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1402         * Queues</a> should be supported by default if they have not been
1403         * explicitly configured.
1404         */
1405        public void setUseMirroredQueues(boolean useMirroredQueues) {
1406            this.useMirroredQueues = useMirroredQueues;
1407        }
1408    
1409        /**
1410         * Sets the destination interceptors to use
1411         */
1412        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1413            this.destinationInterceptors = destinationInterceptors;
1414        }
1415    
1416        public ActiveMQDestination[] getDestinations() {
1417            return destinations;
1418        }
1419    
1420        /**
1421         * Sets the destinations which should be loaded/created on startup
1422         */
1423        public void setDestinations(ActiveMQDestination[] destinations) {
1424            this.destinations = destinations;
1425        }
1426    
1427        /**
1428         * @return the tempDataStore
1429         */
1430        public synchronized PListStore getTempDataStore() {
1431            if (tempDataStore == null) {
1432                if (!isPersistent()) {
1433                    return null;
1434                }
1435                boolean result = true;
1436                boolean empty = true;
1437                try {
1438                    File directory = getTmpDataDirectory();
1439                    if (directory.exists() && directory.isDirectory()) {
1440                        File[] files = directory.listFiles();
1441                        if (files != null && files.length > 0) {
1442                            empty = false;
1443                            for (int i = 0; i < files.length; i++) {
1444                                File file = files[i];
1445                                if (!file.isDirectory()) {
1446                                    result &= file.delete();
1447                                }
1448                            }
1449                        }
1450                    }
1451                    if (!empty) {
1452                        String str = result ? "Successfully deleted" : "Failed to delete";
1453                        LOG.info(str + " temporary storage");
1454                    }
1455                    this.tempDataStore = new PListStore();
1456                    this.tempDataStore.setDirectory(getTmpDataDirectory());
1457                    this.tempDataStore.start();
1458                } catch (Exception e) {
1459                    throw new RuntimeException(e);
1460                }
1461            }
1462            return tempDataStore;
1463        }
1464    
1465        /**
1466         * @param tempDataStore
1467         *            the tempDataStore to set
1468         */
1469        public void setTempDataStore(PListStore tempDataStore) {
1470            this.tempDataStore = tempDataStore;
1471        }
1472    
1473        public int getPersistenceThreadPriority() {
1474            return persistenceThreadPriority;
1475        }
1476    
1477        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1478            this.persistenceThreadPriority = persistenceThreadPriority;
1479        }
1480    
1481        /**
1482         * @return the useLocalHostBrokerName
1483         */
1484        public boolean isUseLocalHostBrokerName() {
1485            return this.useLocalHostBrokerName;
1486        }
1487    
1488        /**
1489         * @param useLocalHostBrokerName
1490         *            the useLocalHostBrokerName to set
1491         */
1492        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1493            this.useLocalHostBrokerName = useLocalHostBrokerName;
1494            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1495                brokerName = LOCAL_HOST_NAME;
1496            }
1497        }
1498    
1499        /**
1500         * @return the supportFailOver
1501         */
1502        public boolean isSupportFailOver() {
1503            return this.supportFailOver;
1504        }
1505    
1506        /**
1507         * @param supportFailOver
1508         *            the supportFailOver to set
1509         */
1510        public void setSupportFailOver(boolean supportFailOver) {
1511            this.supportFailOver = supportFailOver;
1512        }
1513    
1514        /**
1515         * Looks up and lazily creates if necessary the destination for the given
1516         * JMS name
1517         */
1518        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1519            return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1520        }
1521    
1522        public void removeDestination(ActiveMQDestination destination) throws Exception {
1523            getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1524        }
1525    
1526        public int getProducerSystemUsagePortion() {
1527            return producerSystemUsagePortion;
1528        }
1529    
1530        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1531            this.producerSystemUsagePortion = producerSystemUsagePortion;
1532        }
1533    
1534        public int getConsumerSystemUsagePortion() {
1535            return consumerSystemUsagePortion;
1536        }
1537    
1538        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1539            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1540        }
1541    
1542        public boolean isSplitSystemUsageForProducersConsumers() {
1543            return splitSystemUsageForProducersConsumers;
1544        }
1545    
1546        public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1547            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1548        }
1549    
1550        public boolean isMonitorConnectionSplits() {
1551            return monitorConnectionSplits;
1552        }
1553    
1554        public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1555            this.monitorConnectionSplits = monitorConnectionSplits;
1556        }
1557    
1558        public int getTaskRunnerPriority() {
1559            return taskRunnerPriority;
1560        }
1561    
1562        public void setTaskRunnerPriority(int taskRunnerPriority) {
1563            this.taskRunnerPriority = taskRunnerPriority;
1564        }
1565    
1566        public boolean isDedicatedTaskRunner() {
1567            return dedicatedTaskRunner;
1568        }
1569    
1570        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1571            this.dedicatedTaskRunner = dedicatedTaskRunner;
1572        }
1573    
1574        public boolean isCacheTempDestinations() {
1575            return cacheTempDestinations;
1576        }
1577    
1578        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1579            this.cacheTempDestinations = cacheTempDestinations;
1580        }
1581    
1582        public int getTimeBeforePurgeTempDestinations() {
1583            return timeBeforePurgeTempDestinations;
1584        }
1585    
1586        public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1587            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1588        }
1589    
1590        public boolean isUseTempMirroredQueues() {
1591            return useTempMirroredQueues;
1592        }
1593    
1594        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1595            this.useTempMirroredQueues = useTempMirroredQueues;
1596        }
1597    
1598        //
1599        // Implementation methods
1600        // -------------------------------------------------------------------------
1601        /**
1602         * Handles any lazy-creation helper properties which are added to make
1603         * things easier to configure inside environments such as Spring
1604         * 
1605         * @throws Exception
1606         */
1607        protected void processHelperProperties() throws Exception {
1608            boolean masterServiceExists = false;
1609            if (transportConnectorURIs != null) {
1610                for (int i = 0; i < transportConnectorURIs.length; i++) {
1611                    String uri = transportConnectorURIs[i];
1612                    addConnector(uri);
1613                }
1614            }
1615            if (networkConnectorURIs != null) {
1616                for (int i = 0; i < networkConnectorURIs.length; i++) {
1617                    String uri = networkConnectorURIs[i];
1618                    addNetworkConnector(uri);
1619                }
1620            }
1621            if (jmsBridgeConnectors != null) {
1622                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1623                    addJmsConnector(jmsBridgeConnectors[i]);
1624                }
1625            }
1626            for (Service service : services) {
1627                if (service instanceof MasterConnector) {
1628                    masterServiceExists = true;
1629                    break;
1630                }
1631            }
1632            if (masterConnectorURI != null) {
1633                if (masterServiceExists) {
1634                    throw new IllegalStateException(
1635                            "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1636                } else {
1637                    addService(new MasterConnector(masterConnectorURI));
1638                }
1639            }
1640        }
1641    
1642        public void stopAllConnectors(ServiceStopper stopper) {
1643            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1644                NetworkConnector connector = iter.next();
1645                unregisterNetworkConnectorMBean(connector);
1646                stopper.stop(connector);
1647            }
1648            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1649                ProxyConnector connector = iter.next();
1650                stopper.stop(connector);
1651            }
1652            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1653                JmsConnector connector = iter.next();
1654                stopper.stop(connector);
1655            }
1656            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1657                TransportConnector connector = iter.next();
1658                stopper.stop(connector);
1659            }
1660        }
1661    
1662        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1663            try {
1664                ObjectName objectName = createConnectorObjectName(connector);
1665                connector = connector.asManagedConnector(getManagementContext(), objectName);
1666                ConnectorViewMBean view = new ConnectorView(connector);
1667                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1668                return connector;
1669            } catch (Throwable e) {
1670                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1671            }
1672        }
1673    
1674        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1675            if (isUseJmx()) {
1676                try {
1677                    ObjectName objectName = createConnectorObjectName(connector);
1678                    getManagementContext().unregisterMBean(objectName);
1679                } catch (Throwable e) {
1680                    throw IOExceptionSupport.create(
1681                            "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1682                }
1683            }
1684        }
1685    
1686        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1687            return adaptor;
1688        }
1689    
1690        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1691            if (isUseJmx()) {
1692            }
1693        }
1694    
1695        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1696            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1697                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
1698                    + JMXSupport.encodeObjectNamePart(connector.getName()));
1699        }
1700    
1701        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1702            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
1703            try {
1704                ObjectName objectName = createNetworkConnectorObjectName(connector);
1705                connector.setObjectName(objectName);
1706                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1707            } catch (Throwable e) {
1708                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
1709            }
1710        }
1711    
1712        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
1713                throws MalformedObjectNameException {
1714            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1715                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1716                    + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1717        }
1718    
1719    
1720        public ObjectName createDuplexNetworkConnectorObjectName(String transport)
1721                throws MalformedObjectNameException {
1722            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1723                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
1724                    + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
1725        }
1726    
1727        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
1728            if (isUseJmx()) {
1729                try {
1730                    ObjectName objectName = createNetworkConnectorObjectName(connector);
1731                    getManagementContext().unregisterMBean(objectName);
1732                } catch (Exception e) {
1733                    LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
1734                }
1735            }
1736        }
1737    
1738        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
1739            ProxyConnectorView view = new ProxyConnectorView(connector);
1740            try {
1741                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1742                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
1743                        + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1744                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1745            } catch (Throwable e) {
1746                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1747            }
1748        }
1749    
1750        protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
1751            FTConnectorView view = new FTConnectorView(connector);
1752            try {
1753                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1754                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
1755                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1756            } catch (Throwable e) {
1757                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1758            }
1759        }
1760    
1761        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
1762            JmsConnectorView view = new JmsConnectorView(connector);
1763            try {
1764                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1765                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
1766                        + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
1767                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1768            } catch (Throwable e) {
1769                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
1770            }
1771        }
1772    
1773        /**
1774         * Factory method to create a new broker
1775         * 
1776         * @throws Exception
1777         * @throws
1778         * @throws
1779         */
1780        protected Broker createBroker() throws Exception {
1781            regionBroker = createRegionBroker();
1782            Broker broker = addInterceptors(regionBroker);
1783            // Add a filter that will stop access to the broker once stopped
1784            broker = new MutableBrokerFilter(broker) {
1785                Broker old;
1786    
1787                @Override
1788                public void stop() throws Exception {
1789                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
1790                        // Just ignore additional stop actions.
1791                        @Override
1792                        public void stop() throws Exception {
1793                        }
1794                    });
1795                    old.stop();
1796                }
1797    
1798                @Override
1799                public void start() throws Exception {
1800                    if (forceStart && old != null) {
1801                        this.next.set(old);
1802                    }
1803                    getNext().start();
1804                }
1805            };
1806            return broker;
1807        }
1808    
1809        /**
1810         * Factory method to create the core region broker onto which interceptors
1811         * are added
1812         * 
1813         * @throws Exception
1814         */
1815        protected Broker createRegionBroker() throws Exception {
1816            if (destinationInterceptors == null) {
1817                destinationInterceptors = createDefaultDestinationInterceptor();
1818            }
1819            configureServices(destinationInterceptors);
1820            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
1821            if (destinationFactory == null) {
1822                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
1823            }
1824            return createRegionBroker(destinationInterceptor);
1825        }
1826    
1827        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
1828            RegionBroker regionBroker;
1829            if (isUseJmx()) {
1830                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
1831                        getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
1832            } else {
1833                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
1834                        destinationInterceptor,getScheduler(),getExecutor());
1835            }
1836            destinationFactory.setRegionBroker(regionBroker);
1837            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
1838            regionBroker.setBrokerName(getBrokerName());
1839            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
1840            if (brokerId != null) {
1841                regionBroker.setBrokerId(brokerId);
1842            }
1843            return regionBroker;
1844        }
1845    
1846        /**
1847         * Create the default destination interceptor
1848         */
1849        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
1850            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
1851            if (isUseVirtualTopics()) {
1852                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
1853                VirtualTopic virtualTopic = new VirtualTopic();
1854                virtualTopic.setName("VirtualTopic.>");
1855                VirtualDestination[] virtualDestinations = { virtualTopic };
1856                interceptor.setVirtualDestinations(virtualDestinations);
1857                answer.add(interceptor);
1858            }
1859            if (isUseMirroredQueues()) {
1860                MirroredQueue interceptor = new MirroredQueue();
1861                answer.add(interceptor);
1862            }
1863            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
1864            answer.toArray(array);
1865            return array;
1866        }
1867    
1868        /**
1869         * Strategy method to add interceptors to the broker
1870         * 
1871         * @throws IOException
1872         */
1873        protected Broker addInterceptors(Broker broker) throws Exception {
1874            if (isSchedulerSupport()) {
1875                SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
1876                if (isUseJmx()) {
1877                    JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
1878                    try {
1879                        ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
1880                                + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
1881                                + "Type=jobScheduler," + "jobSchedulerName=JMS");
1882    
1883                        AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1884                        this.adminView.setJMSJobScheduler(objectName);
1885                    } catch (Throwable e) {
1886                        throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
1887                                + e.getMessage(), e);
1888                    }
1889    
1890                }
1891                broker = sb;
1892            }
1893            if (isAdvisorySupport()) {
1894                broker = new AdvisoryBroker(broker);
1895            }
1896            broker = new CompositeDestinationBroker(broker);
1897            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
1898            if (isPopulateJMSXUserID()) {
1899                UserIDBroker userIDBroker = new UserIDBroker(broker);
1900                userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
1901                broker = userIDBroker;
1902            }
1903            if (isMonitorConnectionSplits()) {
1904                broker = new ConnectionSplitBroker(broker);
1905            }
1906            if (plugins != null) {
1907                for (int i = 0; i < plugins.length; i++) {
1908                    BrokerPlugin plugin = plugins[i];
1909                    broker = plugin.installPlugin(broker);
1910                }
1911            }
1912            return broker;
1913        }
1914    
1915        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
1916            if (isPersistent()) {
1917                PersistenceAdapterFactory fac = getPersistenceFactory();
1918                if (fac != null) {
1919                    return fac.createPersistenceAdapter();
1920                }else {
1921                    KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
1922                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
1923                    adaptor.setDirectory(dir);
1924                    return adaptor;
1925                }
1926            } else {
1927                return new MemoryPersistenceAdapter();
1928            }
1929        }
1930    
1931        protected ObjectName createBrokerObjectName() throws IOException {
1932            try {
1933                return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
1934                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
1935            } catch (Throwable e) {
1936                throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
1937            }
1938        }
1939    
1940        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
1941            TransportServer transport = TransportFactory.bind(this, brokerURI);
1942            return new TransportConnector(transport);
1943        }
1944    
1945        /**
1946         * Extracts the port from the options
1947         */
1948        protected Object getPort(Map options) {
1949            Object port = options.get("port");
1950            if (port == null) {
1951                port = DEFAULT_PORT;
1952                LOG.warn("No port specified so defaulting to: " + port);
1953            }
1954            return port;
1955        }
1956    
1957        protected void addShutdownHook() {
1958            if (useShutdownHook) {
1959                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
1960                    @Override
1961                    public void run() {
1962                        containerShutdown();
1963                    }
1964                };
1965                Runtime.getRuntime().addShutdownHook(shutdownHook);
1966            }
1967        }
1968    
1969        protected void removeShutdownHook() {
1970            if (shutdownHook != null) {
1971                try {
1972                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
1973                } catch (Exception e) {
1974                    LOG.debug("Caught exception, must be shutting down: " + e);
1975                }
1976            }
1977        }
1978    
1979        /**
1980         * Sets hooks to be executed when broker shut down
1981         * 
1982         * @org.apache.xbean.Property
1983         */
1984        public void setShutdownHooks(List<Runnable> hooks) throws Exception {
1985            for (Runnable hook : hooks) {
1986                addShutdownHook(hook);
1987            }
1988        }
1989        
1990        /**
1991         * Causes a clean shutdown of the container when the VM is being shut down
1992         */
1993        protected void containerShutdown() {
1994            try {
1995                stop();
1996            } catch (IOException e) {
1997                Throwable linkedException = e.getCause();
1998                if (linkedException != null) {
1999                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2000                } else {
2001                    logError("Failed to shut down: " + e, e);
2002                }
2003                if (!useLoggingForShutdownErrors) {
2004                    e.printStackTrace(System.err);
2005                }
2006            } catch (Exception e) {
2007                logError("Failed to shut down: " + e, e);
2008            }
2009        }
2010    
2011        protected void logError(String message, Throwable e) {
2012            if (useLoggingForShutdownErrors) {
2013                LOG.error("Failed to shut down: " + e);
2014            } else {
2015                System.err.println("Failed to shut down: " + e);
2016            }
2017        }
2018    
2019        /**
2020         * Starts any configured destinations on startup
2021         */
2022        protected void startDestinations() throws Exception {
2023            if (destinations != null) {
2024                ConnectionContext adminConnectionContext = getAdminConnectionContext();
2025                for (int i = 0; i < destinations.length; i++) {
2026                    ActiveMQDestination destination = destinations[i];
2027                    getBroker().addDestination(adminConnectionContext, destination,true);
2028                }
2029            }
2030        }
2031    
2032        /**
2033         * Returns the broker's administration connection context used for
2034         * configuring the broker at startup
2035         */
2036        public ConnectionContext getAdminConnectionContext() throws Exception {
2037            return BrokerSupport.getConnectionContext(getBroker());
2038        }
2039    
2040        protected void waitForSlave() {
2041            try {
2042                if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2043                    throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds."); 
2044                }
2045            } catch (InterruptedException e) {
2046                LOG.error("Exception waiting for slave:" + e);
2047            }
2048        }
2049    
2050        protected void slaveConnectionEstablished() {
2051            slaveStartSignal.countDown();
2052        }
2053        
2054        protected void startManagementContext() throws Exception {
2055            getManagementContext().start();
2056            adminView = new BrokerView(this, null);
2057            ObjectName objectName = getBrokerObjectName();
2058            AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2059        }
2060    
2061        /**
2062         * Start all transport and network connections, proxies and bridges
2063         * 
2064         * @throws Exception
2065         */
2066        public void startAllConnectors() throws Exception {
2067            if (!isSlave()) {
2068                Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2069                List<TransportConnector> al = new ArrayList<TransportConnector>();
2070                for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2071                    TransportConnector connector = iter.next();
2072                    connector.setBrokerService(this);
2073                    al.add(startTransportConnector(connector));
2074                }
2075                if (al.size() > 0) {
2076                    // let's clear the transportConnectors list and replace it with
2077                    // the started transportConnector instances
2078                    this.transportConnectors.clear();
2079                    setTransportConnectors(al);
2080                }
2081                URI uri = getVmConnectorURI();
2082                Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2083                map.put("network", "true");
2084                map.put("async", "false");
2085                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2086                if (isWaitForSlave()) {
2087                    waitForSlave();
2088                }
2089                if (!stopped.get()) {
2090                    ThreadPoolExecutor networkConnectorStartExecutor = null;
2091                    if (isNetworkConnectorStartAsync()) {
2092                        // spin up as many threads as needed
2093                        networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2094                                10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2095                                new ThreadFactory() {
2096                                    int count=0;
2097                                    public Thread newThread(Runnable runnable) {
2098                                        Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2099                                        thread.setDaemon(true);
2100                                        return thread;
2101                                    }
2102                                });
2103                    }
2104    
2105                    for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2106                        final NetworkConnector connector = iter.next();
2107                        connector.setLocalUri(uri);
2108                        connector.setBrokerName(getBrokerName());
2109                        connector.setDurableDestinations(durableDestinations);
2110                        if (getDefaultSocketURIString() != null) {
2111                            connector.setBrokerURL(getDefaultSocketURIString());
2112                        }
2113                        if (networkConnectorStartExecutor != null) {
2114                            final Map context = MDCHelper.getCopyOfContextMap();
2115                            networkConnectorStartExecutor.execute(new Runnable() {
2116                                public void run() {
2117                                    try {
2118                                        MDCHelper.setContextMap(context);
2119                                        LOG.info("Async start of " + connector);
2120                                        connector.start();
2121                                    } catch(Exception e) {
2122                                        LOG.error("Async start of network connector: " + connector + " failed", e);
2123                                    }
2124                                }
2125                            });
2126                        } else {
2127                            connector.start();
2128                        }
2129                    }
2130                    if (networkConnectorStartExecutor != null) {
2131                        // executor done when enqueued tasks are complete
2132                        networkConnectorStartExecutor.shutdown();
2133                        networkConnectorStartExecutor = null;
2134                    }
2135    
2136                    for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2137                        ProxyConnector connector = iter.next();
2138                        connector.start();
2139                    }
2140                    for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2141                        JmsConnector connector = iter.next();
2142                        connector.start();
2143                    }
2144                    for (Service service : services) {
2145                        configureService(service);
2146                        service.start();
2147                    }
2148                }
2149            }
2150        }
2151    
2152        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2153            connector.setTaskRunnerFactory(getTaskRunnerFactory());
2154            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2155            if (policy != null) {
2156                connector.setMessageAuthorizationPolicy(policy);
2157            }
2158            if (isUseJmx()) {
2159                connector = registerConnectorMBean(connector);
2160            }
2161            connector.getStatistics().setEnabled(enableStatistics);
2162            connector.start();
2163            return connector;
2164        }
2165    
2166        /**
2167         * Perform any custom dependency injection
2168         */
2169        protected void configureServices(Object[] services) {
2170            for (Object service : services) {
2171                configureService(service);
2172            }
2173        }
2174    
2175        /**
2176         * Perform any custom dependency injection
2177         */
2178        protected void configureService(Object service) {
2179            if (service instanceof BrokerServiceAware) {
2180                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2181                serviceAware.setBrokerService(this);
2182            }
2183            if (masterConnector == null) {
2184                if (service instanceof MasterConnector) {
2185                    masterConnector = (MasterConnector) service;
2186                    supportFailOver = true;
2187                }
2188            }
2189        }
2190        
2191        public void handleIOException(IOException exception) {
2192            if (ioExceptionHandler != null) {
2193                ioExceptionHandler.handle(exception);
2194             } else {
2195                LOG.info("Ignoring IO exception, " + exception, exception);
2196             }
2197        }
2198    
2199        /**
2200         * Starts all destiantions in persistence store. This includes all inactive
2201         * destinations
2202         */
2203        protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
2204            Set destinations = destinationFactory.getDestinations();
2205            if (destinations != null) {
2206                Iterator iter = destinations.iterator();
2207                ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
2208                if (adminConnectionContext == null) {
2209                    ConnectionContext context = new ConnectionContext();
2210                    context.setBroker(broker);
2211                    adminConnectionContext = context;
2212                    broker.setAdminConnectionContext(adminConnectionContext);
2213                }
2214                while (iter.hasNext()) {
2215                    ActiveMQDestination destination = (ActiveMQDestination) iter.next();
2216                    broker.addDestination(adminConnectionContext, destination,false);
2217                }
2218            }
2219        }
2220        
2221        protected synchronized ThreadPoolExecutor getExecutor() {
2222            if (this.executor == null) {
2223            this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2224                public Thread newThread(Runnable runnable) {
2225                    Thread thread = new Thread(runnable, "Usage Async Task");
2226                    thread.setDaemon(true);
2227                    return thread;
2228                }
2229            });
2230            }
2231            return this.executor;
2232        }
2233        
2234        public synchronized Scheduler getScheduler() {
2235            if (this.scheduler==null) {
2236                this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2237                try {
2238                    this.scheduler.start();
2239                } catch (Exception e) {
2240                   LOG.error("Failed to start Scheduler ",e);
2241                }
2242            }
2243            return this.scheduler;
2244        }
2245    
2246        public Broker getRegionBroker() {
2247            return regionBroker;
2248        }
2249    
2250        public void setRegionBroker(Broker regionBroker) {
2251            this.regionBroker = regionBroker;
2252        }
2253    
2254        public void addShutdownHook(Runnable hook) {
2255            synchronized (shutdownHooks) {
2256                shutdownHooks.add(hook);
2257            }
2258        }
2259    
2260        public void removeShutdownHook(Runnable hook) {
2261            synchronized (shutdownHooks) {
2262                shutdownHooks.remove(hook);
2263            }
2264        }
2265    
2266        public boolean isSystemExitOnShutdown() {
2267            return systemExitOnShutdown;
2268        }
2269    
2270        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2271            this.systemExitOnShutdown = systemExitOnShutdown;
2272        }
2273    
2274        public int getSystemExitOnShutdownExitCode() {
2275            return systemExitOnShutdownExitCode;
2276        }
2277    
2278        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2279            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2280        }
2281    
2282        public SslContext getSslContext() {
2283            return sslContext;
2284        }
2285    
2286        public void setSslContext(SslContext sslContext) {
2287            this.sslContext = sslContext;
2288        }
2289    
2290        public boolean isShutdownOnSlaveFailure() {
2291            return shutdownOnSlaveFailure;
2292        }
2293    
2294        public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2295            this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2296        }
2297    
2298        public boolean isWaitForSlave() {
2299            return waitForSlave;
2300        }
2301    
2302        public void setWaitForSlave(boolean waitForSlave) {
2303            this.waitForSlave = waitForSlave;
2304        }
2305      
2306        public long getWaitForSlaveTimeout() {
2307            return this.waitForSlaveTimeout;
2308        }
2309        
2310        public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2311            this.waitForSlaveTimeout = waitForSlaveTimeout;
2312        }
2313        
2314        public CountDownLatch getSlaveStartSignal() {
2315            return slaveStartSignal;
2316        }
2317    
2318        /**
2319         * Get the passiveSlave
2320         * @return the passiveSlave
2321         */
2322        public boolean isPassiveSlave() {
2323            return this.passiveSlave;
2324        }
2325    
2326        /**
2327         * Set the passiveSlave
2328         * @param passiveSlave the passiveSlave to set
2329         */
2330        public void setPassiveSlave(boolean passiveSlave) {
2331            this.passiveSlave = passiveSlave;
2332        }
2333    
2334        /**
2335         * override the Default IOException handler, called when persistence adapter
2336         * has experiences File or JDBC I/O Exceptions
2337         *
2338         * @param ioExceptionHandler
2339         */
2340        public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2341            configureService(ioExceptionHandler);
2342            this.ioExceptionHandler = ioExceptionHandler;
2343        }
2344    
2345        public IOExceptionHandler getIoExceptionHandler() {
2346            return ioExceptionHandler;
2347        }
2348    
2349        /**
2350         * @return the schedulerSupport
2351         */
2352        public boolean isSchedulerSupport() {
2353            return this.schedulerSupport;
2354        }
2355    
2356        /**
2357         * @param schedulerSupport the schedulerSupport to set
2358         */
2359        public void setSchedulerSupport(boolean schedulerSupport) {
2360            this.schedulerSupport = schedulerSupport;
2361        }
2362    
2363        /**
2364         * @return the schedulerDirectory
2365         */
2366        public File getSchedulerDirectoryFile() {
2367            if (this.schedulerDirectoryFile == null) {
2368                this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2369            }
2370            return schedulerDirectoryFile;
2371        }
2372    
2373        /**
2374         * @param schedulerDirectory the schedulerDirectory to set
2375         */
2376        public void setSchedulerDirectoryFile(File schedulerDirectory) {
2377            this.schedulerDirectoryFile = schedulerDirectory;
2378        }
2379        
2380        public void setSchedulerDirectory(String schedulerDirectory) {
2381            setSchedulerDirectoryFile(new File(schedulerDirectory));
2382        }
2383    
2384        public int getSchedulePeriodForDestinationPurge() {
2385            return this.schedulePeriodForDestinationPurge;
2386        }
2387    
2388        public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2389            this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2390        }
2391    
2392        public BrokerContext getBrokerContext() {
2393            return brokerContext;
2394        }
2395    
2396        public void setBrokerContext(BrokerContext brokerContext) {
2397            this.brokerContext = brokerContext;
2398        }
2399    
2400        public void setBrokerId(String brokerId) {
2401            this.brokerId = new BrokerId(brokerId);
2402        }
2403    
2404        public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2405            return useAuthenticatedPrincipalForJMSXUserID;
2406        }
2407    
2408        public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2409            this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2410        }
2411    
2412        public boolean isNetworkConnectorStartAsync() {
2413            return networkConnectorStartAsync;
2414        }
2415    
2416        public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2417            this.networkConnectorStartAsync = networkConnectorStartAsync;
2418        }
2419    }