001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport;
019    
020    import java.io.IOException;
021    import java.net.Socket;
022    import java.util.Iterator;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.atomic.AtomicInteger;
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.ReentrantLock;
027    
028    import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
029    import org.apache.activemq.transport.tcp.TimeStampStream;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * This filter implements write timeouts for socket write operations.
035     * When using blocking IO, the Java implementation doesn't have an explicit flag
036     * to set a timeout, and can cause operations to block forever (or until the TCP stack implementation times out the retransmissions,
037     * which is usually around 13-30 minutes).<br/>
038     * To enable this transport, in the transport URI, simpley add<br/>
039     * <code>transport.soWriteTimeout=<value in millis></code>.<br/>
040     * For example (15 second timeout on write operations to the socket):</br>
041     * <pre><code>
042     * &lt;transportConnector 
043     *     name=&quot;tcp1&quot; 
044     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
045     * /&gt;
046     * </code></pre><br/>
047     * For example (enable default timeout on the socket):</br>
048     * <pre><code>
049     * &lt;transportConnector 
050     *     name=&quot;tcp1&quot; 
051     *     uri=&quot;tcp://127.0.0.1:61616?transport.soTimeout=10000&amp;transport.soWriteTimeout=15000"
052     * /&gt;
053     * </code></pre>
054     * @author Filip Hanik
055     *
056     */
057    public class WriteTimeoutFilter extends TransportFilter {
058    
059        private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
060        protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue<WriteTimeoutFilter>();
061        protected static AtomicInteger messageCounter = new AtomicInteger(0);
062        protected static TimeoutThread timeoutThread = new TimeoutThread(); 
063        
064        protected static long sleep = 5000l;
065    
066        protected long writeTimeout = -1;
067        
068        public WriteTimeoutFilter(Transport next) {
069            super(next);
070        }
071    
072        @Override
073        public void oneway(Object command) throws IOException {
074            try {
075                registerWrite(this);
076                super.oneway(command);
077            } catch (IOException x) {
078                throw x;
079            } finally {
080                deRegisterWrite(this,false,null);
081            }
082        }
083        
084        public long getWriteTimeout() {
085            return writeTimeout;
086        }
087    
088        public void setWriteTimeout(long writeTimeout) {
089            this.writeTimeout = writeTimeout;
090        }
091        
092        public static long getSleep() {
093            return sleep;
094        }
095    
096        public static void setSleep(long sleep) {
097            WriteTimeoutFilter.sleep = sleep;
098        }
099    
100        
101        protected TimeStampStream getWriter() {
102            return next.narrow(TimeStampStream.class);
103        }
104        
105        protected Socket getSocket() {
106            return next.narrow(Socket.class);
107        }
108        
109        protected static void registerWrite(WriteTimeoutFilter filter) {
110            writers.add(filter);
111        }
112        
113        protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
114            boolean result = writers.remove(filter); 
115            if (result) {
116                if (fail) {
117                    String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
118                    LOG.warn(message);
119                    Socket sock = filter.getSocket();
120                    if (sock==null) {
121                        LOG.error("Destination socket is null, unable to close socket.("+message+")");
122                    } else {
123                        try {
124                            sock.close();
125                        }catch (IOException ignore) {
126                        }
127                    }
128                }
129            }
130            return result;
131        }
132        
133        @Override
134        public void start() throws Exception {
135            super.start();
136        }
137        
138        @Override
139        public void stop() throws Exception {
140            super.stop();
141        }
142        
143        protected static class TimeoutThread extends Thread {
144            static AtomicInteger instance = new AtomicInteger(0);
145            boolean run = true;
146            public TimeoutThread() {
147                setName("WriteTimeoutFilter-Timeout-"+instance.incrementAndGet());
148                setDaemon(true);
149                setPriority(Thread.MIN_PRIORITY);
150                start();
151            }
152    
153            
154            public void run() {
155                while (run) {
156                    boolean error = false;
157                    try {
158                            if (!interrupted()) {
159                                    Iterator<WriteTimeoutFilter> filters = writers.iterator();
160                                while (run && filters.hasNext()) { 
161                                WriteTimeoutFilter filter = filters.next();
162                                if (filter.getWriteTimeout()<=0) continue; //no timeout set
163                                long writeStart = filter.getWriter().getWriteTimestamp();
164                                long delta = (filter.getWriter().isWriting() && writeStart>0)?System.currentTimeMillis() - writeStart:-1;
165                                if (delta>filter.getWriteTimeout()) {
166                                    WriteTimeoutFilter.deRegisterWrite(filter, true,null);
167                                }//if timeout
168                            }//while
169                        }//if interrupted
170                        try {
171                            Thread.sleep(getSleep());
172                            error = false;
173                        } catch (InterruptedException x) {
174                            //do nothing
175                        }
176                    }catch (Throwable t) { //make sure this thread never dies
177                        if (!error) { //use error flag to avoid filling up the logs
178                            LOG.error("WriteTimeout thread unable validate existing sockets.",t);
179                            error = true;
180                        }
181                    }
182                }
183            }
184        }
185    
186    }