001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.StringTokenizer;
025    import java.util.concurrent.CopyOnWriteArrayList;
026    import java.util.regex.Pattern;
027    import javax.management.ObjectName;
028    import org.apache.activemq.broker.jmx.ManagedTransportConnector;
029    import org.apache.activemq.broker.jmx.ManagementContext;
030    import org.apache.activemq.broker.region.ConnectorStatistics;
031    import org.apache.activemq.command.BrokerInfo;
032    import org.apache.activemq.command.ConnectionControl;
033    import org.apache.activemq.security.MessageAuthorizationPolicy;
034    import org.apache.activemq.thread.DefaultThreadPools;
035    import org.apache.activemq.thread.TaskRunnerFactory;
036    import org.apache.activemq.transport.Transport;
037    import org.apache.activemq.transport.TransportAcceptListener;
038    import org.apache.activemq.transport.TransportFactory;
039    import org.apache.activemq.transport.TransportServer;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
042    import org.apache.activemq.util.MDCHelper;
043    import org.apache.activemq.util.ServiceStopper;
044    import org.apache.activemq.util.ServiceSupport;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * @org.apache.xbean.XBean
050     * 
051     */
052    public class TransportConnector implements Connector, BrokerServiceAware {
053    
054        final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
055    
056        protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
057        protected TransportStatusDetector statusDector;
058        private BrokerService brokerService;
059        private TransportServer server;
060        private URI uri;
061        private BrokerInfo brokerInfo = new BrokerInfo();
062        private TaskRunnerFactory taskRunnerFactory;
063        private MessageAuthorizationPolicy messageAuthorizationPolicy;
064        private DiscoveryAgent discoveryAgent;
065        private final ConnectorStatistics statistics = new ConnectorStatistics();
066        private URI discoveryUri;
067        private URI connectUri;
068        private String name;
069        private boolean disableAsyncDispatch;
070        private boolean enableStatusMonitor = false;
071        private Broker broker;
072        private boolean updateClusterClients = false;
073        private boolean rebalanceClusterClients;
074        private boolean updateClusterClientsOnRemove = false;
075        private String updateClusterFilter;
076    
077        public TransportConnector() {
078        }
079    
080        public TransportConnector(TransportServer server) {
081            this();
082            setServer(server);
083            if (server != null && server.getConnectURI() != null) {
084                URI uri = server.getConnectURI();
085                if (uri != null && uri.getScheme().equals("vm")) {
086                    setEnableStatusMonitor(false);
087                }
088            }
089    
090        }
091    
092        /**
093         * @return Returns the connections.
094         */
095        public CopyOnWriteArrayList<TransportConnection> getConnections() {
096            return connections;
097        }
098    
099        /**
100         * Factory method to create a JMX managed version of this transport
101         * connector
102         */
103        public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName)
104                throws IOException, URISyntaxException {
105            ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
106            rc.setBrokerInfo(getBrokerInfo());
107            rc.setConnectUri(getConnectUri());
108            rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
109            rc.setDiscoveryAgent(getDiscoveryAgent());
110            rc.setDiscoveryUri(getDiscoveryUri());
111            rc.setEnableStatusMonitor(isEnableStatusMonitor());
112            rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
113            rc.setName(getName());
114            rc.setTaskRunnerFactory(getTaskRunnerFactory());
115            rc.setUri(getUri());
116            rc.setBrokerService(brokerService);
117            rc.setUpdateClusterClients(isUpdateClusterClients());
118            rc.setRebalanceClusterClients(isRebalanceClusterClients());
119            rc.setUpdateClusterFilter(getUpdateClusterFilter());
120            rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
121            return rc;
122        }
123    
124        public BrokerInfo getBrokerInfo() {
125            return brokerInfo;
126        }
127    
128        public void setBrokerInfo(BrokerInfo brokerInfo) {
129            this.brokerInfo = brokerInfo;
130        }
131    
132        /**
133         * 
134         * @deprecated use the {@link #setBrokerService(BrokerService)} method
135         *             instead.
136         */
137        @Deprecated
138        public void setBrokerName(String name) {
139            if (this.brokerInfo == null) {
140                this.brokerInfo = new BrokerInfo();
141            }
142            this.brokerInfo.setBrokerName(name);
143        }
144    
145        public TransportServer getServer() throws IOException, URISyntaxException {
146            if (server == null) {
147                setServer(createTransportServer());
148            }
149            return server;
150        }
151    
152        public void setServer(TransportServer server) {
153            this.server = server;
154        }
155    
156        public URI getUri() {
157            if (uri == null) {
158                try {
159                    uri = getConnectUri();
160                } catch (Throwable e) {
161                }
162            }
163            return uri;
164        }
165    
166        /**
167         * Sets the server transport URI to use if there is not a
168         * {@link TransportServer} configured via the
169         * {@link #setServer(TransportServer)} method. This value is used to lazy
170         * create a {@link TransportServer} instance
171         * 
172         * @param uri
173         */
174        public void setUri(URI uri) {
175            this.uri = uri;
176        }
177    
178        public TaskRunnerFactory getTaskRunnerFactory() {
179            return taskRunnerFactory;
180        }
181    
182        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
183            this.taskRunnerFactory = taskRunnerFactory;
184        }
185    
186        /**
187         * @return the statistics for this connector
188         */
189        public ConnectorStatistics getStatistics() {
190            return statistics;
191        }
192    
193        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
194            return messageAuthorizationPolicy;
195        }
196    
197        /**
198         * Sets the policy used to decide if the current connection is authorized to
199         * consume a given message
200         */
201        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
202            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
203        }
204    
205        public void start() throws Exception {
206            broker = brokerService.getBroker();
207            brokerInfo.setBrokerName(broker.getBrokerName());
208            brokerInfo.setBrokerId(broker.getBrokerId());
209            brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
210            brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
211            brokerInfo.setBrokerURL(getServer().getConnectURI().toString());
212            final Map context = MDCHelper.getCopyOfContextMap();
213            getServer().setAcceptListener(new TransportAcceptListener() {
214                public void onAccept(final Transport transport) {
215                    try {
216                        DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
217                            public void run() {
218                                MDCHelper.setContextMap(context);
219                                try {
220                                    Connection connection = createConnection(transport);
221                                    connection.start();
222                                } catch (Exception e) {
223                                    ServiceSupport.dispose(transport);
224                                    onAcceptError(e);
225                                }
226                            }
227                        });
228                    } catch (Exception e) {
229                        String remoteHost = transport.getRemoteAddress();
230                        ServiceSupport.dispose(transport);
231                        onAcceptError(e, remoteHost);
232                    }
233                }
234    
235                public void onAcceptError(Exception error) {
236                    onAcceptError(error, null);
237                }
238    
239                private void onAcceptError(Exception error, String remoteHost) {
240                    LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": "
241                            + error);
242                    LOG.debug("Reason: " + error, error);
243                }
244            });
245            getServer().setBrokerInfo(brokerInfo);
246            getServer().start();
247    
248            DiscoveryAgent da = getDiscoveryAgent();
249            if (da != null) {
250                da.registerService(getPublishableConnectString());
251                da.start();
252            }
253            if (enableStatusMonitor) {
254                this.statusDector = new TransportStatusDetector(this);
255                this.statusDector.start();
256            }
257    
258            LOG.info("Connector " + getName() + " Started");
259        }
260    
261        public String getPublishableConnectString() throws Exception {
262            String publishableConnectString = null;
263            URI theConnectURI = getConnectUri();
264            if (theConnectURI != null) {
265                publishableConnectString = theConnectURI.toString();
266                // strip off server side query parameters which may not be compatible to
267                // clients
268                if (theConnectURI.getRawQuery() != null) {
269                    publishableConnectString = publishableConnectString.substring(0, publishableConnectString
270                            .indexOf(theConnectURI.getRawQuery()) - 1);
271                }
272            }
273            if (LOG.isDebugEnabled()) {
274                LOG.debug("Publishing: " + publishableConnectString + " for broker transport URI: " + theConnectURI);
275            }
276            return publishableConnectString;
277        }
278    
279        public void stop() throws Exception {
280            ServiceStopper ss = new ServiceStopper();
281            if (discoveryAgent != null) {
282                ss.stop(discoveryAgent);
283            }
284            if (server != null) {
285                ss.stop(server);
286                server = null;
287            }
288            if (this.statusDector != null) {
289                this.statusDector.stop();
290            }
291    
292            for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
293                TransportConnection c = iter.next();
294                ss.stop(c);
295            }
296            ss.throwFirstException();
297            LOG.info("Connector " + getName() + " Stopped");
298        }
299    
300        // Implementation methods
301        // -------------------------------------------------------------------------
302        protected Connection createConnection(Transport transport) throws IOException {
303            TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
304                    : taskRunnerFactory);
305            boolean statEnabled = this.getStatistics().isEnabled();
306            answer.getStatistics().setEnabled(statEnabled);
307            answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
308            return answer;
309        }
310    
311        protected TransportServer createTransportServer() throws IOException, URISyntaxException {
312            if (uri == null) {
313                throw new IllegalArgumentException("You must specify either a server or uri property");
314            }
315            if (brokerService == null) {
316                throw new IllegalArgumentException(
317                        "You must specify the brokerService property. Maybe this connector should be added to a broker?");
318            }
319            return TransportFactory.bind(brokerService, uri);
320        }
321    
322        public DiscoveryAgent getDiscoveryAgent() throws IOException {
323            if (discoveryAgent == null) {
324                discoveryAgent = createDiscoveryAgent();
325            }
326            return discoveryAgent;
327        }
328    
329        protected DiscoveryAgent createDiscoveryAgent() throws IOException {
330            if (discoveryUri != null) {
331                return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
332            }
333            return null;
334        }
335    
336        public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
337            this.discoveryAgent = discoveryAgent;
338        }
339    
340        public URI getDiscoveryUri() {
341            return discoveryUri;
342        }
343    
344        public void setDiscoveryUri(URI discoveryUri) {
345            this.discoveryUri = discoveryUri;
346        }
347    
348        public URI getConnectUri() throws IOException, URISyntaxException {
349            if (connectUri == null) {
350                if (server != null) {
351                    connectUri = server.getConnectURI();
352                }
353            }
354            return connectUri;
355        }
356    
357        public void setConnectUri(URI transportUri) {
358            this.connectUri = transportUri;
359        }
360    
361        public void onStarted(TransportConnection connection) {
362            connections.add(connection);
363        }
364    
365        public void onStopped(TransportConnection connection) {
366            connections.remove(connection);
367        }
368    
369        public String getName() {
370            if (name == null) {
371                uri = getUri();
372                if (uri != null) {
373                    name = uri.toString();
374                }
375            }
376            return name;
377        }
378    
379        public void setName(String name) {
380            this.name = name;
381        }
382    
383        @Override
384        public String toString() {
385            String rc = getName();
386            if (rc == null) {
387                rc = super.toString();
388            }
389            return rc;
390        }
391    
392        protected ConnectionControl getConnectionControl() {
393            boolean rebalance = isRebalanceClusterClients();
394            String connectedBrokers = "";
395            String self = "";
396    
397            if (isUpdateClusterClients()) {
398                if (brokerService.getDefaultSocketURIString() != null) {
399                    self += brokerService.getDefaultSocketURIString();
400                    self += ",";
401                }
402                if (rebalance == false) {
403                    connectedBrokers += self;
404                }
405                if (this.broker.getPeerBrokerInfos() != null) {
406                    for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
407                        if (isMatchesClusterFilter(info.getBrokerName())) {
408                            connectedBrokers += info.getBrokerURL();
409                            connectedBrokers += ",";
410                        }
411                    }
412                }
413                if (rebalance) {
414                    connectedBrokers += self;
415                }
416            }
417    
418            ConnectionControl control = new ConnectionControl();
419            control.setConnectedBrokers(connectedBrokers);
420            control.setRebalanceConnection(rebalance);
421            return control;
422    
423        }
424    
425        public void updateClientClusterInfo() {
426            if (isRebalanceClusterClients() || isUpdateClusterClients()) {
427                ConnectionControl control = getConnectionControl();
428                for (Connection c : this.connections) {
429                    c.updateClient(control);
430                }
431            }
432        }
433    
434        private boolean isMatchesClusterFilter(String brokerName) {
435            boolean result = true;
436            String filter = getUpdateClusterFilter();
437            if (filter != null) {
438                filter = filter.trim();
439                if (filter.length() > 0) {
440                    StringTokenizer tokenizer = new StringTokenizer(filter, ",");
441                    while (result && tokenizer.hasMoreTokens()) {
442                        String token = tokenizer.nextToken();
443                        result = isMatchesClusterFilter(brokerName, token);
444                    }
445                }
446            }
447            return result;
448        }
449    
450        private boolean isMatchesClusterFilter(String brokerName, String match) {
451            boolean result = true;
452            if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
453                result = Pattern.matches(match, brokerName);
454            }
455            return result;
456        }
457    
458        public boolean isDisableAsyncDispatch() {
459            return disableAsyncDispatch;
460        }
461    
462        public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
463            this.disableAsyncDispatch = disableAsyncDispatch;
464        }
465    
466        /**
467         * @return the enableStatusMonitor
468         */
469        public boolean isEnableStatusMonitor() {
470            return enableStatusMonitor;
471        }
472    
473        /**
474         * @param enableStatusMonitor
475         *            the enableStatusMonitor to set
476         */
477        public void setEnableStatusMonitor(boolean enableStatusMonitor) {
478            this.enableStatusMonitor = enableStatusMonitor;
479        }
480    
481        /**
482         * This is called by the BrokerService right before it starts the transport.
483         */
484        public void setBrokerService(BrokerService brokerService) {
485            this.brokerService = brokerService;
486        }
487    
488        public Broker getBroker() {
489            return broker;
490        }
491    
492        public BrokerService getBrokerService() {
493            return brokerService;
494        }
495    
496        /**
497         * @return the updateClusterClients
498         */
499        public boolean isUpdateClusterClients() {
500            return this.updateClusterClients;
501        }
502    
503        /**
504         * @param updateClusterClients
505         *            the updateClusterClients to set
506         */
507        public void setUpdateClusterClients(boolean updateClusterClients) {
508            this.updateClusterClients = updateClusterClients;
509        }
510    
511        /**
512         * @return the rebalanceClusterClients
513         */
514        public boolean isRebalanceClusterClients() {
515            return this.rebalanceClusterClients;
516        }
517    
518        /**
519         * @param rebalanceClusterClients
520         *            the rebalanceClusterClients to set
521         */
522        public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
523            this.rebalanceClusterClients = rebalanceClusterClients;
524        }
525     
526        /**
527         * @return the updateClusterClientsOnRemove
528         */
529        public boolean isUpdateClusterClientsOnRemove() {
530            return this.updateClusterClientsOnRemove;
531        }
532    
533        /**
534         * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
535         */
536        public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
537            this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
538        }
539        
540        /**
541         * @return the updateClusterFilter
542         */
543        public String getUpdateClusterFilter() {
544            return this.updateClusterFilter;
545        }
546    
547        /**
548         * @param updateClusterFilter
549         *            the updateClusterFilter to set
550         */
551        public void setUpdateClusterFilter(String updateClusterFilter) {
552            this.updateClusterFilter = updateClusterFilter;
553        }
554    
555    }