001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.EOFException; 024 import java.io.File; 025 import java.io.IOException; 026 import java.io.InputStream; 027 import java.io.ObjectInputStream; 028 import java.io.ObjectOutputStream; 029 import java.io.OutputStream; 030 import java.util.*; 031 import java.util.Map.Entry; 032 import java.util.concurrent.atomic.AtomicBoolean; 033 import java.util.concurrent.atomic.AtomicLong; 034 import java.util.concurrent.locks.ReentrantReadWriteLock; 035 036 import org.apache.activemq.ActiveMQMessageAuditNoSync; 037 import org.apache.activemq.broker.BrokerService; 038 import org.apache.activemq.broker.BrokerServiceAware; 039 import org.apache.activemq.command.ConnectionId; 040 import org.apache.activemq.command.LocalTransactionId; 041 import org.apache.activemq.command.MessageId; 042 import org.apache.activemq.command.SubscriptionInfo; 043 import org.apache.activemq.command.TransactionId; 044 import org.apache.activemq.command.XATransactionId; 045 import org.apache.activemq.protobuf.Buffer; 046 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 047 import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 048 import org.apache.activemq.store.kahadb.data.KahaDestination; 049 import org.apache.activemq.store.kahadb.data.KahaEntryType; 050 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 051 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 052 import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 053 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 054 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 055 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 056 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 057 import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 058 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 059 import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 060 import org.apache.activemq.util.Callback; 061 import org.apache.activemq.util.IOHelper; 062 import org.apache.activemq.util.ServiceStopper; 063 import org.apache.activemq.util.ServiceSupport; 064 import org.slf4j.Logger; 065 import org.slf4j.LoggerFactory; 066 import org.apache.kahadb.index.BTreeIndex; 067 import org.apache.kahadb.index.BTreeVisitor; 068 import org.apache.kahadb.journal.DataFile; 069 import org.apache.kahadb.journal.Journal; 070 import org.apache.kahadb.journal.Location; 071 import org.apache.kahadb.page.Page; 072 import org.apache.kahadb.page.PageFile; 073 import org.apache.kahadb.page.Transaction; 074 import org.apache.kahadb.util.ByteSequence; 075 import org.apache.kahadb.util.DataByteArrayInputStream; 076 import org.apache.kahadb.util.DataByteArrayOutputStream; 077 import org.apache.kahadb.util.LockFile; 078 import org.apache.kahadb.util.LongMarshaller; 079 import org.apache.kahadb.util.Marshaller; 080 import org.apache.kahadb.util.Sequence; 081 import org.apache.kahadb.util.SequenceSet; 082 import org.apache.kahadb.util.StringMarshaller; 083 import org.apache.kahadb.util.VariableMarshaller; 084 085 public class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 086 087 protected BrokerService brokerService; 088 089 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 090 public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0")); 091 092 protected static final Buffer UNMATCHED; 093 static { 094 UNMATCHED = new Buffer(new byte[]{}); 095 } 096 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 097 private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 098 099 static final int CLOSED_STATE = 1; 100 static final int OPEN_STATE = 2; 101 static final long NOT_ACKED = -1; 102 static final long UNMATCHED_SEQ = -2; 103 104 static final int VERSION = 3; 105 106 107 protected class Metadata { 108 protected Page<Metadata> page; 109 protected int state; 110 protected BTreeIndex<String, StoredDestination> destinations; 111 protected Location lastUpdate; 112 protected Location firstInProgressTransactionLocation; 113 protected Location producerSequenceIdTrackerLocation = null; 114 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 115 protected int version = VERSION; 116 public void read(DataInput is) throws IOException { 117 state = is.readInt(); 118 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 119 if (is.readBoolean()) { 120 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 121 } else { 122 lastUpdate = null; 123 } 124 if (is.readBoolean()) { 125 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 126 } else { 127 firstInProgressTransactionLocation = null; 128 } 129 try { 130 if (is.readBoolean()) { 131 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 132 } else { 133 producerSequenceIdTrackerLocation = null; 134 } 135 } catch (EOFException expectedOnUpgrade) { 136 } 137 try { 138 version = is.readInt(); 139 }catch (EOFException expectedOnUpgrade) { 140 version=1; 141 } 142 LOG.info("KahaDB is version " + version); 143 } 144 145 public void write(DataOutput os) throws IOException { 146 os.writeInt(state); 147 os.writeLong(destinations.getPageId()); 148 149 if (lastUpdate != null) { 150 os.writeBoolean(true); 151 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 152 } else { 153 os.writeBoolean(false); 154 } 155 156 if (firstInProgressTransactionLocation != null) { 157 os.writeBoolean(true); 158 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 159 } else { 160 os.writeBoolean(false); 161 } 162 163 if (producerSequenceIdTrackerLocation != null) { 164 os.writeBoolean(true); 165 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 166 } else { 167 os.writeBoolean(false); 168 } 169 os.writeInt(VERSION); 170 } 171 } 172 173 class MetadataMarshaller extends VariableMarshaller<Metadata> { 174 public Metadata readPayload(DataInput dataIn) throws IOException { 175 Metadata rc = new Metadata(); 176 rc.read(dataIn); 177 return rc; 178 } 179 180 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 181 object.write(dataOut); 182 } 183 } 184 185 protected PageFile pageFile; 186 protected Journal journal; 187 protected Metadata metadata = new Metadata(); 188 189 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 190 191 protected boolean failIfDatabaseIsLocked; 192 193 protected boolean deleteAllMessages; 194 protected File directory = new File("KahaDB"); 195 protected Thread checkpointThread; 196 protected boolean enableJournalDiskSyncs=true; 197 protected boolean archiveDataLogs; 198 protected File directoryArchive; 199 protected AtomicLong storeSize = new AtomicLong(0); 200 long checkpointInterval = 5*1000; 201 long cleanupInterval = 30*1000; 202 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 203 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 204 boolean enableIndexWriteAsync = false; 205 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 206 207 208 protected AtomicBoolean opened = new AtomicBoolean(); 209 private LockFile lockFile; 210 private boolean ignoreMissingJournalfiles = false; 211 private int indexCacheSize = 10000; 212 private boolean checkForCorruptJournalFiles = false; 213 private boolean checksumJournalFiles = false; 214 private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY; 215 protected boolean forceRecoverIndex = false; 216 private final Object checkpointThreadLock = new Object(); 217 218 public MessageDatabase() { 219 } 220 221 @Override 222 public void doStart() throws Exception { 223 load(); 224 } 225 226 @Override 227 public void doStop(ServiceStopper stopper) throws Exception { 228 unload(); 229 } 230 231 private void loadPageFile() throws IOException { 232 this.indexLock.writeLock().lock(); 233 try { 234 final PageFile pageFile = getPageFile(); 235 pageFile.load(); 236 pageFile.tx().execute(new Transaction.Closure<IOException>() { 237 public void execute(Transaction tx) throws IOException { 238 if (pageFile.getPageCount() == 0) { 239 // First time this is created.. Initialize the metadata 240 Page<Metadata> page = tx.allocate(); 241 assert page.getPageId() == 0; 242 page.set(metadata); 243 metadata.page = page; 244 metadata.state = CLOSED_STATE; 245 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 246 247 tx.store(metadata.page, metadataMarshaller, true); 248 } else { 249 Page<Metadata> page = tx.load(0, metadataMarshaller); 250 metadata = page.get(); 251 metadata.page = page; 252 } 253 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 254 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 255 metadata.destinations.load(tx); 256 } 257 }); 258 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 259 // Perhaps we should just keep an index of file 260 storedDestinations.clear(); 261 pageFile.tx().execute(new Transaction.Closure<IOException>() { 262 public void execute(Transaction tx) throws IOException { 263 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 264 Entry<String, StoredDestination> entry = iterator.next(); 265 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 266 storedDestinations.put(entry.getKey(), sd); 267 } 268 } 269 }); 270 pageFile.flush(); 271 }finally { 272 this.indexLock.writeLock().unlock(); 273 } 274 } 275 276 private void startCheckpoint() { 277 synchronized (checkpointThreadLock) { 278 boolean start = false; 279 if (checkpointThread == null) { 280 start = true; 281 } else if (!checkpointThread.isAlive()) { 282 start = true; 283 LOG.info("KahaDB: Recovering checkpoint thread after death"); 284 } 285 if (start) { 286 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 287 @Override 288 public void run() { 289 try { 290 long lastCleanup = System.currentTimeMillis(); 291 long lastCheckpoint = System.currentTimeMillis(); 292 // Sleep for a short time so we can periodically check 293 // to see if we need to exit this thread. 294 long sleepTime = Math.min(checkpointInterval, 500); 295 while (opened.get()) { 296 Thread.sleep(sleepTime); 297 long now = System.currentTimeMillis(); 298 if( now - lastCleanup >= cleanupInterval ) { 299 checkpointCleanup(true); 300 lastCleanup = now; 301 lastCheckpoint = now; 302 } else if( now - lastCheckpoint >= checkpointInterval ) { 303 checkpointCleanup(false); 304 lastCheckpoint = now; 305 } 306 } 307 } catch (InterruptedException e) { 308 // Looks like someone really wants us to exit this thread... 309 } catch (IOException ioe) { 310 LOG.error("Checkpoint failed", ioe); 311 brokerService.handleIOException(ioe); 312 } 313 } 314 }; 315 316 checkpointThread.setDaemon(true); 317 checkpointThread.start(); 318 } 319 } 320 } 321 322 public void open() throws IOException { 323 if( opened.compareAndSet(false, true) ) { 324 getJournal().start(); 325 loadPageFile(); 326 startCheckpoint(); 327 recover(); 328 } 329 } 330 331 private void lock() throws IOException { 332 if( lockFile == null ) { 333 File lockFileName = new File(directory, "lock"); 334 lockFile = new LockFile(lockFileName, true); 335 if (failIfDatabaseIsLocked) { 336 lockFile.lock(); 337 } else { 338 while (true) { 339 try { 340 lockFile.lock(); 341 break; 342 } catch (IOException e) { 343 LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e); 344 try { 345 Thread.sleep(getDatabaseLockedWaitDelay()); 346 } catch (InterruptedException e1) { 347 } 348 } 349 } 350 } 351 } 352 } 353 354 // for testing 355 public LockFile getLockFile() { 356 return lockFile; 357 } 358 359 public void load() throws IOException { 360 361 this.indexLock.writeLock().lock(); 362 try { 363 lock(); 364 if (deleteAllMessages) { 365 getJournal().start(); 366 getJournal().delete(); 367 getJournal().close(); 368 journal = null; 369 getPageFile().delete(); 370 LOG.info("Persistence store purged."); 371 deleteAllMessages = false; 372 } 373 374 open(); 375 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 376 }finally { 377 this.indexLock.writeLock().unlock(); 378 } 379 380 } 381 382 383 public void close() throws IOException, InterruptedException { 384 if( opened.compareAndSet(true, false)) { 385 this.indexLock.writeLock().lock(); 386 try { 387 pageFile.tx().execute(new Transaction.Closure<IOException>() { 388 public void execute(Transaction tx) throws IOException { 389 checkpointUpdate(tx, true); 390 } 391 }); 392 pageFile.unload(); 393 metadata = new Metadata(); 394 }finally { 395 this.indexLock.writeLock().unlock(); 396 } 397 journal.close(); 398 synchronized (checkpointThreadLock) { 399 checkpointThread.join(); 400 } 401 lockFile.unlock(); 402 lockFile=null; 403 } 404 } 405 406 public void unload() throws IOException, InterruptedException { 407 this.indexLock.writeLock().lock(); 408 try { 409 if( pageFile != null && pageFile.isLoaded() ) { 410 metadata.state = CLOSED_STATE; 411 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 412 413 pageFile.tx().execute(new Transaction.Closure<IOException>() { 414 public void execute(Transaction tx) throws IOException { 415 tx.store(metadata.page, metadataMarshaller, true); 416 } 417 }); 418 } 419 }finally { 420 this.indexLock.writeLock().unlock(); 421 } 422 close(); 423 } 424 425 // public for testing 426 public Location getFirstInProgressTxLocation() { 427 Location l = null; 428 synchronized (inflightTransactions) { 429 if (!inflightTransactions.isEmpty()) { 430 l = inflightTransactions.values().iterator().next().get(0).getLocation(); 431 } 432 if (!preparedTransactions.isEmpty()) { 433 Location t = preparedTransactions.values().iterator().next().get(0).getLocation(); 434 if (l==null || t.compareTo(l) <= 0) { 435 l = t; 436 } 437 } 438 } 439 return l; 440 } 441 442 /** 443 * Move all the messages that were in the journal into long term storage. We 444 * just replay and do a checkpoint. 445 * 446 * @throws IOException 447 * @throws IOException 448 * @throws IllegalStateException 449 */ 450 private void recover() throws IllegalStateException, IOException { 451 this.indexLock.writeLock().lock(); 452 try { 453 454 long start = System.currentTimeMillis(); 455 Location producerAuditPosition = recoverProducerAudit(); 456 Location lastIndoubtPosition = getRecoveryPosition(); 457 458 Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition); 459 460 if (recoveryPosition != null) { 461 int redoCounter = 0; 462 LOG.info("Recovering from the journal ..."); 463 while (recoveryPosition != null) { 464 JournalCommand<?> message = load(recoveryPosition); 465 metadata.lastUpdate = recoveryPosition; 466 process(message, recoveryPosition, lastIndoubtPosition); 467 redoCounter++; 468 recoveryPosition = journal.getNextLocation(recoveryPosition); 469 } 470 long end = System.currentTimeMillis(); 471 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 472 } 473 474 // We may have to undo some index updates. 475 pageFile.tx().execute(new Transaction.Closure<IOException>() { 476 public void execute(Transaction tx) throws IOException { 477 recoverIndex(tx); 478 } 479 }); 480 481 // rollback any recovered inflight local transactions 482 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 483 synchronized (inflightTransactions) { 484 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 485 TransactionId id = it.next(); 486 if (id.isLocalTransaction()) { 487 toRollback.add(id); 488 } 489 } 490 for (TransactionId tx: toRollback) { 491 LOG.debug("rolling back recovered indoubt local transaction " + tx); 492 store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null); 493 } 494 } 495 }finally { 496 this.indexLock.writeLock().unlock(); 497 } 498 } 499 500 private Location minimum(Location producerAuditPosition, 501 Location lastIndoubtPosition) { 502 Location min = null; 503 if (producerAuditPosition != null) { 504 min = producerAuditPosition; 505 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { 506 min = lastIndoubtPosition; 507 } 508 } else { 509 min = lastIndoubtPosition; 510 } 511 return min; 512 } 513 514 private Location recoverProducerAudit() throws IOException { 515 if (metadata.producerSequenceIdTrackerLocation != null) { 516 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 517 try { 518 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 519 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 520 } catch (ClassNotFoundException cfe) { 521 IOException ioe = new IOException("Failed to read producerAudit: " + cfe); 522 ioe.initCause(cfe); 523 throw ioe; 524 } 525 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); 526 } else { 527 // got no audit stored so got to recreate via replay from start of the journal 528 return journal.getNextLocation(null); 529 } 530 } 531 532 protected void recoverIndex(Transaction tx) throws IOException { 533 long start = System.currentTimeMillis(); 534 // It is possible index updates got applied before the journal updates.. 535 // in that case we need to removed references to messages that are not in the journal 536 final Location lastAppendLocation = journal.getLastAppendLocation(); 537 long undoCounter=0; 538 539 // Go through all the destinations to see if they have messages past the lastAppendLocation 540 for (StoredDestination sd : storedDestinations.values()) { 541 542 final ArrayList<Long> matches = new ArrayList<Long>(); 543 // Find all the Locations that are >= than the last Append Location. 544 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 545 @Override 546 protected void matched(Location key, Long value) { 547 matches.add(value); 548 } 549 }); 550 551 552 for (Long sequenceId : matches) { 553 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 554 sd.locationIndex.remove(tx, keys.location); 555 sd.messageIdIndex.remove(tx, keys.messageId); 556 metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId)); 557 undoCounter++; 558 // TODO: do we need to modify the ack positions for the pub sub case? 559 } 560 } 561 562 long end = System.currentTimeMillis(); 563 if( undoCounter > 0 ) { 564 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 565 // should do sync writes to the journal. 566 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 567 } 568 569 undoCounter = 0; 570 start = System.currentTimeMillis(); 571 572 // Lets be extra paranoid here and verify that all the datafiles being referenced 573 // by the indexes still exists. 574 575 final SequenceSet ss = new SequenceSet(); 576 for (StoredDestination sd : storedDestinations.values()) { 577 // Use a visitor to cut down the number of pages that we load 578 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 579 int last=-1; 580 581 public boolean isInterestedInKeysBetween(Location first, Location second) { 582 if( first==null ) { 583 return !ss.contains(0, second.getDataFileId()); 584 } else if( second==null ) { 585 return true; 586 } else { 587 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 588 } 589 } 590 591 public void visit(List<Location> keys, List<Long> values) { 592 for (Location l : keys) { 593 int fileId = l.getDataFileId(); 594 if( last != fileId ) { 595 ss.add(fileId); 596 last = fileId; 597 } 598 } 599 } 600 601 }); 602 } 603 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 604 while( !ss.isEmpty() ) { 605 missingJournalFiles.add( (int)ss.removeFirst() ); 606 } 607 missingJournalFiles.removeAll( journal.getFileMap().keySet() ); 608 609 if( !missingJournalFiles.isEmpty() ) { 610 LOG.info("Some journal files are missing: "+missingJournalFiles); 611 } 612 613 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 614 for (Integer missing : missingJournalFiles) { 615 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0))); 616 } 617 618 if ( checkForCorruptJournalFiles ) { 619 Collection<DataFile> dataFiles = journal.getFileMap().values(); 620 for (DataFile dataFile : dataFiles) { 621 int id = dataFile.getDataFileId(); 622 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0))); 623 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 624 while( seq!=null ) { 625 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1))); 626 seq = seq.getNext(); 627 } 628 } 629 } 630 631 if( !missingPredicates.isEmpty() ) { 632 for (StoredDestination sd : storedDestinations.values()) { 633 634 final ArrayList<Long> matches = new ArrayList<Long>(); 635 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 636 @Override 637 protected void matched(Location key, Long value) { 638 matches.add(value); 639 } 640 }); 641 642 // If somes message references are affected by the missing data files... 643 if( !matches.isEmpty() ) { 644 645 // We either 'gracefully' recover dropping the missing messages or 646 // we error out. 647 if( ignoreMissingJournalfiles ) { 648 // Update the index to remove the references to the missing data 649 for (Long sequenceId : matches) { 650 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 651 sd.locationIndex.remove(tx, keys.location); 652 sd.messageIdIndex.remove(tx, keys.messageId); 653 undoCounter++; 654 // TODO: do we need to modify the ack positions for the pub sub case? 655 } 656 657 } else { 658 throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); 659 } 660 } 661 } 662 } 663 664 end = System.currentTimeMillis(); 665 if( undoCounter > 0 ) { 666 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 667 // should do sync writes to the journal. 668 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 669 } 670 } 671 672 private Location nextRecoveryPosition; 673 private Location lastRecoveryPosition; 674 675 public void incrementalRecover() throws IOException { 676 this.indexLock.writeLock().lock(); 677 try { 678 if( nextRecoveryPosition == null ) { 679 if( lastRecoveryPosition==null ) { 680 nextRecoveryPosition = getRecoveryPosition(); 681 } else { 682 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 683 } 684 } 685 while (nextRecoveryPosition != null) { 686 lastRecoveryPosition = nextRecoveryPosition; 687 metadata.lastUpdate = lastRecoveryPosition; 688 JournalCommand<?> message = load(lastRecoveryPosition); 689 process(message, lastRecoveryPosition); 690 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 691 } 692 }finally { 693 this.indexLock.writeLock().unlock(); 694 } 695 } 696 697 public Location getLastUpdatePosition() throws IOException { 698 return metadata.lastUpdate; 699 } 700 701 private Location getRecoveryPosition() throws IOException { 702 703 if (!this.forceRecoverIndex) { 704 705 // If we need to recover the transactions.. 706 if (metadata.firstInProgressTransactionLocation != null) { 707 return metadata.firstInProgressTransactionLocation; 708 } 709 710 // Perhaps there were no transactions... 711 if( metadata.lastUpdate!=null) { 712 // Start replay at the record after the last one recorded in the index file. 713 return journal.getNextLocation(metadata.lastUpdate); 714 } 715 } 716 // This loads the first position. 717 return journal.getNextLocation(null); 718 } 719 720 protected void checkpointCleanup(final boolean cleanup) throws IOException { 721 long start; 722 this.indexLock.writeLock().lock(); 723 try { 724 start = System.currentTimeMillis(); 725 if( !opened.get() ) { 726 return; 727 } 728 pageFile.tx().execute(new Transaction.Closure<IOException>() { 729 public void execute(Transaction tx) throws IOException { 730 checkpointUpdate(tx, cleanup); 731 } 732 }); 733 }finally { 734 this.indexLock.writeLock().unlock(); 735 } 736 long end = System.currentTimeMillis(); 737 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 738 LOG.info("Slow KahaDB access: cleanup took "+(end-start)); 739 } 740 } 741 742 743 public void checkpoint(Callback closure) throws Exception { 744 this.indexLock.writeLock().lock(); 745 try { 746 pageFile.tx().execute(new Transaction.Closure<IOException>() { 747 public void execute(Transaction tx) throws IOException { 748 checkpointUpdate(tx, false); 749 } 750 }); 751 closure.execute(); 752 }finally { 753 this.indexLock.writeLock().unlock(); 754 } 755 } 756 757 // ///////////////////////////////////////////////////////////////// 758 // Methods call by the broker to update and query the store. 759 // ///////////////////////////////////////////////////////////////// 760 public Location store(JournalCommand<?> data) throws IOException { 761 return store(data, false, null,null); 762 } 763 764 /** 765 * All updated are are funneled through this method. The updates are converted 766 * to a JournalMessage which is logged to the journal and then the data from 767 * the JournalMessage is used to update the index just like it would be done 768 * during a recovery process. 769 */ 770 public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException { 771 if (before != null) { 772 before.run(); 773 } 774 try { 775 int size = data.serializedSizeFramed(); 776 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 777 os.writeByte(data.type().getNumber()); 778 data.writeFramed(os); 779 780 long start = System.currentTimeMillis(); 781 Location location = journal.write(os.toByteSequence(), sync); 782 long start2 = System.currentTimeMillis(); 783 process(data, location); 784 long end = System.currentTimeMillis(); 785 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 786 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 787 } 788 789 this.indexLock.writeLock().lock(); 790 try { 791 metadata.lastUpdate = location; 792 }finally { 793 this.indexLock.writeLock().unlock(); 794 } 795 if (!checkpointThread.isAlive()) { 796 startCheckpoint(); 797 } 798 if (after != null) { 799 after.run(); 800 } 801 return location; 802 } catch (IOException ioe) { 803 LOG.error("KahaDB failed to store to Journal", ioe); 804 brokerService.handleIOException(ioe); 805 throw ioe; 806 } 807 } 808 809 /** 810 * Loads a previously stored JournalMessage 811 * 812 * @param location 813 * @return 814 * @throws IOException 815 */ 816 public JournalCommand<?> load(Location location) throws IOException { 817 ByteSequence data = journal.read(location); 818 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 819 byte readByte = is.readByte(); 820 KahaEntryType type = KahaEntryType.valueOf(readByte); 821 if( type == null ) { 822 throw new IOException("Could not load journal record. Invalid location: "+location); 823 } 824 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 825 message.mergeFramed(is); 826 return message; 827 } 828 829 /** 830 * do minimal recovery till we reach the last inDoubtLocation 831 * @param data 832 * @param location 833 * @param inDoubtlocation 834 * @throws IOException 835 */ 836 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 837 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 838 process(data, location); 839 } else { 840 // just recover producer audit 841 data.visit(new Visitor() { 842 public void visit(KahaAddMessageCommand command) throws IOException { 843 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 844 } 845 }); 846 } 847 } 848 849 // ///////////////////////////////////////////////////////////////// 850 // Journaled record processing methods. Once the record is journaled, 851 // these methods handle applying the index updates. These may be called 852 // from the recovery method too so they need to be idempotent 853 // ///////////////////////////////////////////////////////////////// 854 855 void process(JournalCommand<?> data, final Location location) throws IOException { 856 data.visit(new Visitor() { 857 @Override 858 public void visit(KahaAddMessageCommand command) throws IOException { 859 process(command, location); 860 } 861 862 @Override 863 public void visit(KahaRemoveMessageCommand command) throws IOException { 864 process(command, location); 865 } 866 867 @Override 868 public void visit(KahaPrepareCommand command) throws IOException { 869 process(command, location); 870 } 871 872 @Override 873 public void visit(KahaCommitCommand command) throws IOException { 874 process(command, location); 875 } 876 877 @Override 878 public void visit(KahaRollbackCommand command) throws IOException { 879 process(command, location); 880 } 881 882 @Override 883 public void visit(KahaRemoveDestinationCommand command) throws IOException { 884 process(command, location); 885 } 886 887 @Override 888 public void visit(KahaSubscriptionCommand command) throws IOException { 889 process(command, location); 890 } 891 }); 892 } 893 894 protected void process(final KahaAddMessageCommand command, final Location location) throws IOException { 895 if (command.hasTransactionInfo()) { 896 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 897 inflightTx.add(new AddOpperation(command, location)); 898 } else { 899 this.indexLock.writeLock().lock(); 900 try { 901 pageFile.tx().execute(new Transaction.Closure<IOException>() { 902 public void execute(Transaction tx) throws IOException { 903 upadateIndex(tx, command, location); 904 } 905 }); 906 }finally { 907 this.indexLock.writeLock().unlock(); 908 } 909 } 910 } 911 912 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 913 if (command.hasTransactionInfo()) { 914 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location); 915 inflightTx.add(new RemoveOpperation(command, location)); 916 } else { 917 this.indexLock.writeLock().lock(); 918 try { 919 pageFile.tx().execute(new Transaction.Closure<IOException>() { 920 public void execute(Transaction tx) throws IOException { 921 updateIndex(tx, command, location); 922 } 923 }); 924 }finally { 925 this.indexLock.writeLock().unlock(); 926 } 927 } 928 929 } 930 931 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 932 this.indexLock.writeLock().lock(); 933 try { 934 pageFile.tx().execute(new Transaction.Closure<IOException>() { 935 public void execute(Transaction tx) throws IOException { 936 updateIndex(tx, command, location); 937 } 938 }); 939 }finally { 940 this.indexLock.writeLock().unlock(); 941 } 942 } 943 944 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 945 this.indexLock.writeLock().lock(); 946 try { 947 pageFile.tx().execute(new Transaction.Closure<IOException>() { 948 public void execute(Transaction tx) throws IOException { 949 updateIndex(tx, command, location); 950 } 951 }); 952 }finally { 953 this.indexLock.writeLock().unlock(); 954 } 955 } 956 957 protected void process(KahaCommitCommand command, Location location) throws IOException { 958 TransactionId key = key(command.getTransactionInfo()); 959 List<Operation> inflightTx; 960 synchronized (inflightTransactions) { 961 inflightTx = inflightTransactions.remove(key); 962 if (inflightTx == null) { 963 inflightTx = preparedTransactions.remove(key); 964 } 965 } 966 if (inflightTx == null) { 967 return; 968 } 969 970 final List<Operation> messagingTx = inflightTx; 971 this.indexLock.writeLock().lock(); 972 try { 973 pageFile.tx().execute(new Transaction.Closure<IOException>() { 974 public void execute(Transaction tx) throws IOException { 975 for (Operation op : messagingTx) { 976 op.execute(tx); 977 } 978 } 979 }); 980 }finally { 981 this.indexLock.writeLock().unlock(); 982 } 983 } 984 985 protected void process(KahaPrepareCommand command, Location location) { 986 TransactionId key = key(command.getTransactionInfo()); 987 synchronized (inflightTransactions) { 988 List<Operation> tx = inflightTransactions.remove(key); 989 if (tx != null) { 990 preparedTransactions.put(key, tx); 991 } 992 } 993 } 994 995 protected void process(KahaRollbackCommand command, Location location) { 996 TransactionId key = key(command.getTransactionInfo()); 997 synchronized (inflightTransactions) { 998 List<Operation> tx = inflightTransactions.remove(key); 999 if (tx == null) { 1000 preparedTransactions.remove(key); 1001 } 1002 } 1003 } 1004 1005 // ///////////////////////////////////////////////////////////////// 1006 // These methods do the actual index updates. 1007 // ///////////////////////////////////////////////////////////////// 1008 1009 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1010 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1011 1012 void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1013 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1014 1015 // Skip adding the message to the index if this is a topic and there are 1016 // no subscriptions. 1017 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1018 return; 1019 } 1020 1021 // Add the message. 1022 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1023 long id = sd.orderIndex.getNextMessageId(priority); 1024 Long previous = sd.locationIndex.put(tx, location, id); 1025 if (previous == null) { 1026 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1027 if (previous == null) { 1028 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1029 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1030 addAckLocationForNewMessage(tx, sd, id); 1031 } 1032 } else { 1033 // If the message ID as indexed, then the broker asked us to 1034 // store a DUP 1035 // message. Bad BOY! Don't do it, and log a warning. 1036 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); 1037 // TODO: consider just rolling back the tx. 1038 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1039 sd.locationIndex.remove(tx, location); 1040 } 1041 } else { 1042 // restore the previous value.. Looks like this was a redo of a 1043 // previously 1044 // added message. We don't want to assign it a new id as the other 1045 // indexes would 1046 // be wrong.. 1047 // 1048 // TODO: consider just rolling back the tx. 1049 sd.locationIndex.put(tx, location, previous); 1050 } 1051 // record this id in any event, initial send or recovery 1052 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1053 } 1054 1055 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1056 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1057 if (!command.hasSubscriptionKey()) { 1058 1059 // In the queue case we just remove the message from the index.. 1060 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1061 if (sequenceId != null) { 1062 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1063 if (keys != null) { 1064 sd.locationIndex.remove(tx, keys.location); 1065 recordAckMessageReferenceLocation(ackLocation, keys.location); 1066 } 1067 } 1068 } else { 1069 // In the topic case we need remove the message once it's been acked 1070 // by all the subs 1071 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1072 1073 // Make sure it's a valid message id... 1074 if (sequence != null) { 1075 String subscriptionKey = command.getSubscriptionKey(); 1076 if (command.getAck() != UNMATCHED) { 1077 sd.orderIndex.get(tx, sequence); 1078 byte priority = sd.orderIndex.lastGetPriority(); 1079 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1080 } 1081 // The following method handles deleting un-referenced messages. 1082 removeAckLocation(tx, sd, subscriptionKey, sequence); 1083 } 1084 1085 } 1086 } 1087 1088 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 1089 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1090 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1091 if (referenceFileIds == null) { 1092 referenceFileIds = new HashSet<Integer>(); 1093 referenceFileIds.add(messageLocation.getDataFileId()); 1094 ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1095 } else { 1096 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1097 if (!referenceFileIds.contains(id)) { 1098 referenceFileIds.add(id); 1099 } 1100 } 1101 } 1102 1103 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1104 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1105 sd.orderIndex.remove(tx); 1106 1107 sd.locationIndex.clear(tx); 1108 sd.locationIndex.unload(tx); 1109 tx.free(sd.locationIndex.getPageId()); 1110 1111 sd.messageIdIndex.clear(tx); 1112 sd.messageIdIndex.unload(tx); 1113 tx.free(sd.messageIdIndex.getPageId()); 1114 1115 if (sd.subscriptions != null) { 1116 sd.subscriptions.clear(tx); 1117 sd.subscriptions.unload(tx); 1118 tx.free(sd.subscriptions.getPageId()); 1119 1120 sd.subscriptionAcks.clear(tx); 1121 sd.subscriptionAcks.unload(tx); 1122 tx.free(sd.subscriptionAcks.getPageId()); 1123 1124 sd.ackPositions.clear(tx); 1125 sd.ackPositions.unload(tx); 1126 tx.free(sd.ackPositions.getPageId()); 1127 } 1128 1129 String key = key(command.getDestination()); 1130 storedDestinations.remove(key); 1131 metadata.destinations.remove(tx, key); 1132 } 1133 1134 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1135 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1136 final String subscriptionKey = command.getSubscriptionKey(); 1137 1138 // If set then we are creating it.. otherwise we are destroying the sub 1139 if (command.hasSubscriptionInfo()) { 1140 sd.subscriptions.put(tx, subscriptionKey, command); 1141 long ackLocation=NOT_ACKED; 1142 if (!command.getRetroactive()) { 1143 ackLocation = sd.orderIndex.nextMessageId-1; 1144 } else { 1145 addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); 1146 } 1147 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1148 } else { 1149 // delete the sub... 1150 sd.subscriptions.remove(tx, subscriptionKey); 1151 sd.subscriptionAcks.remove(tx, subscriptionKey); 1152 removeAckLocationsForSub(tx, sd, subscriptionKey); 1153 } 1154 } 1155 1156 /** 1157 * @param tx 1158 * @throws IOException 1159 */ 1160 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1161 LOG.debug("Checkpoint started."); 1162 1163 // reflect last update exclusive of current checkpoint 1164 Location firstTxLocation = metadata.lastUpdate; 1165 1166 metadata.state = OPEN_STATE; 1167 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1168 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); 1169 tx.store(metadata.page, metadataMarshaller, true); 1170 pageFile.flush(); 1171 1172 if( cleanup ) { 1173 1174 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1175 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1176 1177 LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); 1178 1179 // Don't GC files under replication 1180 if( journalFilesBeingReplicated!=null ) { 1181 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1182 } 1183 1184 // Don't GC files after the first in progress tx 1185 if( metadata.firstInProgressTransactionLocation!=null ) { 1186 if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { 1187 firstTxLocation = metadata.firstInProgressTransactionLocation; 1188 }; 1189 } 1190 1191 if( firstTxLocation!=null ) { 1192 while( !gcCandidateSet.isEmpty() ) { 1193 Integer last = gcCandidateSet.last(); 1194 if( last >= firstTxLocation.getDataFileId() ) { 1195 gcCandidateSet.remove(last); 1196 } else { 1197 break; 1198 } 1199 } 1200 LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet); 1201 } 1202 1203 // Go through all the destinations to see if any of them can remove GC candidates. 1204 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1205 if( gcCandidateSet.isEmpty() ) { 1206 break; 1207 } 1208 1209 // Use a visitor to cut down the number of pages that we load 1210 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1211 int last=-1; 1212 public boolean isInterestedInKeysBetween(Location first, Location second) { 1213 if( first==null ) { 1214 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1215 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1216 subset.remove(second.getDataFileId()); 1217 } 1218 return !subset.isEmpty(); 1219 } else if( second==null ) { 1220 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1221 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1222 subset.remove(first.getDataFileId()); 1223 } 1224 return !subset.isEmpty(); 1225 } else { 1226 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1227 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1228 subset.remove(first.getDataFileId()); 1229 } 1230 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1231 subset.remove(second.getDataFileId()); 1232 } 1233 return !subset.isEmpty(); 1234 } 1235 } 1236 1237 public void visit(List<Location> keys, List<Long> values) { 1238 for (Location l : keys) { 1239 int fileId = l.getDataFileId(); 1240 if( last != fileId ) { 1241 gcCandidateSet.remove(fileId); 1242 last = fileId; 1243 } 1244 } 1245 } 1246 }); 1247 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1248 } 1249 1250 // check we are not deleting file with ack for in-use journal files 1251 LOG.trace("gc candidates: " + gcCandidateSet); 1252 final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet); 1253 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1254 while (candidates.hasNext()) { 1255 Integer candidate = candidates.next(); 1256 Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate); 1257 if (referencedFileIds != null) { 1258 for (Integer referencedFileId : referencedFileIds) { 1259 if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) { 1260 // active file that is not targeted for deletion is referenced so don't delete 1261 candidates.remove(); 1262 break; 1263 } 1264 } 1265 if (gcCandidateSet.contains(candidate)) { 1266 ackMessageFileMap.remove(candidate); 1267 } else { 1268 LOG.trace("not removing data file: " + candidate 1269 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1270 } 1271 } 1272 } 1273 1274 if( !gcCandidateSet.isEmpty() ) { 1275 LOG.debug("Cleanup removing the data files: "+gcCandidateSet); 1276 journal.removeDataFiles(gcCandidateSet); 1277 } 1278 } 1279 1280 LOG.debug("Checkpoint done."); 1281 } 1282 1283 private Location checkpointProducerAudit() throws IOException { 1284 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1285 ObjectOutputStream oout = new ObjectOutputStream(baos); 1286 oout.writeObject(metadata.producerSequenceIdTracker); 1287 oout.flush(); 1288 oout.close(); 1289 return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null); 1290 } 1291 1292 public HashSet<Integer> getJournalFilesBeingReplicated() { 1293 return journalFilesBeingReplicated; 1294 } 1295 1296 // ///////////////////////////////////////////////////////////////// 1297 // StoredDestination related implementation methods. 1298 // ///////////////////////////////////////////////////////////////// 1299 1300 1301 private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 1302 1303 class StoredSubscription { 1304 SubscriptionInfo subscriptionInfo; 1305 String lastAckId; 1306 Location lastAckLocation; 1307 Location cursor; 1308 } 1309 1310 static class MessageKeys { 1311 final String messageId; 1312 final Location location; 1313 1314 public MessageKeys(String messageId, Location location) { 1315 this.messageId=messageId; 1316 this.location=location; 1317 } 1318 1319 @Override 1320 public String toString() { 1321 return "["+messageId+","+location+"]"; 1322 } 1323 } 1324 1325 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 1326 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 1327 1328 public MessageKeys readPayload(DataInput dataIn) throws IOException { 1329 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 1330 } 1331 1332 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 1333 dataOut.writeUTF(object.messageId); 1334 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 1335 } 1336 } 1337 1338 class LastAck { 1339 long lastAckedSequence; 1340 byte priority; 1341 1342 public LastAck(LastAck source) { 1343 this.lastAckedSequence = source.lastAckedSequence; 1344 this.priority = source.priority; 1345 } 1346 1347 public LastAck() { 1348 this.priority = MessageOrderIndex.HI; 1349 } 1350 1351 public LastAck(long ackLocation) { 1352 this.lastAckedSequence = ackLocation; 1353 this.priority = MessageOrderIndex.LO; 1354 } 1355 1356 public LastAck(long ackLocation, byte priority) { 1357 this.lastAckedSequence = ackLocation; 1358 this.priority = priority; 1359 } 1360 1361 public String toString() { 1362 return "[" + lastAckedSequence + ":" + priority + "]"; 1363 } 1364 } 1365 1366 protected class LastAckMarshaller implements Marshaller<LastAck> { 1367 1368 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 1369 dataOut.writeLong(object.lastAckedSequence); 1370 dataOut.writeByte(object.priority); 1371 } 1372 1373 public LastAck readPayload(DataInput dataIn) throws IOException { 1374 LastAck lastAcked = new LastAck(); 1375 lastAcked.lastAckedSequence = dataIn.readLong(); 1376 if (metadata.version >= 3) { 1377 lastAcked.priority = dataIn.readByte(); 1378 } 1379 return lastAcked; 1380 } 1381 1382 public int getFixedSize() { 1383 return 9; 1384 } 1385 1386 public LastAck deepCopy(LastAck source) { 1387 return new LastAck(source); 1388 } 1389 1390 public boolean isDeepCopySupported() { 1391 return true; 1392 } 1393 } 1394 1395 class StoredDestination { 1396 1397 MessageOrderIndex orderIndex = new MessageOrderIndex(); 1398 BTreeIndex<Location, Long> locationIndex; 1399 BTreeIndex<String, Long> messageIdIndex; 1400 1401 // These bits are only set for Topics 1402 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 1403 BTreeIndex<String, LastAck> subscriptionAcks; 1404 HashMap<String, MessageOrderCursor> subscriptionCursors; 1405 BTreeIndex<Long, HashSet<String>> ackPositions; 1406 } 1407 1408 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 1409 1410 public StoredDestination readPayload(DataInput dataIn) throws IOException { 1411 final StoredDestination value = new StoredDestination(); 1412 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1413 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 1414 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1415 1416 if (dataIn.readBoolean()) { 1417 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 1418 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 1419 if (metadata.version >= 3) { 1420 value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 1421 } else { 1422 // upgrade 1423 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1424 public void execute(Transaction tx) throws IOException { 1425 value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate()); 1426 value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 1427 value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 1428 value.ackPositions.load(tx); 1429 } 1430 }); 1431 } 1432 } 1433 if (metadata.version >= 2) { 1434 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1435 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1436 } else { 1437 // upgrade 1438 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1439 public void execute(Transaction tx) throws IOException { 1440 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1441 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1442 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1443 value.orderIndex.lowPriorityIndex.load(tx); 1444 1445 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 1446 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 1447 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 1448 value.orderIndex.highPriorityIndex.load(tx); 1449 } 1450 }); 1451 } 1452 1453 return value; 1454 } 1455 1456 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 1457 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 1458 dataOut.writeLong(value.locationIndex.getPageId()); 1459 dataOut.writeLong(value.messageIdIndex.getPageId()); 1460 if (value.subscriptions != null) { 1461 dataOut.writeBoolean(true); 1462 dataOut.writeLong(value.subscriptions.getPageId()); 1463 dataOut.writeLong(value.subscriptionAcks.getPageId()); 1464 dataOut.writeLong(value.ackPositions.getPageId()); 1465 } else { 1466 dataOut.writeBoolean(false); 1467 } 1468 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 1469 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 1470 } 1471 } 1472 1473 static class LocationMarshaller implements Marshaller<Location> { 1474 final static LocationMarshaller INSTANCE = new LocationMarshaller(); 1475 1476 public Location readPayload(DataInput dataIn) throws IOException { 1477 Location rc = new Location(); 1478 rc.setDataFileId(dataIn.readInt()); 1479 rc.setOffset(dataIn.readInt()); 1480 return rc; 1481 } 1482 1483 public void writePayload(Location object, DataOutput dataOut) throws IOException { 1484 dataOut.writeInt(object.getDataFileId()); 1485 dataOut.writeInt(object.getOffset()); 1486 } 1487 1488 public int getFixedSize() { 1489 return 8; 1490 } 1491 1492 public Location deepCopy(Location source) { 1493 return new Location(source); 1494 } 1495 1496 public boolean isDeepCopySupported() { 1497 return true; 1498 } 1499 } 1500 1501 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 1502 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 1503 1504 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 1505 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 1506 rc.mergeFramed((InputStream)dataIn); 1507 return rc; 1508 } 1509 1510 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 1511 object.writeFramed((OutputStream)dataOut); 1512 } 1513 } 1514 1515 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 1516 String key = key(destination); 1517 StoredDestination rc = storedDestinations.get(key); 1518 if (rc == null) { 1519 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 1520 rc = loadStoredDestination(tx, key, topic); 1521 // Cache it. We may want to remove/unload destinations from the 1522 // cache that are not used for a while 1523 // to reduce memory usage. 1524 storedDestinations.put(key, rc); 1525 } 1526 return rc; 1527 } 1528 1529 1530 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 1531 String key = key(destination); 1532 StoredDestination rc = storedDestinations.get(key); 1533 if (rc == null && metadata.destinations.containsKey(tx, key)) { 1534 rc = getStoredDestination(destination, tx); 1535 } 1536 return rc; 1537 } 1538 1539 /** 1540 * @param tx 1541 * @param key 1542 * @param topic 1543 * @return 1544 * @throws IOException 1545 */ 1546 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 1547 // Try to load the existing indexes.. 1548 StoredDestination rc = metadata.destinations.get(tx, key); 1549 if (rc == null) { 1550 // Brand new destination.. allocate indexes for it. 1551 rc = new StoredDestination(); 1552 rc.orderIndex.allocate(tx); 1553 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 1554 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 1555 1556 if (topic) { 1557 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 1558 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 1559 rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate()); 1560 } 1561 metadata.destinations.put(tx, key, rc); 1562 } 1563 1564 // Configure the marshalers and load. 1565 rc.orderIndex.load(tx); 1566 1567 // Figure out the next key using the last entry in the destination. 1568 rc.orderIndex.configureLast(tx); 1569 1570 rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE); 1571 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1572 rc.locationIndex.load(tx); 1573 1574 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 1575 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 1576 rc.messageIdIndex.load(tx); 1577 1578 // If it was a topic... 1579 if (topic) { 1580 1581 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 1582 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 1583 rc.subscriptions.load(tx); 1584 1585 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 1586 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 1587 rc.subscriptionAcks.load(tx); 1588 1589 rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 1590 rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 1591 rc.ackPositions.load(tx); 1592 1593 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 1594 1595 if (metadata.version < 3) { 1596 1597 // on upgrade need to fill ackLocation with available messages past last ack 1598 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 1599 Entry<String, LastAck> entry = iterator.next(); 1600 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 1601 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 1602 Long sequence = orderIterator.next().getKey(); 1603 addAckLocation(tx, rc, sequence, entry.getKey()); 1604 } 1605 // modify so it is upgraded 1606 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 1607 } 1608 } 1609 1610 if (rc.orderIndex.nextMessageId == 0) { 1611 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 1612 if (!rc.subscriptionAcks.isEmpty(tx)) { 1613 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 1614 Entry<String, LastAck> entry = iterator.next(); 1615 rc.orderIndex.nextMessageId = 1616 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 1617 } 1618 } 1619 } else { 1620 // update based on ackPositions for unmatched, last entry is always the next 1621 if (!rc.ackPositions.isEmpty(tx)) { 1622 Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx); 1623 rc.orderIndex.nextMessageId = 1624 Math.max(rc.orderIndex.nextMessageId, last.getKey()); 1625 } 1626 } 1627 1628 } 1629 1630 if (metadata.version < 3) { 1631 // store again after upgrade 1632 metadata.destinations.put(tx, key, rc); 1633 } 1634 return rc; 1635 } 1636 1637 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 1638 HashSet<String> hs = sd.ackPositions.get(tx, messageSequence); 1639 if (hs == null) { 1640 hs = new HashSet<String>(); 1641 } 1642 hs.add(subscriptionKey); 1643 // every ack location addition needs to be a btree modification to get it stored 1644 sd.ackPositions.put(tx, messageSequence, hs); 1645 } 1646 1647 // new sub is interested in potentially all existing messages 1648 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 1649 for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) { 1650 Entry<Long, HashSet<String>> entry = iterator.next(); 1651 entry.getValue().add(subscriptionKey); 1652 sd.ackPositions.put(tx, entry.getKey(), entry.getValue()); 1653 } 1654 } 1655 1656 final HashSet nextMessageIdMarker = new HashSet<String>(); 1657 // on a new message add, all existing subs are interested in this message 1658 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 1659 HashSet hs = new HashSet<String>(); 1660 for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) { 1661 Entry<String, LastAck> entry = iterator.next(); 1662 hs.add(entry.getKey()); 1663 } 1664 sd.ackPositions.put(tx, messageSequence, hs); 1665 // add empty next to keep track of nextMessage 1666 sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker); 1667 } 1668 1669 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 1670 if (!sd.ackPositions.isEmpty(tx)) { 1671 Long end = sd.ackPositions.getLast(tx).getKey(); 1672 for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { 1673 removeAckLocation(tx, sd, subscriptionKey, sequence); 1674 } 1675 } 1676 } 1677 1678 /** 1679 * @param tx 1680 * @param sd 1681 * @param subscriptionKey 1682 * @param sequenceId 1683 * @throws IOException 1684 */ 1685 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException { 1686 // Remove the sub from the previous location set.. 1687 if (sequenceId != null) { 1688 HashSet<String> hs = sd.ackPositions.get(tx, sequenceId); 1689 if (hs != null) { 1690 hs.remove(subscriptionKey); 1691 if (hs.isEmpty()) { 1692 HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue(); 1693 sd.ackPositions.remove(tx, sequenceId); 1694 1695 // Find all the entries that need to get deleted. 1696 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 1697 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 1698 1699 // Do the actual deletes. 1700 for (Entry<Long, MessageKeys> entry : deletes) { 1701 sd.locationIndex.remove(tx, entry.getValue().location); 1702 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 1703 sd.orderIndex.remove(tx, entry.getKey()); 1704 } 1705 } else { 1706 // update 1707 sd.ackPositions.put(tx, sequenceId, hs); 1708 } 1709 } 1710 } 1711 } 1712 1713 private String key(KahaDestination destination) { 1714 return destination.getType().getNumber() + ":" + destination.getName(); 1715 } 1716 1717 // ///////////////////////////////////////////////////////////////// 1718 // Transaction related implementation methods. 1719 // ///////////////////////////////////////////////////////////////// 1720 protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 1721 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 1722 1723 private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) { 1724 TransactionId key = key(info); 1725 List<Operation> tx; 1726 synchronized (inflightTransactions) { 1727 tx = inflightTransactions.get(key); 1728 if (tx == null) { 1729 tx = Collections.synchronizedList(new ArrayList<Operation>()); 1730 inflightTransactions.put(key, tx); 1731 } 1732 } 1733 return tx; 1734 } 1735 1736 private TransactionId key(KahaTransactionInfo transactionInfo) { 1737 if (transactionInfo.hasLocalTransacitonId()) { 1738 KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId(); 1739 LocalTransactionId rc = new LocalTransactionId(); 1740 rc.setConnectionId(new ConnectionId(tx.getConnectionId())); 1741 rc.setValue(tx.getTransacitonId()); 1742 return rc; 1743 } else { 1744 KahaXATransactionId tx = transactionInfo.getXaTransacitonId(); 1745 XATransactionId rc = new XATransactionId(); 1746 rc.setBranchQualifier(tx.getBranchQualifier().toByteArray()); 1747 rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray()); 1748 rc.setFormatId(tx.getFormatId()); 1749 return rc; 1750 } 1751 } 1752 1753 abstract class Operation { 1754 final Location location; 1755 1756 public Operation(Location location) { 1757 this.location = location; 1758 } 1759 1760 public Location getLocation() { 1761 return location; 1762 } 1763 1764 abstract public void execute(Transaction tx) throws IOException; 1765 } 1766 1767 class AddOpperation extends Operation { 1768 final KahaAddMessageCommand command; 1769 1770 public AddOpperation(KahaAddMessageCommand command, Location location) { 1771 super(location); 1772 this.command = command; 1773 } 1774 1775 @Override 1776 public void execute(Transaction tx) throws IOException { 1777 upadateIndex(tx, command, location); 1778 } 1779 1780 public KahaAddMessageCommand getCommand() { 1781 return command; 1782 } 1783 } 1784 1785 class RemoveOpperation extends Operation { 1786 final KahaRemoveMessageCommand command; 1787 1788 public RemoveOpperation(KahaRemoveMessageCommand command, Location location) { 1789 super(location); 1790 this.command = command; 1791 } 1792 1793 @Override 1794 public void execute(Transaction tx) throws IOException { 1795 updateIndex(tx, command, location); 1796 } 1797 1798 public KahaRemoveMessageCommand getCommand() { 1799 return command; 1800 } 1801 } 1802 1803 // ///////////////////////////////////////////////////////////////// 1804 // Initialization related implementation methods. 1805 // ///////////////////////////////////////////////////////////////// 1806 1807 private PageFile createPageFile() { 1808 PageFile index = new PageFile(directory, "db"); 1809 index.setEnableWriteThread(isEnableIndexWriteAsync()); 1810 index.setWriteBatchSize(getIndexWriteBatchSize()); 1811 index.setPageCacheSize(indexCacheSize); 1812 return index; 1813 } 1814 1815 private Journal createJournal() throws IOException { 1816 Journal manager = new Journal(); 1817 manager.setDirectory(directory); 1818 manager.setMaxFileLength(getJournalMaxFileLength()); 1819 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 1820 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 1821 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 1822 manager.setArchiveDataLogs(isArchiveDataLogs()); 1823 manager.setSizeAccumulator(storeSize); 1824 if (getDirectoryArchive() != null) { 1825 IOHelper.mkdirs(getDirectoryArchive()); 1826 manager.setDirectoryArchive(getDirectoryArchive()); 1827 } 1828 return manager; 1829 } 1830 1831 public int getJournalMaxWriteBatchSize() { 1832 return journalMaxWriteBatchSize; 1833 } 1834 1835 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 1836 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 1837 } 1838 1839 public File getDirectory() { 1840 return directory; 1841 } 1842 1843 public void setDirectory(File directory) { 1844 this.directory = directory; 1845 } 1846 1847 public boolean isDeleteAllMessages() { 1848 return deleteAllMessages; 1849 } 1850 1851 public void setDeleteAllMessages(boolean deleteAllMessages) { 1852 this.deleteAllMessages = deleteAllMessages; 1853 } 1854 1855 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 1856 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 1857 } 1858 1859 public int getIndexWriteBatchSize() { 1860 return setIndexWriteBatchSize; 1861 } 1862 1863 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 1864 this.enableIndexWriteAsync = enableIndexWriteAsync; 1865 } 1866 1867 boolean isEnableIndexWriteAsync() { 1868 return enableIndexWriteAsync; 1869 } 1870 1871 public boolean isEnableJournalDiskSyncs() { 1872 return enableJournalDiskSyncs; 1873 } 1874 1875 public void setEnableJournalDiskSyncs(boolean syncWrites) { 1876 this.enableJournalDiskSyncs = syncWrites; 1877 } 1878 1879 public long getCheckpointInterval() { 1880 return checkpointInterval; 1881 } 1882 1883 public void setCheckpointInterval(long checkpointInterval) { 1884 this.checkpointInterval = checkpointInterval; 1885 } 1886 1887 public long getCleanupInterval() { 1888 return cleanupInterval; 1889 } 1890 1891 public void setCleanupInterval(long cleanupInterval) { 1892 this.cleanupInterval = cleanupInterval; 1893 } 1894 1895 public void setJournalMaxFileLength(int journalMaxFileLength) { 1896 this.journalMaxFileLength = journalMaxFileLength; 1897 } 1898 1899 public int getJournalMaxFileLength() { 1900 return journalMaxFileLength; 1901 } 1902 1903 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 1904 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 1905 } 1906 1907 public int getMaxFailoverProducersToTrack() { 1908 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 1909 } 1910 1911 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 1912 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 1913 } 1914 1915 public int getFailoverProducersAuditDepth() { 1916 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 1917 } 1918 1919 public PageFile getPageFile() { 1920 if (pageFile == null) { 1921 pageFile = createPageFile(); 1922 } 1923 return pageFile; 1924 } 1925 1926 public Journal getJournal() throws IOException { 1927 if (journal == null) { 1928 journal = createJournal(); 1929 } 1930 return journal; 1931 } 1932 1933 public boolean isFailIfDatabaseIsLocked() { 1934 return failIfDatabaseIsLocked; 1935 } 1936 1937 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 1938 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 1939 } 1940 1941 public boolean isIgnoreMissingJournalfiles() { 1942 return ignoreMissingJournalfiles; 1943 } 1944 1945 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 1946 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 1947 } 1948 1949 public int getIndexCacheSize() { 1950 return indexCacheSize; 1951 } 1952 1953 public void setIndexCacheSize(int indexCacheSize) { 1954 this.indexCacheSize = indexCacheSize; 1955 } 1956 1957 public boolean isCheckForCorruptJournalFiles() { 1958 return checkForCorruptJournalFiles; 1959 } 1960 1961 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 1962 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 1963 } 1964 1965 public boolean isChecksumJournalFiles() { 1966 return checksumJournalFiles; 1967 } 1968 1969 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 1970 this.checksumJournalFiles = checksumJournalFiles; 1971 } 1972 1973 public void setBrokerService(BrokerService brokerService) { 1974 this.brokerService = brokerService; 1975 } 1976 1977 /** 1978 * @return the archiveDataLogs 1979 */ 1980 public boolean isArchiveDataLogs() { 1981 return this.archiveDataLogs; 1982 } 1983 1984 /** 1985 * @param archiveDataLogs the archiveDataLogs to set 1986 */ 1987 public void setArchiveDataLogs(boolean archiveDataLogs) { 1988 this.archiveDataLogs = archiveDataLogs; 1989 } 1990 1991 /** 1992 * @return the directoryArchive 1993 */ 1994 public File getDirectoryArchive() { 1995 return this.directoryArchive; 1996 } 1997 1998 /** 1999 * @param directoryArchive the directoryArchive to set 2000 */ 2001 public void setDirectoryArchive(File directoryArchive) { 2002 this.directoryArchive = directoryArchive; 2003 } 2004 2005 /** 2006 * @return the databaseLockedWaitDelay 2007 */ 2008 public int getDatabaseLockedWaitDelay() { 2009 return this.databaseLockedWaitDelay; 2010 } 2011 2012 /** 2013 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 2014 */ 2015 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { 2016 this.databaseLockedWaitDelay = databaseLockedWaitDelay; 2017 } 2018 2019 // ///////////////////////////////////////////////////////////////// 2020 // Internal conversion methods. 2021 // ///////////////////////////////////////////////////////////////// 2022 2023 KahaTransactionInfo createTransactionInfo(TransactionId txid) { 2024 if (txid == null) { 2025 return null; 2026 } 2027 KahaTransactionInfo rc = new KahaTransactionInfo(); 2028 2029 if (txid.isLocalTransaction()) { 2030 LocalTransactionId t = (LocalTransactionId) txid; 2031 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 2032 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 2033 kahaTxId.setTransacitonId(t.getValue()); 2034 rc.setLocalTransacitonId(kahaTxId); 2035 } else { 2036 XATransactionId t = (XATransactionId) txid; 2037 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 2038 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 2039 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 2040 kahaTxId.setFormatId(t.getFormatId()); 2041 rc.setXaTransacitonId(kahaTxId); 2042 } 2043 return rc; 2044 } 2045 2046 class MessageOrderCursor{ 2047 long defaultCursorPosition; 2048 long lowPriorityCursorPosition; 2049 long highPriorityCursorPosition; 2050 MessageOrderCursor(){ 2051 } 2052 2053 MessageOrderCursor(long position){ 2054 this.defaultCursorPosition=position; 2055 this.lowPriorityCursorPosition=position; 2056 this.highPriorityCursorPosition=position; 2057 } 2058 2059 MessageOrderCursor(MessageOrderCursor other){ 2060 this.defaultCursorPosition=other.defaultCursorPosition; 2061 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2062 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2063 } 2064 2065 MessageOrderCursor copy() { 2066 return new MessageOrderCursor(this); 2067 } 2068 2069 void reset() { 2070 this.defaultCursorPosition=0; 2071 this.highPriorityCursorPosition=0; 2072 this.lowPriorityCursorPosition=0; 2073 } 2074 2075 void increment() { 2076 if (defaultCursorPosition!=0) { 2077 defaultCursorPosition++; 2078 } 2079 if (highPriorityCursorPosition!=0) { 2080 highPriorityCursorPosition++; 2081 } 2082 if (lowPriorityCursorPosition!=0) { 2083 lowPriorityCursorPosition++; 2084 } 2085 } 2086 2087 public String toString() { 2088 return "MessageOrderCursor:[def:" + defaultCursorPosition 2089 + ", low:" + lowPriorityCursorPosition 2090 + ", high:" + highPriorityCursorPosition + "]"; 2091 } 2092 2093 public void sync(MessageOrderCursor other) { 2094 this.defaultCursorPosition=other.defaultCursorPosition; 2095 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 2096 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 2097 } 2098 } 2099 2100 class MessageOrderIndex { 2101 static final byte HI = 9; 2102 static final byte LO = 0; 2103 static final byte DEF = 4; 2104 2105 long nextMessageId; 2106 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 2107 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 2108 BTreeIndex<Long, MessageKeys> highPriorityIndex; 2109 MessageOrderCursor cursor = new MessageOrderCursor(); 2110 Long lastDefaultKey; 2111 Long lastHighKey; 2112 Long lastLowKey; 2113 byte lastGetPriority; 2114 2115 MessageKeys remove(Transaction tx, Long key) throws IOException { 2116 MessageKeys result = defaultPriorityIndex.remove(tx, key); 2117 if (result == null && highPriorityIndex!=null) { 2118 result = highPriorityIndex.remove(tx, key); 2119 if (result ==null && lowPriorityIndex!=null) { 2120 result = lowPriorityIndex.remove(tx, key); 2121 } 2122 } 2123 return result; 2124 } 2125 2126 void load(Transaction tx) throws IOException { 2127 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2128 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2129 defaultPriorityIndex.load(tx); 2130 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2131 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2132 lowPriorityIndex.load(tx); 2133 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2134 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2135 highPriorityIndex.load(tx); 2136 } 2137 2138 void allocate(Transaction tx) throws IOException { 2139 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2140 if (metadata.version >= 2) { 2141 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2142 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2143 } 2144 } 2145 2146 void configureLast(Transaction tx) throws IOException { 2147 // Figure out the next key using the last entry in the destination. 2148 if (highPriorityIndex != null) { 2149 Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx); 2150 if (lastEntry != null) { 2151 nextMessageId = lastEntry.getKey() + 1; 2152 } else { 2153 lastEntry = defaultPriorityIndex.getLast(tx); 2154 if (lastEntry != null) { 2155 nextMessageId = lastEntry.getKey() + 1; 2156 } else { 2157 lastEntry = lowPriorityIndex.getLast(tx); 2158 if (lastEntry != null) { 2159 nextMessageId = lastEntry.getKey() + 1; 2160 } 2161 } 2162 } 2163 } else { 2164 Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx); 2165 if (lastEntry != null) { 2166 nextMessageId = lastEntry.getKey() + 1; 2167 } 2168 } 2169 } 2170 2171 2172 void remove(Transaction tx) throws IOException { 2173 defaultPriorityIndex.clear(tx); 2174 defaultPriorityIndex.unload(tx); 2175 tx.free(defaultPriorityIndex.getPageId()); 2176 if (lowPriorityIndex != null) { 2177 lowPriorityIndex.clear(tx); 2178 lowPriorityIndex.unload(tx); 2179 2180 tx.free(lowPriorityIndex.getPageId()); 2181 } 2182 if (highPriorityIndex != null) { 2183 highPriorityIndex.clear(tx); 2184 highPriorityIndex.unload(tx); 2185 tx.free(highPriorityIndex.getPageId()); 2186 } 2187 } 2188 2189 void resetCursorPosition() { 2190 this.cursor.reset(); 2191 lastDefaultKey = null; 2192 lastHighKey = null; 2193 lastLowKey = null; 2194 } 2195 2196 void setBatch(Transaction tx, Long sequence) throws IOException { 2197 if (sequence != null) { 2198 Long nextPosition = new Long(sequence.longValue() + 1); 2199 if (defaultPriorityIndex.containsKey(tx, sequence)) { 2200 lastDefaultKey = sequence; 2201 cursor.defaultCursorPosition = nextPosition.longValue(); 2202 } else if (highPriorityIndex != null) { 2203 if (highPriorityIndex.containsKey(tx, sequence)) { 2204 lastHighKey = sequence; 2205 cursor.highPriorityCursorPosition = nextPosition.longValue(); 2206 } else if (lowPriorityIndex.containsKey(tx, sequence)) { 2207 lastLowKey = sequence; 2208 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 2209 } 2210 } else { 2211 lastDefaultKey = sequence; 2212 cursor.defaultCursorPosition = nextPosition.longValue(); 2213 } 2214 } 2215 } 2216 2217 void setBatch(Transaction tx, LastAck last) throws IOException { 2218 setBatch(tx, last.lastAckedSequence); 2219 if (cursor.defaultCursorPosition == 0 2220 && cursor.highPriorityCursorPosition == 0 2221 && cursor.lowPriorityCursorPosition == 0) { 2222 long next = last.lastAckedSequence + 1; 2223 switch (last.priority) { 2224 case DEF: 2225 cursor.defaultCursorPosition = next; 2226 cursor.highPriorityCursorPosition = next; 2227 break; 2228 case HI: 2229 cursor.highPriorityCursorPosition = next; 2230 break; 2231 case LO: 2232 cursor.lowPriorityCursorPosition = next; 2233 cursor.defaultCursorPosition = next; 2234 cursor.highPriorityCursorPosition = next; 2235 break; 2236 } 2237 } 2238 } 2239 2240 void stoppedIterating() { 2241 if (lastDefaultKey!=null) { 2242 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 2243 } 2244 if (lastHighKey!=null) { 2245 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 2246 } 2247 if (lastLowKey!=null) { 2248 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 2249 } 2250 lastDefaultKey = null; 2251 lastHighKey = null; 2252 lastLowKey = null; 2253 } 2254 2255 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 2256 throws IOException { 2257 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 2258 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 2259 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 2260 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 2261 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 2262 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 2263 } 2264 } 2265 2266 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 2267 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 2268 2269 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId); 2270 deletes.add(iterator.next()); 2271 } 2272 2273 long getNextMessageId(int priority) { 2274 return nextMessageId++; 2275 } 2276 2277 MessageKeys get(Transaction tx, Long key) throws IOException { 2278 MessageKeys result = defaultPriorityIndex.get(tx, key); 2279 if (result == null) { 2280 result = highPriorityIndex.get(tx, key); 2281 if (result == null) { 2282 result = lowPriorityIndex.get(tx, key); 2283 lastGetPriority = LO; 2284 } else { 2285 lastGetPriority = HI; 2286 } 2287 } else { 2288 lastGetPriority = DEF; 2289 } 2290 return result; 2291 } 2292 2293 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 2294 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 2295 return defaultPriorityIndex.put(tx, key, value); 2296 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 2297 return highPriorityIndex.put(tx, key, value); 2298 } else { 2299 return lowPriorityIndex.put(tx, key, value); 2300 } 2301 } 2302 2303 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 2304 return new MessageOrderIterator(tx,cursor); 2305 } 2306 2307 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 2308 return new MessageOrderIterator(tx,m); 2309 } 2310 2311 public byte lastGetPriority() { 2312 return lastGetPriority; 2313 } 2314 2315 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 2316 Iterator<Entry<Long, MessageKeys>>currentIterator; 2317 final Iterator<Entry<Long, MessageKeys>>highIterator; 2318 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 2319 final Iterator<Entry<Long, MessageKeys>>lowIterator; 2320 2321 2322 2323 MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException { 2324 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition); 2325 if (highPriorityIndex != null) { 2326 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition); 2327 } else { 2328 this.highIterator = null; 2329 } 2330 if (lowPriorityIndex != null) { 2331 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition); 2332 } else { 2333 this.lowIterator = null; 2334 } 2335 } 2336 2337 public boolean hasNext() { 2338 if (currentIterator == null) { 2339 if (highIterator != null) { 2340 if (highIterator.hasNext()) { 2341 currentIterator = highIterator; 2342 return currentIterator.hasNext(); 2343 } 2344 if (defaultIterator.hasNext()) { 2345 currentIterator = defaultIterator; 2346 return currentIterator.hasNext(); 2347 } 2348 if (lowIterator.hasNext()) { 2349 currentIterator = lowIterator; 2350 return currentIterator.hasNext(); 2351 } 2352 return false; 2353 } else { 2354 currentIterator = defaultIterator; 2355 return currentIterator.hasNext(); 2356 } 2357 } 2358 if (highIterator != null) { 2359 if (currentIterator.hasNext()) { 2360 return true; 2361 } 2362 if (currentIterator == highIterator) { 2363 if (defaultIterator.hasNext()) { 2364 currentIterator = defaultIterator; 2365 return currentIterator.hasNext(); 2366 } 2367 if (lowIterator.hasNext()) { 2368 currentIterator = lowIterator; 2369 return currentIterator.hasNext(); 2370 } 2371 return false; 2372 } 2373 if (currentIterator == defaultIterator) { 2374 if (lowIterator.hasNext()) { 2375 currentIterator = lowIterator; 2376 return currentIterator.hasNext(); 2377 } 2378 return false; 2379 } 2380 } 2381 return currentIterator.hasNext(); 2382 } 2383 2384 public Entry<Long, MessageKeys> next() { 2385 Entry<Long, MessageKeys> result = currentIterator.next(); 2386 if (result != null) { 2387 Long key = result.getKey(); 2388 if (highIterator != null) { 2389 if (currentIterator == defaultIterator) { 2390 lastDefaultKey = key; 2391 } else if (currentIterator == highIterator) { 2392 lastHighKey = key; 2393 } else { 2394 lastLowKey = key; 2395 } 2396 } else { 2397 lastDefaultKey = key; 2398 } 2399 } 2400 return result; 2401 } 2402 2403 public void remove() { 2404 throw new UnsupportedOperationException(); 2405 } 2406 2407 } 2408 } 2409 2410 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 2411 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 2412 2413 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 2414 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2415 ObjectOutputStream oout = new ObjectOutputStream(baos); 2416 oout.writeObject(object); 2417 oout.flush(); 2418 oout.close(); 2419 byte[] data = baos.toByteArray(); 2420 dataOut.writeInt(data.length); 2421 dataOut.write(data); 2422 } 2423 2424 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 2425 int dataLen = dataIn.readInt(); 2426 byte[] data = new byte[dataLen]; 2427 dataIn.readFully(data); 2428 ByteArrayInputStream bais = new ByteArrayInputStream(data); 2429 ObjectInputStream oin = new ObjectInputStream(bais); 2430 try { 2431 return (HashSet<String>) oin.readObject(); 2432 } catch (ClassNotFoundException cfe) { 2433 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 2434 ioe.initCause(cfe); 2435 throw ioe; 2436 } 2437 } 2438 } 2439 }