001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.simple;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.Map;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.activemq.command.DiscoveryEvent;
029    import org.apache.activemq.thread.DefaultThreadPools;
030    import org.apache.activemq.transport.discovery.DiscoveryAgent;
031    import org.apache.activemq.transport.discovery.DiscoveryListener;
032    import org.apache.activemq.util.MDCHelper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * A simple DiscoveryAgent that allows static configuration of the discovered
038     * services.
039     * 
040     * 
041     */
042    public class SimpleDiscoveryAgent implements DiscoveryAgent {
043    
044        private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
045        private long initialReconnectDelay = 1000;
046        private long maxReconnectDelay = 1000 * 30;
047        private long backOffMultiplier = 2;
048        private boolean useExponentialBackOff=true;
049        private int maxReconnectAttempts;
050        private final Object sleepMutex = new Object();
051        private long minConnectTime = 5000;
052        private DiscoveryListener listener;
053        private String services[] = new String[] {};
054        private final AtomicBoolean running = new AtomicBoolean(false);
055    
056        class SimpleDiscoveryEvent extends DiscoveryEvent {
057    
058            private int connectFailures;
059            private long reconnectDelay = initialReconnectDelay;
060            private long connectTime = System.currentTimeMillis();
061            private AtomicBoolean failed = new AtomicBoolean(false);
062    
063            public SimpleDiscoveryEvent(String service) {
064                super(service);
065            }
066    
067        }
068    
069        public void setDiscoveryListener(DiscoveryListener listener) {
070            this.listener = listener;
071        }
072    
073        public void registerService(String name) throws IOException {
074        }
075    
076        public void start() throws Exception {
077            running.set(true);
078            for (int i = 0; i < services.length; i++) {
079                listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
080            }
081        }
082    
083        public void stop() throws Exception {
084            running.set(false);
085            synchronized (sleepMutex) {
086                sleepMutex.notifyAll();
087            }
088        }
089    
090        public String[] getServices() {
091            return services;
092        }
093    
094        public void setServices(String services) {
095            this.services = services.split(",");
096        }
097    
098        public void setServices(String services[]) {
099            this.services = services;
100        }
101    
102        public void setServices(URI services[]) {
103            this.services = new String[services.length];
104            for (int i = 0; i < services.length; i++) {
105                this.services[i] = services[i].toString();
106            }
107        }
108    
109        public void serviceFailed(DiscoveryEvent devent) throws IOException {
110    
111            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
112            if (event.failed.compareAndSet(false, true)) {
113    
114                listener.onServiceRemove(event);
115                final Map context = MDCHelper.getCopyOfContextMap();
116                DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
117                    public void run() {
118    
119                        MDCHelper.setContextMap(context);
120    
121                        // We detect a failed connection attempt because the service
122                        // fails right
123                        // away.
124                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
125                            LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
126    
127                            event.connectFailures++;
128    
129                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
130                                LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
131                                return;
132                            }
133    
134                            synchronized (sleepMutex) {
135                                try {
136                                    if (!running.get()) {
137                                        return;
138                                    }
139    
140                                    LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
141                                    sleepMutex.wait(event.reconnectDelay);
142                                } catch (InterruptedException ie) {
143                                    Thread.currentThread().interrupt();
144                                    return;
145                                }
146                            }
147    
148                            if (!useExponentialBackOff) {
149                                event.reconnectDelay = initialReconnectDelay;
150                            } else {
151                                // Exponential increment of reconnect delay.
152                                event.reconnectDelay *= backOffMultiplier;
153                                if (event.reconnectDelay > maxReconnectDelay) {
154                                    event.reconnectDelay = maxReconnectDelay;
155                                }
156                            }
157    
158                        } else {
159                            event.connectFailures = 0;
160                            event.reconnectDelay = initialReconnectDelay;
161                        }
162    
163                        if (!running.get()) {
164                            return;
165                        }
166    
167                        event.connectTime = System.currentTimeMillis();
168                        event.failed.set(false);
169                        listener.onServiceAdd(event);
170                    }
171                }, "Simple Discovery Agent");
172            }
173        }
174    
175        public long getBackOffMultiplier() {
176            return backOffMultiplier;
177        }
178    
179        public void setBackOffMultiplier(long backOffMultiplier) {
180            this.backOffMultiplier = backOffMultiplier;
181        }
182    
183        public long getInitialReconnectDelay() {
184            return initialReconnectDelay;
185        }
186    
187        public void setInitialReconnectDelay(long initialReconnectDelay) {
188            this.initialReconnectDelay = initialReconnectDelay;
189        }
190    
191        public int getMaxReconnectAttempts() {
192            return maxReconnectAttempts;
193        }
194    
195        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
196            this.maxReconnectAttempts = maxReconnectAttempts;
197        }
198    
199        public long getMaxReconnectDelay() {
200            return maxReconnectDelay;
201        }
202    
203        public void setMaxReconnectDelay(long maxReconnectDelay) {
204            this.maxReconnectDelay = maxReconnectDelay;
205        }
206    
207        public long getMinConnectTime() {
208            return minConnectTime;
209        }
210    
211        public void setMinConnectTime(long minConnectTime) {
212            this.minConnectTime = minConnectTime;
213        }
214    
215        public boolean isUseExponentialBackOff() {
216            return useExponentialBackOff;
217        }
218    
219        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
220            this.useExponentialBackOff = useExponentialBackOff;
221        }
222    }