001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.HashMap; 023 import java.util.Iterator; 024 import java.util.Map; 025 026 import org.apache.activemq.broker.BrokerService; 027 import org.apache.activemq.broker.SslContext; 028 import org.apache.activemq.command.DiscoveryEvent; 029 import org.apache.activemq.transport.Transport; 030 import org.apache.activemq.transport.TransportDisposedIOException; 031 import org.apache.activemq.transport.TransportFactory; 032 import org.apache.activemq.transport.discovery.DiscoveryAgent; 033 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 034 import org.apache.activemq.transport.discovery.DiscoveryListener; 035 import org.apache.activemq.util.IntrospectionSupport; 036 import org.apache.activemq.util.ServiceStopper; 037 import org.apache.activemq.util.ServiceSupport; 038 import org.apache.activemq.util.URISupport; 039 import org.apache.activemq.util.URISupport.CompositeData; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 import javax.management.ObjectName; 044 045 /** 046 * A network connector which uses a discovery agent to detect the remote brokers 047 * available and setup a connection to each available remote broker 048 * 049 * @org.apache.xbean.XBean element="networkConnector" 050 * 051 */ 052 public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener { 053 private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkConnector.class); 054 055 private DiscoveryAgent discoveryAgent; 056 057 private Map<String, String> parameters; 058 059 public DiscoveryNetworkConnector() { 060 } 061 062 public DiscoveryNetworkConnector(URI discoveryURI) throws IOException { 063 setUri(discoveryURI); 064 } 065 066 public void setUri(URI discoveryURI) throws IOException { 067 setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); 068 try { 069 parameters = URISupport.parseParameters(discoveryURI); 070 // allow discovery agent to grab it's parameters 071 IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); 072 } catch (URISyntaxException e) { 073 LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); 074 } 075 076 } 077 078 public void onServiceAdd(DiscoveryEvent event) { 079 // Ignore events once we start stopping. 080 if (serviceSupport.isStopped() || serviceSupport.isStopping()) { 081 return; 082 } 083 String url = event.getServiceName(); 084 if (url != null) { 085 URI uri; 086 try { 087 uri = new URI(url); 088 } catch (URISyntaxException e) { 089 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 090 return; 091 } 092 // Should we try to connect to that URI? 093 if( bridges.containsKey(uri) ) { 094 LOG.debug("Discovery agent generated a duplicate onServiceAdd event for: "+uri ); 095 return; 096 } 097 if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) { 098 LOG.debug("not connecting loopback: " + uri); 099 return; 100 } 101 URI connectUri = uri; 102 try { 103 connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); 104 } catch (URISyntaxException e) { 105 LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); 106 } 107 LOG.info("Establishing network connection from " + localURI + " to " + connectUri); 108 109 Transport remoteTransport; 110 Transport localTransport; 111 try { 112 // Allows the transport to access the broker's ssl configuration. 113 SslContext.setCurrentSslContext(getBrokerService().getSslContext()); 114 try { 115 remoteTransport = TransportFactory.connect(connectUri); 116 } catch (Exception e) { 117 LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); 118 LOG.debug("Connection failure exception: " + e, e); 119 return; 120 } 121 try { 122 localTransport = createLocalTransport(); 123 } catch (Exception e) { 124 ServiceSupport.dispose(remoteTransport); 125 LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage()); 126 LOG.debug("Connection failure exception: " + e, e); 127 return; 128 } 129 } finally { 130 SslContext.setCurrentSslContext(null); 131 } 132 NetworkBridge bridge = createBridge(localTransport, remoteTransport, event); 133 try { 134 bridge.start(); 135 bridges.put(uri, bridge); 136 } catch (TransportDisposedIOException e) { 137 LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly stopped before it was correctly started."); 138 } catch (Exception e) { 139 ServiceSupport.dispose(localTransport); 140 ServiceSupport.dispose(remoteTransport); 141 LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e); 142 LOG.debug("Start failure exception: " + e, e); 143 try { 144 discoveryAgent.serviceFailed(event); 145 } catch (IOException e1) { 146 LOG.debug("Discovery agent failure while handling failure event: " + e1.getMessage(), e1); 147 } 148 return; 149 } 150 } 151 } 152 153 public void onServiceRemove(DiscoveryEvent event) { 154 String url = event.getServiceName(); 155 if (url != null) { 156 URI uri; 157 try { 158 uri = new URI(url); 159 } catch (URISyntaxException e) { 160 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); 161 return; 162 } 163 164 NetworkBridge bridge = bridges.remove(uri); 165 if (bridge == null) { 166 return; 167 } 168 169 ServiceSupport.dispose(bridge); 170 } 171 } 172 173 public DiscoveryAgent getDiscoveryAgent() { 174 return discoveryAgent; 175 } 176 177 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 178 this.discoveryAgent = discoveryAgent; 179 if (discoveryAgent != null) { 180 this.discoveryAgent.setDiscoveryListener(this); 181 } 182 } 183 184 protected void handleStart() throws Exception { 185 if (discoveryAgent == null) { 186 throw new IllegalStateException("You must configure the 'discoveryAgent' property"); 187 } 188 this.discoveryAgent.start(); 189 super.handleStart(); 190 } 191 192 protected void handleStop(ServiceStopper stopper) throws Exception { 193 for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) { 194 NetworkBridge bridge = i.next(); 195 try { 196 bridge.stop(); 197 } catch (Exception e) { 198 stopper.onException(this, e); 199 } 200 } 201 try { 202 this.discoveryAgent.stop(); 203 } catch (Exception e) { 204 stopper.onException(this, e); 205 } 206 207 super.handleStop(stopper); 208 } 209 210 protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { 211 class DiscoverNetworkBridgeListener extends MBeanNetworkListener { 212 213 public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { 214 super(brokerService, connectorName); 215 } 216 217 public void bridgeFailed() { 218 if (!serviceSupport.isStopped()) { 219 try { 220 discoveryAgent.serviceFailed(event); 221 } catch (IOException e) { 222 } 223 } 224 225 } 226 } 227 NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName()); 228 229 DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener); 230 result.setBrokerService(getBrokerService()); 231 return configureBridge(result); 232 } 233 234 @Override 235 public String toString() { 236 return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); 237 } 238 }