001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.IOException;
020    import java.nio.channels.SocketChannel;
021    import java.util.LinkedList;
022    import java.util.concurrent.Executor;
023    import java.util.concurrent.ExecutorService;
024    import java.util.concurrent.SynchronousQueue;
025    import java.util.concurrent.ThreadFactory;
026    import java.util.concurrent.ThreadPoolExecutor;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * The SelectorManager will manage one Selector and the thread that checks the
031     * selector.
032     * 
033     * We may need to consider running more than one thread to check the selector if
034     * servicing the selector takes too long.
035     * 
036     * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
037     */
038    public final class SelectorManager {
039    
040        public static final SelectorManager SINGLETON = new SelectorManager();
041    
042        private Executor selectorExecutor = createDefaultExecutor();
043        private Executor channelExecutor = selectorExecutor;
044        private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
045        private int maxChannelsPerWorker = 1024;
046        
047        protected ExecutorService createDefaultExecutor() {
048            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
049                public Thread newThread(Runnable runnable) {
050                    return new Thread(runnable, "ActiveMQ NIO Worker");
051                }
052            });
053            // rc.allowCoreThreadTimeOut(true);
054            return rc;
055        }
056        
057        public static SelectorManager getInstance() {
058            return SINGLETON;
059        }
060    
061        public interface Listener {
062            void onSelect(SelectorSelection selector);
063    
064            void onError(SelectorSelection selection, Throwable error);
065        }
066    
067    
068        public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
069            throws IOException {
070    
071            SelectorSelection selection = null;
072            while( selection == null ) {
073                if (freeWorkers.size() > 0) {
074                    SelectorWorker worker = freeWorkers.getFirst();
075                    if( worker.isReleased() ) {
076                        freeWorkers.remove(worker);
077                    } else {
078                        worker.retain();
079                        selection = new SelectorSelection(worker, socketChannel, listener);
080                    }
081                    
082                } else {
083                    // Worker starts /w retain count of 1
084                    SelectorWorker worker = new SelectorWorker(this);
085                    freeWorkers.addFirst(worker);
086                    selection = new SelectorSelection(worker, socketChannel, listener);
087                }
088            }
089            
090            return selection;
091        }
092    
093        synchronized void onWorkerFullEvent(SelectorWorker worker) {
094            freeWorkers.remove(worker);
095        }
096    
097        public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
098            freeWorkers.remove(worker);
099        }
100    
101        public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
102            freeWorkers.addFirst(worker);
103        }
104    
105        public Executor getChannelExecutor() {
106            return channelExecutor;
107        }
108    
109        public void setChannelExecutor(Executor channelExecutor) {
110            this.channelExecutor = channelExecutor;
111        }
112    
113        public int getMaxChannelsPerWorker() {
114            return maxChannelsPerWorker;
115        }
116    
117        public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
118            this.maxChannelsPerWorker = maxChannelsPerWorker;
119        }
120    
121        public Executor getSelectorExecutor() {
122            return selectorExecutor;
123        }
124    
125        public void setSelectorExecutor(Executor selectorExecutor) {
126            this.selectorExecutor = selectorExecutor;
127        }
128    
129    }