001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker; 018 019 import java.io.IOException; 020 import java.net.URI; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.LinkedList; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Properties; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.CopyOnWriteArrayList; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicInteger; 033 import java.util.concurrent.atomic.AtomicReference; 034 import java.util.concurrent.locks.ReentrantReadWriteLock; 035 036 import javax.management.ObjectName; 037 import javax.transaction.xa.XAResource; 038 039 import org.apache.activemq.broker.ft.MasterBroker; 040 import org.apache.activemq.broker.region.ConnectionStatistics; 041 import org.apache.activemq.broker.region.RegionBroker; 042 import org.apache.activemq.command.BrokerId; 043 import org.apache.activemq.command.BrokerInfo; 044 import org.apache.activemq.command.Command; 045 import org.apache.activemq.command.CommandTypes; 046 import org.apache.activemq.command.ConnectionControl; 047 import org.apache.activemq.command.ConnectionError; 048 import org.apache.activemq.command.ConnectionId; 049 import org.apache.activemq.command.ConnectionInfo; 050 import org.apache.activemq.command.ConsumerControl; 051 import org.apache.activemq.command.ConsumerId; 052 import org.apache.activemq.command.ConsumerInfo; 053 import org.apache.activemq.command.ControlCommand; 054 import org.apache.activemq.command.DataArrayResponse; 055 import org.apache.activemq.command.DestinationInfo; 056 import org.apache.activemq.command.ExceptionResponse; 057 import org.apache.activemq.command.FlushCommand; 058 import org.apache.activemq.command.IntegerResponse; 059 import org.apache.activemq.command.KeepAliveInfo; 060 import org.apache.activemq.command.Message; 061 import org.apache.activemq.command.MessageAck; 062 import org.apache.activemq.command.MessageDispatch; 063 import org.apache.activemq.command.MessageDispatchNotification; 064 import org.apache.activemq.command.MessagePull; 065 import org.apache.activemq.command.ProducerAck; 066 import org.apache.activemq.command.ProducerId; 067 import org.apache.activemq.command.ProducerInfo; 068 import org.apache.activemq.command.RemoveSubscriptionInfo; 069 import org.apache.activemq.command.Response; 070 import org.apache.activemq.command.SessionId; 071 import org.apache.activemq.command.SessionInfo; 072 import org.apache.activemq.command.ShutdownInfo; 073 import org.apache.activemq.command.TransactionId; 074 import org.apache.activemq.command.TransactionInfo; 075 import org.apache.activemq.command.WireFormatInfo; 076 import org.apache.activemq.network.*; 077 import org.apache.activemq.security.MessageAuthorizationPolicy; 078 import org.apache.activemq.state.CommandVisitor; 079 import org.apache.activemq.state.ConnectionState; 080 import org.apache.activemq.state.ConsumerState; 081 import org.apache.activemq.state.ProducerState; 082 import org.apache.activemq.state.SessionState; 083 import org.apache.activemq.state.TransactionState; 084 import org.apache.activemq.thread.DefaultThreadPools; 085 import org.apache.activemq.thread.Task; 086 import org.apache.activemq.thread.TaskRunner; 087 import org.apache.activemq.thread.TaskRunnerFactory; 088 import org.apache.activemq.transaction.Transaction; 089 import org.apache.activemq.transport.DefaultTransportListener; 090 import org.apache.activemq.transport.ResponseCorrelator; 091 import org.apache.activemq.transport.Transport; 092 import org.apache.activemq.transport.TransportDisposedIOException; 093 import org.apache.activemq.transport.TransportFactory; 094 import org.apache.activemq.util.*; 095 import org.slf4j.Logger; 096 import org.slf4j.LoggerFactory; 097 import org.slf4j.MDC; 098 099 public class TransportConnection implements Connection, Task, CommandVisitor { 100 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 101 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 102 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 103 // Keeps track of the broker and connector that created this connection. 104 protected final Broker broker; 105 protected final TransportConnector connector; 106 // Keeps track of the state of the connections. 107 // protected final ConcurrentHashMap localConnectionStates=new 108 // ConcurrentHashMap(); 109 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 110 // The broker and wireformat info that was exchanged. 111 protected BrokerInfo brokerInfo; 112 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 113 protected TaskRunner taskRunner; 114 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>(); 115 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 116 private MasterBroker masterBroker; 117 private final Transport transport; 118 private MessageAuthorizationPolicy messageAuthorizationPolicy; 119 private WireFormatInfo wireFormatInfo; 120 // Used to do async dispatch.. this should perhaps be pushed down into the 121 // transport layer.. 122 private boolean inServiceException; 123 private final ConnectionStatistics statistics = new ConnectionStatistics(); 124 private boolean manageable; 125 private boolean slow; 126 private boolean markedCandidate; 127 private boolean blockedCandidate; 128 private boolean blocked; 129 private boolean connected; 130 private boolean active; 131 private boolean starting; 132 private boolean pendingStop; 133 private long timeStamp; 134 private final AtomicBoolean stopping = new AtomicBoolean(false); 135 private final CountDownLatch stopped = new CountDownLatch(1); 136 private final AtomicBoolean asyncException = new AtomicBoolean(false); 137 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 138 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 139 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 140 private ConnectionContext context; 141 private boolean networkConnection; 142 private boolean faultTolerantConnection; 143 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 144 private DemandForwardingBridge duplexBridge; 145 private final TaskRunnerFactory taskRunnerFactory; 146 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 147 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 148 private String duplexNetworkConnectorId; 149 150 /** 151 * @param connector 152 * @param transport 153 * @param broker 154 * @param taskRunnerFactory 155 * - can be null if you want direct dispatch to the transport 156 * else commands are sent async. 157 */ 158 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 159 TaskRunnerFactory taskRunnerFactory) { 160 this.connector = connector; 161 this.broker = broker; 162 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 163 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 164 brokerConnectionStates = rb.getConnectionStates(); 165 if (connector != null) { 166 this.statistics.setParent(connector.getStatistics()); 167 } 168 this.taskRunnerFactory = taskRunnerFactory; 169 this.transport = transport; 170 this.transport.setTransportListener(new DefaultTransportListener() { 171 @Override 172 public void onCommand(Object o) { 173 serviceLock.readLock().lock(); 174 try { 175 if (!(o instanceof Command)) { 176 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 177 } 178 Command command = (Command) o; 179 Response response = service(command); 180 if (response != null) { 181 dispatchSync(response); 182 } 183 } finally { 184 serviceLock.readLock().unlock(); 185 } 186 } 187 188 @Override 189 public void onException(IOException exception) { 190 serviceLock.readLock().lock(); 191 try { 192 serviceTransportException(exception); 193 } finally { 194 serviceLock.readLock().unlock(); 195 } 196 } 197 }); 198 connected = true; 199 } 200 201 /** 202 * Returns the number of messages to be dispatched to this connection 203 * 204 * @return size of dispatch queue 205 */ 206 public int getDispatchQueueSize() { 207 synchronized (dispatchQueue) { 208 return dispatchQueue.size(); 209 } 210 } 211 212 public void serviceTransportException(IOException e) { 213 BrokerService bService = connector.getBrokerService(); 214 if (bService.isShutdownOnSlaveFailure()) { 215 if (brokerInfo != null) { 216 if (brokerInfo.isSlaveBroker()) { 217 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e); 218 try { 219 doStop(); 220 bService.stop(); 221 } catch (Exception ex) { 222 LOG.warn("Failed to stop the master", ex); 223 } 224 } 225 } 226 } 227 if (!stopping.get()) { 228 transportException.set(e); 229 if (TRANSPORTLOG.isDebugEnabled()) { 230 TRANSPORTLOG.debug("Transport failed: " + e, e); 231 } else if (TRANSPORTLOG.isInfoEnabled()) { 232 TRANSPORTLOG.info("Transport failed: " + e); 233 } 234 stopAsync(); 235 } 236 } 237 238 /** 239 * Calls the serviceException method in an async thread. Since handling a 240 * service exception closes a socket, we should not tie up broker threads 241 * since client sockets may hang or cause deadlocks. 242 * 243 * @param e 244 */ 245 public void serviceExceptionAsync(final IOException e) { 246 if (asyncException.compareAndSet(false, true)) { 247 new Thread("Async Exception Handler") { 248 @Override 249 public void run() { 250 serviceException(e); 251 } 252 }.start(); 253 } 254 } 255 256 /** 257 * Closes a clients connection due to a detected error. Errors are ignored 258 * if: the client is closing or broker is closing. Otherwise, the connection 259 * error transmitted to the client before stopping it's transport. 260 */ 261 public void serviceException(Throwable e) { 262 // are we a transport exception such as not being able to dispatch 263 // synchronously to a transport 264 if (e instanceof IOException) { 265 serviceTransportException((IOException) e); 266 } else if (e.getClass() == BrokerStoppedException.class) { 267 // Handle the case where the broker is stopped 268 // But the client is still connected. 269 if (!stopping.get()) { 270 if (SERVICELOG.isDebugEnabled()) { 271 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 272 } 273 ConnectionError ce = new ConnectionError(); 274 ce.setException(e); 275 dispatchSync(ce); 276 // Wait a little bit to try to get the output buffer to flush 277 // the exption notification to the client. 278 try { 279 Thread.sleep(500); 280 } catch (InterruptedException ie) { 281 Thread.currentThread().interrupt(); 282 } 283 // Worst case is we just kill the connection before the 284 // notification gets to him. 285 stopAsync(); 286 } 287 } else if (!stopping.get() && !inServiceException) { 288 inServiceException = true; 289 try { 290 SERVICELOG.warn("Async error occurred: " + e, e); 291 ConnectionError ce = new ConnectionError(); 292 ce.setException(e); 293 dispatchAsync(ce); 294 } finally { 295 inServiceException = false; 296 } 297 } 298 } 299 300 public Response service(Command command) { 301 MDC.put("activemq.connector", connector.getUri().toString()); 302 Response response = null; 303 boolean responseRequired = command.isResponseRequired(); 304 int commandId = command.getCommandId(); 305 try { 306 response = command.visit(this); 307 } catch (Throwable e) { 308 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 309 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 310 + " command: " + command + ", exception: " + e, e); 311 } 312 if (responseRequired) { 313 response = new ExceptionResponse(e); 314 if(e instanceof java.lang.SecurityException){ 315 //still need to close this down - incase the peer of this transport doesn't play nice 316 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); 317 } 318 } else { 319 serviceException(e); 320 } 321 } 322 if (responseRequired) { 323 if (response == null) { 324 response = new Response(); 325 } 326 response.setCorrelationId(commandId); 327 } 328 // The context may have been flagged so that the response is not 329 // sent. 330 if (context != null) { 331 if (context.isDontSendReponse()) { 332 context.setDontSendReponse(false); 333 response = null; 334 } 335 context = null; 336 } 337 MDC.remove("activemq.connector"); 338 return response; 339 } 340 341 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 342 return null; 343 } 344 345 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 346 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 347 return null; 348 } 349 350 public Response processWireFormat(WireFormatInfo info) throws Exception { 351 wireFormatInfo = info; 352 protocolVersion.set(info.getVersion()); 353 return null; 354 } 355 356 public Response processShutdown(ShutdownInfo info) throws Exception { 357 stopAsync(); 358 return null; 359 } 360 361 public Response processFlush(FlushCommand command) throws Exception { 362 return null; 363 } 364 365 public Response processBeginTransaction(TransactionInfo info) throws Exception { 366 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 367 context = null; 368 if (cs != null) { 369 context = cs.getContext(); 370 } 371 if (cs == null) { 372 throw new NullPointerException("Context is null"); 373 } 374 // Avoid replaying dup commands 375 if (cs.getTransactionState(info.getTransactionId()) == null) { 376 cs.addTransactionState(info.getTransactionId()); 377 broker.beginTransaction(context, info.getTransactionId()); 378 } 379 return null; 380 } 381 382 public Response processEndTransaction(TransactionInfo info) throws Exception { 383 // No need to do anything. This packet is just sent by the client 384 // make sure he is synced with the server as commit command could 385 // come from a different connection. 386 return null; 387 } 388 389 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 390 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 391 context = null; 392 if (cs != null) { 393 context = cs.getContext(); 394 } 395 if (cs == null) { 396 throw new NullPointerException("Context is null"); 397 } 398 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 399 if (transactionState == null) { 400 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 401 + info.getTransactionId()); 402 } 403 // Avoid dups. 404 if (!transactionState.isPrepared()) { 405 transactionState.setPrepared(true); 406 int result = broker.prepareTransaction(context, info.getTransactionId()); 407 transactionState.setPreparedResult(result); 408 if (result == XAResource.XA_RDONLY) { 409 // we are done, no further rollback or commit from TM 410 cs.removeTransactionState(info.getTransactionId()); 411 } 412 IntegerResponse response = new IntegerResponse(result); 413 return response; 414 } else { 415 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 416 return response; 417 } 418 } 419 420 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 421 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 422 context = cs.getContext(); 423 cs.removeTransactionState(info.getTransactionId()); 424 broker.commitTransaction(context, info.getTransactionId(), true); 425 return null; 426 } 427 428 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 429 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 430 context = cs.getContext(); 431 cs.removeTransactionState(info.getTransactionId()); 432 broker.commitTransaction(context, info.getTransactionId(), false); 433 return null; 434 } 435 436 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 437 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 438 context = cs.getContext(); 439 cs.removeTransactionState(info.getTransactionId()); 440 broker.rollbackTransaction(context, info.getTransactionId()); 441 return null; 442 } 443 444 public Response processForgetTransaction(TransactionInfo info) throws Exception { 445 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 446 context = cs.getContext(); 447 broker.forgetTransaction(context, info.getTransactionId()); 448 return null; 449 } 450 451 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 452 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 453 context = cs.getContext(); 454 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 455 return new DataArrayResponse(preparedTransactions); 456 } 457 458 public Response processMessage(Message messageSend) throws Exception { 459 ProducerId producerId = messageSend.getProducerId(); 460 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 461 if (producerExchange.canDispatch(messageSend)) { 462 broker.send(producerExchange, messageSend); 463 } 464 return null; 465 } 466 467 public Response processMessageAck(MessageAck ack) throws Exception { 468 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 469 broker.acknowledge(consumerExchange, ack); 470 return null; 471 } 472 473 public Response processMessagePull(MessagePull pull) throws Exception { 474 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 475 } 476 477 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 478 broker.processDispatchNotification(notification); 479 return null; 480 } 481 482 public Response processAddDestination(DestinationInfo info) throws Exception { 483 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 484 broker.addDestinationInfo(cs.getContext(), info); 485 if (info.getDestination().isTemporary()) { 486 cs.addTempDestination(info); 487 } 488 return null; 489 } 490 491 public Response processRemoveDestination(DestinationInfo info) throws Exception { 492 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 493 broker.removeDestinationInfo(cs.getContext(), info); 494 if (info.getDestination().isTemporary()) { 495 cs.removeTempDestination(info.getDestination()); 496 } 497 return null; 498 } 499 500 public Response processAddProducer(ProducerInfo info) throws Exception { 501 SessionId sessionId = info.getProducerId().getParentId(); 502 ConnectionId connectionId = sessionId.getParentId(); 503 TransportConnectionState cs = lookupConnectionState(connectionId); 504 SessionState ss = cs.getSessionState(sessionId); 505 if (ss == null) { 506 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 507 + sessionId); 508 } 509 // Avoid replaying dup commands 510 if (!ss.getProducerIds().contains(info.getProducerId())) { 511 broker.addProducer(cs.getContext(), info); 512 try { 513 ss.addProducer(info); 514 } catch (IllegalStateException e) { 515 broker.removeProducer(cs.getContext(), info); 516 } 517 } 518 return null; 519 } 520 521 public Response processRemoveProducer(ProducerId id) throws Exception { 522 SessionId sessionId = id.getParentId(); 523 ConnectionId connectionId = sessionId.getParentId(); 524 TransportConnectionState cs = lookupConnectionState(connectionId); 525 SessionState ss = cs.getSessionState(sessionId); 526 if (ss == null) { 527 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 528 + sessionId); 529 } 530 ProducerState ps = ss.removeProducer(id); 531 if (ps == null) { 532 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 533 } 534 removeProducerBrokerExchange(id); 535 broker.removeProducer(cs.getContext(), ps.getInfo()); 536 return null; 537 } 538 539 public Response processAddConsumer(ConsumerInfo info) throws Exception { 540 SessionId sessionId = info.getConsumerId().getParentId(); 541 ConnectionId connectionId = sessionId.getParentId(); 542 TransportConnectionState cs = lookupConnectionState(connectionId); 543 SessionState ss = cs.getSessionState(sessionId); 544 if (ss == null) { 545 throw new IllegalStateException(broker.getBrokerName() 546 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 547 } 548 // Avoid replaying dup commands 549 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 550 broker.addConsumer(cs.getContext(), info); 551 try { 552 ss.addConsumer(info); 553 } catch (IllegalStateException e) { 554 broker.removeConsumer(cs.getContext(), info); 555 } 556 } 557 return null; 558 } 559 560 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 561 SessionId sessionId = id.getParentId(); 562 ConnectionId connectionId = sessionId.getParentId(); 563 TransportConnectionState cs = lookupConnectionState(connectionId); 564 if (cs == null) { 565 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 566 + connectionId); 567 } 568 SessionState ss = cs.getSessionState(sessionId); 569 if (ss == null) { 570 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 571 + sessionId); 572 } 573 ConsumerState consumerState = ss.removeConsumer(id); 574 if (consumerState == null) { 575 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 576 } 577 ConsumerInfo info = consumerState.getInfo(); 578 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 579 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 580 removeConsumerBrokerExchange(id); 581 return null; 582 } 583 584 public Response processAddSession(SessionInfo info) throws Exception { 585 ConnectionId connectionId = info.getSessionId().getParentId(); 586 TransportConnectionState cs = lookupConnectionState(connectionId); 587 // Avoid replaying dup commands 588 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 589 broker.addSession(cs.getContext(), info); 590 try { 591 cs.addSession(info); 592 } catch (IllegalStateException e) { 593 e.printStackTrace(); 594 broker.removeSession(cs.getContext(), info); 595 } 596 } 597 return null; 598 } 599 600 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 601 ConnectionId connectionId = id.getParentId(); 602 TransportConnectionState cs = lookupConnectionState(connectionId); 603 if (cs == null) { 604 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 605 } 606 SessionState session = cs.getSessionState(id); 607 if (session == null) { 608 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 609 } 610 // Don't let new consumers or producers get added while we are closing 611 // this down. 612 session.shutdown(); 613 // Cascade the connection stop to the consumers and producers. 614 for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) { 615 ConsumerId consumerId = (ConsumerId) iter.next(); 616 try { 617 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 618 } catch (Throwable e) { 619 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); 620 } 621 } 622 for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) { 623 ProducerId producerId = (ProducerId) iter.next(); 624 try { 625 processRemoveProducer(producerId); 626 } catch (Throwable e) { 627 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e); 628 } 629 } 630 cs.removeSession(id); 631 broker.removeSession(cs.getContext(), session.getInfo()); 632 return null; 633 } 634 635 public Response processAddConnection(ConnectionInfo info) throws Exception { 636 // if the broker service has slave attached, wait for the slave to be 637 // attached to allow client connection. slave connection is fine 638 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave() 639 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) { 640 ServiceSupport.dispose(transport); 641 return new ExceptionResponse(new Exception("Master's slave not attached yet.")); 642 } 643 // Older clients should have been defaulting this field to true.. but 644 // they were not. 645 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 646 info.setClientMaster(true); 647 } 648 TransportConnectionState state; 649 // Make sure 2 concurrent connections by the same ID only generate 1 650 // TransportConnectionState object. 651 synchronized (brokerConnectionStates) { 652 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 653 if (state == null) { 654 state = new TransportConnectionState(info, this); 655 brokerConnectionStates.put(info.getConnectionId(), state); 656 } 657 state.incrementReference(); 658 } 659 // If there are 2 concurrent connections for the same connection id, 660 // then last one in wins, we need to sync here 661 // to figure out the winner. 662 synchronized (state.getConnectionMutex()) { 663 if (state.getConnection() != this) { 664 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); 665 state.getConnection().stop(); 666 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " 667 + state.getConnection().getRemoteAddress()); 668 state.setConnection(this); 669 state.reset(info); 670 } 671 } 672 registerConnectionState(info.getConnectionId(), state); 673 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); 674 this.faultTolerantConnection=info.isFaultTolerant(); 675 // Setup the context. 676 String clientId = info.getClientId(); 677 context = new ConnectionContext(); 678 context.setBroker(broker); 679 context.setClientId(clientId); 680 context.setClientMaster(info.isClientMaster()); 681 context.setConnection(this); 682 context.setConnectionId(info.getConnectionId()); 683 context.setConnector(connector); 684 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 685 context.setNetworkConnection(networkConnection); 686 context.setFaultTolerant(faultTolerantConnection); 687 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 688 context.setUserName(info.getUserName()); 689 context.setWireFormatInfo(wireFormatInfo); 690 context.setReconnect(info.isFailoverReconnect()); 691 this.manageable = info.isManageable(); 692 state.setContext(context); 693 state.setConnection(this); 694 695 try { 696 broker.addConnection(context, info); 697 } catch (Exception e) { 698 synchronized (brokerConnectionStates) { 699 brokerConnectionStates.remove(info.getConnectionId()); 700 } 701 unregisterConnectionState(info.getConnectionId()); 702 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString()); 703 if (LOG.isDebugEnabled()) { 704 LOG.debug("Exception detail:", e); 705 } 706 throw e; 707 } 708 if (info.isManageable()) { 709 // send ConnectionCommand 710 ConnectionControl command = this.connector.getConnectionControl(); 711 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 712 dispatchAsync(command); 713 } 714 return null; 715 } 716 717 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 718 throws InterruptedException { 719 LOG.debug("remove connection id: " + id); 720 TransportConnectionState cs = lookupConnectionState(id); 721 if (cs != null) { 722 // Don't allow things to be added to the connection state while we 723 // are 724 // shutting down. 725 cs.shutdown(); 726 // Cascade the connection stop to the sessions. 727 for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) { 728 SessionId sessionId = (SessionId) iter.next(); 729 try { 730 processRemoveSession(sessionId, lastDeliveredSequenceId); 731 } catch (Throwable e) { 732 SERVICELOG.warn("Failed to remove session " + sessionId, e); 733 } 734 } 735 // Cascade the connection stop to temp destinations. 736 for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext();) { 737 DestinationInfo di = (DestinationInfo) iter.next(); 738 try { 739 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 740 } catch (Throwable e) { 741 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e); 742 } 743 iter.remove(); 744 } 745 try { 746 broker.removeConnection(cs.getContext(), cs.getInfo(), null); 747 } catch (Throwable e) { 748 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString()); 749 if (LOG.isDebugEnabled()) { 750 SERVICELOG.debug("Exception detail:", e); 751 } 752 } 753 TransportConnectionState state = unregisterConnectionState(id); 754 if (state != null) { 755 synchronized (brokerConnectionStates) { 756 // If we are the last reference, we should remove the state 757 // from the broker. 758 if (state.decrementReference() == 0) { 759 brokerConnectionStates.remove(id); 760 } 761 } 762 } 763 } 764 return null; 765 } 766 767 public Response processProducerAck(ProducerAck ack) throws Exception { 768 // A broker should not get ProducerAck messages. 769 return null; 770 } 771 772 public Connector getConnector() { 773 return connector; 774 } 775 776 public void dispatchSync(Command message) { 777 // getStatistics().getEnqueues().increment(); 778 try { 779 processDispatch(message); 780 } catch (IOException e) { 781 serviceExceptionAsync(e); 782 } 783 } 784 785 public void dispatchAsync(Command message) { 786 if (!stopping.get()) { 787 // getStatistics().getEnqueues().increment(); 788 if (taskRunner == null) { 789 dispatchSync(message); 790 } else { 791 synchronized (dispatchQueue) { 792 dispatchQueue.add(message); 793 } 794 try { 795 taskRunner.wakeup(); 796 } catch (InterruptedException e) { 797 Thread.currentThread().interrupt(); 798 } 799 } 800 } else { 801 if (message.isMessageDispatch()) { 802 MessageDispatch md = (MessageDispatch) message; 803 Runnable sub = md.getTransmitCallback(); 804 broker.postProcessDispatch(md); 805 if (sub != null) { 806 sub.run(); 807 } 808 } 809 } 810 } 811 812 protected void processDispatch(Command command) throws IOException { 813 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 814 try { 815 if (!stopping.get()) { 816 if (messageDispatch != null) { 817 broker.preProcessDispatch(messageDispatch); 818 } 819 dispatch(command); 820 } 821 } finally { 822 if (messageDispatch != null) { 823 Runnable sub = messageDispatch.getTransmitCallback(); 824 broker.postProcessDispatch(messageDispatch); 825 if (sub != null) { 826 sub.run(); 827 } 828 } 829 // getStatistics().getDequeues().increment(); 830 } 831 } 832 833 public boolean iterate() { 834 try { 835 if (stopping.get()) { 836 if (dispatchStopped.compareAndSet(false, true)) { 837 if (transportException.get() == null) { 838 try { 839 dispatch(new ShutdownInfo()); 840 } catch (Throwable ignore) { 841 } 842 } 843 dispatchStoppedLatch.countDown(); 844 } 845 return false; 846 } 847 if (!dispatchStopped.get()) { 848 Command command = null; 849 synchronized (dispatchQueue) { 850 if (dispatchQueue.isEmpty()) { 851 return false; 852 } 853 command = dispatchQueue.remove(0); 854 } 855 processDispatch(command); 856 return true; 857 } 858 return false; 859 } catch (IOException e) { 860 if (dispatchStopped.compareAndSet(false, true)) { 861 dispatchStoppedLatch.countDown(); 862 } 863 serviceExceptionAsync(e); 864 return false; 865 } 866 } 867 868 /** 869 * Returns the statistics for this connection 870 */ 871 public ConnectionStatistics getStatistics() { 872 return statistics; 873 } 874 875 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 876 return messageAuthorizationPolicy; 877 } 878 879 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 880 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 881 } 882 883 public boolean isManageable() { 884 return manageable; 885 } 886 887 public void start() throws Exception { 888 starting = true; 889 try { 890 synchronized (this) { 891 if (taskRunnerFactory != null) { 892 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 893 + getRemoteAddress()); 894 } else { 895 taskRunner = null; 896 } 897 transport.start(); 898 active = true; 899 BrokerInfo info = connector.getBrokerInfo().copy(); 900 if (connector.isUpdateClusterClients()) { 901 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 902 } else { 903 info.setPeerBrokerInfos(null); 904 } 905 dispatchAsync(info); 906 907 connector.onStarted(this); 908 } 909 } catch (Exception e) { 910 // Force clean up on an error starting up. 911 stop(); 912 throw e; 913 } finally { 914 // stop() can be called from within the above block, 915 // but we want to be sure start() completes before 916 // stop() runs, so queue the stop until right now: 917 starting = false; 918 if (pendingStop) { 919 LOG.debug("Calling the delayed stop()"); 920 stop(); 921 } 922 } 923 } 924 925 public void stop() throws Exception { 926 synchronized (this) { 927 pendingStop = true; 928 if (starting) { 929 LOG.debug("stop() called in the middle of start(). Delaying..."); 930 return; 931 } 932 } 933 stopAsync(); 934 while (!stopped.await(5, TimeUnit.SECONDS)) { 935 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown."); 936 } 937 } 938 939 public void delayedStop(final int waitTime, final String reason) { 940 if (waitTime > 0) { 941 try { 942 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { 943 public void run() { 944 try { 945 Thread.sleep(waitTime); 946 stopAsync(); 947 LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason); 948 } catch (InterruptedException e) { 949 } 950 } 951 }, "delayedStop:" + transport.getRemoteAddress()); 952 } catch (Throwable t) { 953 LOG.warn("cannot create stopAsync :", t); 954 } 955 } 956 } 957 958 public void stopAsync() { 959 // If we're in the middle of starting 960 // then go no further... for now. 961 if (stopping.compareAndSet(false, true)) { 962 // Let all the connection contexts know we are shutting down 963 // so that in progress operations can notice and unblock. 964 List<TransportConnectionState> connectionStates = listConnectionStates(); 965 for (TransportConnectionState cs : connectionStates) { 966 cs.getContext().getStopping().set(true); 967 } 968 try { 969 final Map context = MDCHelper.getCopyOfContextMap(); 970 DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable(){ 971 public void run() { 972 serviceLock.writeLock().lock(); 973 try { 974 MDCHelper.setContextMap(context); 975 doStop(); 976 } catch (Throwable e) { 977 LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() 978 + "': ", e); 979 } finally { 980 stopped.countDown(); 981 serviceLock.writeLock().unlock(); 982 } 983 } 984 }, "StopAsync:" + transport.getRemoteAddress()); 985 } catch (Throwable t) { 986 LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); 987 stopped.countDown(); 988 } 989 } 990 } 991 992 @Override 993 public String toString() { 994 return "Transport Connection to: " + transport.getRemoteAddress(); 995 } 996 997 protected void doStop() throws Exception, InterruptedException { 998 LOG.debug("Stopping connection: " + transport.getRemoteAddress()); 999 connector.onStopped(this); 1000 try { 1001 synchronized (this) { 1002 if (masterBroker != null) { 1003 masterBroker.stop(); 1004 } 1005 if (duplexBridge != null) { 1006 duplexBridge.stop(); 1007 } 1008 } 1009 } catch (Exception ignore) { 1010 LOG.trace("Exception caught stopping", ignore); 1011 } 1012 try { 1013 transport.stop(); 1014 LOG.debug("Stopped transport: " + transport.getRemoteAddress()); 1015 } catch (Exception e) { 1016 LOG.debug("Could not stop transport: " + e, e); 1017 } 1018 if (taskRunner != null) { 1019 taskRunner.shutdown(1); 1020 } 1021 active = false; 1022 // Run the MessageDispatch callbacks so that message references get 1023 // cleaned up. 1024 synchronized (dispatchQueue) { 1025 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();) { 1026 Command command = iter.next(); 1027 if (command.isMessageDispatch()) { 1028 MessageDispatch md = (MessageDispatch) command; 1029 Runnable sub = md.getTransmitCallback(); 1030 broker.postProcessDispatch(md); 1031 if (sub != null) { 1032 sub.run(); 1033 } 1034 } 1035 } 1036 dispatchQueue.clear(); 1037 } 1038 // 1039 // Remove all logical connection associated with this connection 1040 // from the broker. 1041 if (!broker.isStopped()) { 1042 List<TransportConnectionState> connectionStates = listConnectionStates(); 1043 connectionStates = listConnectionStates(); 1044 for (TransportConnectionState cs : connectionStates) { 1045 cs.getContext().getStopping().set(true); 1046 try { 1047 LOG.debug("Cleaning up connection resources: " + getRemoteAddress()); 1048 processRemoveConnection(cs.getInfo().getConnectionId(), 0l); 1049 } catch (Throwable ignore) { 1050 ignore.printStackTrace(); 1051 } 1052 } 1053 } 1054 LOG.debug("Connection Stopped: " + getRemoteAddress()); 1055 } 1056 1057 /** 1058 * @return Returns the blockedCandidate. 1059 */ 1060 public boolean isBlockedCandidate() { 1061 return blockedCandidate; 1062 } 1063 1064 /** 1065 * @param blockedCandidate 1066 * The blockedCandidate to set. 1067 */ 1068 public void setBlockedCandidate(boolean blockedCandidate) { 1069 this.blockedCandidate = blockedCandidate; 1070 } 1071 1072 /** 1073 * @return Returns the markedCandidate. 1074 */ 1075 public boolean isMarkedCandidate() { 1076 return markedCandidate; 1077 } 1078 1079 /** 1080 * @param markedCandidate 1081 * The markedCandidate to set. 1082 */ 1083 public void setMarkedCandidate(boolean markedCandidate) { 1084 this.markedCandidate = markedCandidate; 1085 if (!markedCandidate) { 1086 timeStamp = 0; 1087 blockedCandidate = false; 1088 } 1089 } 1090 1091 /** 1092 * @param slow 1093 * The slow to set. 1094 */ 1095 public void setSlow(boolean slow) { 1096 this.slow = slow; 1097 } 1098 1099 /** 1100 * @return true if the Connection is slow 1101 */ 1102 public boolean isSlow() { 1103 return slow; 1104 } 1105 1106 /** 1107 * @return true if the Connection is potentially blocked 1108 */ 1109 public boolean isMarkedBlockedCandidate() { 1110 return markedCandidate; 1111 } 1112 1113 /** 1114 * Mark the Connection, so we can deem if it's collectable on the next sweep 1115 */ 1116 public void doMark() { 1117 if (timeStamp == 0) { 1118 timeStamp = System.currentTimeMillis(); 1119 } 1120 } 1121 1122 /** 1123 * @return if after being marked, the Connection is still writing 1124 */ 1125 public boolean isBlocked() { 1126 return blocked; 1127 } 1128 1129 /** 1130 * @return true if the Connection is connected 1131 */ 1132 public boolean isConnected() { 1133 return connected; 1134 } 1135 1136 /** 1137 * @param blocked 1138 * The blocked to set. 1139 */ 1140 public void setBlocked(boolean blocked) { 1141 this.blocked = blocked; 1142 } 1143 1144 /** 1145 * @param connected 1146 * The connected to set. 1147 */ 1148 public void setConnected(boolean connected) { 1149 this.connected = connected; 1150 } 1151 1152 /** 1153 * @return true if the Connection is active 1154 */ 1155 public boolean isActive() { 1156 return active; 1157 } 1158 1159 /** 1160 * @param active 1161 * The active to set. 1162 */ 1163 public void setActive(boolean active) { 1164 this.active = active; 1165 } 1166 1167 /** 1168 * @return true if the Connection is starting 1169 */ 1170 public synchronized boolean isStarting() { 1171 return starting; 1172 } 1173 1174 public synchronized boolean isNetworkConnection() { 1175 return networkConnection; 1176 } 1177 1178 public boolean isFaultTolerantConnection() { 1179 return this.faultTolerantConnection; 1180 } 1181 1182 protected synchronized void setStarting(boolean starting) { 1183 this.starting = starting; 1184 } 1185 1186 /** 1187 * @return true if the Connection needs to stop 1188 */ 1189 public synchronized boolean isPendingStop() { 1190 return pendingStop; 1191 } 1192 1193 protected synchronized void setPendingStop(boolean pendingStop) { 1194 this.pendingStop = pendingStop; 1195 } 1196 1197 public Response processBrokerInfo(BrokerInfo info) { 1198 if (info.isSlaveBroker()) { 1199 BrokerService bService = connector.getBrokerService(); 1200 // Do we only support passive slaves - or does the slave want to be 1201 // passive ? 1202 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave(); 1203 if (passive == false) { 1204 1205 // stream messages from this broker (the master) to 1206 // the slave 1207 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class); 1208 masterBroker = new MasterBroker(parent, transport); 1209 masterBroker.startProcessing(); 1210 } 1211 LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached"); 1212 bService.slaveConnectionEstablished(); 1213 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1214 // so this TransportConnection is the rear end of a network bridge 1215 // We have been requested to create a two way pipe ... 1216 try { 1217 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1218 Map<String, String> props = createMap(properties); 1219 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1220 IntrospectionSupport.setProperties(config, props, ""); 1221 config.setBrokerName(broker.getBrokerName()); 1222 1223 // check for existing duplex connection hanging about 1224 1225 // We first look if existing network connection already exists for the same broker Id and network connector name 1226 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1227 // and the duplex network connector side wanting to open a new one 1228 // In this case, the old connection must be broken 1229 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1230 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1231 synchronized (connections) { 1232 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) { 1233 TransportConnection c = iter.next(); 1234 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1235 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ")."); 1236 c.stopAsync(); 1237 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1238 c.getStopped().await(1, TimeUnit.SECONDS); 1239 } 1240 } 1241 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1242 } 1243 URI uri = broker.getVmConnectorURI(); 1244 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri)); 1245 map.put("network", "true"); 1246 map.put("async", "false"); 1247 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map)); 1248 Transport localTransport = TransportFactory.connect(uri); 1249 Transport remoteBridgeTransport = new ResponseCorrelator(transport); 1250 String duplexName = localTransport.toString(); 1251 if (duplexName.contains("#")) { 1252 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1253 } 1254 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); 1255 listener.setCreatedByDuplex(true); 1256 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1257 duplexBridge.setBrokerService(broker.getBrokerService()); 1258 // now turn duplex off this side 1259 info.setDuplexConnection(false); 1260 duplexBridge.setCreatedByDuplex(true); 1261 duplexBridge.duplexStart(this, brokerInfo, info); 1262 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId); 1263 return null; 1264 } catch (TransportDisposedIOException e) { 1265 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started."); 1266 return null; 1267 } catch (Exception e) { 1268 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId , e); 1269 return null; 1270 } 1271 } 1272 // We only expect to get one broker info command per connection 1273 if (this.brokerInfo != null) { 1274 LOG.warn("Unexpected extra broker info command received: " + info); 1275 } 1276 this.brokerInfo = info; 1277 networkConnection = true; 1278 List<TransportConnectionState> connectionStates = listConnectionStates(); 1279 for (TransportConnectionState cs : connectionStates) { 1280 cs.getContext().setNetworkConnection(true); 1281 } 1282 return null; 1283 } 1284 1285 @SuppressWarnings("unchecked") 1286 private HashMap<String, String> createMap(Properties properties) { 1287 return new HashMap(properties); 1288 } 1289 1290 protected void dispatch(Command command) throws IOException { 1291 try { 1292 setMarkedCandidate(true); 1293 transport.oneway(command); 1294 } finally { 1295 setMarkedCandidate(false); 1296 } 1297 } 1298 1299 public String getRemoteAddress() { 1300 return transport.getRemoteAddress(); 1301 } 1302 1303 public String getConnectionId() { 1304 List<TransportConnectionState> connectionStates = listConnectionStates(); 1305 for (TransportConnectionState cs : connectionStates) { 1306 if (cs.getInfo().getClientId() != null) { 1307 return cs.getInfo().getClientId(); 1308 } 1309 return cs.getInfo().getConnectionId().toString(); 1310 } 1311 return null; 1312 } 1313 1314 public void updateClient(ConnectionControl control) { 1315 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1316 && this.wireFormatInfo.getVersion() >= 6) { 1317 dispatchAsync(control); 1318 } 1319 } 1320 1321 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1322 ProducerBrokerExchange result = producerExchanges.get(id); 1323 if (result == null) { 1324 synchronized (producerExchanges) { 1325 result = new ProducerBrokerExchange(); 1326 TransportConnectionState state = lookupConnectionState(id); 1327 context = state.getContext(); 1328 if (context.isReconnect()) { 1329 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); 1330 } 1331 result.setConnectionContext(context); 1332 SessionState ss = state.getSessionState(id.getParentId()); 1333 if (ss != null) { 1334 result.setProducerState(ss.getProducerState(id)); 1335 ProducerState producerState = ss.getProducerState(id); 1336 if (producerState != null && producerState.getInfo() != null) { 1337 ProducerInfo info = producerState.getInfo(); 1338 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1339 } 1340 } 1341 producerExchanges.put(id, result); 1342 } 1343 } else { 1344 context = result.getConnectionContext(); 1345 } 1346 return result; 1347 } 1348 1349 private void removeProducerBrokerExchange(ProducerId id) { 1350 synchronized (producerExchanges) { 1351 producerExchanges.remove(id); 1352 } 1353 } 1354 1355 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1356 ConsumerBrokerExchange result = consumerExchanges.get(id); 1357 if (result == null) { 1358 synchronized (consumerExchanges) { 1359 result = new ConsumerBrokerExchange(); 1360 TransportConnectionState state = lookupConnectionState(id); 1361 context = state.getContext(); 1362 result.setConnectionContext(context); 1363 SessionState ss = state.getSessionState(id.getParentId()); 1364 if (ss != null) { 1365 ConsumerState cs = ss.getConsumerState(id); 1366 if (cs != null) { 1367 ConsumerInfo info = cs.getInfo(); 1368 if (info != null) { 1369 if (info.getDestination() != null && info.getDestination().isPattern()) { 1370 result.setWildcard(true); 1371 } 1372 } 1373 } 1374 } 1375 consumerExchanges.put(id, result); 1376 } 1377 } 1378 return result; 1379 } 1380 1381 private void removeConsumerBrokerExchange(ConsumerId id) { 1382 synchronized (consumerExchanges) { 1383 consumerExchanges.remove(id); 1384 } 1385 } 1386 1387 public int getProtocolVersion() { 1388 return protocolVersion.get(); 1389 } 1390 1391 public Response processControlCommand(ControlCommand command) throws Exception { 1392 String control = command.getCommand(); 1393 if (control != null && control.equals("shutdown")) { 1394 System.exit(0); 1395 } 1396 return null; 1397 } 1398 1399 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1400 return null; 1401 } 1402 1403 public Response processConnectionControl(ConnectionControl control) throws Exception { 1404 if (control != null) { 1405 faultTolerantConnection = control.isFaultTolerant(); 1406 } 1407 return null; 1408 } 1409 1410 public Response processConnectionError(ConnectionError error) throws Exception { 1411 return null; 1412 } 1413 1414 public Response processConsumerControl(ConsumerControl control) throws Exception { 1415 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1416 broker.processConsumerControl(consumerExchange, control); 1417 return null; 1418 } 1419 1420 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1421 TransportConnectionState state) { 1422 TransportConnectionState cs = null; 1423 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1424 // swap implementations 1425 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1426 newRegister.intialize(connectionStateRegister); 1427 connectionStateRegister = newRegister; 1428 } 1429 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1430 return cs; 1431 } 1432 1433 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1434 return connectionStateRegister.unregisterConnectionState(connectionId); 1435 } 1436 1437 protected synchronized List<TransportConnectionState> listConnectionStates() { 1438 return connectionStateRegister.listConnectionStates(); 1439 } 1440 1441 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1442 return connectionStateRegister.lookupConnectionState(connectionId); 1443 } 1444 1445 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1446 return connectionStateRegister.lookupConnectionState(id); 1447 } 1448 1449 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1450 return connectionStateRegister.lookupConnectionState(id); 1451 } 1452 1453 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1454 return connectionStateRegister.lookupConnectionState(id); 1455 } 1456 1457 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1458 return connectionStateRegister.lookupConnectionState(connectionId); 1459 } 1460 1461 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1462 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1463 } 1464 1465 protected synchronized String getDuplexNetworkConnectorId() { 1466 return this.duplexNetworkConnectorId; 1467 } 1468 1469 protected CountDownLatch getStopped() { 1470 return stopped; 1471 } 1472 }