001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import org.apache.activemq.broker.BrokerService;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     * @org.apache.xbean.XBean
030     */
031     public class DefaultIOExceptionHandler implements IOExceptionHandler {
032    
033        private static final Logger LOG = LoggerFactory
034                .getLogger(DefaultIOExceptionHandler.class);
035        private BrokerService broker;
036        private boolean ignoreAllErrors = false;
037        private boolean ignoreNoSpaceErrors = true;
038        private boolean ignoreSQLExceptions = true;
039        private boolean stopStartConnectors = false;
040        private String noSpaceMessage = "space";
041        private String sqlExceptionMessage = ""; // match all
042        private long resumeCheckSleepPeriod = 5*1000;
043        private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044    
045        public void handle(IOException exception) {
046            if (ignoreAllErrors) {
047                LOG.info("Ignoring IO exception, " + exception, exception);
048                return;
049            }
050    
051            if (ignoreNoSpaceErrors) {
052                Throwable cause = exception;
053                while (cause != null && cause instanceof IOException) {
054                    if (cause.getMessage().contains(noSpaceMessage)) {
055                        LOG.info("Ignoring no space left exception, " + exception, exception);
056                        return;
057                    }
058                    cause = cause.getCause();
059                }
060            }
061    
062            if (ignoreSQLExceptions) {
063                Throwable cause = exception;
064                while (cause != null) {
065                    if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) {
066                        LOG.info("Ignoring SQLException, " + exception, cause);
067                        return;
068                    }
069                    cause = cause.getCause();
070                }
071            }
072    
073            if (stopStartConnectors) {
074                if (!stopStartInProgress.compareAndSet(false, true)) {
075                    // we are already working on it
076                    return;
077                }
078                LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
079    
080                new Thread("stop transport connectors on IO exception") {
081                    public void run() {
082                        try {
083                            ServiceStopper stopper = new ServiceStopper();
084                            broker.stopAllConnectors(stopper);
085                        } catch (Exception e) {
086                            LOG.warn("Failure occurred while stopping broker connectors", e);
087                        }
088                    }
089                }.start();
090    
091                // resume again
092                new Thread("restart transport connectors post IO exception") {
093                    public void run() {
094                        try {
095                            while (isPersistenceAdapterDown()) {
096                                LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
097                                TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
098                            }
099                            broker.startAllConnectors();
100                        } catch (Exception e) {
101                            LOG.warn("Failure occurred while restarting broker connectors", e);
102                        } finally {
103                            stopStartInProgress.compareAndSet(true, false);
104                        }
105                    }
106    
107                    private boolean isPersistenceAdapterDown() {
108                        boolean checkpointSuccess = false;
109                        try {
110                            broker.getPersistenceAdapter().checkpoint(true);
111                            checkpointSuccess = true;
112                        } catch (Throwable ignored) {}
113                        return !checkpointSuccess;
114                    }
115                }.start();
116    
117                return;
118            }
119    
120            LOG.info("Stopping the broker due to IO exception, " + exception, exception);
121            new Thread("Stopping the broker due to IO exception") {
122                public void run() {
123                    try {
124                        broker.stop();
125                    } catch (Exception e) {
126                        LOG.warn("Failure occurred while stopping broker", e);
127                    }
128                }
129            }.start();
130        }
131    
132        public void setBrokerService(BrokerService broker) {
133            this.broker = broker;
134        }
135    
136        public boolean isIgnoreAllErrors() {
137            return ignoreAllErrors;
138        }
139    
140        public void setIgnoreAllErrors(boolean ignoreAllErrors) {
141            this.ignoreAllErrors = ignoreAllErrors;
142        }
143    
144        public boolean isIgnoreNoSpaceErrors() {
145            return ignoreNoSpaceErrors;
146        }
147    
148        public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
149            this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
150        }
151    
152        public String getNoSpaceMessage() {
153            return noSpaceMessage;
154        }
155    
156        public void setNoSpaceMessage(String noSpaceMessage) {
157            this.noSpaceMessage = noSpaceMessage;
158        }
159    
160        public boolean isIgnoreSQLExceptions() {
161            return ignoreSQLExceptions;
162        }
163    
164        public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
165            this.ignoreSQLExceptions = ignoreSQLExceptions;
166        }
167    
168        public String getSqlExceptionMessage() {
169            return sqlExceptionMessage;
170        }
171    
172        public void setSqlExceptionMessage(String sqlExceptionMessage) {
173            this.sqlExceptionMessage = sqlExceptionMessage;
174        }
175    
176        public boolean isStopStartConnectors() {
177            return stopStartConnectors;
178        }
179    
180        public void setStopStartConnectors(boolean stopStartConnectors) {
181            this.stopStartConnectors = stopStartConnectors;
182        }
183    
184        public long getResumeCheckSleepPeriod() {
185            return resumeCheckSleepPeriod;
186        }
187    
188        public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
189            this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
190        }
191    }