001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.security.GeneralSecurityException;
021    import java.security.cert.X509Certificate;
022    import java.util.*;
023    import java.util.concurrent.ConcurrentHashMap;
024    import java.util.concurrent.CountDownLatch;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicLong;
028    
029    import org.apache.activemq.Service;
030    import org.apache.activemq.advisory.AdvisorySupport;
031    import org.apache.activemq.broker.BrokerService;
032    import org.apache.activemq.broker.BrokerServiceAware;
033    import org.apache.activemq.broker.TransportConnection;
034    import org.apache.activemq.broker.region.AbstractRegion;
035    import org.apache.activemq.broker.region.RegionBroker;
036    import org.apache.activemq.broker.region.Subscription;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQMessage;
039    import org.apache.activemq.command.ActiveMQTempDestination;
040    import org.apache.activemq.command.ActiveMQTopic;
041    import org.apache.activemq.command.BrokerId;
042    import org.apache.activemq.command.BrokerInfo;
043    import org.apache.activemq.command.Command;
044    import org.apache.activemq.command.ConnectionError;
045    import org.apache.activemq.command.ConnectionId;
046    import org.apache.activemq.command.ConnectionInfo;
047    import org.apache.activemq.command.ConsumerId;
048    import org.apache.activemq.command.ConsumerInfo;
049    import org.apache.activemq.command.DataStructure;
050    import org.apache.activemq.command.DestinationInfo;
051    import org.apache.activemq.command.ExceptionResponse;
052    import org.apache.activemq.command.KeepAliveInfo;
053    import org.apache.activemq.command.Message;
054    import org.apache.activemq.command.MessageAck;
055    import org.apache.activemq.command.MessageDispatch;
056    import org.apache.activemq.command.NetworkBridgeFilter;
057    import org.apache.activemq.command.ProducerInfo;
058    import org.apache.activemq.command.RemoveInfo;
059    import org.apache.activemq.command.Response;
060    import org.apache.activemq.command.SessionInfo;
061    import org.apache.activemq.command.ShutdownInfo;
062    import org.apache.activemq.command.WireFormatInfo;
063    import org.apache.activemq.filter.DestinationFilter;
064    import org.apache.activemq.filter.MessageEvaluationContext;
065    import org.apache.activemq.thread.DefaultThreadPools;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.transport.DefaultTransportListener;
068    import org.apache.activemq.transport.FutureResponse;
069    import org.apache.activemq.transport.ResponseCallback;
070    import org.apache.activemq.transport.Transport;
071    import org.apache.activemq.transport.TransportDisposedIOException;
072    import org.apache.activemq.transport.TransportFilter;
073    import org.apache.activemq.transport.TransportListener;
074    import org.apache.activemq.transport.tcp.SslTransport;
075    import org.apache.activemq.util.*;
076    import org.slf4j.Logger;
077    import org.slf4j.LoggerFactory;
078    import org.slf4j.MDC;
079    
080    /**
081     * A useful base class for implementing demand forwarding bridges.
082     * 
083     * 
084     */
085    public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
086        private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
087        private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
088        protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
089        protected final Transport localBroker;
090        protected final Transport remoteBroker;
091        protected final IdGenerator idGenerator = new IdGenerator();
092        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
093        protected ConnectionInfo localConnectionInfo;
094        protected ConnectionInfo remoteConnectionInfo;
095        protected SessionInfo localSessionInfo;
096        protected ProducerInfo producerInfo;
097        protected String remoteBrokerName = "Unknown";
098        protected String localClientId;
099        protected ConsumerInfo demandConsumerInfo;
100        protected int demandConsumerDispatched;
101        protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
102        protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
103        protected AtomicBoolean disposed = new AtomicBoolean();
104        protected BrokerId localBrokerId;
105        protected ActiveMQDestination[] excludedDestinations;
106        protected ActiveMQDestination[] dynamicallyIncludedDestinations;
107        protected ActiveMQDestination[] staticallyIncludedDestinations;
108        protected ActiveMQDestination[] durableDestinations;
109        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
110        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
111        protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
112        protected CountDownLatch startedLatch = new CountDownLatch(2);
113        protected CountDownLatch localStartedLatch = new CountDownLatch(1);
114        protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
115        protected CountDownLatch localBrokerIdKnownLatch = new CountDownLatch(1);
116        protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
117        protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
118        protected NetworkBridgeConfiguration configuration;
119    
120        final AtomicLong enqueueCounter = new AtomicLong();
121        final AtomicLong dequeueCounter = new AtomicLong();
122    
123        private NetworkBridgeListener networkBridgeListener;
124        private boolean createdByDuplex;
125        private BrokerInfo localBrokerInfo;
126        private BrokerInfo remoteBrokerInfo;
127    
128        private final AtomicBoolean started = new AtomicBoolean();
129        private TransportConnection duplexInitiatingConnection;
130        private BrokerService brokerService = null;
131    
132        public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
133            this.configuration = configuration;
134            this.localBroker = localBroker;
135            this.remoteBroker = remoteBroker;
136        }
137    
138        public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
139            this.localBrokerInfo = localBrokerInfo;
140            this.remoteBrokerInfo = remoteBrokerInfo;
141            this.duplexInitiatingConnection = connection;
142            start();
143            serviceRemoteCommand(remoteBrokerInfo);
144        }
145    
146        public void start() throws Exception {
147            if (started.compareAndSet(false, true)) {
148                localBroker.setTransportListener(new DefaultTransportListener() {
149    
150                    @Override
151                    public void onCommand(Object o) {
152                        Command command = (Command) o;
153                        serviceLocalCommand(command);
154                    }
155    
156                    @Override
157                    public void onException(IOException error) {
158                        serviceLocalException(error);
159                    }
160                });
161                remoteBroker.setTransportListener(new TransportListener() {
162    
163                    public void onCommand(Object o) {
164                        Command command = (Command) o;
165                        serviceRemoteCommand(command);
166                    }
167    
168                    public void onException(IOException error) {
169                        serviceRemoteException(error);
170                    }
171    
172                    public void transportInterupted() {
173                        // clear any subscriptions - to try and prevent the bridge
174                        // from stalling the broker
175                        if (remoteInterupted.compareAndSet(false, true)) {
176                            LOG.info("Outbound transport to " + remoteBrokerName + " interrupted.");
177                            if (localBridgeStarted.get()) {
178                                clearDownSubscriptions();
179                                synchronized (DemandForwardingBridgeSupport.this) {
180                                    try {
181                                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
182                                    } catch (TransportDisposedIOException td) {
183                                        LOG.debug("local broker is now disposed", td);
184                                    } catch (IOException e) {
185                                        LOG.warn("Caught exception from local start", e);
186                                    }
187                                }
188                            }
189                            localBridgeStarted.set(false);
190                            remoteBridgeStarted.set(false);
191                            startedLatch = new CountDownLatch(2);
192                            localStartedLatch = new CountDownLatch(1);
193                        }
194                    }
195    
196                    public void transportResumed() {
197                        if (remoteInterupted.compareAndSet(true, false)) {
198                            // We want to slow down false connects so that we don't
199                            // get in a busy loop.
200                            // False connects can occurr if you using SSH tunnels.
201                            if (!lastConnectSucceeded.get()) {
202                                try {
203                                    LOG.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
204                                    Thread.sleep(1000);
205                                } catch (InterruptedException e) {
206                                    Thread.currentThread().interrupt();
207                                }
208                            }
209                            lastConnectSucceeded.set(false);
210                            try {
211                                startLocalBridge();
212                                remoteBridgeStarted.set(true);
213                                startedLatch.countDown();
214                                LOG.info("Outbound transport to " + remoteBrokerName + " resumed");
215                            } catch (Throwable e) {
216                                LOG.error("Caught exception  from local start in resume transport", e);
217                                serviceLocalException(e);
218                            }
219                        }
220                    }
221                });
222    
223                localBroker.start();
224                remoteBroker.start();
225                if (!disposed.get()) {
226                    try {
227                        triggerRemoteStartBridge();
228                    } catch (IOException e) {
229                        LOG.warn("Caught exception from remote start", e);
230                    }
231                } else {
232                    LOG.warn ("Bridge was disposed before the start() method was fully executed.");
233                    throw new TransportDisposedIOException();
234                }
235            }
236        }
237    
238        protected void triggerLocalStartBridge() throws IOException {
239            final Map context = MDCHelper.getCopyOfContextMap();
240            asyncTaskRunner.execute(new Runnable() {
241                public void run() {
242                    MDCHelper.setContextMap(context);
243                    final String originalName = Thread.currentThread().getName();
244                    Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
245                    try {
246                        startLocalBridge();
247                    } catch (Throwable e) {
248                        serviceLocalException(e);
249                    } finally {
250                        Thread.currentThread().setName(originalName);
251                    }
252                }
253            });
254        }
255    
256        protected void triggerRemoteStartBridge() throws IOException {
257            final Map context = MDCHelper.getCopyOfContextMap();
258            asyncTaskRunner.execute(new Runnable() {
259                public void run() {
260                    MDCHelper.setContextMap(context);
261                    final String originalName = Thread.currentThread().getName();
262                    Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
263                    try {
264                        startRemoteBridge();
265                    } catch (Exception e) {
266                        serviceRemoteException(e);
267                    } finally {
268                        Thread.currentThread().setName(originalName);
269                    }
270                }
271            });
272        }
273    
274        protected void startLocalBridge() throws Throwable {
275            if (localBridgeStarted.compareAndSet(false, true)) {
276                synchronized (this) {
277                    if (LOG.isTraceEnabled()) {
278                        LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
279                    }
280                    remoteBrokerNameKnownLatch.await();
281    
282                    if (!disposed.get()) {
283                        localConnectionInfo = new ConnectionInfo();
284                        localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
285                        localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
286                        localConnectionInfo.setClientId(localClientId);
287                        localConnectionInfo.setUserName(configuration.getUserName());
288                        localConnectionInfo.setPassword(configuration.getPassword());
289                        Transport originalTransport = remoteBroker;
290                        while (originalTransport instanceof TransportFilter) {
291                            originalTransport = ((TransportFilter) originalTransport).getNext();
292                        }
293                        if (originalTransport instanceof SslTransport) {
294                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
295                            localConnectionInfo.setTransportContext(peerCerts);
296                        }
297                        // sync requests that may fail
298                        Object resp = localBroker.request(localConnectionInfo);
299                        if (resp instanceof ExceptionResponse) {
300                            throw ((ExceptionResponse)resp).getException();
301                        }
302                        localSessionInfo = new SessionInfo(localConnectionInfo, 1);
303                        localBroker.oneway(localSessionInfo);
304    
305                        brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
306                        NetworkBridgeListener l = this.networkBridgeListener;
307                        if (l != null) {
308                            l.onStart(this);
309                        }
310                        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
311    
312                    } else {
313                        LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
314                    }
315                    startedLatch.countDown();
316                    localStartedLatch.countDown();
317                    if (!disposed.get()) {
318                        setupStaticDestinations();
319                    } else {
320                        LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
321                    }
322                }
323            }
324        }
325    
326        protected void startRemoteBridge() throws Exception {
327            if (remoteBridgeStarted.compareAndSet(false, true)) {
328                if (LOG.isTraceEnabled()) {
329                    LOG.trace(configuration.getBrokerName() + " starting remote Bridge, localBroker=" + localBroker);
330                }
331                synchronized (this) {
332                    if (!isCreatedByDuplex()) {
333                        BrokerInfo brokerInfo = new BrokerInfo();
334                        brokerInfo.setBrokerName(configuration.getBrokerName());
335                        brokerInfo.setBrokerURL(configuration.getBrokerURL());
336                        brokerInfo.setNetworkConnection(true);
337                        brokerInfo.setDuplexConnection(configuration.isDuplex());
338                        // set our properties
339                        Properties props = new Properties();
340                        IntrospectionSupport.getProperties(configuration, props, null);
341                        String str = MarshallingSupport.propertiesToString(props);
342                        brokerInfo.setNetworkProperties(str);
343                        brokerInfo.setBrokerId(this.localBrokerId);
344                        remoteBroker.oneway(brokerInfo);
345                    }
346                    if (remoteConnectionInfo != null) {
347                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
348                    }
349                    remoteConnectionInfo = new ConnectionInfo();
350                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
351                    remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
352                    remoteConnectionInfo.setUserName(configuration.getUserName());
353                    remoteConnectionInfo.setPassword(configuration.getPassword());
354                    remoteBroker.oneway(remoteConnectionInfo);
355    
356                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
357                    remoteBroker.oneway(remoteSessionInfo);
358                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
359                    producerInfo.setResponseRequired(false);
360                    remoteBroker.oneway(producerInfo);
361                    // Listen to consumer advisory messages on the remote broker to
362                    // determine demand.
363                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
364                    demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
365                    String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + configuration.getDestinationFilter();
366                    if (configuration.isBridgeTempDestinations()) {
367                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
368                    }
369                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
370                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
371                    remoteBroker.oneway(demandConsumerInfo);
372                    startedLatch.countDown();
373                    if (!disposed.get()) {
374                        triggerLocalStartBridge();
375                    }
376                }
377            }
378        }
379    
380        public void stop() throws Exception {
381            if (started.compareAndSet(true, false)) {
382                if (disposed.compareAndSet(false, true)) {
383                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
384                    NetworkBridgeListener l = this.networkBridgeListener;
385                    if (l != null) {
386                        l.onStop(this);
387                    }
388                    try {
389                        remoteBridgeStarted.set(false);
390                        final CountDownLatch sendShutdown = new CountDownLatch(1);
391                        final Map map = MDCHelper.getCopyOfContextMap();
392                        asyncTaskRunner.execute(new Runnable() {
393                            public void run() {
394                                try {
395                                    MDCHelper.setContextMap(map);
396                                    localBroker.oneway(new ShutdownInfo());
397                                    sendShutdown.countDown();
398                                    remoteBroker.oneway(new ShutdownInfo());
399                                } catch (Throwable e) {
400                                    LOG.debug("Caught exception sending shutdown", e);
401                                } finally {
402                                    sendShutdown.countDown();
403                                }
404    
405                            }
406                        });
407                        if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
408                            LOG.info("Network Could not shutdown in a timely manner");
409                        }
410                    } finally {
411                        ServiceStopper ss = new ServiceStopper();
412                        ss.stop(remoteBroker);
413                        ss.stop(localBroker);
414                        // Release the started Latch since another thread could be
415                        // stuck waiting for it to start up.
416                        startedLatch.countDown();
417                        startedLatch.countDown();
418                        localStartedLatch.countDown();
419                        ss.throwFirstException();
420                    }
421                }
422                brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
423                brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
424                LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
425                remoteBrokerNameKnownLatch.countDown();
426            }
427        }
428    
429        public void serviceRemoteException(Throwable error) {
430            if (!disposed.get()) {
431                if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
432                    LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
433                } else {
434                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
435                }
436                LOG.debug("The remote Exception was: " + error, error);
437                final Map map = MDCHelper.getCopyOfContextMap();
438                asyncTaskRunner.execute(new Runnable() {
439                    public void run() {
440                        MDCHelper.setContextMap(map);
441                        ServiceSupport.dispose(getControllingService());
442                    }
443                });
444                fireBridgeFailed();
445            }
446        }
447    
448        protected void serviceRemoteCommand(Command command) {
449            if (!disposed.get()) {
450                try {
451                    if (command.isMessageDispatch()) {
452                        waitStarted();
453                        MessageDispatch md = (MessageDispatch) command;
454                        serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
455                        demandConsumerDispatched++;
456                        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
457                            remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
458                            demandConsumerDispatched = 0;
459                        }
460                    } else if (command.isBrokerInfo()) {
461                        lastConnectSucceeded.set(true);
462                        remoteBrokerInfo = (BrokerInfo) command;
463                        Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
464                        try {
465                            IntrospectionSupport.getProperties(configuration, props, null);
466                            if (configuration.getExcludedDestinations() != null) {
467                                excludedDestinations = configuration.getExcludedDestinations().toArray(
468                                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
469                            }
470                            if (configuration.getStaticallyIncludedDestinations() != null) {
471                                staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
472                                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
473                            }
474                            if (configuration.getDynamicallyIncludedDestinations() != null) {
475                                dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
476                                        .toArray(
477                                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
478                                                        .size()]);
479                            }
480                        } catch (Throwable t) {
481                            LOG.error("Error mapping remote destinations", t);
482                        }
483                        serviceRemoteBrokerInfo(command);
484                        // Let the local broker know the remote broker's ID.
485                        localBroker.oneway(command);
486                        // new peer broker (a consumer can work with remote broker also)
487                        brokerService.getBroker().addBroker(null, remoteBrokerInfo);
488                    } else if (command.getClass() == ConnectionError.class) {
489                        ConnectionError ce = (ConnectionError) command;
490                        serviceRemoteException(ce.getException());
491                    } else {
492                        if (isDuplex()) {
493                            if (command.isMessage()) {
494                                ActiveMQMessage message = (ActiveMQMessage) command;
495                                if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 
496                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
497                                    serviceRemoteConsumerAdvisory(message.getDataStructure());
498                                } else {
499                                    if (!isPermissableDestination(message.getDestination(), true)) {
500                                        return;
501                                    }
502                                    if (message.isResponseRequired()) {
503                                        Response reply = new Response();
504                                        reply.setCorrelationId(message.getCommandId());
505                                        localBroker.oneway(message);
506                                        remoteBroker.oneway(reply);
507                                    } else {
508                                        localBroker.oneway(message);
509                                    }
510                                }
511                            } else {
512                                switch (command.getDataStructureType()) {
513                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
514                                case SessionInfo.DATA_STRUCTURE_TYPE:
515                                case ProducerInfo.DATA_STRUCTURE_TYPE:
516                                    localBroker.oneway(command);
517                                    break;
518                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
519                                    localStartedLatch.await();
520                                    if (started.get()) {
521                                        if (!addConsumerInfo((ConsumerInfo) command)) {
522                                            if (LOG.isDebugEnabled()) {
523                                                LOG.debug("Ignoring ConsumerInfo: " + command);
524                                            }
525                                        } else {
526                                            if (LOG.isTraceEnabled()) {
527                                                LOG.trace("Adding ConsumerInfo: " + command);
528                                            }
529                                        }
530                                    } else {
531                                        // received a subscription whilst stopping
532                                        LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
533                                    }
534                                    break;
535                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
536                                    // initiator is shutting down, controlled case
537                                    // abortive close dealt with by inactivity monitor
538                                    LOG.info("Stopping network bridge on shutdown of remote broker");
539                                    serviceRemoteException(new IOException(command.toString()));
540                                    break;
541                                default:
542                                    if (LOG.isDebugEnabled()) {
543                                        LOG.debug("Ignoring remote command: " + command);
544                                    }
545                                }
546                            }
547                        } else {
548                            switch (command.getDataStructureType()) {
549                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
550                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
551                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
552                                break;
553                            default:
554                                LOG.warn("Unexpected remote command: " + command);
555                            }
556                        }
557                    }
558                } catch (Throwable e) {
559                    if (LOG.isDebugEnabled()) {
560                        LOG.debug("Exception processing remote command: " + command, e);
561                    }
562                    serviceRemoteException(e);
563                }
564            }
565        }
566    
567        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
568            final int networkTTL = configuration.getNetworkTTL();
569            if (data.getClass() == ConsumerInfo.class) {
570                // Create a new local subscription
571                ConsumerInfo info = (ConsumerInfo) data;
572                BrokerId[] path = info.getBrokerPath();
573    
574                if (info.isBrowser()) {
575                    if (LOG.isDebugEnabled()) {
576                        LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
577                    }
578                    return;
579                }
580    
581                if (path != null && path.length >= networkTTL) {
582                    if (LOG.isDebugEnabled()) {
583                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
584                    }
585                    return;
586                }
587                if (contains(path, localBrokerPath[0])) {
588                    // Ignore this consumer as it's a consumer we locally sent to the broker.
589                    if (LOG.isDebugEnabled()) {
590                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
591                    }
592                    return;
593                }
594                if (!isPermissableDestination(info.getDestination())) {
595                    // ignore if not in the permitted or in the excluded list
596                    if (LOG.isDebugEnabled()) {
597                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
598                    }
599                    return;
600                }
601    
602                // in a cyclic network there can be multiple bridges per broker that can propagate
603                // a network subscription so there is a need to synchronise on a shared entity
604                synchronized (brokerService.getVmConnectorURI()) {
605                    if (addConsumerInfo(info)) {
606                        if (LOG.isDebugEnabled()) {
607                            LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
608                        }
609                    } else {
610                        if (LOG.isDebugEnabled()) {
611                            LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
612                        }
613                    }
614                }
615            } else if (data.getClass() == DestinationInfo.class) {
616                // It's a destination info - we want to pass up
617                // information about temporary destinations
618                DestinationInfo destInfo = (DestinationInfo) data;
619                BrokerId[] path = destInfo.getBrokerPath();
620                if (path != null && path.length >= networkTTL) {
621                    if (LOG.isDebugEnabled()) {
622                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
623                    }
624                    return;
625                }
626                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
627                    // Ignore this consumer as it's a consumer we locally sent to
628                    // the broker.
629                    if (LOG.isDebugEnabled()) {
630                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
631                    }
632                    return;
633                }
634                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
635                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
636                    // re-set connection id so comes from here
637                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
638                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
639                }
640                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
641                if (LOG.isTraceEnabled()) {
642                    LOG.trace("bridging destination control command: " + destInfo);
643                }
644                localBroker.oneway(destInfo);
645            } else if (data.getClass() == RemoveInfo.class) {
646                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
647                removeDemandSubscription(id);
648            }
649        }
650    
651        public void serviceLocalException(Throwable error) {
652            if (!disposed.get()) {
653                LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
654                LOG.debug("The local Exception was:" + error, error);
655                final Map map = MDCHelper.getCopyOfContextMap();
656                asyncTaskRunner.execute(new Runnable() {
657                    public void run() {
658                        MDCHelper.setContextMap(map);
659                        ServiceSupport.dispose(getControllingService());
660                    }
661                });
662                fireBridgeFailed();
663            }
664        }
665    
666        protected Service getControllingService() {
667            return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
668        }
669    
670        protected void addSubscription(DemandSubscription sub) throws IOException {
671            if (sub != null) {
672                localBroker.oneway(sub.getLocalInfo());
673            }
674        }
675    
676        protected void removeSubscription(final DemandSubscription sub) throws IOException {
677            if (sub != null) {
678                if (LOG.isDebugEnabled()) {
679                    LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
680                }
681                subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
682    
683                // continue removal in separate thread to free up this thread for outstanding responses
684                final Map map = MDCHelper.getCopyOfContextMap();
685                asyncTaskRunner.execute(new Runnable() {
686                    public void run() {
687                        MDCHelper.setContextMap(map);
688                        sub.waitForCompletion();
689                        try {
690                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
691                        } catch (IOException e) {
692                            LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
693                        }
694                    }
695                });
696            }
697        }
698    
699        protected Message configureMessage(MessageDispatch md) {
700            Message message = md.getMessage().copy();
701            // Update the packet to show where it came from.
702            message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
703            message.setProducerId(producerInfo.getProducerId());
704            message.setDestination(md.getDestination());
705            if (message.getOriginalTransactionId() == null) {
706                message.setOriginalTransactionId(message.getTransactionId());
707            }
708            message.setTransactionId(null);
709            return message;
710        }
711    
712        protected void serviceLocalCommand(Command command) {
713            if (!disposed.get()) {
714                try {
715                    if (command.isMessageDispatch()) {
716                        enqueueCounter.incrementAndGet();
717                        final MessageDispatch md = (MessageDispatch) command;
718                        final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
719                        if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
720                            
721                            if (suppressMessageDispatch(md, sub)) {
722                                if (LOG.isDebugEnabled()) {
723                                    LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
724                                }
725                                // still ack as it may be durable
726                                try {
727                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
728                                } finally {
729                                    sub.decrementOutstandingResponses();
730                                }
731                                return;
732                            }
733                            
734                            Message message = configureMessage(md);
735                            if (LOG.isDebugEnabled()) {
736                                LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
737                            }
738                            
739                            if (!message.isResponseRequired()) {
740                                
741                                // If the message was originally sent using async
742                                // send, we will preserve that QOS
743                                // by bridging it using an async send (small chance
744                                // of message loss).
745                                try {
746                                    remoteBroker.oneway(message);
747                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
748                                    dequeueCounter.incrementAndGet();
749                                } finally {
750                                    sub.decrementOutstandingResponses();
751                                }
752                                
753                            } else {
754                                
755                                // The message was not sent using async send, so we
756                                // should only ack the local
757                                // broker when we get confirmation that the remote
758                                // broker has received the message.
759                                ResponseCallback callback = new ResponseCallback() {
760                                    public void onCompletion(FutureResponse future) {
761                                        try {
762                                            Response response = future.getResult();
763                                            if (response.isException()) {
764                                                ExceptionResponse er = (ExceptionResponse) response;
765                                                serviceLocalException(er.getException());
766                                            } else {
767                                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
768                                                dequeueCounter.incrementAndGet();
769                                            }   
770                                        } catch (IOException e) {
771                                            serviceLocalException(e);
772                                        } finally {
773                                            sub.decrementOutstandingResponses();
774                                        }
775                                    }
776                                };
777                                
778                                remoteBroker.asyncRequest(message, callback);
779                                
780                            }
781                        } else {
782                            if (LOG.isDebugEnabled()) {
783                                LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
784                            }
785                        }
786                    } else if (command.isBrokerInfo()) {
787                        localBrokerInfo = (BrokerInfo) command;
788                        serviceLocalBrokerInfo(command);
789                    } else if (command.isShutdownInfo()) {
790                        LOG.info(configuration.getBrokerName() + " Shutting down");
791                        // Don't shut down the whole connector if the remote side
792                        // was interrupted.
793                        // the local transport is just shutting down temporarily
794                        // until the remote side
795                        // is restored.
796                        if (!remoteInterupted.get()) {
797                            stop();
798                        }
799                    } else if (command.getClass() == ConnectionError.class) {
800                        ConnectionError ce = (ConnectionError) command;
801                        serviceLocalException(ce.getException());
802                    } else {
803                        switch (command.getDataStructureType()) {
804                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
805                            break;
806                        default:
807                            LOG.warn("Unexpected local command: " + command);
808                        }
809                    }
810                } catch (Throwable e) {
811                    LOG.warn("Caught an exception processing local command", e);
812                    serviceLocalException(e);
813                }
814            }
815        }
816    
817        private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
818            // See if this consumer's brokerPath tells us it came from the broker at the other end
819            // of the bridge. I think we should be making this decision based on the message's
820            // broker bread crumbs and not the consumer's? However, the message's broker bread
821            // crumbs are null, which is another matter.   
822            boolean suppress = false;
823            Object consumerInfo = md.getMessage().getDataStructure();
824            if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
825                suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
826            }
827            
828            // for durable subs, suppression via filter leaves dangling acks so we need to 
829            // check here and allow the ack irrespective
830            if (!suppress && sub.getLocalInfo().isDurable()) {
831                MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
832                messageEvalContext.setMessageReference(md.getMessage());
833                suppress = !createNetworkBridgeFilter(null).matches(messageEvalContext);
834            }  
835            return suppress;
836        }
837    
838        /**
839         * @return Returns the dynamicallyIncludedDestinations.
840         */
841        public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
842            return dynamicallyIncludedDestinations;
843        }
844    
845        /**
846         * @param dynamicallyIncludedDestinations The
847         *            dynamicallyIncludedDestinations to set.
848         */
849        public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
850            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
851        }
852    
853        /**
854         * @return Returns the excludedDestinations.
855         */
856        public ActiveMQDestination[] getExcludedDestinations() {
857            return excludedDestinations;
858        }
859    
860        /**
861         * @param excludedDestinations The excludedDestinations to set.
862         */
863        public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
864            this.excludedDestinations = excludedDestinations;
865        }
866    
867        /**
868         * @return Returns the staticallyIncludedDestinations.
869         */
870        public ActiveMQDestination[] getStaticallyIncludedDestinations() {
871            return staticallyIncludedDestinations;
872        }
873    
874        /**
875         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
876         *            to set.
877         */
878        public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
879            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
880        }
881    
882        /**
883         * @return Returns the durableDestinations.
884         */
885        public ActiveMQDestination[] getDurableDestinations() {
886            return durableDestinations;
887        }
888    
889        /**
890         * @param durableDestinations The durableDestinations to set.
891         */
892        public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
893            this.durableDestinations = durableDestinations;
894        }
895    
896        /**
897         * @return Returns the localBroker.
898         */
899        public Transport getLocalBroker() {
900            return localBroker;
901        }
902    
903        /**
904         * @return Returns the remoteBroker.
905         */
906        public Transport getRemoteBroker() {
907            return remoteBroker;
908        }
909    
910        /**
911         * @return the createdByDuplex
912         */
913        public boolean isCreatedByDuplex() {
914            return this.createdByDuplex;
915        }
916    
917        /**
918         * @param createdByDuplex the createdByDuplex to set
919         */
920        public void setCreatedByDuplex(boolean createdByDuplex) {
921            this.createdByDuplex = createdByDuplex;
922        }
923    
924        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
925            if (brokerPath != null) {
926                for (int i = 0; i < brokerPath.length; i++) {
927                    if (brokerId.equals(brokerPath[i])) {
928                        return true;
929                    }
930                }
931            }
932            return false;
933        }
934    
935        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
936            if (brokerPath == null || brokerPath.length == 0) {
937                return pathsToAppend;
938            }
939            BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
940            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
941            System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
942            return rc;
943        }
944    
945        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
946            if (brokerPath == null || brokerPath.length == 0) {
947                return new BrokerId[] { idToAppend };
948            }
949            BrokerId rc[] = new BrokerId[brokerPath.length + 1];
950            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
951            rc[brokerPath.length] = idToAppend;
952            return rc;
953        }
954    
955        protected boolean isPermissableDestination(ActiveMQDestination destination) {
956            return isPermissableDestination(destination, false);
957        }
958    
959        protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
960            // Are we not bridging temp destinations?
961            if (destination.isTemporary()) {
962                if (allowTemporary) {
963                    return true;
964                } else {
965                    return configuration.isBridgeTempDestinations();
966                }
967            }
968    
969            ActiveMQDestination[] dests = excludedDestinations;
970            if (dests != null && dests.length > 0) {
971                for (int i = 0; i < dests.length; i++) {
972                    ActiveMQDestination match = dests[i];
973                    DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
974                    if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
975                        return false;
976                    }
977                }
978            }
979    
980            dests = dynamicallyIncludedDestinations;
981            if (dests != null && dests.length > 0) {
982                for (int i = 0; i < dests.length; i++) {
983                    ActiveMQDestination match = dests[i];
984                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
985                    if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
986                        return true;
987                    }
988                }
989    
990                return false;
991            }
992            return true;
993        }
994    
995        /**
996         * Subscriptions for these destinations are always created
997         */
998        protected void setupStaticDestinations() {
999            ActiveMQDestination[] dests = staticallyIncludedDestinations;
1000            if (dests != null) {
1001                for (int i = 0; i < dests.length; i++) {
1002                    ActiveMQDestination dest = dests[i];
1003                    DemandSubscription sub = createDemandSubscription(dest);
1004                    try {
1005                        addSubscription(sub);
1006                    } catch (IOException e) {
1007                        LOG.error("Failed to add static destination " + dest, e);
1008                    }
1009                    if (LOG.isTraceEnabled()) {
1010                        LOG.trace("bridging messages for static destination: " + dest);
1011                    }
1012                }
1013            }
1014        }
1015    
1016        protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1017            boolean consumerAdded = false;
1018            ConsumerInfo info = consumerInfo.copy();
1019            addRemoteBrokerToBrokerPath(info);
1020            DemandSubscription sub = createDemandSubscription(info);
1021            if (sub != null) {
1022                if (duplicateSuppressionIsRequired(sub)) {
1023                    undoMapRegistration(sub);
1024                } else {
1025                    addSubscription(sub);
1026                    consumerAdded = true;
1027                }
1028            }
1029            return consumerAdded;
1030        }
1031    
1032        private void undoMapRegistration(DemandSubscription sub) {
1033            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1034            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1035        }
1036    
1037        /*
1038         * check our existing subs networkConsumerIds against the list of network ids in this subscription
1039         * A match means a duplicate which we suppress for topics and maybe for queues
1040         */
1041        private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1042            final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1043            boolean suppress = false;
1044    
1045            if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
1046                    consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1047                return suppress;
1048            }
1049    
1050            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1051            Collection<Subscription> currentSubs = 
1052                getRegionSubscriptions(consumerInfo.getDestination().isTopic());
1053            for (Subscription sub : currentSubs) {
1054                List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1055                if (!networkConsumers.isEmpty()) {
1056                    if (matchFound(candidateConsumers, networkConsumers)) {
1057                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1058                        break;
1059                    }
1060                }
1061            }
1062            return suppress;
1063        }
1064    
1065        private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1066            boolean suppress = false;
1067    
1068            if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1069                if (LOG.isDebugEnabled()) {
1070                    LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1071                            + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " 
1072                            + existingSub.getConsumerInfo()  + ", networkComsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1073                }
1074                suppress = true;
1075            } else {
1076                // remove the existing lower priority duplicate and allow this candidate
1077                try {
1078                    removeDuplicateSubscription(existingSub);
1079    
1080                    if (LOG.isDebugEnabled()) {
1081                        LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1082                                + " with sub from " + remoteBrokerName
1083                                + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " 
1084                                + candidateInfo.getNetworkConsumerIds());
1085                    }
1086                } catch (IOException e) {
1087                    LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1088                }
1089            }
1090            return suppress;
1091        }
1092    
1093        private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1094            for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1095                if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1096                    break;
1097                }
1098            }
1099        }
1100    
1101        private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1102            boolean found = false;
1103            for (ConsumerId aliasConsumer : networkConsumers) {
1104                if (candidateConsumers.contains(aliasConsumer)) {
1105                    found = true;
1106                    break;
1107                }
1108            }
1109            return found;
1110        }
1111    
1112        private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
1113            RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
1114            AbstractRegion abstractRegion = (AbstractRegion) 
1115                (isTopic ? region.getTopicRegion() : region.getQueueRegion());
1116            return abstractRegion.getSubscriptions().values();
1117        }
1118    
1119        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1120            //add our original id to ourselves
1121            info.addNetworkConsumerId(info.getConsumerId());
1122            return doCreateDemandSubscription(info);
1123        }
1124    
1125        protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1126            DemandSubscription result = new DemandSubscription(info);
1127            result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1128            if (info.getDestination().isTemporary()) {
1129                // reset the local connection Id
1130    
1131                ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1132                dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1133            }
1134    
1135            if (configuration.isDecreaseNetworkConsumerPriority()) {
1136                byte priority = ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
1137                if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1138                    // The longer the path to the consumer, the less it's consumer priority.
1139                    priority -= info.getBrokerPath().length + 1;
1140                }
1141                result.getLocalInfo().setPriority(priority);
1142                if (LOG.isDebugEnabled()) {
1143                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1144                }
1145            }
1146            configureDemandSubscription(info, result);
1147            return result;
1148        }
1149    
1150        final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1151            ConsumerInfo info = new ConsumerInfo();
1152            info.setDestination(destination);
1153            // the remote info held by the DemandSubscription holds the original
1154            // consumerId,
1155            // the local info get's overwritten
1156    
1157            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1158            DemandSubscription result = null;
1159            try {
1160                result = createDemandSubscription(info);
1161            } catch (IOException e) {
1162                LOG.error("Failed to create DemandSubscription ", e);
1163            }
1164            if (result != null) {
1165                result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
1166            }
1167            return result;
1168        }
1169    
1170        protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1171            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1172            sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1173            subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1174            subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1175    
1176            if (!info.isDurable()) {
1177                // This works for now since we use a VM connection to the local broker.
1178                // may need to change if we ever subscribe to a remote broker.
1179                sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
1180            } else  {
1181                // need to ack this message if it is ignored as it is durable so
1182                // we check before we send. see: suppressMessageDispatch()
1183            }
1184        }
1185    
1186        protected void removeDemandSubscription(ConsumerId id) throws IOException {
1187            DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1188            if (LOG.isDebugEnabled()) {
1189                LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1190            }
1191            if (sub != null) {
1192                removeSubscription(sub);
1193                if (LOG.isDebugEnabled()) {
1194                    LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1195                }
1196            }
1197        }
1198    
1199        protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1200            boolean removeDone = false;
1201            DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1202            if (sub != null) {
1203                try {
1204                    removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1205                    removeDone = true;
1206                } catch (IOException e) {
1207                    LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1208                }
1209            }
1210            return removeDone;
1211        }
1212    
1213        protected void waitStarted() throws InterruptedException {
1214            startedLatch.await();
1215            localBrokerIdKnownLatch.await();
1216        }
1217    
1218        protected void clearDownSubscriptions() {
1219            subscriptionMapByLocalId.clear();
1220            subscriptionMapByRemoteId.clear();
1221        }
1222    
1223        protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
1224    
1225        protected abstract void serviceLocalBrokerInfo(Command command) throws InterruptedException;
1226    
1227        protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException;
1228    
1229        protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
1230    
1231        protected abstract BrokerId[] getRemoteBrokerPath();
1232    
1233        public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1234            this.networkBridgeListener = listener;
1235        }
1236    
1237        private void fireBridgeFailed() {
1238            NetworkBridgeListener l = this.networkBridgeListener;
1239            if (l != null) {
1240                l.bridgeFailed();
1241            }
1242        }
1243    
1244        public String getRemoteAddress() {
1245            return remoteBroker.getRemoteAddress();
1246        }
1247    
1248        public String getLocalAddress() {
1249            return localBroker.getRemoteAddress();
1250        }
1251    
1252        public String getRemoteBrokerName() {
1253            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1254        }
1255    
1256        public String getLocalBrokerName() {
1257            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1258        }
1259    
1260        public long getDequeueCounter() {
1261            return dequeueCounter.get();
1262        }
1263    
1264        public long getEnqueueCounter() {
1265            return enqueueCounter.get();
1266        }
1267    
1268        protected boolean isDuplex() {
1269            return configuration.isDuplex() || createdByDuplex;
1270        }
1271    
1272        public void setBrokerService(BrokerService brokerService) {
1273            this.brokerService = brokerService;
1274        }
1275    }