001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.LinkedList;
021    import java.util.concurrent.atomic.AtomicLong;
022    import javax.jms.JMSException;
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
027    import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
028    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
029    import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
030    import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
031    import org.apache.activemq.command.ConsumerControl;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.MessageDispatchNotification;
037    import org.apache.activemq.command.MessagePull;
038    import org.apache.activemq.command.Response;
039    import org.apache.activemq.transaction.Synchronization;
040    import org.apache.activemq.usage.SystemUsage;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    public class TopicSubscription extends AbstractSubscription {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
047        private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
048        
049        protected PendingMessageCursor matched;
050        protected final SystemUsage usageManager;
051        protected AtomicLong dispatchedCounter = new AtomicLong();
052           
053        boolean singleDestination = true;
054        Destination destination;
055    
056        private int maximumPendingMessages = -1;
057        private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
058        private int discarded;
059        private final Object matchedListMutex = new Object();
060        private final AtomicLong enqueueCounter = new AtomicLong(0);
061        private final AtomicLong dequeueCounter = new AtomicLong(0);
062        private int memoryUsageHighWaterMark = 95;
063        // allow duplicate suppression in a ring network of brokers
064        protected int maxProducersToAudit = 1024;
065        protected int maxAuditDepth = 1000;
066        protected boolean enableAudit = false;
067        protected ActiveMQMessageAudit audit;
068        protected boolean active = false;
069    
070        public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
071            super(broker, context, info);
072            this.usageManager = usageManager;
073            String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
074            if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
075                this.matched = new VMPendingMessageCursor(false);
076            } else {
077                this.matched = new FilePendingMessageCursor(broker,matchedName,false);
078            }
079        }
080    
081        public void init() throws Exception {
082            this.matched.setSystemUsage(usageManager);
083            this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
084            this.matched.start();
085            if (enableAudit) {
086                audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
087            }
088            this.active=true;
089        }
090    
091        public void add(MessageReference node) throws Exception {
092            if (isDuplicate(node)) {
093                return;
094            }
095            enqueueCounter.incrementAndGet();
096            if (!isFull() && matched.isEmpty()  && !isSlave()) {
097                // if maximumPendingMessages is set we will only discard messages which
098                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
099                dispatch(node);
100                setSlowConsumer(false);
101            } else {
102                //we are slow
103                if(!isSlowConsumer()) {
104                    setSlowConsumer(true);
105                    for (Destination dest: destinations) {
106                        dest.slowConsumer(getContext(), this);
107                    }
108                }
109                if (maximumPendingMessages != 0) {
110                    boolean warnedAboutWait = false;
111                    while (active) {
112                        synchronized (matchedListMutex) {
113                            while (matched.isFull()) {
114                                if (getContext().getStopping().get()) {
115                                    LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
116                                            + node.getMessageId());
117                                    enqueueCounter.decrementAndGet();
118                                    return;
119                                }
120                                if (!warnedAboutWait) {
121                                    LOG.info(toString() + ": Pending message cursor [" + matched
122                                            + "] is full, temp usage ("
123                                            + +matched.getSystemUsage().getTempUsage().getPercentUsage()
124                                            + "%) or memory usage ("
125                                            + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
126                                            + "%) limit reached, blocking message add() pending the release of resources.");
127                                    warnedAboutWait = true;
128                                }
129                                matchedListMutex.wait(20);
130                            }
131                            //Temporary storage could be full - so just try to add the message
132                            //see https://issues.apache.org/activemq/browse/AMQ-2475
133                            if (matched.tryAddMessageLast(node, 10)) {
134                                break;
135                            }
136                        }
137                    }
138                    synchronized (matchedListMutex) {
139                        
140                        // NOTE - be careful about the slaveBroker!
141                        if (maximumPendingMessages > 0) {
142                            // calculate the high water mark from which point we
143                            // will eagerly evict expired messages
144                            int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
145                            if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
146                                max = maximumPendingMessages;
147                            }
148                            if (!matched.isEmpty() && matched.size() > max) {
149                                removeExpiredMessages();
150                            }
151                            // lets discard old messages as we are a slow consumer
152                            while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
153                                int pageInSize = matched.size() - maximumPendingMessages;
154                                // only page in a 1000 at a time - else we could
155                                // blow da memory
156                                pageInSize = Math.max(1000, pageInSize);
157                                LinkedList<MessageReference> list = null;
158                                MessageReference[] oldMessages=null;
159                                synchronized(matched){
160                                    list = matched.pageInList(pageInSize);
161                                    oldMessages = messageEvictionStrategy.evictMessages(list);
162                                    for (MessageReference ref : list) {
163                                        ref.decrementReferenceCount();
164                                    }
165                                }
166                                int messagesToEvict = 0;
167                                if (oldMessages != null){
168                                        messagesToEvict = oldMessages.length;
169                                        for (int i = 0; i < messagesToEvict; i++) {
170                                            MessageReference oldMessage = oldMessages[i];
171                                            discard(oldMessage);
172                                        }
173                                }
174                                // lets avoid an infinite loop if we are given a bad
175                                // eviction strategy
176                                // for a bad strategy lets just not evict
177                                if (messagesToEvict == 0) {
178                                    LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
179                                    break;
180                                }
181                            }
182                        }
183                    }
184                    dispatchMatched();
185                }
186            }
187        }
188    
189        private boolean isDuplicate(MessageReference node) {
190            boolean duplicate = false;
191            if (enableAudit && audit != null) {
192                duplicate = audit.isDuplicate(node);
193                if (LOG.isDebugEnabled()) {
194                    if (duplicate) {
195                        LOG.debug("ignoring duplicate add: " + node.getMessageId());
196                    }
197                }
198            }
199            return duplicate;
200        }
201    
202        /**
203         * Discard any expired messages from the matched list. Called from a
204         * synchronized block.
205         * 
206         * @throws IOException
207         */
208        protected void removeExpiredMessages() throws IOException {
209            try {
210                matched.reset();
211                while (matched.hasNext()) {
212                    MessageReference node = matched.next();
213                    node.decrementReferenceCount();
214                    if (broker.isExpired(node)) {
215                        matched.remove();
216                        dispatchedCounter.incrementAndGet();
217                        node.decrementReferenceCount();
218                        node.getRegionDestination().getDestinationStatistics().getExpired().increment();
219                        broker.messageExpired(getContext(), node, this);
220                        break;
221                    }
222                }
223            } finally {
224                matched.release();
225            }
226        }
227    
228        public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
229            synchronized (matchedListMutex) {
230                try {
231                    matched.reset();
232                    while (matched.hasNext()) {
233                        MessageReference node = matched.next();
234                        node.decrementReferenceCount();
235                        if (node.getMessageId().equals(mdn.getMessageId())) {
236                            matched.remove();
237                            dispatchedCounter.incrementAndGet();
238                            node.decrementReferenceCount();
239                            break;
240                        }
241                    }
242                } finally {
243                    matched.release();
244                }
245            }
246        }
247    
248        public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
249            // Handle the standard acknowledgment case.
250            if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
251                if (context.isInTransaction()) {
252                    context.getTransaction().addSynchronization(new Synchronization() {
253    
254                        @Override
255                        public void afterCommit() throws Exception {
256                           synchronized (TopicSubscription.this) {
257                                if (singleDestination && destination != null) {
258                                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
259                                }
260                            }
261                            dequeueCounter.addAndGet(ack.getMessageCount());
262                            dispatchMatched();
263                        }
264                    });
265                } else {
266                    if (singleDestination && destination != null) {
267                        destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
268                        destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
269                    }
270                    dequeueCounter.addAndGet(ack.getMessageCount());
271                }
272                dispatchMatched();
273                return;
274            } else if (ack.isDeliveredAck()) {
275                // Message was delivered but not acknowledged: update pre-fetch
276                // counters.
277                // also. get these for a consumer expired message.
278                if (destination != null && !ack.isInTransaction()) {
279                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
280                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
281                }
282                dequeueCounter.addAndGet(ack.getMessageCount());
283                dispatchMatched();
284                return;
285            } else if (ack.isRedeliveredAck()) {
286                // nothing to do atm
287                return;
288            }
289            throw new JMSException("Invalid acknowledgment: " + ack);
290        }
291    
292        public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
293            // not supported for topics
294            return null;
295        }
296    
297        public int getPendingQueueSize() {
298            return matched();
299        }
300    
301        public int getDispatchedQueueSize() {
302            return (int)(dispatchedCounter.get() - dequeueCounter.get());
303        }
304    
305        public int getMaximumPendingMessages() {
306            return maximumPendingMessages;
307        }
308    
309        public long getDispatchedCounter() {
310            return dispatchedCounter.get();
311        }
312    
313        public long getEnqueueCounter() {
314            return enqueueCounter.get();
315        }
316    
317        public long getDequeueCounter() {
318            return dequeueCounter.get();
319        }
320    
321        /**
322         * @return the number of messages discarded due to being a slow consumer
323         */
324        public int discarded() {
325            synchronized (matchedListMutex) {
326                return discarded;
327            }
328        }
329    
330        /**
331         * @return the number of matched messages (messages targeted for the
332         *         subscription but not yet able to be dispatched due to the
333         *         prefetch buffer being full).
334         */
335        public int matched() {
336            synchronized (matchedListMutex) {
337                return matched.size();
338            }
339        }
340    
341        /**
342         * Sets the maximum number of pending messages that can be matched against
343         * this consumer before old messages are discarded.
344         */
345        public void setMaximumPendingMessages(int maximumPendingMessages) {
346            this.maximumPendingMessages = maximumPendingMessages;
347        }
348    
349        public MessageEvictionStrategy getMessageEvictionStrategy() {
350            return messageEvictionStrategy;
351        }
352    
353        /**
354         * Sets the eviction strategy used to decide which message to evict when the
355         * slow consumer needs to discard messages
356         */
357        public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
358            this.messageEvictionStrategy = messageEvictionStrategy;
359        }
360    
361        public int getMaxProducersToAudit() {
362            return maxProducersToAudit;
363        }
364    
365        public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
366            this.maxProducersToAudit = maxProducersToAudit;
367            if (audit != null) {
368                audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
369            }
370        }
371    
372        public int getMaxAuditDepth() {
373            return maxAuditDepth;
374        }
375        
376        public synchronized void setMaxAuditDepth(int maxAuditDepth) {
377            this.maxAuditDepth = maxAuditDepth;
378            if (audit != null) {
379                audit.setAuditDepth(maxAuditDepth);
380            }
381        }
382        
383        public boolean isEnableAudit() {
384            return enableAudit;
385        }
386    
387        public synchronized void setEnableAudit(boolean enableAudit) {
388            this.enableAudit = enableAudit;
389            if (enableAudit && audit==null) {
390                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
391            }
392        }
393        
394        // Implementation methods
395        // -------------------------------------------------------------------------
396        public boolean isFull() {
397            return getDispatchedQueueSize()  >= info.getPrefetchSize();
398        }
399        
400        public int getInFlightSize() {
401            return getDispatchedQueueSize();
402        }
403        
404        
405        /**
406         * @return true when 60% or more room is left for dispatching messages
407         */
408        public boolean isLowWaterMark() {
409            return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
410        }
411    
412        /**
413         * @return true when 10% or less room is left for dispatching messages
414         */
415        public boolean isHighWaterMark() {
416            return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
417        }
418    
419        /**
420         * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
421         */
422        public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
423            this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
424        }
425    
426        /**
427         * @return the memoryUsageHighWaterMark
428         */
429        public int getMemoryUsageHighWaterMark() {
430            return this.memoryUsageHighWaterMark;
431        }
432    
433        /**
434         * @return the usageManager
435         */
436        public SystemUsage getUsageManager() {
437            return this.usageManager;
438        }
439    
440        /**
441         * @return the matched
442         */
443        public PendingMessageCursor getMatched() {
444            return this.matched;
445        }
446    
447        /**
448         * @param matched the matched to set
449         */
450        public void setMatched(PendingMessageCursor matched) {
451            this.matched = matched;
452        }
453    
454        /**
455         * inform the MessageConsumer on the client to change it's prefetch
456         * 
457         * @param newPrefetch
458         */
459        public void updateConsumerPrefetch(int newPrefetch) {
460            if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
461                ConsumerControl cc = new ConsumerControl();
462                cc.setConsumerId(info.getConsumerId());
463                cc.setPrefetch(newPrefetch);
464                context.getConnection().dispatchAsync(cc);
465            }
466        }
467    
468        private void dispatchMatched() throws IOException {       
469            synchronized (matchedListMutex) {
470                if (!matched.isEmpty() && !isFull()) {
471                    try {
472                        matched.reset();
473                       
474                        while (matched.hasNext() && !isFull()) {
475                            MessageReference message = matched.next();
476                            message.decrementReferenceCount();
477                            matched.remove();
478                            // Message may have been sitting in the matched list a
479                            // while
480                            // waiting for the consumer to ak the message.
481                            if (message.isExpired()) {
482                                discard(message);
483                                continue; // just drop it.
484                            }
485                            dispatch(message);
486                        }
487                    } finally {
488                        matched.release();
489                    }
490                }
491            }
492        }
493    
494        private void dispatch(final MessageReference node) throws IOException {
495            Message message = (Message)node;
496            node.incrementReferenceCount();
497            // Make sure we can dispatch a message.
498            MessageDispatch md = new MessageDispatch();
499            md.setMessage(message);
500            md.setConsumerId(info.getConsumerId());
501            md.setDestination(node.getRegionDestination().getActiveMQDestination());
502            dispatchedCounter.incrementAndGet();
503            // Keep track if this subscription is receiving messages from a single
504            // destination.
505            if (singleDestination) {
506                if (destination == null) {
507                    destination = node.getRegionDestination();
508                } else {
509                    if (destination != node.getRegionDestination()) {
510                        singleDestination = false;
511                    }
512                }
513            }
514            if (info.isDispatchAsync()) {
515                md.setTransmitCallback(new Runnable() {
516    
517                    public void run() {
518                        node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
519                        node.getRegionDestination().getDestinationStatistics().getInflight().increment();
520                        node.decrementReferenceCount();
521                    }
522                });
523                context.getConnection().dispatchAsync(md);
524            } else {
525                context.getConnection().dispatchSync(md);
526                node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
527                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
528                node.decrementReferenceCount();
529            }
530        }
531    
532        private void discard(MessageReference message) {
533            message.decrementReferenceCount();
534            matched.remove(message);
535            discarded++;
536            if(destination != null) {
537                destination.getDestinationStatistics().getDequeues().increment();
538            }
539            if (LOG.isDebugEnabled()) {
540                LOG.debug("Discarding message " + message);
541            }
542            Destination dest = message.getRegionDestination();
543            if (dest != null) {
544                dest.messageDiscarded(getContext(), this, message);
545            }
546            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
547        }
548    
549        @Override
550        public String toString() {
551            return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
552                   + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
553        }
554    
555        public void destroy() {
556            this.active=false;
557            synchronized (matchedListMutex) {
558                try {
559                    matched.destroy();
560                } catch (Exception e) {
561                    LOG.warn("Failed to destroy cursor", e);
562                }
563            }
564            setSlowConsumer(false);
565        }
566    
567        @Override
568        public int getPrefetchSize() {
569            return info.getPrefetchSize();
570        }
571    
572    }