001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.concurrent.ConcurrentHashMap;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import javax.jms.InvalidSelectorException;
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.broker.Broker;
028    import org.apache.activemq.broker.ConnectionContext;
029    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
030    import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.MessageId;
037    import org.apache.activemq.filter.MessageEvaluationContext;
038    import org.apache.activemq.store.TopicMessageStore;
039    import org.apache.activemq.usage.SystemUsage;
040    import org.apache.activemq.usage.Usage;
041    import org.apache.activemq.usage.UsageListener;
042    import org.apache.activemq.util.SubscriptionKey;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
049        private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
050        private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
051        private final SubscriptionKey subscriptionKey;
052        private final boolean keepDurableSubsActive;
053        private AtomicBoolean active = new AtomicBoolean();
054    
055        public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
056            throws JMSException {
057            super(broker,usageManager, context, info);
058            this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
059            this.pending.setSystemUsage(usageManager);
060            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
061            this.keepDurableSubsActive = keepDurableSubsActive;
062            subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
063            
064        }
065    
066        public final boolean isActive() {
067            return active.get();
068        }
069    
070        public boolean isFull() {
071            return !active.get() || super.isFull();
072        }
073    
074        public void gc() {
075        }
076    
077        /**
078         * store will have a pending ack for all durables, irrespective of the selector
079         * so we need to ack if node is un-matched
080         */
081        public void unmatched(MessageReference node) throws IOException {
082            MessageAck ack = new MessageAck();
083            ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
084            ack.setMessageID(node.getMessageId());
085            node.getRegionDestination().acknowledge(this.getContext(), this, ack, node);
086        }
087    
088        @Override
089        protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) {
090            // statically configured via maxPageSize
091        }
092    
093        public void add(ConnectionContext context, Destination destination) throws Exception {
094            super.add(context, destination);
095            // do it just once per destination
096            if (destinations.containsKey(destination.getActiveMQDestination())) {
097                return;
098            }
099            destinations.put(destination.getActiveMQDestination(), destination);
100    
101            if (active.get() || keepDurableSubsActive) {
102                Topic topic = (Topic)destination;
103                topic.activate(context, this);
104                if (pending.isEmpty(topic)) {
105                    topic.recoverRetroactiveMessages(context, this);
106                }
107                this.enqueueCounter+=pending.size();
108            } else if (destination.getMessageStore() != null) {
109                TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
110                try {
111                    this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
112                } catch (IOException e) {
113                    JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
114                    jmsEx.setLinkedException(e);
115                    throw jmsEx;
116                }
117            }
118            dispatchPending();
119        }
120    
121        public void activate(SystemUsage memoryManager, ConnectionContext context,
122                ConsumerInfo info) throws Exception {
123            if (!active.get()) {
124                this.context = context;
125                this.info = info;
126                LOG.debug("Activating " + this);
127                if (!keepDurableSubsActive) {
128                    for (Iterator<Destination> iter = destinations.values()
129                            .iterator(); iter.hasNext();) {
130                        Topic topic = (Topic) iter.next();
131                        add(context, topic);
132                        topic.activate(context, this);
133                    }
134                }
135                synchronized (pendingLock) {
136                    pending.setSystemUsage(memoryManager);
137                    pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
138                    pending.setMaxAuditDepth(getMaxAuditDepth());
139                    pending.setMaxProducersToAudit(getMaxProducersToAudit());
140                    pending.start();
141                    // If nothing was in the persistent store, then try to use the
142                    // recovery policy.
143                    if (pending.isEmpty()) {
144                        for (Iterator<Destination> iter = destinations.values()
145                                .iterator(); iter.hasNext();) {
146                            Topic topic = (Topic) iter.next();
147                            topic.recoverRetroactiveMessages(context, this);
148                        }
149                    }
150                }
151                this.active.set(true);
152                dispatchPending();
153                this.usageManager.getMemoryUsage().addUsageListener(this);
154            }
155        }
156    
157        public void deactivate(boolean keepDurableSubsActive) throws Exception {
158            LOG.debug("Deactivating " + this);
159            active.set(false);
160            this.usageManager.getMemoryUsage().removeUsageListener(this);
161            synchronized (pending) {
162                pending.stop();
163            }
164            if (!keepDurableSubsActive) {
165                for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
166                    Topic topic = (Topic)iter.next();
167                    topic.deactivate(context, this);
168                }
169            }
170            
171            for (final MessageReference node : dispatched) {
172                // Mark the dispatched messages as redelivered for next time.
173                Integer count = redeliveredMessages.get(node.getMessageId());
174                if (count != null) {
175                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
176                } else {
177                    redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
178                }
179                if (keepDurableSubsActive&& pending.isTransient()) {
180                    synchronized (pending) {
181                        pending.addMessageFirst(node);
182                    }
183                } else {
184                    node.decrementReferenceCount();
185                }
186            }
187            synchronized(dispatched) {
188                dispatched.clear();
189            }
190            if (!keepDurableSubsActive && pending.isTransient()) {
191                synchronized (pending) {
192                    try {
193                        pending.reset();
194                        while (pending.hasNext()) {
195                            MessageReference node = pending.next();
196                            node.decrementReferenceCount();
197                            pending.remove();
198                        }
199                    } finally {
200                        pending.release();
201                    }
202                }
203            }
204            prefetchExtension = 0;
205        }
206    
207        
208        protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
209            MessageDispatch md = super.createMessageDispatch(node, message);
210            Integer count = redeliveredMessages.get(node.getMessageId());
211            if (count != null) {
212                md.setRedeliveryCounter(count.intValue());
213            }
214            return md;
215        }
216    
217        public void add(MessageReference node) throws Exception {
218            if (!active.get() && !keepDurableSubsActive) {
219                return;
220            }
221            super.add(node);
222        }
223    
224        protected void dispatchPending() throws IOException {
225            if (isActive()) {
226                super.dispatchPending();
227            }
228        }
229        
230        protected void doAddRecoveredMessage(MessageReference message) throws Exception {
231            synchronized(pending) {
232                pending.addRecoveredMessage(message);
233            }
234        }
235    
236        public int getPendingQueueSize() {
237            if (active.get() || keepDurableSubsActive) {
238                return super.getPendingQueueSize();
239            }
240            // TODO: need to get from store
241            return 0;
242        }
243    
244        public void setSelector(String selector) throws InvalidSelectorException {
245            throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
246        }
247    
248        protected boolean canDispatch(MessageReference node) {
249            return isActive();
250        }
251    
252        protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
253            node.getRegionDestination().acknowledge(context, this, ack, node);
254            redeliveredMessages.remove(node.getMessageId());
255            node.decrementReferenceCount();
256        }
257    
258        
259        public synchronized String toString() {
260            return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
261                   + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
262        }
263    
264        public SubscriptionKey getSubscriptionKey() {
265            return subscriptionKey;
266        }
267    
268        /**
269         * Release any references that we are holding.
270         */
271        public void destroy() {
272            synchronized (pending) {
273                try {
274    
275                    pending.reset();
276                    while (pending.hasNext()) {
277                        MessageReference node = pending.next();
278                        node.decrementReferenceCount();
279                    }
280    
281                } finally {
282                    pending.release();
283                    pending.clear();
284                }
285            }
286            synchronized(dispatched) {
287                for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
288                    MessageReference node = (MessageReference) iter.next();
289                    node.decrementReferenceCount();
290                }
291                dispatched.clear();
292            }
293            setSlowConsumer(false);
294        }
295    
296        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
297            if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
298                try {
299                    dispatchPending();
300                } catch (IOException e) {
301                    LOG.warn("problem calling dispatchMatched", e);
302                }
303            }
304        }
305        
306        protected boolean isDropped(MessageReference node) {
307           return false;
308        }
309    
310        public boolean isKeepDurableSubsActive() {
311            return keepDurableSubsActive;
312        }
313    }