001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashSet;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.Set;
023    import java.util.concurrent.ConcurrentHashMap;
024    import javax.jms.InvalidDestinationException;
025    import javax.jms.JMSException;
026    import org.apache.activemq.advisory.AdvisorySupport;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.policy.PolicyEntry;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ConnectionId;
031    import org.apache.activemq.command.ConsumerId;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.RemoveSubscriptionInfo;
034    import org.apache.activemq.command.SessionId;
035    import org.apache.activemq.command.SubscriptionInfo;
036    import org.apache.activemq.store.TopicMessageStore;
037    import org.apache.activemq.thread.TaskRunnerFactory;
038    import org.apache.activemq.usage.SystemUsage;
039    import org.apache.activemq.util.LongSequenceGenerator;
040    import org.apache.activemq.util.SubscriptionKey;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    /**
045     * 
046     */
047    public class TopicRegion extends AbstractRegion {
048        private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class);
049        protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
050        private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
051        private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
052        private boolean keepDurableSubsActive;
053    
054        public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
055                           DestinationFactory destinationFactory) {
056            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
057    
058        }
059    
060        @Override
061        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
062            if (info.isDurable()) {
063                ActiveMQDestination destination = info.getDestination();
064                if (!destination.isPattern()) {
065                    // Make sure the destination is created.
066                    lookup(context, destination,true);
067                }
068                String clientId = context.getClientId();
069                String subscriptionName = info.getSubscriptionName();
070                SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
071                DurableTopicSubscription sub = durableSubscriptions.get(key);
072                if (sub != null) {
073                    if (sub.isActive()) {
074                        throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
075                    }
076                    // Has the selector changed??
077                    if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
078                        // Remove the consumer first then add it.
079                        durableSubscriptions.remove(key);
080                        synchronized (destinationsMutex) {
081                            for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
082                                Destination dest = iter.next();
083                                //Account for virtual destinations
084                                if (dest instanceof Topic){
085                                    Topic topic = (Topic)dest;
086                                    topic.deleteSubscription(context, key);
087                                }
088                            }
089                        }
090                        super.removeConsumer(context, sub.getConsumerInfo());
091                        super.addConsumer(context, info);
092                        sub = durableSubscriptions.get(key);
093                    } else {
094                        // Change the consumer id key of the durable sub.
095                        if (sub.getConsumerInfo().getConsumerId() != null) {
096                            subscriptions.remove(sub.getConsumerInfo().getConsumerId());
097                        }
098                        subscriptions.put(info.getConsumerId(), sub);
099                    }
100                } else {
101                    super.addConsumer(context, info);
102                    sub = durableSubscriptions.get(key);
103                    if (sub == null) {
104                        throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
105                                               + " subscriberName: " + key.getSubscriptionName());
106                    }
107                }
108                sub.activate(usageManager, context, info);
109                return sub;
110            } else {
111                return super.addConsumer(context, info);
112            }
113        }
114    
115        @Override
116        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117            if (info.isDurable()) {
118    
119                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
120                DurableTopicSubscription sub = durableSubscriptions.get(key);
121                if (sub != null) {
122                    sub.deactivate(keepDurableSubsActive);
123                }
124    
125            } else {
126                super.removeConsumer(context, info);
127            }
128        }
129    
130        @Override
131        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
132            SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
133            DurableTopicSubscription sub = durableSubscriptions.remove(key);
134            if (sub == null) {
135                throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
136            }
137            if (sub.isActive()) {
138                throw new JMSException("Durable consumer is in use");
139            }
140    
141            synchronized (destinationsMutex) {
142                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
143                    Destination dest = iter.next();
144                    //Account for virtual destinations
145                    if (dest instanceof Topic){
146                        Topic topic = (Topic)dest;
147                        topic.deleteSubscription(context, key);
148                    }
149                }
150            }
151            if (subscriptions.get(sub.getConsumerInfo()) != null) {
152                super.removeConsumer(context, sub.getConsumerInfo());
153            } else {
154                // try destroying inactive subscriptions
155                destroySubscription(sub);
156            }
157        }
158    
159        @Override
160        public String toString() {
161            return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
162        }
163    
164        @Override
165        protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
166            List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
167            Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
168    
169            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
170            // Eagerly recover the durable subscriptions
171            if (store != null) {
172                SubscriptionInfo[] infos = store.getAllSubscriptions();
173                for (int i = 0; i < infos.length; i++) {
174    
175                    SubscriptionInfo info = infos[i];
176                    LOG.debug("Restoring durable subscription: " + info);
177                    SubscriptionKey key = new SubscriptionKey(info);
178    
179                    // A single durable sub may be subscribing to multiple topics.
180                    // so it might exist already.
181                    DurableTopicSubscription sub = durableSubscriptions.get(key);
182                    ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
183                    if (sub == null) {
184                        ConnectionContext c = new ConnectionContext();
185                        c.setBroker(context.getBroker());
186                        c.setClientId(key.getClientId());
187                        c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
188                        sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
189                    }
190    
191                    if (dupChecker.contains(sub)) {
192                        continue;
193                    }
194    
195                    dupChecker.add(sub);
196                    rc.add(sub);
197                    dest.addSubscription(context, sub);
198                }
199    
200                // Now perhaps there other durable subscriptions (via wild card)
201                // that would match this destination..
202                durableSubscriptions.values();
203                for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
204                    DurableTopicSubscription sub = iterator.next();
205                    // Skip over subscriptions that we allready added..
206                    if (dupChecker.contains(sub)) {
207                        continue;
208                    }
209    
210                    if (sub.matches(dest.getActiveMQDestination())) {
211                        rc.add(sub);
212                        dest.addSubscription(context, sub);
213                    }
214                }
215            }
216            return rc;
217        }
218    
219        private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
220            ConsumerInfo rc = new ConsumerInfo();
221            rc.setSelector(info.getSelector());
222            rc.setSubscriptionName(info.getSubscriptionName());
223            rc.setDestination(info.getSubscribedDestination());
224            rc.setConsumerId(createConsumerId());
225            return rc;
226        }
227    
228        private ConsumerId createConsumerId() {
229            return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
230        }
231    
232        protected void configureTopic(Topic topic, ActiveMQDestination destination) {
233            if (broker.getDestinationPolicy() != null) {
234                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
235                if (entry != null) {
236                    entry.configure(broker,topic);
237                }
238            }
239        }
240    
241        @Override
242        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
243            ActiveMQDestination destination = info.getDestination();
244            
245            if (info.isDurable()) {
246                if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
247                    throw new JMSException("Cannot create a durable subscription for an advisory Topic");
248                }
249                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
250                DurableTopicSubscription sub = durableSubscriptions.get(key);
251                
252                if (sub == null) {
253                    
254                    sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
255    
256                    if (destination != null && broker.getDestinationPolicy() != null) {
257                        PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
258                        if (entry != null) {
259                            entry.configure(broker, usageManager, sub);
260                        }
261                    }
262                    durableSubscriptions.put(key, sub);
263                } else {
264                    throw new JMSException("That durable subscription is already active.");
265                }
266                return sub;
267            }
268            try {
269                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
270                // lets configure the subscription depending on the destination
271                if (destination != null && broker.getDestinationPolicy() != null) {
272                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
273                    if (entry != null) {
274                        entry.configure(broker, usageManager, answer);
275                    }
276                }
277                answer.init();
278                return answer;
279            } catch (Exception e) {
280                LOG.error("Failed to create TopicSubscription ", e);
281                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
282                jmsEx.setLinkedException(e);
283                throw jmsEx;
284            }
285        }
286    
287        /**
288         */
289        private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
290            if (info1.getSelector() != null ^ info2.getSelector() != null) {
291                return true;
292            }
293            if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
294                return true;
295            }
296            return !info1.getDestination().equals(info2.getDestination());
297        }
298    
299        @Override
300        protected Set<ActiveMQDestination> getInactiveDestinations() {
301            Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
302            for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
303                ActiveMQDestination dest = iter.next();
304                if (!dest.isTopic()) {
305                    iter.remove();
306                }
307            }
308            return inactiveDestinations;
309        }
310    
311        public boolean isKeepDurableSubsActive() {
312            return keepDurableSubsActive;
313        }
314    
315        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
316            this.keepDurableSubsActive = keepDurableSubsActive;
317        }
318    
319        public boolean durableSubscriptionExists(SubscriptionKey key) {
320            return this.durableSubscriptions.containsKey(key);
321        }
322    
323    }