001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.Timer;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    
028    import org.apache.activemq.command.KeepAliveInfo;
029    import org.apache.activemq.command.WireFormatInfo;
030    import org.apache.activemq.thread.SchedulerTimerTask;
031    import org.apache.activemq.wireformat.WireFormat;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Used to make sure that commands are arriving periodically from the peer of
037     * the transport.
038     *
039     * 
040     */
041    public class InactivityMonitor extends TransportFilter {
042    
043        private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
044        private static ThreadPoolExecutor ASYNC_TASKS;
045        private static int CHECKER_COUNTER;
046        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
047        private static Timer  READ_CHECK_TIMER;
048        private static Timer  WRITE_CHECK_TIMER;
049    
050        private WireFormatInfo localWireFormatInfo;
051        private WireFormatInfo remoteWireFormatInfo;
052        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053    
054        private final AtomicBoolean commandSent = new AtomicBoolean(false);
055        private final AtomicBoolean inSend = new AtomicBoolean(false);
056        private final AtomicBoolean failed = new AtomicBoolean(false);
057    
058        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059        private final AtomicBoolean inReceive = new AtomicBoolean(false);
060        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061    
062        private SchedulerTimerTask writeCheckerTask;
063        private SchedulerTimerTask readCheckerTask;
064    
065        private boolean ignoreRemoteWireFormat = false;
066        private boolean ignoreAllWireFormatInfo = false;
067    
068        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
069        private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
070        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
071        private boolean useKeepAlive = true;
072        private boolean keepAliveResponseRequired;
073        private WireFormat wireFormat;
074    
075        private final Runnable readChecker = new Runnable() {
076            long lastRunTime;
077            public void run() {
078                long now = System.currentTimeMillis();
079                long elapsed = (now-lastRunTime);
080    
081                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
082                    LOG.debug(""+elapsed+" ms elapsed since last read check.");
083                }
084    
085                // Perhaps the timer executed a read check late.. and then executes
086                // the next read check on time which causes the time elapsed between
087                // read checks to be small..
088    
089                // If less than 90% of the read check Time elapsed then abort this readcheck.
090                if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
091                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
092                    return;
093                }
094    
095                lastRunTime = now;
096                readCheck();
097            }
098        };
099    
100        private boolean allowReadCheck(long elapsed) {
101            return elapsed > (readCheckTime * 9 / 10);
102        }
103    
104        private final Runnable writeChecker = new Runnable() {
105            long lastRunTime;
106            public void run() {
107                long now = System.currentTimeMillis();
108                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
109                    LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
110    
111                }
112                lastRunTime = now;
113                writeCheck();
114            }
115        };
116    
117        public InactivityMonitor(Transport next, WireFormat wireFormat) {
118            super(next);
119            this.wireFormat = wireFormat;
120            if (this.wireFormat == null) {
121                this.ignoreAllWireFormatInfo = true;
122            }
123        }
124    
125        public void start() throws Exception {
126            next.start();
127            startMonitorThreads();
128        }
129    
130        public void stop() throws Exception {
131            stopMonitorThreads();
132            next.stop();
133        }
134    
135        final void writeCheck() {
136            if (inSend.get()) {
137                if (LOG.isTraceEnabled()) {
138                    LOG.trace("A send is in progress");
139                }
140                return;
141            }
142    
143            if (!commandSent.get() && useKeepAlive) {
144                if (LOG.isTraceEnabled()) {
145                    LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
146                }
147                ASYNC_TASKS.execute(new Runnable() {
148                    public void run() {
149                        if (monitorStarted.get()) {
150                            try {
151    
152                                KeepAliveInfo info = new KeepAliveInfo();
153                                info.setResponseRequired(keepAliveResponseRequired);
154                                oneway(info);
155                            } catch (IOException e) {
156                                onException(e);
157                            }
158                        }
159                    };
160                });
161            } else {
162                if (LOG.isTraceEnabled()) {
163                    LOG.trace(this + " message sent since last write check, resetting flag");
164                }
165            }
166    
167            commandSent.set(false);
168        }
169    
170        final void readCheck() {
171            int currentCounter = next.getReceiveCounter();
172            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
173            if (inReceive.get() || currentCounter!=previousCounter ) {
174                if (LOG.isTraceEnabled()) {
175                    LOG.trace("A receive is in progress");
176                }
177                return;
178            }
179            if (!commandReceived.get()) {
180                if (LOG.isDebugEnabled()) {
181                    LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
182                }
183                ASYNC_TASKS.execute(new Runnable() {
184                    public void run() {
185                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
186                    };
187    
188                });
189            } else {
190                if (LOG.isTraceEnabled()) {
191                    LOG.trace("Message received since last read check, resetting flag: ");
192                }
193            }
194            commandReceived.set(false);
195        }
196    
197        public void onCommand(Object command) {
198            commandReceived.set(true);
199            inReceive.set(true);
200            try {
201                if (command.getClass() == KeepAliveInfo.class) {
202                    KeepAliveInfo info = (KeepAliveInfo) command;
203                    if (info.isResponseRequired()) {
204                        try {
205                            info.setResponseRequired(false);
206                            oneway(info);
207                        } catch (IOException e) {
208                            onException(e);
209                        }
210                    }
211                } else {
212                    if (command.getClass() == WireFormatInfo.class) {
213                        synchronized (this) {
214                            IOException error = null;
215                            remoteWireFormatInfo = (WireFormatInfo) command;
216                            try {
217                                startMonitorThreads();
218                            } catch (IOException e) {
219                                error = e;
220                            }
221                            if (error != null) {
222                                onException(error);
223                            }
224                        }
225                    }
226                    synchronized (readChecker) {
227                        transportListener.onCommand(command);
228                    }
229                }
230            } finally {
231    
232                inReceive.set(false);
233            }
234        }
235    
236        public void oneway(Object o) throws IOException {
237            // Disable inactivity monitoring while processing a command.
238            //synchronize this method - its not synchronized
239            //further down the transport stack and gets called by more
240            //than one thread  by this class
241            synchronized(inSend) {
242                inSend.set(true);
243                try {
244    
245                    if( failed.get() ) {
246                        throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
247                    }
248                    if (o.getClass() == WireFormatInfo.class) {
249                        synchronized (this) {
250                            localWireFormatInfo = (WireFormatInfo)o;
251                            startMonitorThreads();
252                        }
253                    }
254                    next.oneway(o);
255                } finally {
256                    commandSent.set(true);
257                    inSend.set(false);
258                }
259            }
260        }
261    
262        public void onException(IOException error) {
263            if (failed.compareAndSet(false, true)) {
264                stopMonitorThreads();
265                transportListener.onException(error);
266            }
267        }
268    
269        public void setKeepAliveResponseRequired(boolean val) {
270            keepAliveResponseRequired = val;
271        }
272        
273        public void setUseKeepAlive(boolean val) {
274            useKeepAlive = val;
275        }
276    
277        public void setIgnoreRemoteWireFormat(boolean val) {
278            ignoreRemoteWireFormat = val;
279        }
280    
281        public long getReadCheckTime() {
282            return readCheckTime;
283        }
284    
285        public void setReadCheckTime(long readCheckTime) {
286            this.readCheckTime = readCheckTime;
287        }
288    
289        public long getInitialDelayTime() {
290            return initialDelayTime;
291        }
292    
293        public void setInitialDelayTime(long initialDelayTime) {
294            this.initialDelayTime = initialDelayTime;
295        }
296        
297        private synchronized void startMonitorThreads() throws IOException {
298            if (monitorStarted.get()) {
299                return;
300            }
301    
302            if (!configuredOk()) {
303                return;
304            }
305    
306            if (readCheckTime > 0) {
307                monitorStarted.set(true);
308                writeCheckerTask = new SchedulerTimerTask(writeChecker);
309                readCheckerTask = new  SchedulerTimerTask(readChecker);
310                writeCheckTime = readCheckTime>3 ? readCheckTime/3 : readCheckTime;
311                synchronized( InactivityMonitor.class ) {
312                    if( CHECKER_COUNTER == 0 ) {
313                        ASYNC_TASKS = createExecutor();
314                        READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true);
315                        WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true);
316                    }
317                    CHECKER_COUNTER++;
318                    WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime);
319                    READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime);
320                }
321            }
322        }
323    
324        private boolean configuredOk() throws IOException {
325            boolean configured = false;
326            if (ignoreAllWireFormatInfo) {
327                configured = true;
328            } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
329                if (!ignoreRemoteWireFormat) {
330                    if (LOG.isDebugEnabled()) {
331                        LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
332                    }
333                    readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
334                    initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
335                } else {
336                    if (LOG.isDebugEnabled()) {
337                        LOG.debug("Using local: " + localWireFormatInfo);
338                    }
339                    readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
340                    initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
341                }
342                configured = true;
343            }
344            return configured;
345        }
346    
347        /**
348         *
349         */
350        private synchronized void stopMonitorThreads() {
351            if (monitorStarted.compareAndSet(true, false)) {
352                readCheckerTask.cancel();
353                writeCheckerTask.cancel();
354                synchronized( InactivityMonitor.class ) {
355                    WRITE_CHECK_TIMER.purge();
356                    READ_CHECK_TIMER.purge();
357                    CHECKER_COUNTER--;
358                    if(CHECKER_COUNTER==0) {
359                        WRITE_CHECK_TIMER.cancel();
360                        READ_CHECK_TIMER.cancel();
361                        WRITE_CHECK_TIMER = null;
362                        READ_CHECK_TIMER = null;
363                        ASYNC_TASKS.shutdownNow();
364                        ASYNC_TASKS = null;
365                    }
366                }
367            }
368        }
369    
370        private ThreadFactory factory = new ThreadFactory() {
371            public Thread newThread(Runnable runnable) {
372                Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable);
373                thread.setDaemon(true);
374                return thread;
375            }
376        };
377    
378        private ThreadPoolExecutor createExecutor() {
379            return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
380        }
381    }