001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.advisory; 018 019 import java.util.Iterator; 020 import java.util.Map; 021 import java.util.Set; 022 import java.util.concurrent.ConcurrentHashMap; 023 import org.apache.activemq.broker.Broker; 024 import org.apache.activemq.broker.BrokerFilter; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.broker.ProducerBrokerExchange; 027 import org.apache.activemq.broker.region.Destination; 028 import org.apache.activemq.broker.region.MessageReference; 029 import org.apache.activemq.broker.region.Subscription; 030 import org.apache.activemq.broker.region.TopicSubscription; 031 import org.apache.activemq.command.*; 032 import org.apache.activemq.security.SecurityContext; 033 import org.apache.activemq.state.ProducerState; 034 import org.apache.activemq.usage.Usage; 035 import org.apache.activemq.util.IdGenerator; 036 import org.apache.activemq.util.LongSequenceGenerator; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * This broker filter handles tracking the state of the broker for purposes of 042 * publishing advisory messages to advisory consumers. 043 * 044 * 045 */ 046 public class AdvisoryBroker extends BrokerFilter { 047 048 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class); 049 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 050 051 protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); 052 protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>(); 053 protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); 054 protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); 055 protected final ProducerId advisoryProducerId = new ProducerId(); 056 057 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 058 059 public AdvisoryBroker(Broker next) { 060 super(next); 061 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 062 } 063 064 @Override 065 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 066 super.addConnection(context, info); 067 068 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 069 //do not distribute usernames or passwords in advisory 070 ConnectionInfo copy = info.copy(); 071 copy.setUserName(""); 072 copy.setPassword(""); 073 fireAdvisory(context, topic, copy); 074 connections.put(copy.getConnectionId(), copy); 075 } 076 077 @Override 078 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 079 Subscription answer = super.addConsumer(context, info); 080 081 // Don't advise advisory topics. 082 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 083 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); 084 consumers.put(info.getConsumerId(), info); 085 fireConsumerAdvisory(context, info.getDestination(), topic, info); 086 } else { 087 // We need to replay all the previously collected state objects 088 // for this newly added consumer. 089 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) { 090 // Replay the connections. 091 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) { 092 ConnectionInfo value = iter.next(); 093 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 094 fireAdvisory(context, topic, value, info.getConsumerId()); 095 } 096 } 097 098 // We need to replay all the previously collected destination 099 // objects 100 // for this newly added consumer. 101 if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) { 102 // Replay the destinations. 103 for (Iterator<DestinationInfo> iter = destinations.values().iterator(); iter.hasNext();) { 104 DestinationInfo value = iter.next(); 105 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(value.getDestination()); 106 fireAdvisory(context, topic, value, info.getConsumerId()); 107 } 108 } 109 110 // Replay the producers. 111 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) { 112 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) { 113 ProducerInfo value = iter.next(); 114 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); 115 fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); 116 } 117 } 118 119 // Replay the consumers. 120 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { 121 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) { 122 ConsumerInfo value = iter.next(); 123 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); 124 fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); 125 } 126 } 127 } 128 return answer; 129 } 130 131 @Override 132 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 133 super.addProducer(context, info); 134 135 // Don't advise advisory topics. 136 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 137 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); 138 fireProducerAdvisory(context, info.getDestination(), topic, info); 139 producers.put(info.getProducerId(), info); 140 } 141 } 142 143 @Override 144 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { 145 Destination answer = super.addDestination(context, destination,create); 146 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 147 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); 148 DestinationInfo previous = destinations.putIfAbsent(destination, info); 149 if( previous==null ) { 150 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 151 fireAdvisory(context, topic, info); 152 } 153 } 154 return answer; 155 } 156 157 @Override 158 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 159 ActiveMQDestination destination = info.getDestination(); 160 next.addDestinationInfo(context, info); 161 162 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 163 DestinationInfo previous = destinations.putIfAbsent(destination, info); 164 if( previous==null ) { 165 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 166 fireAdvisory(context, topic, info); 167 } 168 } 169 } 170 171 @Override 172 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 173 super.removeDestination(context, destination, timeout); 174 DestinationInfo info = destinations.remove(destination); 175 if (info != null) { 176 info.setDestination(destination); 177 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 178 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); 179 fireAdvisory(context, topic, info); 180 try { 181 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 182 } catch (Exception expectedIfDestinationDidNotExistYet) { 183 } 184 try { 185 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 186 } catch (Exception expectedIfDestinationDidNotExistYet) { 187 } 188 } 189 190 } 191 192 @Override 193 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception { 194 super.removeDestinationInfo(context, destInfo); 195 DestinationInfo info = destinations.remove(destInfo.getDestination()); 196 if (info != null) { 197 info.setDestination(destInfo.getDestination()); 198 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 199 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination()); 200 fireAdvisory(context, topic, info); 201 try { 202 next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1); 203 } catch (Exception expectedIfDestinationDidNotExistYet) { 204 } 205 try { 206 next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1); 207 208 } catch (Exception expectedIfDestinationDidNotExistYet) { 209 } 210 } 211 212 } 213 214 @Override 215 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 216 super.removeConnection(context, info, error); 217 218 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic(); 219 fireAdvisory(context, topic, info.createRemoveCommand()); 220 connections.remove(info.getConnectionId()); 221 } 222 223 @Override 224 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 225 super.removeConsumer(context, info); 226 227 // Don't advise advisory topics. 228 ActiveMQDestination dest = info.getDestination(); 229 if (!AdvisorySupport.isAdvisoryTopic(dest)) { 230 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); 231 consumers.remove(info.getConsumerId()); 232 if (!dest.isTemporary() || destinations.containsKey(dest)) { 233 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand()); 234 } 235 } 236 } 237 238 @Override 239 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 240 super.removeProducer(context, info); 241 242 // Don't advise advisory topics. 243 ActiveMQDestination dest = info.getDestination(); 244 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) { 245 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest); 246 producers.remove(info.getProducerId()); 247 if (!dest.isTemporary() || destinations.contains(dest)) { 248 fireProducerAdvisory(context, dest,topic, info.createRemoveCommand()); 249 } 250 } 251 } 252 253 @Override 254 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) { 255 super.messageExpired(context, messageReference, subscription); 256 try { 257 if(!messageReference.isAdvisory()) { 258 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination()); 259 Message payload = messageReference.getMessage().copy(); 260 payload.clearBody(); 261 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 262 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString()); 263 fireAdvisory(context, topic, payload, null, advisoryMessage); 264 } 265 } catch (Exception e) { 266 handleFireFailure("expired", e); 267 } 268 } 269 270 @Override 271 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 272 super.messageConsumed(context, messageReference); 273 try { 274 if(!messageReference.isAdvisory()) { 275 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination()); 276 Message payload = messageReference.getMessage().copy(); 277 payload.clearBody(); 278 fireAdvisory(context, topic,payload); 279 } 280 } catch (Exception e) { 281 handleFireFailure("consumed", e); 282 } 283 } 284 285 @Override 286 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 287 super.messageDelivered(context, messageReference); 288 try { 289 if (!messageReference.isAdvisory()) { 290 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination()); 291 Message payload = messageReference.getMessage().copy(); 292 payload.clearBody(); 293 fireAdvisory(context, topic,payload); 294 } 295 } catch (Exception e) { 296 handleFireFailure("delivered", e); 297 } 298 } 299 300 @Override 301 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 302 super.messageDiscarded(context, sub, messageReference); 303 try { 304 if (!messageReference.isAdvisory()) { 305 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination()); 306 Message payload = messageReference.getMessage().copy(); 307 payload.clearBody(); 308 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 309 if (sub instanceof TopicSubscription) { 310 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded()); 311 } 312 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString()); 313 fireAdvisory(context, topic, payload, null, advisoryMessage); 314 } 315 } catch (Exception e) { 316 handleFireFailure("discarded", e); 317 } 318 } 319 320 @Override 321 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 322 super.slowConsumer(context, destination,subs); 323 try { 324 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination()); 325 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 326 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString()); 327 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage); 328 } catch (Exception e) { 329 handleFireFailure("slow consumer", e); 330 } 331 } 332 333 @Override 334 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) { 335 super.fastProducer(context, producerInfo); 336 try { 337 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination()); 338 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 339 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString()); 340 fireAdvisory(context, topic, producerInfo, null, advisoryMessage); 341 } catch (Exception e) { 342 handleFireFailure("fast producer", e); 343 } 344 } 345 346 @Override 347 public void isFull(ConnectionContext context, Destination destination, Usage usage) { 348 super.isFull(context, destination, usage); 349 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) { 350 try { 351 352 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination()); 353 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 354 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName()); 355 fireAdvisory(context, topic, null, null, advisoryMessage); 356 357 } catch (Exception e) { 358 handleFireFailure("is full", e); 359 } 360 } 361 } 362 363 @Override 364 public void nowMasterBroker() { 365 super.nowMasterBroker(); 366 try { 367 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic(); 368 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 369 ConnectionContext context = new ConnectionContext(); 370 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 371 context.setBroker(getBrokerService().getBroker()); 372 fireAdvisory(context, topic,null,null,advisoryMessage); 373 } catch (Exception e) { 374 handleFireFailure("now master broker", e); 375 } 376 } 377 378 @Override 379 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 380 Subscription subscription){ 381 super.sendToDeadLetterQueue(context, messageReference, subscription); 382 try { 383 if(!messageReference.isAdvisory()) { 384 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); 385 Message payload = messageReference.getMessage().copy(); 386 payload.clearBody(); 387 fireAdvisory(context, topic,payload); 388 } 389 } catch (Exception e) { 390 handleFireFailure("add to DLQ", e); 391 } 392 } 393 394 @Override 395 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex) { 396 try { 397 if (brokerInfo != null) { 398 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 399 advisoryMessage.setBooleanProperty("started", true); 400 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex); 401 402 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 403 404 ConnectionContext context = new ConnectionContext(); 405 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 406 context.setBroker(getBrokerService().getBroker()); 407 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 408 } 409 } catch (Exception e) { 410 handleFireFailure("network bridge started", e); 411 } 412 } 413 414 @Override 415 public void networkBridgeStopped(BrokerInfo brokerInfo) { 416 try { 417 if (brokerInfo != null) { 418 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 419 advisoryMessage.setBooleanProperty("started", false); 420 421 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic(); 422 423 ConnectionContext context = new ConnectionContext(); 424 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 425 context.setBroker(getBrokerService().getBroker()); 426 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage); 427 } 428 } catch (Exception e) { 429 handleFireFailure("network bridge stopped", e); 430 } 431 } 432 433 private void handleFireFailure(String message, Throwable cause) { 434 LOG.warn("Failed to fire " + message + " advisory, reason: " + cause); 435 if (LOG.isDebugEnabled()) { 436 LOG.debug(message + " detail", cause); 437 } 438 } 439 440 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { 441 fireAdvisory(context, topic, command, null); 442 } 443 444 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 445 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 446 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 447 } 448 449 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { 450 fireConsumerAdvisory(context, consumerDestination,topic, command, null); 451 } 452 453 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 454 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 455 int count = 0; 456 Set<Destination>set = getDestinations(consumerDestination); 457 if (set != null) { 458 for (Destination dest:set) { 459 count += dest.getDestinationStatistics().getConsumers().getCount(); 460 } 461 } 462 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count); 463 464 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 465 } 466 467 protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { 468 fireProducerAdvisory(context,producerDestination, topic, command, null); 469 } 470 471 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { 472 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 473 int count = 0; 474 if (producerDestination != null) { 475 Set<Destination> set = getDestinations(producerDestination); 476 if (set != null) { 477 for (Destination dest : set) { 478 count += dest.getDestinationStatistics().getProducers().getCount(); 479 } 480 } 481 } 482 advisoryMessage.setIntProperty("producerCount", count); 483 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); 484 } 485 486 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception { 487 if (getBrokerService().isStarted()) { 488 //set properties 489 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName()); 490 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; 491 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); 492 493 String url = getBrokerService().getVmConnectorURI().toString(); 494 if (getBrokerService().getDefaultSocketURIString() != null) { 495 url = getBrokerService().getDefaultSocketURIString(); 496 } 497 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); 498 499 //set the data structure 500 advisoryMessage.setDataStructure(command); 501 advisoryMessage.setPersistent(false); 502 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 503 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId())); 504 advisoryMessage.setTargetConsumerId(targetConsumerId); 505 advisoryMessage.setDestination(topic); 506 advisoryMessage.setResponseRequired(false); 507 advisoryMessage.setProducerId(advisoryProducerId); 508 boolean originalFlowControl = context.isProducerFlowControl(); 509 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 510 producerExchange.setConnectionContext(context); 511 producerExchange.setMutable(true); 512 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 513 try { 514 context.setProducerFlowControl(false); 515 next.send(producerExchange, advisoryMessage); 516 } finally { 517 context.setProducerFlowControl(originalFlowControl); 518 } 519 } 520 } 521 522 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() { 523 return connections; 524 } 525 526 public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() { 527 return consumers; 528 } 529 530 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() { 531 return producers; 532 } 533 534 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { 535 return destinations; 536 } 537 }