001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.LinkedList;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Properties;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArrayList;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    import java.util.concurrent.atomic.AtomicReference;
034    import java.util.concurrent.locks.ReentrantReadWriteLock;
035    
036    import javax.management.ObjectName;
037    import javax.transaction.xa.XAResource;
038    
039    import org.apache.activemq.broker.ft.MasterBroker;
040    import org.apache.activemq.broker.region.ConnectionStatistics;
041    import org.apache.activemq.broker.region.RegionBroker;
042    import org.apache.activemq.command.BrokerId;
043    import org.apache.activemq.command.BrokerInfo;
044    import org.apache.activemq.command.Command;
045    import org.apache.activemq.command.CommandTypes;
046    import org.apache.activemq.command.ConnectionControl;
047    import org.apache.activemq.command.ConnectionError;
048    import org.apache.activemq.command.ConnectionId;
049    import org.apache.activemq.command.ConnectionInfo;
050    import org.apache.activemq.command.ConsumerControl;
051    import org.apache.activemq.command.ConsumerId;
052    import org.apache.activemq.command.ConsumerInfo;
053    import org.apache.activemq.command.ControlCommand;
054    import org.apache.activemq.command.DataArrayResponse;
055    import org.apache.activemq.command.DestinationInfo;
056    import org.apache.activemq.command.ExceptionResponse;
057    import org.apache.activemq.command.FlushCommand;
058    import org.apache.activemq.command.IntegerResponse;
059    import org.apache.activemq.command.KeepAliveInfo;
060    import org.apache.activemq.command.Message;
061    import org.apache.activemq.command.MessageAck;
062    import org.apache.activemq.command.MessageDispatch;
063    import org.apache.activemq.command.MessageDispatchNotification;
064    import org.apache.activemq.command.MessagePull;
065    import org.apache.activemq.command.ProducerAck;
066    import org.apache.activemq.command.ProducerId;
067    import org.apache.activemq.command.ProducerInfo;
068    import org.apache.activemq.command.RemoveSubscriptionInfo;
069    import org.apache.activemq.command.Response;
070    import org.apache.activemq.command.SessionId;
071    import org.apache.activemq.command.SessionInfo;
072    import org.apache.activemq.command.ShutdownInfo;
073    import org.apache.activemq.command.TransactionId;
074    import org.apache.activemq.command.TransactionInfo;
075    import org.apache.activemq.command.WireFormatInfo;
076    import org.apache.activemq.network.*;
077    import org.apache.activemq.security.MessageAuthorizationPolicy;
078    import org.apache.activemq.state.CommandVisitor;
079    import org.apache.activemq.state.ConnectionState;
080    import org.apache.activemq.state.ConsumerState;
081    import org.apache.activemq.state.ProducerState;
082    import org.apache.activemq.state.SessionState;
083    import org.apache.activemq.state.TransactionState;
084    import org.apache.activemq.thread.DefaultThreadPools;
085    import org.apache.activemq.thread.Task;
086    import org.apache.activemq.thread.TaskRunner;
087    import org.apache.activemq.thread.TaskRunnerFactory;
088    import org.apache.activemq.transaction.Transaction;
089    import org.apache.activemq.transport.DefaultTransportListener;
090    import org.apache.activemq.transport.ResponseCorrelator;
091    import org.apache.activemq.transport.Transport;
092    import org.apache.activemq.transport.TransportDisposedIOException;
093    import org.apache.activemq.transport.TransportFactory;
094    import org.apache.activemq.util.*;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    import org.slf4j.MDC;
098    
099    public class TransportConnection implements Connection, Task, CommandVisitor {
100        private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
101        private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
102        private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
103        // Keeps track of the broker and connector that created this connection.
104        protected final Broker broker;
105        protected final TransportConnector connector;
106        // Keeps track of the state of the connections.
107        // protected final ConcurrentHashMap localConnectionStates=new
108        // ConcurrentHashMap();
109        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
110        // The broker and wireformat info that was exchanged.
111        protected BrokerInfo brokerInfo;
112        protected final List<Command> dispatchQueue = new LinkedList<Command>();
113        protected TaskRunner taskRunner;
114        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
115        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
116        private MasterBroker masterBroker;
117        private final Transport transport;
118        private MessageAuthorizationPolicy messageAuthorizationPolicy;
119        private WireFormatInfo wireFormatInfo;
120        // Used to do async dispatch.. this should perhaps be pushed down into the
121        // transport layer..
122        private boolean inServiceException;
123        private final ConnectionStatistics statistics = new ConnectionStatistics();
124        private boolean manageable;
125        private boolean slow;
126        private boolean markedCandidate;
127        private boolean blockedCandidate;
128        private boolean blocked;
129        private boolean connected;
130        private boolean active;
131        private boolean starting;
132        private boolean pendingStop;
133        private long timeStamp;
134        private final AtomicBoolean stopping = new AtomicBoolean(false);
135        private final CountDownLatch stopped = new CountDownLatch(1);
136        private final AtomicBoolean asyncException = new AtomicBoolean(false);
137        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
138        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
139        private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
140        private ConnectionContext context;
141        private boolean networkConnection;
142        private boolean faultTolerantConnection;
143        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
144        private DemandForwardingBridge duplexBridge;
145        private final TaskRunnerFactory taskRunnerFactory;
146        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
147        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
148        private String duplexNetworkConnectorId;
149    
150        /**
151         * @param connector
152         * @param transport
153         * @param broker
154         * @param taskRunnerFactory
155         *            - can be null if you want direct dispatch to the transport
156         *            else commands are sent async.
157         */
158        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
159                TaskRunnerFactory taskRunnerFactory) {
160            this.connector = connector;
161            this.broker = broker;
162            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
163            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
164            brokerConnectionStates = rb.getConnectionStates();
165            if (connector != null) {
166                this.statistics.setParent(connector.getStatistics());
167            }
168            this.taskRunnerFactory = taskRunnerFactory;
169            this.transport = transport;
170            this.transport.setTransportListener(new DefaultTransportListener() {
171                @Override
172                public void onCommand(Object o) {
173                    serviceLock.readLock().lock();
174                    try {
175                        if (!(o instanceof Command)) {
176                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
177                        }
178                        Command command = (Command) o;
179                        Response response = service(command);
180                        if (response != null) {
181                            dispatchSync(response);
182                        }
183                    } finally {
184                        serviceLock.readLock().unlock();
185                    }
186                }
187    
188                @Override
189                public void onException(IOException exception) {
190                    serviceLock.readLock().lock();
191                    try {
192                        serviceTransportException(exception);
193                    } finally {
194                        serviceLock.readLock().unlock();
195                    }
196                }
197            });
198            connected = true;
199        }
200    
201        /**
202         * Returns the number of messages to be dispatched to this connection
203         * 
204         * @return size of dispatch queue
205         */
206        public int getDispatchQueueSize() {
207            synchronized (dispatchQueue) {
208                return dispatchQueue.size();
209            }
210        }
211    
212        public void serviceTransportException(IOException e) {
213            BrokerService bService = connector.getBrokerService();
214            if (bService.isShutdownOnSlaveFailure()) {
215                if (brokerInfo != null) {
216                    if (brokerInfo.isSlaveBroker()) {
217                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
218                        try {
219                            doStop();
220                            bService.stop();
221                        } catch (Exception ex) {
222                            LOG.warn("Failed to stop the master", ex);
223                        }
224                    }
225                }
226            }
227            if (!stopping.get()) {
228                transportException.set(e);
229                if (TRANSPORTLOG.isDebugEnabled()) {
230                    TRANSPORTLOG.debug("Transport failed: " + e, e);
231                } else if (TRANSPORTLOG.isInfoEnabled()) {
232                    TRANSPORTLOG.info("Transport failed: " + e);
233                }
234                stopAsync();
235            }
236        }
237    
238        /**
239         * Calls the serviceException method in an async thread. Since handling a
240         * service exception closes a socket, we should not tie up broker threads
241         * since client sockets may hang or cause deadlocks.
242         * 
243         * @param e
244         */
245        public void serviceExceptionAsync(final IOException e) {
246            if (asyncException.compareAndSet(false, true)) {
247                new Thread("Async Exception Handler") {
248                    @Override
249                    public void run() {
250                        serviceException(e);
251                    }
252                }.start();
253            }
254        }
255    
256        /**
257         * Closes a clients connection due to a detected error. Errors are ignored
258         * if: the client is closing or broker is closing. Otherwise, the connection
259         * error transmitted to the client before stopping it's transport.
260         */
261        public void serviceException(Throwable e) {
262            // are we a transport exception such as not being able to dispatch
263            // synchronously to a transport
264            if (e instanceof IOException) {
265                serviceTransportException((IOException) e);
266            } else if (e.getClass() == BrokerStoppedException.class) {
267                // Handle the case where the broker is stopped
268                // But the client is still connected.
269                if (!stopping.get()) {
270                    if (SERVICELOG.isDebugEnabled()) {
271                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
272                    }
273                    ConnectionError ce = new ConnectionError();
274                    ce.setException(e);
275                    dispatchSync(ce);
276                    // Wait a little bit to try to get the output buffer to flush
277                    // the exption notification to the client.
278                    try {
279                        Thread.sleep(500);
280                    } catch (InterruptedException ie) {
281                        Thread.currentThread().interrupt();
282                    }
283                    // Worst case is we just kill the connection before the
284                    // notification gets to him.
285                    stopAsync();
286                }
287            } else if (!stopping.get() && !inServiceException) {
288                inServiceException = true;
289                try {
290                    SERVICELOG.warn("Async error occurred: " + e, e);
291                    ConnectionError ce = new ConnectionError();
292                    ce.setException(e);
293                    dispatchAsync(ce);
294                } finally {
295                    inServiceException = false;
296                }
297            }
298        }
299    
300        public Response service(Command command) {
301            MDC.put("activemq.connector", connector.getUri().toString());
302            Response response = null;
303            boolean responseRequired = command.isResponseRequired();
304            int commandId = command.getCommandId();
305            try {
306                response = command.visit(this);
307            } catch (Throwable e) {
308                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
309                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
310                            + " command: " + command + ", exception: " + e, e);
311                }
312                if (responseRequired) {
313                    response = new ExceptionResponse(e);
314                    if(e instanceof java.lang.SecurityException){
315                        //still need to close this down - incase the peer of this transport doesn't play nice
316                        delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
317                    }
318                } else {
319                    serviceException(e);
320                }
321            }
322            if (responseRequired) {
323                if (response == null) {
324                    response = new Response();
325                }
326                response.setCorrelationId(commandId);
327            }
328            // The context may have been flagged so that the response is not
329            // sent.
330            if (context != null) {
331                if (context.isDontSendReponse()) {
332                    context.setDontSendReponse(false);
333                    response = null;
334                }
335                context = null;
336            }
337            MDC.remove("activemq.connector");
338            return response;
339        }
340    
341        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
342            return null;
343        }
344    
345        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
346            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
347            return null;
348        }
349    
350        public Response processWireFormat(WireFormatInfo info) throws Exception {
351            wireFormatInfo = info;
352            protocolVersion.set(info.getVersion());
353            return null;
354        }
355    
356        public Response processShutdown(ShutdownInfo info) throws Exception {
357            stopAsync();
358            return null;
359        }
360    
361        public Response processFlush(FlushCommand command) throws Exception {
362            return null;
363        }
364    
365        public Response processBeginTransaction(TransactionInfo info) throws Exception {
366            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
367            context = null;
368            if (cs != null) {
369                context = cs.getContext();
370            }
371            if (cs == null) {
372                throw new NullPointerException("Context is null");
373            }
374            // Avoid replaying dup commands
375            if (cs.getTransactionState(info.getTransactionId()) == null) {
376                cs.addTransactionState(info.getTransactionId());
377                broker.beginTransaction(context, info.getTransactionId());
378            }
379            return null;
380        }
381    
382        public Response processEndTransaction(TransactionInfo info) throws Exception {
383            // No need to do anything. This packet is just sent by the client
384            // make sure he is synced with the server as commit command could
385            // come from a different connection.
386            return null;
387        }
388    
389        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
390            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
391            context = null;
392            if (cs != null) {
393                context = cs.getContext();
394            }
395            if (cs == null) {
396                throw new NullPointerException("Context is null");
397            }
398            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
399            if (transactionState == null) {
400                throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
401                        + info.getTransactionId());
402            }
403            // Avoid dups.
404            if (!transactionState.isPrepared()) {
405                transactionState.setPrepared(true);
406                int result = broker.prepareTransaction(context, info.getTransactionId());
407                transactionState.setPreparedResult(result);
408                if (result == XAResource.XA_RDONLY) {
409                    // we are done, no further rollback or commit from TM
410                    cs.removeTransactionState(info.getTransactionId());
411                }
412                IntegerResponse response = new IntegerResponse(result);
413                return response;
414            } else {
415                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
416                return response;
417            }
418        }
419    
420        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
421            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
422            context = cs.getContext();
423            cs.removeTransactionState(info.getTransactionId());
424            broker.commitTransaction(context, info.getTransactionId(), true);
425            return null;
426        }
427    
428        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
429            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
430            context = cs.getContext();
431            cs.removeTransactionState(info.getTransactionId());
432            broker.commitTransaction(context, info.getTransactionId(), false);
433            return null;
434        }
435    
436        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
437            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
438            context = cs.getContext();
439            cs.removeTransactionState(info.getTransactionId());
440            broker.rollbackTransaction(context, info.getTransactionId());
441            return null;
442        }
443    
444        public Response processForgetTransaction(TransactionInfo info) throws Exception {
445            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
446            context = cs.getContext();
447            broker.forgetTransaction(context, info.getTransactionId());
448            return null;
449        }
450    
451        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
452            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
453            context = cs.getContext();
454            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
455            return new DataArrayResponse(preparedTransactions);
456        }
457    
458        public Response processMessage(Message messageSend) throws Exception {
459            ProducerId producerId = messageSend.getProducerId();
460            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
461            if (producerExchange.canDispatch(messageSend)) {
462                broker.send(producerExchange, messageSend);
463            }
464            return null;
465        }
466    
467        public Response processMessageAck(MessageAck ack) throws Exception {
468            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
469            broker.acknowledge(consumerExchange, ack);
470            return null;
471        }
472    
473        public Response processMessagePull(MessagePull pull) throws Exception {
474            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
475        }
476    
477        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
478            broker.processDispatchNotification(notification);
479            return null;
480        }
481    
482        public Response processAddDestination(DestinationInfo info) throws Exception {
483            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
484            broker.addDestinationInfo(cs.getContext(), info);
485            if (info.getDestination().isTemporary()) {
486                cs.addTempDestination(info);
487            }
488            return null;
489        }
490    
491        public Response processRemoveDestination(DestinationInfo info) throws Exception {
492            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
493            broker.removeDestinationInfo(cs.getContext(), info);
494            if (info.getDestination().isTemporary()) {
495                cs.removeTempDestination(info.getDestination());
496            }
497            return null;
498        }
499    
500        public Response processAddProducer(ProducerInfo info) throws Exception {
501            SessionId sessionId = info.getProducerId().getParentId();
502            ConnectionId connectionId = sessionId.getParentId();
503            TransportConnectionState cs = lookupConnectionState(connectionId);
504            SessionState ss = cs.getSessionState(sessionId);
505            if (ss == null) {
506                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
507                        + sessionId);
508            }
509            // Avoid replaying dup commands
510            if (!ss.getProducerIds().contains(info.getProducerId())) {
511                broker.addProducer(cs.getContext(), info);
512                try {
513                    ss.addProducer(info);
514                } catch (IllegalStateException e) {
515                    broker.removeProducer(cs.getContext(), info);
516                }
517            }
518            return null;
519        }
520    
521        public Response processRemoveProducer(ProducerId id) throws Exception {
522            SessionId sessionId = id.getParentId();
523            ConnectionId connectionId = sessionId.getParentId();
524            TransportConnectionState cs = lookupConnectionState(connectionId);
525            SessionState ss = cs.getSessionState(sessionId);
526            if (ss == null) {
527                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
528                        + sessionId);
529            }
530            ProducerState ps = ss.removeProducer(id);
531            if (ps == null) {
532                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
533            }
534            removeProducerBrokerExchange(id);
535            broker.removeProducer(cs.getContext(), ps.getInfo());
536            return null;
537        }
538    
539        public Response processAddConsumer(ConsumerInfo info) throws Exception {
540            SessionId sessionId = info.getConsumerId().getParentId();
541            ConnectionId connectionId = sessionId.getParentId();
542            TransportConnectionState cs = lookupConnectionState(connectionId);
543            SessionState ss = cs.getSessionState(sessionId);
544            if (ss == null) {
545                throw new IllegalStateException(broker.getBrokerName()
546                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
547            }
548            // Avoid replaying dup commands
549            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
550                broker.addConsumer(cs.getContext(), info);
551                try {
552                    ss.addConsumer(info);
553                } catch (IllegalStateException e) {
554                    broker.removeConsumer(cs.getContext(), info);
555                }
556            }
557            return null;
558        }
559    
560        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
561            SessionId sessionId = id.getParentId();
562            ConnectionId connectionId = sessionId.getParentId();
563            TransportConnectionState cs = lookupConnectionState(connectionId);
564            if (cs == null) {
565                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
566                        + connectionId);
567            }
568            SessionState ss = cs.getSessionState(sessionId);
569            if (ss == null) {
570                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
571                        + sessionId);
572            }
573            ConsumerState consumerState = ss.removeConsumer(id);
574            if (consumerState == null) {
575                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
576            }
577            ConsumerInfo info = consumerState.getInfo();
578            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
579            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
580            removeConsumerBrokerExchange(id);
581            return null;
582        }
583    
584        public Response processAddSession(SessionInfo info) throws Exception {
585            ConnectionId connectionId = info.getSessionId().getParentId();
586            TransportConnectionState cs = lookupConnectionState(connectionId);
587            // Avoid replaying dup commands
588            if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
589                broker.addSession(cs.getContext(), info);
590                try {
591                    cs.addSession(info);
592                } catch (IllegalStateException e) {
593                    e.printStackTrace();
594                    broker.removeSession(cs.getContext(), info);
595                }
596            }
597            return null;
598        }
599    
600        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
601            ConnectionId connectionId = id.getParentId();
602            TransportConnectionState cs = lookupConnectionState(connectionId);
603            if (cs == null) {
604                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
605            }
606            SessionState session = cs.getSessionState(id);
607            if (session == null) {
608                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
609            }
610            // Don't let new consumers or producers get added while we are closing
611            // this down.
612            session.shutdown();
613            // Cascade the connection stop to the consumers and producers.
614            for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
615                ConsumerId consumerId = (ConsumerId) iter.next();
616                try {
617                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
618                } catch (Throwable e) {
619                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
620                }
621            }
622            for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
623                ProducerId producerId = (ProducerId) iter.next();
624                try {
625                    processRemoveProducer(producerId);
626                } catch (Throwable e) {
627                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
628                }
629            }
630            cs.removeSession(id);
631            broker.removeSession(cs.getContext(), session.getInfo());
632            return null;
633        }
634    
635        public Response processAddConnection(ConnectionInfo info) throws Exception {
636            // if the broker service has slave attached, wait for the slave to be
637            // attached to allow client connection. slave connection is fine
638            if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
639                    && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
640                ServiceSupport.dispose(transport);
641                return new ExceptionResponse(new Exception("Master's slave not attached yet."));
642            }
643            // Older clients should have been defaulting this field to true.. but
644            // they were not.
645            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
646                info.setClientMaster(true);
647            }
648            TransportConnectionState state;
649            // Make sure 2 concurrent connections by the same ID only generate 1
650            // TransportConnectionState object.
651            synchronized (brokerConnectionStates) {
652                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
653                if (state == null) {
654                    state = new TransportConnectionState(info, this);
655                    brokerConnectionStates.put(info.getConnectionId(), state);
656                }
657                state.incrementReference();
658            }
659            // If there are 2 concurrent connections for the same connection id,
660            // then last one in wins, we need to sync here
661            // to figure out the winner.
662            synchronized (state.getConnectionMutex()) {
663                if (state.getConnection() != this) {
664                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
665                    state.getConnection().stop();
666                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
667                            + state.getConnection().getRemoteAddress());
668                    state.setConnection(this);
669                    state.reset(info);
670                }
671            }
672            registerConnectionState(info.getConnectionId(), state);
673            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
674            this.faultTolerantConnection=info.isFaultTolerant();
675            // Setup the context.
676            String clientId = info.getClientId();
677            context = new ConnectionContext();
678            context.setBroker(broker);
679            context.setClientId(clientId);
680            context.setClientMaster(info.isClientMaster());
681            context.setConnection(this);
682            context.setConnectionId(info.getConnectionId());
683            context.setConnector(connector);
684            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
685            context.setNetworkConnection(networkConnection);
686            context.setFaultTolerant(faultTolerantConnection);
687            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
688            context.setUserName(info.getUserName());
689            context.setWireFormatInfo(wireFormatInfo);
690            context.setReconnect(info.isFailoverReconnect());
691            this.manageable = info.isManageable();
692            state.setContext(context);
693            state.setConnection(this);
694           
695            try {
696                broker.addConnection(context, info);
697            } catch (Exception e) {
698                synchronized (brokerConnectionStates) {
699                    brokerConnectionStates.remove(info.getConnectionId());
700                }
701                unregisterConnectionState(info.getConnectionId());
702                LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " +  e.toString());
703                if (LOG.isDebugEnabled()) {
704                    LOG.debug("Exception detail:", e);
705                }
706                throw e;
707            }
708            if (info.isManageable()) {
709                // send ConnectionCommand
710                ConnectionControl command = this.connector.getConnectionControl();
711                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
712                dispatchAsync(command);
713            }
714            return null;
715        }
716    
717        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
718                throws InterruptedException {
719            LOG.debug("remove connection id: " + id);
720            TransportConnectionState cs = lookupConnectionState(id);
721            if (cs != null) {
722                // Don't allow things to be added to the connection state while we
723                // are
724                // shutting down.
725                cs.shutdown();
726                // Cascade the connection stop to the sessions.
727                for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
728                    SessionId sessionId = (SessionId) iter.next();
729                    try {
730                        processRemoveSession(sessionId, lastDeliveredSequenceId);
731                    } catch (Throwable e) {
732                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
733                    }
734                }
735                // Cascade the connection stop to temp destinations.
736                for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) {
737                    DestinationInfo di = (DestinationInfo) iter.next();
738                    try {
739                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
740                    } catch (Throwable e) {
741                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
742                    }
743                    iter.remove();
744                }
745                try {
746                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
747                } catch (Throwable e) {
748                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
749                    if (LOG.isDebugEnabled()) {
750                        SERVICELOG.debug("Exception detail:", e);
751                    }
752                }
753                TransportConnectionState state = unregisterConnectionState(id);
754                if (state != null) {
755                    synchronized (brokerConnectionStates) {
756                        // If we are the last reference, we should remove the state
757                        // from the broker.
758                        if (state.decrementReference() == 0) {
759                            brokerConnectionStates.remove(id);
760                        }
761                    }
762                }
763            }
764            return null;
765        }
766    
767        public Response processProducerAck(ProducerAck ack) throws Exception {
768            // A broker should not get ProducerAck messages.
769            return null;
770        }
771    
772        public Connector getConnector() {
773            return connector;
774        }
775    
776        public void dispatchSync(Command message) {
777            // getStatistics().getEnqueues().increment();
778            try {
779                processDispatch(message);
780            } catch (IOException e) {
781                serviceExceptionAsync(e);
782            }
783        }
784    
785        public void dispatchAsync(Command message) {
786            if (!stopping.get()) {
787                // getStatistics().getEnqueues().increment();
788                if (taskRunner == null) {
789                    dispatchSync(message);
790                } else {
791                    synchronized (dispatchQueue) {
792                        dispatchQueue.add(message);
793                    }
794                    try {
795                        taskRunner.wakeup();
796                    } catch (InterruptedException e) {
797                        Thread.currentThread().interrupt();
798                    }
799                }
800            } else {
801                if (message.isMessageDispatch()) {
802                    MessageDispatch md = (MessageDispatch) message;
803                    Runnable sub = md.getTransmitCallback();
804                    broker.postProcessDispatch(md);
805                    if (sub != null) {
806                        sub.run();
807                    }
808                }
809            }
810        }
811    
812        protected void processDispatch(Command command) throws IOException {
813            final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
814            try {
815                if (!stopping.get()) {
816                    if (messageDispatch != null) {
817                        broker.preProcessDispatch(messageDispatch);
818                    }
819                    dispatch(command);
820                }
821            } finally {
822                if (messageDispatch != null) {
823                    Runnable sub = messageDispatch.getTransmitCallback();
824                    broker.postProcessDispatch(messageDispatch);
825                    if (sub != null) {
826                        sub.run();
827                    }
828                }
829                // getStatistics().getDequeues().increment();
830            }
831        }
832    
833        public boolean iterate() {
834            try {
835                if (stopping.get()) {
836                    if (dispatchStopped.compareAndSet(false, true)) {
837                        if (transportException.get() == null) {
838                            try {
839                                dispatch(new ShutdownInfo());
840                            } catch (Throwable ignore) {
841                            }
842                        }
843                        dispatchStoppedLatch.countDown();
844                    }
845                    return false;
846                }
847                if (!dispatchStopped.get()) {
848                    Command command = null;
849                    synchronized (dispatchQueue) {
850                        if (dispatchQueue.isEmpty()) {
851                            return false;
852                        }
853                        command = dispatchQueue.remove(0);
854                    }
855                    processDispatch(command);
856                    return true;
857                }
858                return false;
859            } catch (IOException e) {
860                if (dispatchStopped.compareAndSet(false, true)) {
861                    dispatchStoppedLatch.countDown();
862                }
863                serviceExceptionAsync(e);
864                return false;
865            }
866        }
867    
868        /**
869         * Returns the statistics for this connection
870         */
871        public ConnectionStatistics getStatistics() {
872            return statistics;
873        }
874    
875        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
876            return messageAuthorizationPolicy;
877        }
878    
879        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
880            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
881        }
882    
883        public boolean isManageable() {
884            return manageable;
885        }
886    
887        public void start() throws Exception {
888            starting = true;
889            try {
890                synchronized (this) {
891                    if (taskRunnerFactory != null) {
892                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
893                                + getRemoteAddress());
894                    } else {
895                        taskRunner = null;
896                    }
897                    transport.start();
898                    active = true;
899                    BrokerInfo info = connector.getBrokerInfo().copy();
900                    if (connector.isUpdateClusterClients()) {
901                        info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
902                    } else {
903                        info.setPeerBrokerInfos(null);
904                    }
905                    dispatchAsync(info);
906                    
907                    connector.onStarted(this);
908                }
909            } catch (Exception e) {
910                // Force clean up on an error starting up.
911                stop();
912                throw e;
913            } finally {
914                // stop() can be called from within the above block,
915                // but we want to be sure start() completes before
916                // stop() runs, so queue the stop until right now:
917                starting = false;
918                if (pendingStop) {
919                    LOG.debug("Calling the delayed stop()");
920                    stop();
921                }
922            }
923        }
924    
925        public void stop() throws Exception {
926            synchronized (this) {
927                pendingStop = true;
928                if (starting) {
929                    LOG.debug("stop() called in the middle of start(). Delaying...");
930                    return;
931                }
932            }
933            stopAsync();
934            while (!stopped.await(5, TimeUnit.SECONDS)) {
935                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
936            }
937        }
938    
939        public void delayedStop(final int waitTime, final String reason) {
940            if (waitTime > 0) {
941                try {
942                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
943                        public void run() {
944                            try {
945                                Thread.sleep(waitTime);
946                                stopAsync();
947                                LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
948                            } catch (InterruptedException e) {
949                            }
950                        }
951                    }, "delayedStop:" + transport.getRemoteAddress());
952                } catch (Throwable t) {
953                    LOG.warn("cannot create stopAsync :", t);
954                }
955            }
956        }
957    
958        public void stopAsync() {
959            // If we're in the middle of starting
960            // then go no further... for now.
961            if (stopping.compareAndSet(false, true)) {
962                // Let all the connection contexts know we are shutting down
963                // so that in progress operations can notice and unblock.
964                List<TransportConnectionState> connectionStates = listConnectionStates();
965                for (TransportConnectionState cs : connectionStates) {
966                    cs.getContext().getStopping().set(true);
967                }
968                try {
969                    final Map context = MDCHelper.getCopyOfContextMap();
970                    DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){
971                        public void run() {
972                            serviceLock.writeLock().lock();
973                            try {
974                                MDCHelper.setContextMap(context);
975                                doStop();
976                            } catch (Throwable e) {
977                                LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
978                                        + "': ", e);
979                            } finally {
980                                stopped.countDown();
981                                serviceLock.writeLock().unlock();
982                            }
983                        }
984                    }, "StopAsync:" + transport.getRemoteAddress());
985                } catch (Throwable t) {
986                    LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
987                    stopped.countDown();
988                }
989            }
990        }
991    
992        @Override
993        public String toString() {
994            return "Transport Connection to: " + transport.getRemoteAddress();
995        }
996    
997        protected void doStop() throws Exception, InterruptedException {
998            LOG.debug("Stopping connection: " + transport.getRemoteAddress());
999            connector.onStopped(this);
1000            try {
1001                synchronized (this) {
1002                    if (masterBroker != null) {
1003                        masterBroker.stop();
1004                    }
1005                    if (duplexBridge != null) {
1006                        duplexBridge.stop();
1007                    }
1008                }
1009            } catch (Exception ignore) {
1010                LOG.trace("Exception caught stopping", ignore);
1011            }
1012            try {
1013                transport.stop();
1014                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1015            } catch (Exception e) {
1016                LOG.debug("Could not stop transport: " + e, e);
1017            }
1018            if (taskRunner != null) {
1019                taskRunner.shutdown(1);
1020            }
1021            active = false;
1022            // Run the MessageDispatch callbacks so that message references get
1023            // cleaned up.
1024            synchronized (dispatchQueue) {
1025                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) {
1026                    Command command = iter.next();
1027                    if (command.isMessageDispatch()) {
1028                        MessageDispatch md = (MessageDispatch) command;
1029                        Runnable sub = md.getTransmitCallback();
1030                        broker.postProcessDispatch(md);
1031                        if (sub != null) {
1032                            sub.run();
1033                        }
1034                    }
1035                }
1036                dispatchQueue.clear();
1037            }
1038            //
1039            // Remove all logical connection associated with this connection
1040            // from the broker.
1041            if (!broker.isStopped()) {
1042                List<TransportConnectionState> connectionStates = listConnectionStates();
1043                connectionStates = listConnectionStates();
1044                for (TransportConnectionState cs : connectionStates) {
1045                    cs.getContext().getStopping().set(true);
1046                    try {
1047                        LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
1048                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1049                    } catch (Throwable ignore) {
1050                        ignore.printStackTrace();
1051                    }
1052                }
1053            }
1054            LOG.debug("Connection Stopped: " + getRemoteAddress());
1055        }
1056    
1057        /**
1058         * @return Returns the blockedCandidate.
1059         */
1060        public boolean isBlockedCandidate() {
1061            return blockedCandidate;
1062        }
1063    
1064        /**
1065         * @param blockedCandidate
1066         *            The blockedCandidate to set.
1067         */
1068        public void setBlockedCandidate(boolean blockedCandidate) {
1069            this.blockedCandidate = blockedCandidate;
1070        }
1071    
1072        /**
1073         * @return Returns the markedCandidate.
1074         */
1075        public boolean isMarkedCandidate() {
1076            return markedCandidate;
1077        }
1078    
1079        /**
1080         * @param markedCandidate
1081         *            The markedCandidate to set.
1082         */
1083        public void setMarkedCandidate(boolean markedCandidate) {
1084            this.markedCandidate = markedCandidate;
1085            if (!markedCandidate) {
1086                timeStamp = 0;
1087                blockedCandidate = false;
1088            }
1089        }
1090    
1091        /**
1092         * @param slow
1093         *            The slow to set.
1094         */
1095        public void setSlow(boolean slow) {
1096            this.slow = slow;
1097        }
1098    
1099        /**
1100         * @return true if the Connection is slow
1101         */
1102        public boolean isSlow() {
1103            return slow;
1104        }
1105    
1106        /**
1107         * @return true if the Connection is potentially blocked
1108         */
1109        public boolean isMarkedBlockedCandidate() {
1110            return markedCandidate;
1111        }
1112    
1113        /**
1114         * Mark the Connection, so we can deem if it's collectable on the next sweep
1115         */
1116        public void doMark() {
1117            if (timeStamp == 0) {
1118                timeStamp = System.currentTimeMillis();
1119            }
1120        }
1121    
1122        /**
1123         * @return if after being marked, the Connection is still writing
1124         */
1125        public boolean isBlocked() {
1126            return blocked;
1127        }
1128    
1129        /**
1130         * @return true if the Connection is connected
1131         */
1132        public boolean isConnected() {
1133            return connected;
1134        }
1135    
1136        /**
1137         * @param blocked
1138         *            The blocked to set.
1139         */
1140        public void setBlocked(boolean blocked) {
1141            this.blocked = blocked;
1142        }
1143    
1144        /**
1145         * @param connected
1146         *            The connected to set.
1147         */
1148        public void setConnected(boolean connected) {
1149            this.connected = connected;
1150        }
1151    
1152        /**
1153         * @return true if the Connection is active
1154         */
1155        public boolean isActive() {
1156            return active;
1157        }
1158    
1159        /**
1160         * @param active
1161         *            The active to set.
1162         */
1163        public void setActive(boolean active) {
1164            this.active = active;
1165        }
1166    
1167        /**
1168         * @return true if the Connection is starting
1169         */
1170        public synchronized boolean isStarting() {
1171            return starting;
1172        }
1173    
1174        public synchronized boolean isNetworkConnection() {
1175            return networkConnection;
1176        }
1177        
1178        public boolean isFaultTolerantConnection() {
1179           return this.faultTolerantConnection;
1180        }
1181    
1182        protected synchronized void setStarting(boolean starting) {
1183            this.starting = starting;
1184        }
1185    
1186        /**
1187         * @return true if the Connection needs to stop
1188         */
1189        public synchronized boolean isPendingStop() {
1190            return pendingStop;
1191        }
1192    
1193        protected synchronized void setPendingStop(boolean pendingStop) {
1194            this.pendingStop = pendingStop;
1195        }
1196    
1197        public Response processBrokerInfo(BrokerInfo info) {
1198            if (info.isSlaveBroker()) {
1199                BrokerService bService = connector.getBrokerService();
1200                // Do we only support passive slaves - or does the slave want to be
1201                // passive ?
1202                boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1203                if (passive == false) {
1204                    
1205                    // stream messages from this broker (the master) to
1206                    // the slave
1207                    MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1208                    masterBroker = new MasterBroker(parent, transport);
1209                    masterBroker.startProcessing();
1210                }
1211                LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
1212                bService.slaveConnectionEstablished();
1213            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1214                // so this TransportConnection is the rear end of a network bridge
1215                // We have been requested to create a two way pipe ...
1216                try {
1217                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1218                    Map<String, String> props = createMap(properties);
1219                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1220                    IntrospectionSupport.setProperties(config, props, "");
1221                    config.setBrokerName(broker.getBrokerName());
1222    
1223                    // check for existing duplex connection hanging about
1224    
1225                    // We first look if existing network connection already exists for the same broker Id and network connector name
1226                    // It's possible in case of brief network fault to have this transport connector side of the connection always active
1227                    // and the duplex network connector side wanting to open a new one
1228                    // In this case, the old connection must be broken
1229                    String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 
1230                    CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1231                    synchronized (connections) {
1232                        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
1233                            TransportConnection c = iter.next();
1234                            if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1235                                LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1236                                c.stopAsync();
1237                                // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1238                                c.getStopped().await(1, TimeUnit.SECONDS);
1239                            }
1240                        }
1241                        setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1242                    }
1243                    URI uri = broker.getVmConnectorURI();
1244                    HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1245                    map.put("network", "true");
1246                    map.put("async", "false");
1247                    uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1248                    Transport localTransport = TransportFactory.connect(uri);
1249                    Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1250                    String duplexName = localTransport.toString();
1251                    if (duplexName.contains("#")) {
1252                        duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1253                    }
1254                    MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1255                    listener.setCreatedByDuplex(true);
1256                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1257                    duplexBridge.setBrokerService(broker.getBrokerService());
1258                    // now turn duplex off this side
1259                    info.setDuplexConnection(false);
1260                    duplexBridge.setCreatedByDuplex(true);
1261                    duplexBridge.duplexStart(this, brokerInfo, info);
1262                    LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1263                    return null;
1264                } catch (TransportDisposedIOException e) {
1265                    LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1266                    return null;
1267                } catch (Exception e) {
1268                    LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e);
1269                    return null;
1270                }
1271            }
1272            // We only expect to get one broker info command per connection
1273            if (this.brokerInfo != null) {
1274                LOG.warn("Unexpected extra broker info command received: " + info);
1275            }
1276            this.brokerInfo = info;
1277            networkConnection = true;
1278            List<TransportConnectionState> connectionStates = listConnectionStates();
1279            for (TransportConnectionState cs : connectionStates) {
1280                cs.getContext().setNetworkConnection(true);
1281            }
1282            return null;
1283        }
1284    
1285        @SuppressWarnings("unchecked")
1286        private HashMap<String, String> createMap(Properties properties) {
1287            return new HashMap(properties);
1288        }
1289    
1290        protected void dispatch(Command command) throws IOException {
1291            try {
1292                setMarkedCandidate(true);
1293                transport.oneway(command);
1294            } finally {
1295                setMarkedCandidate(false);
1296            }
1297        }
1298    
1299        public String getRemoteAddress() {
1300            return transport.getRemoteAddress();
1301        }
1302    
1303        public String getConnectionId() {
1304            List<TransportConnectionState> connectionStates = listConnectionStates();
1305            for (TransportConnectionState cs : connectionStates) {
1306                if (cs.getInfo().getClientId() != null) {
1307                    return cs.getInfo().getClientId();
1308                }
1309                return cs.getInfo().getConnectionId().toString();
1310            }
1311            return null;
1312        }
1313            
1314        public void updateClient(ConnectionControl control) {
1315            if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1316                    && this.wireFormatInfo.getVersion() >= 6) {
1317                dispatchAsync(control);
1318            }
1319        }
1320    
1321        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1322            ProducerBrokerExchange result = producerExchanges.get(id);
1323            if (result == null) {
1324                synchronized (producerExchanges) {
1325                    result = new ProducerBrokerExchange();
1326                    TransportConnectionState state = lookupConnectionState(id);              
1327                    context = state.getContext();
1328                    if (context.isReconnect()) {
1329                        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1330                    }
1331                    result.setConnectionContext(context);
1332                    SessionState ss = state.getSessionState(id.getParentId());
1333                    if (ss != null) {
1334                        result.setProducerState(ss.getProducerState(id));
1335                        ProducerState producerState = ss.getProducerState(id);
1336                        if (producerState != null && producerState.getInfo() != null) {
1337                            ProducerInfo info = producerState.getInfo();
1338                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1339                        }
1340                    }
1341                    producerExchanges.put(id, result);
1342                }
1343            } else {
1344                context = result.getConnectionContext();
1345            }
1346            return result;
1347        }
1348    
1349        private void removeProducerBrokerExchange(ProducerId id) {
1350            synchronized (producerExchanges) {
1351                producerExchanges.remove(id);
1352            }
1353        }
1354    
1355        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1356            ConsumerBrokerExchange result = consumerExchanges.get(id);
1357            if (result == null) {
1358                synchronized (consumerExchanges) {
1359                    result = new ConsumerBrokerExchange();
1360                    TransportConnectionState state = lookupConnectionState(id);
1361                    context = state.getContext();
1362                    result.setConnectionContext(context);
1363                    SessionState ss = state.getSessionState(id.getParentId());
1364                    if (ss != null) {
1365                        ConsumerState cs = ss.getConsumerState(id);
1366                        if (cs != null) {
1367                            ConsumerInfo info = cs.getInfo();
1368                            if (info != null) {
1369                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1370                                    result.setWildcard(true);
1371                                }
1372                            }
1373                        }
1374                    }
1375                    consumerExchanges.put(id, result);
1376                }
1377            }
1378            return result;
1379        }
1380    
1381        private void removeConsumerBrokerExchange(ConsumerId id) {
1382            synchronized (consumerExchanges) {
1383                consumerExchanges.remove(id);
1384            }
1385        }
1386    
1387        public int getProtocolVersion() {
1388            return protocolVersion.get();
1389        }
1390    
1391        public Response processControlCommand(ControlCommand command) throws Exception {
1392            String control = command.getCommand();
1393            if (control != null && control.equals("shutdown")) {
1394                System.exit(0);
1395            }
1396            return null;
1397        }
1398    
1399        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1400            return null;
1401        }
1402    
1403        public Response processConnectionControl(ConnectionControl control) throws Exception {
1404            if (control != null) {
1405                faultTolerantConnection = control.isFaultTolerant();
1406            }
1407            return null;
1408        }
1409    
1410        public Response processConnectionError(ConnectionError error) throws Exception {
1411            return null;
1412        }
1413    
1414        public Response processConsumerControl(ConsumerControl control) throws Exception {
1415            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1416            broker.processConsumerControl(consumerExchange, control);
1417            return null;
1418        }
1419    
1420        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1421                TransportConnectionState state) {
1422            TransportConnectionState cs = null;
1423            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1424                // swap implementations
1425                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1426                newRegister.intialize(connectionStateRegister);
1427                connectionStateRegister = newRegister;
1428            }
1429            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1430            return cs;
1431        }
1432    
1433        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1434            return connectionStateRegister.unregisterConnectionState(connectionId);
1435        }
1436    
1437        protected synchronized List<TransportConnectionState> listConnectionStates() {
1438            return connectionStateRegister.listConnectionStates();
1439        }
1440    
1441        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1442            return connectionStateRegister.lookupConnectionState(connectionId);
1443        }
1444    
1445        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1446            return connectionStateRegister.lookupConnectionState(id);
1447        }
1448    
1449        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1450            return connectionStateRegister.lookupConnectionState(id);
1451        }
1452    
1453        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1454            return connectionStateRegister.lookupConnectionState(id);
1455        }
1456    
1457        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1458            return connectionStateRegister.lookupConnectionState(connectionId);
1459        }
1460    
1461        protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1462            this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1463        }
1464    
1465        protected synchronized String getDuplexNetworkConnectorId() {
1466            return this.duplexNetworkConnectorId;
1467        }
1468        
1469        protected CountDownLatch getStopped() {
1470            return stopped;
1471        }
1472    }