001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq;
019    
020    import java.util.List;
021    import javax.jms.JMSException;
022    import org.apache.activemq.command.ConsumerId;
023    import org.apache.activemq.command.MessageDispatch;
024    import org.apache.activemq.thread.Task;
025    import org.apache.activemq.thread.TaskRunner;
026    import org.apache.activemq.util.JMSExceptionSupport;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * A utility class used by the Session for dispatching messages asynchronously
032     * to consumers
033     * 
034     * 
035     * @see javax.jms.Session
036     */
037    public class ActiveMQSessionExecutor implements Task {
038        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSessionExecutor.class);
039    
040        private final ActiveMQSession session;
041        private final MessageDispatchChannel messageQueue;
042        private boolean dispatchedBySessionPool;
043        private volatile TaskRunner taskRunner;
044        private boolean startedOrWarnedThatNotStarted;
045    
046        ActiveMQSessionExecutor(ActiveMQSession session) {
047            this.session = session;
048            if (this.session.connection != null && this.session.connection.isMessagePrioritySupported()) {
049               this.messageQueue = new SimplePriorityMessageDispatchChannel();
050            }else {
051                this.messageQueue = new FifoMessageDispatchChannel();
052            }
053        }
054    
055        void setDispatchedBySessionPool(boolean value) {
056            dispatchedBySessionPool = value;
057            wakeup();
058        }
059    
060        void execute(MessageDispatch message) throws InterruptedException {
061    
062            if (!startedOrWarnedThatNotStarted) {
063    
064                ActiveMQConnection connection = session.connection;
065                long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
066                if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
067                    startedOrWarnedThatNotStarted = true;
068                } else {
069                    long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();
070    
071                    // lets only warn when a significant amount of time has passed
072                    // just in case its normal operation
073                    if (elapsedTime > aboutUnstartedConnectionTimeout) {
074                        LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
075                                 + " Received: " + message);
076                        startedOrWarnedThatNotStarted = true;
077                    }
078                }
079            }
080    
081            if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
082                dispatch(message);
083            } else {
084                messageQueue.enqueue(message);
085                wakeup();
086            }
087        }
088    
089        public void wakeup() {
090            if (!dispatchedBySessionPool) {
091                if (session.isSessionAsyncDispatch()) {
092                    try {
093                        TaskRunner taskRunner = this.taskRunner;
094                        if (taskRunner == null) {
095                            synchronized (this) {
096                                if (this.taskRunner == null) {
097                                    if (!isRunning()) {
098                                        // stop has been called
099                                        return;
100                                    }
101                                    this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
102                                            "ActiveMQ Session: " + session.getSessionId());
103                                }
104                                taskRunner = this.taskRunner;
105                            }
106                        }
107                        taskRunner.wakeup();
108                    } catch (InterruptedException e) {
109                        Thread.currentThread().interrupt();
110                    }
111                } else {
112                    while (iterate()) {
113                    }
114                }
115            }
116        }
117    
118        void executeFirst(MessageDispatch message) {
119            messageQueue.enqueueFirst(message);
120            wakeup();
121        }
122    
123        public boolean hasUncomsumedMessages() {
124            return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
125        }
126    
127        void dispatch(MessageDispatch message) {
128    
129            // TODO - we should use a Map for this indexed by consumerId
130    
131            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
132                ConsumerId consumerId = message.getConsumerId();
133                if (consumerId.equals(consumer.getConsumerId())) {
134                    consumer.dispatch(message);
135                    break;
136                }
137            }
138        }
139    
140        synchronized void start() {
141            if (!messageQueue.isRunning()) {
142                messageQueue.start();
143                if (hasUncomsumedMessages()) {
144                    wakeup();
145                }
146            }
147        }
148    
149        void stop() throws JMSException {
150            try {
151                if (messageQueue.isRunning()) {
152                    synchronized(this) {
153                        messageQueue.stop();
154                        if (this.taskRunner != null) {
155                            this.taskRunner.shutdown();
156                            this.taskRunner = null;
157                        }
158                    }
159                }
160            } catch (InterruptedException e) {
161                Thread.currentThread().interrupt();
162                throw JMSExceptionSupport.create(e);
163            }
164        }
165    
166        boolean isRunning() {
167            return messageQueue.isRunning();
168        }
169    
170        void close() {
171            messageQueue.close();
172        }
173    
174        void clear() {
175            messageQueue.clear();
176        }
177    
178        MessageDispatch dequeueNoWait() {
179            return messageQueue.dequeueNoWait();
180        }
181    
182        protected void clearMessagesInProgress() {
183            messageQueue.clear();
184        }
185    
186        public boolean isEmpty() {
187            return messageQueue.isEmpty();
188        }
189    
190        public boolean iterate() {
191    
192            // Deliver any messages queued on the consumer to their listeners.
193            for (ActiveMQMessageConsumer consumer : this.session.consumers) {
194                if (consumer.iterate()) {
195                    return true;
196                }
197            }
198    
199            // No messages left queued on the listeners.. so now dispatch messages
200            // queued on the session
201            MessageDispatch message = messageQueue.dequeueNoWait();
202            if (message == null) {
203                return false;
204            } else {
205                dispatch(message);
206                return !messageQueue.isEmpty();
207            }
208        }
209    
210        List getUnconsumedMessages() {
211            return messageQueue.removeAll();
212        }
213    
214    }