001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.failover; 019 020 import java.io.BufferedReader; 021 import java.io.FileNotFoundException; 022 import java.io.FileReader; 023 import java.io.IOException; 024 import java.io.InputStreamReader; 025 import java.io.InterruptedIOException; 026 import java.net.InetAddress; 027 import java.net.MalformedURLException; 028 import java.net.URI; 029 import java.net.URL; 030 import java.util.ArrayList; 031 import java.util.HashSet; 032 import java.util.Iterator; 033 import java.util.LinkedHashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Set; 037 import java.util.StringTokenizer; 038 import java.util.concurrent.CopyOnWriteArrayList; 039 import java.util.concurrent.atomic.AtomicReference; 040 import org.apache.activemq.broker.SslContext; 041 import org.apache.activemq.command.Command; 042 import org.apache.activemq.command.ConnectionControl; 043 import org.apache.activemq.command.ConnectionId; 044 import org.apache.activemq.command.RemoveInfo; 045 import org.apache.activemq.command.Response; 046 import org.apache.activemq.state.ConnectionStateTracker; 047 import org.apache.activemq.state.Tracked; 048 import org.apache.activemq.thread.DefaultThreadPools; 049 import org.apache.activemq.thread.Task; 050 import org.apache.activemq.thread.TaskRunner; 051 import org.apache.activemq.transport.CompositeTransport; 052 import org.apache.activemq.transport.DefaultTransportListener; 053 import org.apache.activemq.transport.FutureResponse; 054 import org.apache.activemq.transport.ResponseCallback; 055 import org.apache.activemq.transport.Transport; 056 import org.apache.activemq.transport.TransportFactory; 057 import org.apache.activemq.transport.TransportListener; 058 import org.apache.activemq.util.IOExceptionSupport; 059 import org.apache.activemq.util.ServiceSupport; 060 import org.slf4j.Logger; 061 import org.slf4j.LoggerFactory; 062 063 064 /** 065 * A Transport that is made reliable by being able to fail over to another 066 * transport when a transport failure is detected. 067 * 068 * 069 */ 070 public class FailoverTransport implements CompositeTransport { 071 072 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 073 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 074 private TransportListener transportListener; 075 private boolean disposed; 076 private boolean connected; 077 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 078 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 079 080 private final Object reconnectMutex = new Object(); 081 private final Object backupMutex = new Object(); 082 private final Object sleepMutex = new Object(); 083 private final Object listenerMutex = new Object(); 084 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 085 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 086 087 private URI connectedTransportURI; 088 private URI failedConnectTransportURI; 089 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 090 private final TaskRunner reconnectTask; 091 private boolean started; 092 private boolean initialized; 093 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 094 private long maxReconnectDelay = 1000 * 30; 095 private double backOffMultiplier = 2d; 096 private long timeout = -1; 097 private boolean useExponentialBackOff = true; 098 private boolean randomize = true; 099 private int maxReconnectAttempts; 100 private int startupMaxReconnectAttempts; 101 private int connectFailures; 102 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 103 private Exception connectionFailure; 104 private boolean firstConnection = true; 105 // optionally always have a backup created 106 private boolean backup = false; 107 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 108 private int backupPoolSize = 1; 109 private boolean trackMessages = false; 110 private boolean trackTransactionProducers = true; 111 private int maxCacheSize = 128 * 1024; 112 private final TransportListener disposedListener = new DefaultTransportListener() { 113 }; 114 //private boolean connectionInterruptProcessingComplete; 115 116 private final TransportListener myTransportListener = createTransportListener(); 117 private boolean updateURIsSupported=true; 118 private boolean reconnectSupported=true; 119 // remember for reconnect thread 120 private SslContext brokerSslContext; 121 private String updateURIsURL = null; 122 private boolean rebalanceUpdateURIs=true; 123 private boolean doRebalance = false; 124 125 public FailoverTransport() throws InterruptedIOException { 126 brokerSslContext = SslContext.getCurrentSslContext(); 127 stateTracker.setTrackTransactions(true); 128 // Setup a task that is used to reconnect the a connection async. 129 reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { 130 public boolean iterate() { 131 boolean result = false; 132 boolean buildBackup = true; 133 boolean doReconnect = !disposed; 134 synchronized (backupMutex) { 135 if ((connectedTransport.get() == null || doRebalance) && !disposed) { 136 result = doReconnect(); 137 buildBackup = false; 138 } 139 } 140 if (buildBackup) { 141 buildBackups(); 142 } else { 143 // build backups on the next iteration 144 buildBackup = true; 145 try { 146 reconnectTask.wakeup(); 147 } catch (InterruptedException e) { 148 LOG.debug("Reconnect task has been interrupted.", e); 149 } 150 } 151 return result; 152 } 153 154 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 155 } 156 157 TransportListener createTransportListener() { 158 return new TransportListener() { 159 public void onCommand(Object o) { 160 Command command = (Command) o; 161 if (command == null) { 162 return; 163 } 164 if (command.isResponse()) { 165 Object object = null; 166 synchronized (requestMap) { 167 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 168 } 169 if (object != null && object.getClass() == Tracked.class) { 170 ((Tracked) object).onResponses(command); 171 } 172 } 173 if (!initialized) { 174 initialized = true; 175 } 176 177 if(command.isConnectionControl()) { 178 handleConnectionControl((ConnectionControl) command); 179 } 180 if (transportListener != null) { 181 transportListener.onCommand(command); 182 } 183 } 184 185 public void onException(IOException error) { 186 try { 187 handleTransportFailure(error); 188 } catch (InterruptedException e) { 189 Thread.currentThread().interrupt(); 190 transportListener.onException(new InterruptedIOException()); 191 } 192 } 193 194 public void transportInterupted() { 195 if (transportListener != null) { 196 transportListener.transportInterupted(); 197 } 198 } 199 200 public void transportResumed() { 201 if (transportListener != null) { 202 transportListener.transportResumed(); 203 } 204 } 205 }; 206 } 207 208 public final void disposeTransport(Transport transport) { 209 transport.setTransportListener(disposedListener); 210 ServiceSupport.dispose(transport); 211 } 212 213 public final void handleTransportFailure(IOException e) throws InterruptedException { 214 if (LOG.isTraceEnabled()) { 215 LOG.trace(this + " handleTransportFailure: " + e); 216 } 217 Transport transport = connectedTransport.getAndSet(null); 218 if (transport == null) { 219 // sync with possible in progress reconnect 220 synchronized (reconnectMutex) { 221 transport = connectedTransport.getAndSet(null); 222 } 223 } 224 if (transport != null) { 225 226 disposeTransport(transport); 227 228 boolean reconnectOk = false; 229 synchronized (reconnectMutex) { 230 if (started) { 231 LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI 232 + " , attempting to automatically reconnect due to: " + e); 233 LOG.debug("Transport failed with the following exception:", e); 234 reconnectOk = true; 235 } 236 initialized = false; 237 failedConnectTransportURI = connectedTransportURI; 238 connectedTransportURI = null; 239 connected = false; 240 241 // notify before any reconnect attempt so ack state can be 242 // whacked 243 if (transportListener != null) { 244 transportListener.transportInterupted(); 245 } 246 247 if (reconnectOk) { 248 reconnectTask.wakeup(); 249 } 250 } 251 } 252 } 253 254 public final void handleConnectionControl(ConnectionControl control) { 255 String reconnectStr = control.getReconnectTo(); 256 if (reconnectStr != null) { 257 reconnectStr = reconnectStr.trim(); 258 if (reconnectStr.length() > 0) { 259 try { 260 URI uri = new URI(reconnectStr); 261 if (isReconnectSupported()) { 262 reconnect(uri); 263 LOG.info("Reconnected to: " + uri); 264 } 265 } catch (Exception e) { 266 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 267 } 268 } 269 } 270 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 271 } 272 273 private final void processNewTransports(boolean rebalance, String newTransports) { 274 if (newTransports != null) { 275 newTransports = newTransports.trim(); 276 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 277 List<URI> list = new ArrayList<URI>(); 278 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 279 while (tokenizer.hasMoreTokens()) { 280 String str = tokenizer.nextToken(); 281 try { 282 URI uri = new URI(str); 283 list.add(uri); 284 } catch (Exception e) { 285 LOG.error("Failed to parse broker address: " + str, e); 286 } 287 } 288 if (list.isEmpty() == false) { 289 try { 290 updateURIs(rebalance, list.toArray(new URI[list.size()])); 291 } catch (IOException e) { 292 LOG.error("Failed to update transport URI's from: " + newTransports, e); 293 } 294 } 295 296 } 297 } 298 } 299 300 public void start() throws Exception { 301 synchronized (reconnectMutex) { 302 LOG.debug("Started."); 303 if (started) { 304 return; 305 } 306 started = true; 307 stateTracker.setMaxCacheSize(getMaxCacheSize()); 308 stateTracker.setTrackMessages(isTrackMessages()); 309 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 310 if (connectedTransport.get() != null) { 311 stateTracker.restore(connectedTransport.get()); 312 } else { 313 reconnect(false); 314 } 315 } 316 } 317 318 public void stop() throws Exception { 319 Transport transportToStop = null; 320 synchronized (reconnectMutex) { 321 LOG.debug("Stopped."); 322 if (!started) { 323 return; 324 } 325 started = false; 326 disposed = true; 327 connected = false; 328 for (BackupTransport t : backups) { 329 t.setDisposed(true); 330 } 331 backups.clear(); 332 333 if (connectedTransport.get() != null) { 334 transportToStop = connectedTransport.getAndSet(null); 335 } 336 reconnectMutex.notifyAll(); 337 } 338 synchronized (sleepMutex) { 339 sleepMutex.notifyAll(); 340 } 341 reconnectTask.shutdown(); 342 if (transportToStop != null) { 343 transportToStop.stop(); 344 } 345 } 346 347 public long getInitialReconnectDelay() { 348 return initialReconnectDelay; 349 } 350 351 public void setInitialReconnectDelay(long initialReconnectDelay) { 352 this.initialReconnectDelay = initialReconnectDelay; 353 } 354 355 public long getMaxReconnectDelay() { 356 return maxReconnectDelay; 357 } 358 359 public void setMaxReconnectDelay(long maxReconnectDelay) { 360 this.maxReconnectDelay = maxReconnectDelay; 361 } 362 363 public long getReconnectDelay() { 364 return reconnectDelay; 365 } 366 367 public void setReconnectDelay(long reconnectDelay) { 368 this.reconnectDelay = reconnectDelay; 369 } 370 371 public double getReconnectDelayExponent() { 372 return backOffMultiplier; 373 } 374 375 public void setReconnectDelayExponent(double reconnectDelayExponent) { 376 this.backOffMultiplier = reconnectDelayExponent; 377 } 378 379 public Transport getConnectedTransport() { 380 return connectedTransport.get(); 381 } 382 383 public URI getConnectedTransportURI() { 384 return connectedTransportURI; 385 } 386 387 public int getMaxReconnectAttempts() { 388 return maxReconnectAttempts; 389 } 390 391 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 392 this.maxReconnectAttempts = maxReconnectAttempts; 393 } 394 395 public int getStartupMaxReconnectAttempts() { 396 return this.startupMaxReconnectAttempts; 397 } 398 399 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 400 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 401 } 402 403 public long getTimeout() { 404 return timeout; 405 } 406 407 public void setTimeout(long timeout) { 408 this.timeout = timeout; 409 } 410 411 /** 412 * @return Returns the randomize. 413 */ 414 public boolean isRandomize() { 415 return randomize; 416 } 417 418 /** 419 * @param randomize 420 * The randomize to set. 421 */ 422 public void setRandomize(boolean randomize) { 423 this.randomize = randomize; 424 } 425 426 public boolean isBackup() { 427 return backup; 428 } 429 430 public void setBackup(boolean backup) { 431 this.backup = backup; 432 } 433 434 public int getBackupPoolSize() { 435 return backupPoolSize; 436 } 437 438 public void setBackupPoolSize(int backupPoolSize) { 439 this.backupPoolSize = backupPoolSize; 440 } 441 442 public boolean isTrackMessages() { 443 return trackMessages; 444 } 445 446 public void setTrackMessages(boolean trackMessages) { 447 this.trackMessages = trackMessages; 448 } 449 450 public boolean isTrackTransactionProducers() { 451 return this.trackTransactionProducers; 452 } 453 454 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 455 this.trackTransactionProducers = trackTransactionProducers; 456 } 457 458 public int getMaxCacheSize() { 459 return maxCacheSize; 460 } 461 462 public void setMaxCacheSize(int maxCacheSize) { 463 this.maxCacheSize = maxCacheSize; 464 } 465 466 /** 467 * @return Returns true if the command is one sent when a connection is 468 * being closed. 469 */ 470 private boolean isShutdownCommand(Command command) { 471 return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo)); 472 } 473 474 public void oneway(Object o) throws IOException { 475 476 Command command = (Command) o; 477 Exception error = null; 478 try { 479 480 synchronized (reconnectMutex) { 481 482 if (isShutdownCommand(command) && connectedTransport.get() == null) { 483 if (command.isShutdownInfo()) { 484 // Skipping send of ShutdownInfo command when not 485 // connected. 486 return; 487 } 488 if (command instanceof RemoveInfo || command.isMessageAck()) { 489 // Simulate response to RemoveInfo command or ack (as it 490 // will be stale) 491 stateTracker.track(command); 492 Response response = new Response(); 493 response.setCorrelationId(command.getCommandId()); 494 myTransportListener.onCommand(response); 495 return; 496 } 497 } 498 // Keep trying until the message is sent. 499 for (int i = 0; !disposed; i++) { 500 try { 501 502 // Wait for transport to be connected. 503 Transport transport = connectedTransport.get(); 504 long start = System.currentTimeMillis(); 505 boolean timedout = false; 506 while (transport == null && !disposed && connectionFailure == null 507 && !Thread.currentThread().isInterrupted()) { 508 LOG.trace("Waiting for transport to reconnect..: " + command); 509 long end = System.currentTimeMillis(); 510 if (timeout > 0 && (end - start > timeout)) { 511 timedout = true; 512 LOG.info("Failover timed out after " + (end - start) + "ms"); 513 break; 514 } 515 try { 516 reconnectMutex.wait(100); 517 } catch (InterruptedException e) { 518 Thread.currentThread().interrupt(); 519 LOG.debug("Interupted: " + e, e); 520 } 521 transport = connectedTransport.get(); 522 } 523 524 if (transport == null) { 525 // Previous loop may have exited due to use being 526 // disposed. 527 if (disposed) { 528 error = new IOException("Transport disposed."); 529 } else if (connectionFailure != null) { 530 error = connectionFailure; 531 } else if (timedout == true) { 532 error = new IOException("Failover timeout of " + timeout + " ms reached."); 533 } else { 534 error = new IOException("Unexpected failure."); 535 } 536 break; 537 } 538 539 // If it was a request and it was not being tracked by 540 // the state tracker, 541 // then hold it in the requestMap so that we can replay 542 // it later. 543 Tracked tracked = stateTracker.track(command); 544 synchronized (requestMap) { 545 if (tracked != null && tracked.isWaitingForResponse()) { 546 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 547 } else if (tracked == null && command.isResponseRequired()) { 548 requestMap.put(Integer.valueOf(command.getCommandId()), command); 549 } 550 } 551 552 // Send the message. 553 try { 554 transport.oneway(command); 555 stateTracker.trackBack(command); 556 } catch (IOException e) { 557 558 // If the command was not tracked.. we will retry in 559 // this method 560 if (tracked == null) { 561 562 // since we will retry in this method.. take it 563 // out of the request 564 // map so that it is not sent 2 times on 565 // recovery 566 if (command.isResponseRequired()) { 567 requestMap.remove(Integer.valueOf(command.getCommandId())); 568 } 569 570 // Rethrow the exception so it will handled by 571 // the outer catch 572 throw e; 573 } 574 575 } 576 577 return; 578 579 } catch (IOException e) { 580 if (LOG.isDebugEnabled()) { 581 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 582 } 583 handleTransportFailure(e); 584 } 585 } 586 } 587 } catch (InterruptedException e) { 588 // Some one may be trying to stop our thread. 589 Thread.currentThread().interrupt(); 590 throw new InterruptedIOException(); 591 } 592 if (!disposed) { 593 if (error != null) { 594 if (error instanceof IOException) { 595 throw (IOException) error; 596 } 597 throw IOExceptionSupport.create(error); 598 } 599 } 600 } 601 602 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 603 throw new AssertionError("Unsupported Method"); 604 } 605 606 public Object request(Object command) throws IOException { 607 throw new AssertionError("Unsupported Method"); 608 } 609 610 public Object request(Object command, int timeout) throws IOException { 611 throw new AssertionError("Unsupported Method"); 612 } 613 614 public void add(boolean rebalance, URI u[]) { 615 boolean newURI = false; 616 for (int i = 0; i < u.length; i++) { 617 if (!contains(u[i])) { 618 uris.add(u[i]); 619 newURI = true; 620 } 621 } 622 if (newURI) { 623 reconnect(rebalance); 624 } 625 } 626 627 public void remove(boolean rebalance, URI u[]) { 628 for (int i = 0; i < u.length; i++) { 629 uris.remove(u[i]); 630 } 631 // rebalance is automatic if any connected to removed/stopped broker 632 } 633 634 public void add(boolean rebalance, String u) { 635 try { 636 URI newURI = new URI(u); 637 if (contains(newURI)==false) { 638 uris.add(newURI); 639 reconnect(rebalance); 640 } 641 642 } catch (Exception e) { 643 LOG.error("Failed to parse URI: " + u); 644 } 645 } 646 647 public void reconnect(boolean rebalance) { 648 synchronized (reconnectMutex) { 649 if (started) { 650 if (rebalance) { 651 doRebalance = true; 652 } 653 LOG.debug("Waking up reconnect task"); 654 try { 655 reconnectTask.wakeup(); 656 } catch (InterruptedException e) { 657 Thread.currentThread().interrupt(); 658 } 659 } else { 660 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 661 } 662 } 663 } 664 665 private List<URI> getConnectList() { 666 ArrayList<URI> l = new ArrayList<URI>(uris); 667 boolean removed = false; 668 if (failedConnectTransportURI != null) { 669 removed = l.remove(failedConnectTransportURI); 670 } 671 if (randomize) { 672 // Randomly, reorder the list by random swapping 673 for (int i = 0; i < l.size(); i++) { 674 int p = (int) (Math.random() * 100 % l.size()); 675 URI t = l.get(p); 676 l.set(p, l.get(i)); 677 l.set(i, t); 678 } 679 } 680 if (removed) { 681 l.add(failedConnectTransportURI); 682 } 683 LOG.debug("urlList connectionList:" + l + ", from: " + uris); 684 return l; 685 } 686 687 public TransportListener getTransportListener() { 688 return transportListener; 689 } 690 691 public void setTransportListener(TransportListener commandListener) { 692 synchronized (listenerMutex) { 693 this.transportListener = commandListener; 694 listenerMutex.notifyAll(); 695 } 696 } 697 698 public <T> T narrow(Class<T> target) { 699 700 if (target.isAssignableFrom(getClass())) { 701 return target.cast(this); 702 } 703 Transport transport = connectedTransport.get(); 704 if (transport != null) { 705 return transport.narrow(target); 706 } 707 return null; 708 709 } 710 711 protected void restoreTransport(Transport t) throws Exception, IOException { 712 t.start(); 713 // send information to the broker - informing it we are an ft client 714 ConnectionControl cc = new ConnectionControl(); 715 cc.setFaultTolerant(true); 716 t.oneway(cc); 717 stateTracker.restore(t); 718 Map tmpMap = null; 719 synchronized (requestMap) { 720 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 721 } 722 for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) { 723 Command command = iter2.next(); 724 if (LOG.isTraceEnabled()) { 725 LOG.trace("restore requestMap, replay: " + command); 726 } 727 t.oneway(command); 728 } 729 } 730 731 public boolean isUseExponentialBackOff() { 732 return useExponentialBackOff; 733 } 734 735 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 736 this.useExponentialBackOff = useExponentialBackOff; 737 } 738 739 @Override 740 public String toString() { 741 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 742 } 743 744 public String getRemoteAddress() { 745 Transport transport = connectedTransport.get(); 746 if (transport != null) { 747 return transport.getRemoteAddress(); 748 } 749 return null; 750 } 751 752 public boolean isFaultTolerant() { 753 return true; 754 } 755 756 final boolean doReconnect() { 757 Exception failure = null; 758 synchronized (reconnectMutex) { 759 760 // If updateURIsURL is specified, read the file and add any new 761 // transport URI's to this FailOverTransport. 762 // Note: Could track file timestamp to avoid unnecessary reading. 763 String fileURL = getUpdateURIsURL(); 764 if (fileURL != null) { 765 BufferedReader in = null; 766 String newUris = null; 767 StringBuffer buffer = new StringBuffer(); 768 769 try { 770 in = new BufferedReader(getURLStream(fileURL)); 771 while (true) { 772 String line = in.readLine(); 773 if (line == null) { 774 break; 775 } 776 buffer.append(line); 777 } 778 newUris = buffer.toString(); 779 } catch (IOException ioe) { 780 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); 781 } finally { 782 if (in != null) { 783 try { 784 in.close(); 785 } catch (IOException ioe) { 786 // ignore 787 } 788 } 789 } 790 791 processNewTransports(isRebalanceUpdateURIs(), newUris); 792 } 793 794 if (disposed || connectionFailure != null) { 795 reconnectMutex.notifyAll(); 796 } 797 798 if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) { 799 return false; 800 } else { 801 List<URI> connectList = getConnectList(); 802 if (connectList.isEmpty()) { 803 failure = new IOException("No uris available to connect to."); 804 } else { 805 if (doRebalance) { 806 if (connectList.get(0).equals(connectedTransportURI)) { 807 // already connected to first in the list, no need to rebalance 808 doRebalance = false; 809 return false; 810 } else { 811 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); 812 try { 813 Transport transport = this.connectedTransport.getAndSet(null); 814 if (transport != null) { 815 disposeTransport(transport); 816 } 817 } catch (Exception e) { 818 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 819 } 820 } 821 doRebalance = false; 822 } 823 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 824 reconnectDelay = initialReconnectDelay; 825 } 826 synchronized (backupMutex) { 827 if (backup && !backups.isEmpty()) { 828 BackupTransport bt = backups.remove(0); 829 Transport t = bt.getTransport(); 830 URI uri = bt.getUri(); 831 t.setTransportListener(myTransportListener); 832 try { 833 if (started) { 834 restoreTransport(t); 835 } 836 reconnectDelay = initialReconnectDelay; 837 failedConnectTransportURI = null; 838 connectedTransportURI = uri; 839 connectedTransport.set(t); 840 reconnectMutex.notifyAll(); 841 connectFailures = 0; 842 LOG.info("Successfully reconnected to backup " + uri); 843 return false; 844 } catch (Exception e) { 845 LOG.debug("Backup transport failed", e); 846 } 847 } 848 } 849 850 Iterator<URI> iter = connectList.iterator(); 851 while (iter.hasNext() && connectedTransport.get() == null && !disposed) { 852 URI uri = iter.next(); 853 Transport t = null; 854 try { 855 LOG.debug("Attempting connect to: " + uri); 856 SslContext.setCurrentSslContext(brokerSslContext); 857 t = TransportFactory.compositeConnect(uri); 858 t.setTransportListener(myTransportListener); 859 t.start(); 860 861 if (started) { 862 restoreTransport(t); 863 } 864 865 LOG.debug("Connection established"); 866 reconnectDelay = initialReconnectDelay; 867 connectedTransportURI = uri; 868 connectedTransport.set(t); 869 reconnectMutex.notifyAll(); 870 connectFailures = 0; 871 // Make sure on initial startup, that the 872 // transportListener 873 // has been initialized for this instance. 874 synchronized (listenerMutex) { 875 if (transportListener == null) { 876 try { 877 // if it isn't set after 2secs - it 878 // probably never will be 879 listenerMutex.wait(2000); 880 } catch (InterruptedException ex) { 881 } 882 } 883 } 884 if (transportListener != null) { 885 transportListener.transportResumed(); 886 } else { 887 LOG.debug("transport resumed by transport listener not set"); 888 } 889 if (firstConnection) { 890 firstConnection = false; 891 LOG.info("Successfully connected to " + uri); 892 } else { 893 LOG.info("Successfully reconnected to " + uri); 894 } 895 connected = true; 896 return false; 897 } catch (Exception e) { 898 failure = e; 899 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 900 if (t != null) { 901 try { 902 t.stop(); 903 } catch (Exception ee) { 904 LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee); 905 } 906 } 907 } finally { 908 SslContext.setCurrentSslContext(null); 909 } 910 } 911 } 912 } 913 int reconnectAttempts = 0; 914 if (firstConnection) { 915 if (this.startupMaxReconnectAttempts != 0) { 916 reconnectAttempts = this.startupMaxReconnectAttempts; 917 } 918 } 919 if (reconnectAttempts == 0) { 920 reconnectAttempts = this.maxReconnectAttempts; 921 } 922 if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) { 923 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); 924 connectionFailure = failure; 925 926 // Make sure on initial startup, that the transportListener has 927 // been initialized 928 // for this instance. 929 synchronized (listenerMutex) { 930 if (transportListener == null) { 931 try { 932 listenerMutex.wait(2000); 933 } catch (InterruptedException ex) { 934 } 935 } 936 } 937 938 if (transportListener != null) { 939 if (connectionFailure instanceof IOException) { 940 transportListener.onException((IOException) connectionFailure); 941 } else { 942 transportListener.onException(IOExceptionSupport.create(connectionFailure)); 943 } 944 } 945 reconnectMutex.notifyAll(); 946 return false; 947 } 948 } 949 if (!disposed) { 950 951 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 952 synchronized (sleepMutex) { 953 try { 954 sleepMutex.wait(reconnectDelay); 955 } catch (InterruptedException e) { 956 Thread.currentThread().interrupt(); 957 } 958 } 959 960 if (useExponentialBackOff) { 961 // Exponential increment of reconnect delay. 962 reconnectDelay *= backOffMultiplier; 963 if (reconnectDelay > maxReconnectDelay) { 964 reconnectDelay = maxReconnectDelay; 965 } 966 } 967 } 968 return !disposed; 969 } 970 971 final boolean buildBackups() { 972 synchronized (backupMutex) { 973 if (!disposed && backup && backups.size() < backupPoolSize) { 974 List<URI> connectList = getConnectList(); 975 // removed disposed backups 976 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 977 for (BackupTransport bt : backups) { 978 if (bt.isDisposed()) { 979 disposedList.add(bt); 980 } 981 } 982 backups.removeAll(disposedList); 983 disposedList.clear(); 984 for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) { 985 URI uri = iter.next(); 986 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 987 try { 988 SslContext.setCurrentSslContext(brokerSslContext); 989 BackupTransport bt = new BackupTransport(this); 990 bt.setUri(uri); 991 if (!backups.contains(bt)) { 992 Transport t = TransportFactory.compositeConnect(uri); 993 t.setTransportListener(bt); 994 t.start(); 995 bt.setTransport(t); 996 backups.add(bt); 997 } 998 } catch (Exception e) { 999 LOG.debug("Failed to build backup ", e); 1000 } finally { 1001 SslContext.setCurrentSslContext(null); 1002 } 1003 } 1004 } 1005 } 1006 } 1007 return false; 1008 } 1009 1010 public boolean isDisposed() { 1011 return disposed; 1012 } 1013 1014 public boolean isConnected() { 1015 return connected; 1016 } 1017 1018 public void reconnect(URI uri) throws IOException { 1019 add(true, new URI[] { uri }); 1020 } 1021 1022 public boolean isReconnectSupported() { 1023 return this.reconnectSupported; 1024 } 1025 1026 public void setReconnectSupported(boolean value) { 1027 this.reconnectSupported=value; 1028 } 1029 1030 public boolean isUpdateURIsSupported() { 1031 return this.updateURIsSupported; 1032 } 1033 1034 public void setUpdateURIsSupported(boolean value) { 1035 this.updateURIsSupported=value; 1036 } 1037 1038 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1039 if (isUpdateURIsSupported()) { 1040 List<URI> copy = new ArrayList<URI>(this.updated); 1041 List<URI> add = new ArrayList<URI>(); 1042 if (updatedURIs != null && updatedURIs.length > 0) { 1043 Set<URI> set = new HashSet<URI>(); 1044 for (int i = 0; i < updatedURIs.length; i++) { 1045 URI uri = updatedURIs[i]; 1046 if (uri != null) { 1047 set.add(uri); 1048 } 1049 } 1050 for (URI uri : set) { 1051 if (copy.remove(uri) == false) { 1052 add.add(uri); 1053 } 1054 } 1055 synchronized (reconnectMutex) { 1056 this.updated.clear(); 1057 this.updated.addAll(add); 1058 for (URI uri : copy) { 1059 this.uris.remove(uri); 1060 } 1061 add(rebalance, add.toArray(new URI[add.size()])); 1062 } 1063 } 1064 } 1065 } 1066 1067 /** 1068 * @return the updateURIsURL 1069 */ 1070 public String getUpdateURIsURL() { 1071 return this.updateURIsURL; 1072 } 1073 1074 /** 1075 * @param updateURIsURL the updateURIsURL to set 1076 */ 1077 public void setUpdateURIsURL(String updateURIsURL) { 1078 this.updateURIsURL = updateURIsURL; 1079 } 1080 1081 /** 1082 * @return the rebalanceUpdateURIs 1083 */ 1084 public boolean isRebalanceUpdateURIs() { 1085 return this.rebalanceUpdateURIs; 1086 } 1087 1088 /** 1089 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1090 */ 1091 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1092 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1093 } 1094 1095 public int getReceiveCounter() { 1096 Transport transport = connectedTransport.get(); 1097 if (transport == null) { 1098 return 0; 1099 } 1100 return transport.getReceiveCounter(); 1101 } 1102 1103 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1104 synchronized (reconnectMutex) { 1105 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1106 } 1107 } 1108 1109 public ConnectionStateTracker getStateTracker() { 1110 return stateTracker; 1111 } 1112 1113 private boolean contains(URI newURI) { 1114 1115 boolean result = false; 1116 try { 1117 for (URI uri:uris) { 1118 if (newURI.getPort()==uri.getPort()) { 1119 InetAddress newAddr = InetAddress.getByName(newURI.getHost()); 1120 InetAddress addr = InetAddress.getByName(uri.getHost()); 1121 if (addr.equals(newAddr)) { 1122 result = true; 1123 break; 1124 } 1125 } 1126 } 1127 }catch(IOException e) { 1128 result = true; 1129 LOG.error("Failed to verify URI " + newURI + " already known: " + e); 1130 } 1131 return result; 1132 } 1133 1134 private InputStreamReader getURLStream(String path) throws IOException { 1135 InputStreamReader result = null; 1136 URL url = null; 1137 try { 1138 url = new URL(path); 1139 result = new InputStreamReader(url.openStream()); 1140 } catch (MalformedURLException e) { 1141 // ignore - it could be a path to a a local file 1142 } 1143 if (result == null) { 1144 result = new FileReader(path); 1145 } 1146 return result; 1147 } 1148 }