001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.net.URI;
020    
021    import org.apache.activemq.transport.Transport;
022    import org.apache.activemq.transport.TransportFactory;
023    import org.apache.activemq.util.ServiceStopper;
024    
025    /**
026     * A network connector which uses some kind of multicast-like transport that
027     * communicates with potentially many remote brokers over a single logical
028     * {@link Transport} instance such as when using multicast.
029     * 
030     * This implementation does not depend on multicast at all; any other group
031     * based transport could be used.
032     * 
033     * @org.apache.xbean.XBean
034     * 
035     * 
036     */
037    public class MulticastNetworkConnector extends NetworkConnector {
038    
039        private Transport localTransport;
040        private Transport remoteTransport;
041        private URI remoteURI;
042        private DemandForwardingBridgeSupport bridge;
043    
044        public MulticastNetworkConnector() {
045        }
046    
047        public MulticastNetworkConnector(URI remoteURI) {
048            this.remoteURI = remoteURI;
049        }
050    
051        // Properties
052        // -------------------------------------------------------------------------
053    
054        public DemandForwardingBridgeSupport getBridge() {
055            return bridge;
056        }
057    
058        public void setBridge(DemandForwardingBridgeSupport bridge) {
059            this.bridge = bridge;
060        }
061    
062        public Transport getLocalTransport() {
063            return localTransport;
064        }
065    
066        public void setLocalTransport(Transport localTransport) {
067            this.localTransport = localTransport;
068        }
069    
070        public Transport getRemoteTransport() {
071            return remoteTransport;
072        }
073    
074        /**
075         * Sets the remote transport implementation
076         */
077        public void setRemoteTransport(Transport remoteTransport) {
078            this.remoteTransport = remoteTransport;
079        }
080    
081        public URI getRemoteURI() {
082            return remoteURI;
083        }
084    
085        /**
086         * Sets the remote transport URI to some group transport like
087         * <code>multicast://address:port</code>
088         */
089        public void setRemoteURI(URI remoteURI) {
090            this.remoteURI = remoteURI;
091        }
092    
093        // Implementation methods
094        // -------------------------------------------------------------------------
095    
096        protected void handleStart() throws Exception {
097            if (remoteTransport == null) {
098                if (remoteURI == null) {
099                    throw new IllegalArgumentException("You must specify the remoteURI property");
100                }
101                remoteTransport = TransportFactory.connect(remoteURI);
102            }
103    
104            if (localTransport == null) {
105                localTransport = createLocalTransport();
106            }
107    
108            bridge = createBridge(localTransport, remoteTransport);
109            configureBridge(bridge);
110            bridge.start();
111    
112            // we need to start the transports after we've created the bridge
113            remoteTransport.start();
114            localTransport.start();
115    
116            super.handleStart();
117        }
118    
119        protected void handleStop(ServiceStopper stopper) throws Exception {
120            super.handleStop(stopper);
121            if (bridge != null) {
122                try {
123                    bridge.stop();
124                } catch (Exception e) {
125                    stopper.onException(this, e);
126                }
127            }
128            if (remoteTransport != null) {
129                try {
130                    remoteTransport.stop();
131                } catch (Exception e) {
132                    stopper.onException(this, e);
133                }
134            }
135            if (localTransport != null) {
136                try {
137                    localTransport.stop();
138                } catch (Exception e) {
139                    stopper.onException(this, e);
140                }
141            }
142        }
143    
144        @Override
145        public String toString() {
146            return getClass().getName() + ":" + getName() + "["  + remoteTransport.toString() + "]";
147        }
148    
149        protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
150            CompositeDemandForwardingBridge bridge = new CompositeDemandForwardingBridge(this, local, remote);
151            bridge.setBrokerService(getBrokerService());
152            return bridge;
153        }
154    
155    }