001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.plugin; 018 019 import org.apache.activemq.advisory.AdvisorySupport; 020 import org.apache.activemq.broker.Broker; 021 import org.apache.activemq.broker.BrokerFilter; 022 import org.apache.activemq.broker.BrokerService; 023 import org.apache.activemq.broker.ConnectionContext; 024 import org.apache.activemq.broker.ProducerBrokerExchange; 025 import org.apache.activemq.broker.region.Destination; 026 import org.apache.activemq.broker.region.DestinationStatistics; 027 import org.apache.activemq.broker.region.RegionBroker; 028 import org.apache.activemq.command.ActiveMQDestination; 029 import org.apache.activemq.command.ActiveMQMapMessage; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageId; 032 import org.apache.activemq.command.ProducerId; 033 import org.apache.activemq.command.ProducerInfo; 034 import org.apache.activemq.state.ProducerState; 035 import org.apache.activemq.usage.SystemUsage; 036 import org.apache.activemq.util.IdGenerator; 037 import org.apache.activemq.util.LongSequenceGenerator; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 import java.io.File; 041 import java.net.URI; 042 import java.util.Set; 043 /** 044 * A StatisticsBroker You can retrieve a Map Message for a Destination - or 045 * Broker containing statistics as key-value pairs The message must contain a 046 * replyTo Destination - else its ignored 047 * 048 */ 049 public class StatisticsBroker extends BrokerFilter { 050 private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); 051 static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; 052 static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; 053 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 054 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 055 protected final ProducerId advisoryProducerId = new ProducerId(); 056 057 /** 058 * 059 * Constructor 060 * 061 * @param next 062 */ 063 public StatisticsBroker(Broker next) { 064 super(next); 065 this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 066 } 067 068 /** 069 * Sets the persistence mode 070 * 071 * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, 072 * org.apache.activemq.command.Message) 073 */ 074 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 075 ActiveMQDestination msgDest = messageSend.getDestination(); 076 ActiveMQDestination replyTo = messageSend.getReplyTo(); 077 if (replyTo != null) { 078 String physicalName = msgDest.getPhysicalName(); 079 boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, 080 STATS_DESTINATION_PREFIX.length()); 081 boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX 082 .length()); 083 if (destStats) { 084 String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); 085 ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType()); 086 Set<Destination> set = getDestinations(queryDest); 087 for (Destination dest : set) { 088 DestinationStatistics stats = dest.getDestinationStatistics(); 089 if (stats != null) { 090 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 091 statsMessage.setString("destinationName", dest.getActiveMQDestination().toString()); 092 statsMessage.setLong("size", stats.getMessages().getCount()); 093 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 094 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 095 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 096 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 097 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 098 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 099 statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); 100 statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); 101 statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); 102 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 103 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 104 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 105 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 106 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 107 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 108 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 109 } 110 } 111 } else if (brokerStats) { 112 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 113 BrokerService brokerService = getBrokerService(); 114 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 115 SystemUsage systemUsage = brokerService.getSystemUsage(); 116 DestinationStatistics stats = regionBroker.getDestinationStatistics(); 117 statsMessage.setString("brokerName", regionBroker.getBrokerName()); 118 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); 119 statsMessage.setLong("size", stats.getMessages().getCount()); 120 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 121 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 122 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 123 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 124 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 125 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 126 statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); 127 statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage()); 128 statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit()); 129 statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage()); 130 statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage()); 131 statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit()); 132 statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage()); 133 statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage()); 134 statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit()); 135 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 136 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 137 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 138 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 139 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 140 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); 141 answer = answer != null ? answer : ""; 142 statsMessage.setString("openwire", answer); 143 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); 144 answer = answer != null ? answer : ""; 145 statsMessage.setString("stomp", answer); 146 answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); 147 answer = answer != null ? answer : ""; 148 statsMessage.setString("ssl", answer); 149 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); 150 answer = answer != null ? answer : ""; 151 statsMessage.setString("stomp+ssl", answer); 152 URI uri = brokerService.getVmConnectorURI(); 153 answer = uri != null ? uri.toString() : ""; 154 statsMessage.setString("vm", answer); 155 File file = brokerService.getDataDirectoryFile(); 156 answer = file != null ? file.getCanonicalPath() : ""; 157 statsMessage.setString("dataDirectory", answer); 158 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 159 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 160 } else { 161 super.send(producerExchange, messageSend); 162 } 163 } else { 164 super.send(producerExchange, messageSend); 165 } 166 } 167 168 public void start() throws Exception { 169 super.start(); 170 LOG.info("Starting StatisticsBroker"); 171 } 172 173 public void stop() throws Exception { 174 super.stop(); 175 } 176 177 protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo) 178 throws Exception { 179 msg.setPersistent(false); 180 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 181 msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId())); 182 msg.setDestination(replyTo); 183 msg.setResponseRequired(false); 184 msg.setProducerId(this.advisoryProducerId); 185 boolean originalFlowControl = context.isProducerFlowControl(); 186 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 187 producerExchange.setConnectionContext(context); 188 producerExchange.setMutable(true); 189 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 190 try { 191 context.setProducerFlowControl(false); 192 this.next.send(producerExchange, msg); 193 } finally { 194 context.setProducerFlowControl(originalFlowControl); 195 } 196 } 197 }