001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    import org.apache.activemq.util.MDCHelper;
020    import org.slf4j.MDC;
021    
022    import java.util.Map;
023    import java.util.concurrent.Executor;
024    
025    /**
026     * 
027     */
028    public class DeterministicTaskRunner implements TaskRunner {
029    
030        private final Executor executor;
031        private final Task task;
032        private final Runnable runable;
033        private boolean shutdown;    
034        
035        /**Constructor
036         * @param executor
037         * @param task
038         */
039        public DeterministicTaskRunner(Executor executor, Task task) {
040            this.executor = executor;
041            this.task = task;
042            final Map context = MDCHelper.getCopyOfContextMap();
043            this.runable = new Runnable() {
044                public void run() {
045                    Thread.currentThread();
046                    if (context != null) {
047                        MDCHelper.setContextMap(context);
048                    }
049                    runTask();
050                }
051            };
052        }
053    
054        /**
055         * We Expect MANY wakeup calls on the same TaskRunner - but each
056         * needs to run
057         */
058        public void wakeup() throws InterruptedException {
059            synchronized (runable) {
060    
061                if (shutdown) {
062                    return;
063                }
064                executor.execute(runable);
065    
066            }
067        }
068    
069        /**
070         * shut down the task
071         * 
072         * @throws InterruptedException
073         */
074        public void shutdown(long timeout) throws InterruptedException {
075            synchronized (runable) {
076                shutdown = true;
077            }
078        }
079    
080        public void shutdown() throws InterruptedException {
081            shutdown(0);
082        }
083    
084        final void runTask() {
085    
086            synchronized (runable) {
087                if (shutdown) {
088                    runable.notifyAll();
089                    return;
090                }
091            }
092            task.iterate();
093        }
094    }