001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    import java.util.concurrent.Executor;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    /**
028     * Manages the thread pool for long running tasks. Long running tasks are not
029     * always active but when they are active, they may need a few iterations of
030     * processing for them to become idle. The manager ensures that each task is
031     * processes but that no one task overtakes the system. This is kinda like
032     * cooperative multitasking.
033     * 
034     * 
035     */
036    public class TaskRunnerFactory implements Executor {
037    
038        private ExecutorService executor;
039        private int maxIterationsPerRun;
040        private String name;
041        private int priority;
042        private boolean daemon;
043        private AtomicLong id = new AtomicLong(0);
044    
045        public TaskRunnerFactory() {
046            this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
047        }
048        
049        private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
050            this(name,priority,daemon,maxIterationsPerRun,false);
051        }
052    
053    
054        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
055    
056            this.name = name;
057            this.priority = priority;
058            this.daemon = daemon;
059            this.maxIterationsPerRun = maxIterationsPerRun;
060    
061            // If your OS/JVM combination has a good thread model, you may want to
062            // avoid
063            // using a thread pool to run tasks and use a DedicatedTaskRunner
064            // instead.
065            if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
066                executor = null;
067            } else {
068                executor = createDefaultExecutor();
069            }
070        }
071    
072        public void shutdown() {
073            if (executor != null) {
074                executor.shutdownNow();
075            }
076        }
077    
078        public TaskRunner createTaskRunner(Task task, String name) {
079            if (executor != null) {
080                return new PooledTaskRunner(executor, task, maxIterationsPerRun);
081            } else {
082                return new DedicatedTaskRunner(task, name, priority, daemon);
083            }
084        }
085    
086        public void execute(Runnable runnable) {
087            execute(runnable, "ActiveMQ Task");
088        }
089        
090        public void execute(Runnable runnable, String name) {
091            if (executor != null) {
092                executor.execute(runnable);
093            } else {
094                new Thread(runnable, name + "-" + id.incrementAndGet()).start();
095            }
096        }
097    
098        protected ExecutorService createDefaultExecutor() {
099            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
100                public Thread newThread(Runnable runnable) {
101                    Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
102                    thread.setDaemon(daemon);
103                    thread.setPriority(priority);
104                    return thread;
105                }
106            });
107            // rc.allowCoreThreadTimeOut(true);
108            return rc;
109        }
110    
111    }