001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import org.apache.activemq.broker.*;
020    import org.apache.activemq.broker.region.Destination;
021    import org.apache.activemq.broker.region.DestinationFilter;
022    import org.apache.activemq.broker.region.DestinationInterceptor;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.Message;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored
031     * Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to.
032     *
033     * 
034     * @org.apache.xbean.XBean
035     */
036    public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware {
037        private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueue.class);
038        private String prefix = "VirtualTopic.Mirror.";
039        private String postfix = "";
040        private boolean copyMessage = true;
041        private BrokerService brokerService;
042    
043        public Destination intercept(final Destination destination) {
044            if (destination.getActiveMQDestination().isQueue()) {
045                if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) {
046                    try {
047                        final Destination mirrorDestination = getMirrorDestination(destination);
048                        if (mirrorDestination != null) {
049                            return new DestinationFilter(destination) {
050                                public void send(ProducerBrokerExchange context, Message message) throws Exception {
051                                    message.setDestination(mirrorDestination.getActiveMQDestination());
052                                    mirrorDestination.send(context, message);
053        
054                                    if (isCopyMessage()) {
055                                        message = message.copy();
056                                    }
057                                    message.setDestination(destination.getActiveMQDestination());
058                                    super.send(context, message);
059                                }
060                            };
061                        }
062                    }
063                    catch (Exception e) {
064                        LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
065                    }
066                }
067            }
068            return destination;
069        }
070        
071    
072        public void remove(Destination destination) {
073            if (brokerService == null) {
074                throw new IllegalArgumentException("No brokerService injected!");
075            }
076            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
077            if (topic != null) {
078                try {
079                    brokerService.removeDestination(topic);
080                } catch (Exception e) {
081                    LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
082                }
083            }
084            
085        }
086    
087        public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {}
088    
089        // Properties
090        // -------------------------------------------------------------------------
091    
092        public String getPostfix() {
093            return postfix;
094        }
095    
096        /**
097         * Sets any postix used to identify the queue consumers
098         */
099        public void setPostfix(String postfix) {
100            this.postfix = postfix;
101        }
102    
103        public String getPrefix() {
104            return prefix;
105        }
106    
107        /**
108         * Sets the prefix wildcard used to identify the queue consumers for a given
109         * topic
110         */
111        public void setPrefix(String prefix) {
112            this.prefix = prefix;
113        }
114    
115        public boolean isCopyMessage() {
116            return copyMessage;
117        }
118    
119        /**
120         * Sets whether a copy of the message will be sent to each destination.
121         * Defaults to true so that the forward destination is set as the
122         * destination of the message
123         */
124        public void setCopyMessage(boolean copyMessage) {
125            this.copyMessage = copyMessage;
126        }
127    
128        public void setBrokerService(BrokerService brokerService) {
129            this.brokerService = brokerService;
130        }
131    
132        // Implementation methods
133        //-------------------------------------------------------------------------
134        protected Destination getMirrorDestination(Destination destination) throws Exception {
135            if (brokerService == null) {
136                throw new IllegalArgumentException("No brokerService injected!");
137            }
138            ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
139            return brokerService.getDestination(topic);
140        }
141    
142        protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
143            return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
144        }
145    
146    }