001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.plugin;
018    
019    import org.apache.activemq.advisory.AdvisorySupport;
020    import org.apache.activemq.broker.Broker;
021    import org.apache.activemq.broker.BrokerFilter;
022    import org.apache.activemq.broker.BrokerService;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.ProducerBrokerExchange;
025    import org.apache.activemq.broker.region.Destination;
026    import org.apache.activemq.broker.region.DestinationStatistics;
027    import org.apache.activemq.broker.region.RegionBroker;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQMapMessage;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.state.ProducerState;
035    import org.apache.activemq.usage.SystemUsage;
036    import org.apache.activemq.util.IdGenerator;
037    import org.apache.activemq.util.LongSequenceGenerator;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    import java.io.File;
041    import java.net.URI;
042    import java.util.Set;
043    /**
044     * A StatisticsBroker You can retrieve a Map Message for a Destination - or
045     * Broker containing statistics as key-value pairs The message must contain a
046     * replyTo Destination - else its ignored
047     * 
048     */
049    public class StatisticsBroker extends BrokerFilter {
050        private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
051        static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
052        static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
053        private static final IdGenerator ID_GENERATOR = new IdGenerator();
054        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
055        protected final ProducerId advisoryProducerId = new ProducerId();
056    
057        /**
058         * 
059         * Constructor
060         * 
061         * @param next
062         */
063        public StatisticsBroker(Broker next) {
064            super(next);
065            this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
066        }
067    
068        /**
069         * Sets the persistence mode
070         * 
071         * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
072         *      org.apache.activemq.command.Message)
073         */
074        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
075            ActiveMQDestination msgDest = messageSend.getDestination();
076            ActiveMQDestination replyTo = messageSend.getReplyTo();
077            if (replyTo != null) {
078                String physicalName = msgDest.getPhysicalName();
079                boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
080                        STATS_DESTINATION_PREFIX.length());
081                boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
082                        .length());
083                if (destStats) {
084                    String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
085                    ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
086                    Set<Destination> set = getDestinations(queryDest);
087                    for (Destination dest : set) {
088                        DestinationStatistics stats = dest.getDestinationStatistics();
089                        if (stats != null) {
090                            ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
091                            statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
092                            statsMessage.setLong("size", stats.getMessages().getCount());
093                            statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
094                            statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
095                            statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
096                            statsMessage.setLong("expiredCount", stats.getExpired().getCount());
097                            statsMessage.setLong("inflightCount", stats.getInflight().getCount());
098                            statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
099                            statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
100                            statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
101                            statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
102                            statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
103                            statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
104                            statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
105                            statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
106                            statsMessage.setLong("producerCount", stats.getProducers().getCount());
107                            statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
108                            sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
109                        }
110                    }
111                } else if (brokerStats) {
112                    ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
113                    BrokerService brokerService = getBrokerService();
114                    RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
115                    SystemUsage systemUsage = brokerService.getSystemUsage();
116                    DestinationStatistics stats = regionBroker.getDestinationStatistics();
117                    statsMessage.setString("brokerName", regionBroker.getBrokerName());
118                    statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
119                    statsMessage.setLong("size", stats.getMessages().getCount());
120                    statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
121                    statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
122                    statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
123                    statsMessage.setLong("expiredCount", stats.getExpired().getCount());
124                    statsMessage.setLong("inflightCount", stats.getInflight().getCount());
125                    statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
126                    statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
127                    statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
128                    statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
129                    statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
130                    statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
131                    statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
132                    statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
133                    statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
134                    statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
135                    statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
136                    statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
137                    statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
138                    statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
139                    statsMessage.setLong("producerCount", stats.getProducers().getCount());
140                    String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
141                    answer = answer != null ? answer : "";
142                    statsMessage.setString("openwire", answer);
143                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
144                    answer = answer != null ? answer : "";
145                    statsMessage.setString("stomp", answer);
146                    answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
147                    answer = answer != null ? answer : "";
148                    statsMessage.setString("ssl", answer);
149                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
150                    answer = answer != null ? answer : "";
151                    statsMessage.setString("stomp+ssl", answer);
152                    URI uri = brokerService.getVmConnectorURI();
153                    answer = uri != null ? uri.toString() : "";
154                    statsMessage.setString("vm", answer);
155                    File file = brokerService.getDataDirectoryFile();
156                    answer = file != null ? file.getCanonicalPath() : "";
157                    statsMessage.setString("dataDirectory", answer);
158                    statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
159                    sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
160                } else {
161                    super.send(producerExchange, messageSend);
162                }
163            } else {
164                super.send(producerExchange, messageSend);
165            }
166        }
167    
168        public void start() throws Exception {
169            super.start();
170            LOG.info("Starting StatisticsBroker");
171        }
172    
173        public void stop() throws Exception {
174            super.stop();
175        }
176    
177        protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
178                throws Exception {
179            msg.setPersistent(false);
180            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
181            msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
182            msg.setDestination(replyTo);
183            msg.setResponseRequired(false);
184            msg.setProducerId(this.advisoryProducerId);
185            boolean originalFlowControl = context.isProducerFlowControl();
186            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
187            producerExchange.setConnectionContext(context);
188            producerExchange.setMutable(true);
189            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
190            try {
191                context.setProducerFlowControl(false);
192                this.next.send(producerExchange, msg);
193            } finally {
194                context.setProducerFlowControl(originalFlowControl);
195            }
196        }
197    }