001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Hashtable;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.Map.Entry;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CopyOnWriteArraySet;
030    import java.util.concurrent.ThreadPoolExecutor;
031    import javax.management.InstanceNotFoundException;
032    import javax.management.MalformedObjectNameException;
033    import javax.management.ObjectName;
034    import javax.management.openmbean.CompositeData;
035    import javax.management.openmbean.CompositeDataSupport;
036    import javax.management.openmbean.CompositeType;
037    import javax.management.openmbean.OpenDataException;
038    import javax.management.openmbean.TabularData;
039    import javax.management.openmbean.TabularDataSupport;
040    import javax.management.openmbean.TabularType;
041    import org.apache.activemq.broker.Broker;
042    import org.apache.activemq.broker.BrokerService;
043    import org.apache.activemq.broker.ConnectionContext;
044    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
045    import org.apache.activemq.broker.region.Destination;
046    import org.apache.activemq.broker.region.DestinationFactory;
047    import org.apache.activemq.broker.region.DestinationFactoryImpl;
048    import org.apache.activemq.broker.region.DestinationInterceptor;
049    import org.apache.activemq.broker.region.Queue;
050    import org.apache.activemq.broker.region.Region;
051    import org.apache.activemq.broker.region.RegionBroker;
052    import org.apache.activemq.broker.region.Subscription;
053    import org.apache.activemq.broker.region.Topic;
054    import org.apache.activemq.broker.region.TopicRegion;
055    import org.apache.activemq.broker.region.TopicSubscription;
056    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
057    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
058    import org.apache.activemq.command.ActiveMQDestination;
059    import org.apache.activemq.command.ActiveMQMessage;
060    import org.apache.activemq.command.ActiveMQTopic;
061    import org.apache.activemq.command.ConsumerInfo;
062    import org.apache.activemq.command.Message;
063    import org.apache.activemq.command.MessageId;
064    import org.apache.activemq.command.SubscriptionInfo;
065    import org.apache.activemq.store.MessageRecoveryListener;
066    import org.apache.activemq.store.PersistenceAdapter;
067    import org.apache.activemq.store.TopicMessageStore;
068    import org.apache.activemq.thread.Scheduler;
069    import org.apache.activemq.thread.TaskRunnerFactory;
070    import org.apache.activemq.usage.SystemUsage;
071    import org.apache.activemq.util.JMXSupport;
072    import org.apache.activemq.util.ServiceStopper;
073    import org.apache.activemq.util.SubscriptionKey;
074    import org.slf4j.Logger;
075    import org.slf4j.LoggerFactory;
076    
077    public class ManagedRegionBroker extends RegionBroker {
078        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
079        private final ManagementContext managementContext;
080        private final ObjectName brokerObjectName;
081        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
082        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
083        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
084        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
085        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
086        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
087        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
088        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
092        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
093        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
094        /* This is the first broker in the broker interceptor chain. */
095        private Broker contextBroker;
096    
097        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
098                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
099            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
100            this.managementContext = context;
101            this.brokerObjectName = brokerObjectName;
102        }
103    
104        @Override
105        public void start() throws Exception {
106            super.start();
107            // build all existing durable subscriptions
108            buildExistingSubscriptions();
109        }
110    
111        @Override
112        protected void doStop(ServiceStopper stopper) {
113            super.doStop(stopper);
114            // lets remove any mbeans not yet removed
115            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
116                ObjectName name = iter.next();
117                try {
118                    managementContext.unregisterMBean(name);
119                } catch (InstanceNotFoundException e) {
120                    LOG.warn("The MBean: " + name + " is no longer registered with JMX");
121                } catch (Exception e) {
122                    stopper.onException(this, e);
123                }
124            }
125            registeredMBeans.clear();
126        }
127    
128        @Override
129        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
130            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
131        }
132    
133        @Override
134        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
135            return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
136        }
137    
138        @Override
139        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
140            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
141        }
142    
143        @Override
144        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146        }
147    
148        public void register(ActiveMQDestination destName, Destination destination) {
149            // TODO refactor to allow views for custom destinations
150            try {
151                ObjectName objectName = createObjectName(destName);
152                DestinationView view;
153                if (destination instanceof Queue) {
154                    view = new QueueView(this, (Queue)destination);
155                } else if (destination instanceof Topic) {
156                    view = new TopicView(this, (Topic)destination);
157                } else {
158                    view = null;
159                    LOG.warn("JMX View is not supported for custom destination: " + destination);
160                }
161                if (view != null) {
162                    registerDestination(objectName, destName, view);
163                }
164            } catch (Exception e) {
165                LOG.error("Failed to register destination " + destName, e);
166            }
167        }
168    
169        public void unregister(ActiveMQDestination destName) {
170            try {
171                ObjectName objectName = createObjectName(destName);
172                unregisterDestination(objectName);
173            } catch (Exception e) {
174                LOG.error("Failed to unregister " + destName, e);
175            }
176        }
177    
178        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
179            String connectionClientId = context.getClientId();
180            ObjectName brokerJmxObjectName = brokerObjectName;
181            String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
182            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
183            try {
184                ObjectName objectName = new ObjectName(objectNameStr);
185                SubscriptionView view;
186                if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
187                    // add offline subscribers to inactive list
188                    SubscriptionInfo info = new SubscriptionInfo();
189                    info.setClientId(context.getClientId());
190                    info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
191                    info.setDestination(sub.getConsumerInfo().getDestination());
192                    addInactiveSubscription(key, info);
193                } else {
194                    if (sub.getConsumerInfo().isDurable()) {
195                        view = new DurableSubscriptionView(this, context.getClientId(), sub);
196                    } else {
197                        if (sub instanceof TopicSubscription) {
198                            view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
199                        } else {
200                            view = new SubscriptionView(context.getClientId(), sub);
201                        }
202                    }
203                    registerSubscription(objectName, sub.getConsumerInfo(), key, view);
204                }
205                subscriptionMap.put(sub, objectName);
206                return objectName;
207            } catch (Exception e) {
208                LOG.error("Failed to register subscription " + sub, e);
209                return null;
210            }
211        }
212    
213        public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
214            Hashtable map = brokerJmxObjectName.getKeyPropertyList();
215            String brokerDomain = brokerJmxObjectName.getDomain();
216            String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
217            String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
218            String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
219            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
220            String persistentMode = "persistentMode=";
221            String consumerId = "";
222            if (sub.getConsumerInfo().isDurable()) {
223                persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
224            } else {
225                persistentMode += "Non-Durable";
226                if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
227                    consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
228                }
229            }
230            objectNameStr += persistentMode + ",";
231            objectNameStr += destinationType + ",";
232            objectNameStr += destinationName + ",";
233            objectNameStr += clientId;
234            objectNameStr += consumerId;
235            return objectNameStr;
236        }
237    
238        @Override
239        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240            Subscription sub = super.addConsumer(context, info);
241            SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
242            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
243            if (inactiveName != null) {
244                // if it was inactive, register it
245                registerSubscription(context, sub);
246            }
247            return sub;
248        }
249    
250        @Override
251        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
252            for (Subscription sub : subscriptionMap.keySet()) {
253                if (sub.getConsumerInfo().equals(info)) {
254                   // unregister all consumer subs
255                   unregisterSubscription(subscriptionMap.get(sub), true);
256                }
257            }
258            super.removeConsumer(context, info);
259        }
260    
261        public void unregisterSubscription(Subscription sub) {
262            ObjectName name = subscriptionMap.remove(sub);
263            if (name != null) {
264                try {
265                    SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
266                    ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
267                    if (inactiveName != null) {
268                        inactiveDurableTopicSubscribers.remove(inactiveName);
269                        managementContext.unregisterMBean(inactiveName);
270                    }
271                } catch (Exception e) {
272                    LOG.error("Failed to unregister subscription " + sub, e);
273                }
274            }
275        }
276    
277        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
278            if (dest.isQueue()) {
279                if (dest.isTemporary()) {
280                    temporaryQueues.put(key, view);
281                } else {
282                    queues.put(key, view);
283                }
284            } else {
285                if (dest.isTemporary()) {
286                    temporaryTopics.put(key, view);
287                } else {
288                    topics.put(key, view);
289                }
290            }
291            try {
292                AnnotatedMBean.registerMBean(managementContext, view, key);
293                registeredMBeans.add(key);
294            } catch (Throwable e) {
295                LOG.warn("Failed to register MBean: " + key);
296                LOG.debug("Failure reason: " + e, e);
297            }
298        }
299    
300        protected void unregisterDestination(ObjectName key) throws Exception {
301    
302            DestinationView view = null;
303            removeAndRemember(topics, key, view);
304            removeAndRemember(queues, key, view);
305            removeAndRemember(temporaryQueues, key, view);
306            removeAndRemember(temporaryTopics, key, view);
307            if (registeredMBeans.remove(key)) {
308                try {
309                    managementContext.unregisterMBean(key);
310                } catch (Throwable e) {
311                    LOG.warn("Failed to unregister MBean: " + key);
312                    LOG.debug("Failure reason: " + e, e);
313                }
314            }
315            if (view != null) {
316                key = view.getSlowConsumerStrategy();
317                if (key!= null && registeredMBeans.remove(key)) {
318                    try {
319                        managementContext.unregisterMBean(key);
320                    } catch (Throwable e) {
321                        LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
322                        LOG.debug("Failure reason: " + e, e);
323                    }
324                }
325            }
326        }
327    
328        private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
329            DestinationView candidate = map.remove(key);
330            if (candidate != null && view == null) {
331                view = candidate;
332            }
333        }
334    
335        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
336            ActiveMQDestination dest = info.getDestination();
337            if (dest.isQueue()) {
338                if (dest.isTemporary()) {
339                    temporaryQueueSubscribers.put(key, view);
340                } else {
341                    queueSubscribers.put(key, view);
342                }
343            } else {
344                if (dest.isTemporary()) {
345                    temporaryTopicSubscribers.put(key, view);
346                } else {
347                    if (info.isDurable()) {
348                        durableTopicSubscribers.put(key, view);
349                        // unregister any inactive durable subs
350                        try {
351                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
352                            if (inactiveName != null) {
353                                inactiveDurableTopicSubscribers.remove(inactiveName);
354                                registeredMBeans.remove(inactiveName);
355                                managementContext.unregisterMBean(inactiveName);
356                            }
357                        } catch (Throwable e) {
358                            LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
359                        }
360                    } else {
361                        topicSubscribers.put(key, view);
362                    }
363                }
364            }
365    
366            try {
367                AnnotatedMBean.registerMBean(managementContext, view, key);
368                registeredMBeans.add(key);
369            } catch (Throwable e) {
370                LOG.warn("Failed to register MBean: " + key);
371                LOG.debug("Failure reason: " + e, e);
372            }
373    
374        }
375    
376        protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
377            queueSubscribers.remove(key);
378            topicSubscribers.remove(key);
379            temporaryQueueSubscribers.remove(key);
380            temporaryTopicSubscribers.remove(key);
381            if (registeredMBeans.remove(key)) {
382                try {
383                    managementContext.unregisterMBean(key);
384                } catch (Throwable e) {
385                    LOG.warn("Failed to unregister MBean: " + key);
386                    LOG.debug("Failure reason: " + e, e);
387                }
388            }
389            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
390            if (view != null) {
391                // need to put this back in the inactive list
392                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
393                if (addToInactive) {
394                    SubscriptionInfo info = new SubscriptionInfo();
395                    info.setClientId(subscriptionKey.getClientId());
396                    info.setSubscriptionName(subscriptionKey.getSubscriptionName());
397                    info.setDestination(new ActiveMQTopic(view.getDestinationName()));
398                    addInactiveSubscription(subscriptionKey, info);
399                }
400            }
401        }
402    
403        protected void buildExistingSubscriptions() throws Exception {
404            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
405            Set destinations = destinationFactory.getDestinations();
406            if (destinations != null) {
407                for (Iterator iter = destinations.iterator(); iter.hasNext();) {
408                    ActiveMQDestination dest = (ActiveMQDestination)iter.next();
409                    if (dest.isTopic()) {                
410                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
411                        if (infos != null) {
412                            for (int i = 0; i < infos.length; i++) {
413                                SubscriptionInfo info = infos[i];
414                                SubscriptionKey key = new SubscriptionKey(info);
415                                if (!alreadyKnown(key)) {
416                                    LOG.debug("Restoring durable subscription mbean: " + info);
417                                    subscriptions.put(key, info);
418                                }
419                            }
420                        }
421                    }
422                }
423            }
424            for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
425                Map.Entry entry = (Entry)i.next();
426                SubscriptionKey key = (SubscriptionKey)entry.getKey();
427                SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
428                addInactiveSubscription(key, info);
429            }
430        }
431    
432        private boolean alreadyKnown(SubscriptionKey key) {
433            boolean known = false;
434            known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
435            if (LOG.isTraceEnabled()) {
436                LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
437            }
438            return known;
439        }
440    
441        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
442            Hashtable map = brokerObjectName.getKeyPropertyList();
443            try {
444                ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
445                                                       + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
446                SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
447    
448                try {
449                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
450                    registeredMBeans.add(objectName);
451                } catch (Throwable e) {
452                    LOG.warn("Failed to register MBean: " + key);
453                    LOG.debug("Failure reason: " + e, e);
454                }
455    
456                inactiveDurableTopicSubscribers.put(objectName, view);
457                subscriptionKeys.put(key, objectName);
458            } catch (Exception e) {
459                LOG.error("Failed to register subscription " + info, e);
460            }
461        }
462    
463        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
464            List<Message> messages = getSubscriberMessages(view);
465            CompositeData c[] = new CompositeData[messages.size()];
466            for (int i = 0; i < c.length; i++) {
467                try {
468                    c[i] = OpenTypeSupport.convert(messages.get(i));
469                } catch (Throwable e) {
470                    LOG.error("failed to browse : " + view, e);
471                }
472            }
473            return c;
474        }
475    
476        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
477            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
478            List<Message> messages = getSubscriberMessages(view);
479            CompositeType ct = factory.getCompositeType();
480            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
481            TabularDataSupport rc = new TabularDataSupport(tt);
482            for (int i = 0; i < messages.size(); i++) {
483                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
484            }
485            return rc;
486        }
487    
488        protected List<Message> getSubscriberMessages(SubscriptionView view) {
489            // TODO It is very dangerous operation for big backlogs
490            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
491                throw new RuntimeException("unsupported by " + destinationFactory);
492            }
493            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
494            final List<Message> result = new ArrayList<Message>();
495            try {
496                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
497                TopicMessageStore store = adapter.createTopicMessageStore(topic);
498                store.recover(new MessageRecoveryListener() {
499                    public boolean recoverMessage(Message message) throws Exception {
500                        result.add(message);
501                        return true;
502                    }
503    
504                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
505                        throw new RuntimeException("Should not be called.");
506                    }
507    
508                    public boolean hasSpace() {
509                        return true;
510                    }
511                    
512                    public boolean isDuplicate(MessageId id) {
513                        return false;
514                    }
515                });
516            } catch (Throwable e) {
517                LOG.error("Failed to browse messages for Subscription " + view, e);
518            }
519            return result;
520    
521        }
522    
523        protected ObjectName[] getTopics() {
524            Set<ObjectName> set = topics.keySet();
525            return set.toArray(new ObjectName[set.size()]);
526        }
527    
528        protected ObjectName[] getQueues() {
529            Set<ObjectName> set = queues.keySet();
530            return set.toArray(new ObjectName[set.size()]);
531        }
532    
533        protected ObjectName[] getTemporaryTopics() {
534            Set<ObjectName> set = temporaryTopics.keySet();
535            return set.toArray(new ObjectName[set.size()]);
536        }
537    
538        protected ObjectName[] getTemporaryQueues() {
539            Set<ObjectName> set = temporaryQueues.keySet();
540            return set.toArray(new ObjectName[set.size()]);
541        }
542    
543        protected ObjectName[] getTopicSubscribers() {
544            Set<ObjectName> set = topicSubscribers.keySet();
545            return set.toArray(new ObjectName[set.size()]);
546        }
547    
548        protected ObjectName[] getDurableTopicSubscribers() {
549            Set<ObjectName> set = durableTopicSubscribers.keySet();
550            return set.toArray(new ObjectName[set.size()]);
551        }
552    
553        protected ObjectName[] getQueueSubscribers() {
554            Set<ObjectName> set = queueSubscribers.keySet();
555            return set.toArray(new ObjectName[set.size()]);
556        }
557    
558        protected ObjectName[] getTemporaryTopicSubscribers() {
559            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
560            return set.toArray(new ObjectName[set.size()]);
561        }
562    
563        protected ObjectName[] getTemporaryQueueSubscribers() {
564            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
565            return set.toArray(new ObjectName[set.size()]);
566        }
567    
568        protected ObjectName[] getInactiveDurableTopicSubscribers() {
569            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
570            return set.toArray(new ObjectName[set.size()]);
571        }
572    
573        public Broker getContextBroker() {
574            return contextBroker;
575        }
576    
577        public void setContextBroker(Broker contextBroker) {
578            this.contextBroker = contextBroker;
579        }
580    
581        protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
582            // Build the object name for the destination
583            Hashtable map = brokerObjectName.getKeyPropertyList();
584            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
585                                                   + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
586                                                   + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
587            return objectName;
588        }
589    
590        public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
591            ObjectName objectName = null;
592            try {
593                objectName = createObjectName(strategy);
594                if (!registeredMBeans.contains(objectName))  {
595                    AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
596                    AnnotatedMBean.registerMBean(managementContext, view, objectName);
597                    registeredMBeans.add(objectName);
598                }
599            } catch (Exception e) {
600                LOG.warn("Failed to register MBean: " + strategy);
601                LOG.debug("Failure reason: " + e, e);
602            }
603            return objectName;
604        }
605    
606        private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
607            Hashtable map = brokerObjectName.getKeyPropertyList();
608            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
609                                + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
610            return objectName;            
611        }
612    
613        public ObjectName getSubscriberObjectName(Subscription key) {
614            return subscriptionMap.get(key);
615        }
616    
617        public Subscription getSubscriber(ObjectName key) {
618            Subscription sub = null;
619            for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
620                if (entry.getValue().equals(key)) {
621                    sub = entry.getKey();
622                    break;
623                }
624            }
625            return sub;
626        }
627    }