001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.Set; 022 import org.apache.activeio.journal.Journal; 023 import org.apache.activemq.broker.BrokerService; 024 import org.apache.activemq.broker.BrokerServiceAware; 025 import org.apache.activemq.broker.ConnectionContext; 026 import org.apache.activemq.command.ActiveMQDestination; 027 import org.apache.activemq.command.ActiveMQQueue; 028 import org.apache.activemq.command.ActiveMQTopic; 029 import org.apache.activemq.command.ProducerId; 030 import org.apache.activemq.store.MessageStore; 031 import org.apache.activemq.store.PersistenceAdapter; 032 import org.apache.activemq.store.TopicMessageStore; 033 import org.apache.activemq.store.TransactionStore; 034 import org.apache.activemq.usage.SystemUsage; 035 036 /** 037 * An implementation of {@link PersistenceAdapter} designed for use with a 038 * {@link Journal} and then check pointing asynchronously on a timeout with some 039 * other long term persistent storage. 040 * 041 * @org.apache.xbean.XBean element="kahaDB" 042 * 043 */ 044 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { 045 private final KahaDBStore letter = new KahaDBStore(); 046 047 /** 048 * @param context 049 * @throws IOException 050 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 051 */ 052 public void beginTransaction(ConnectionContext context) throws IOException { 053 this.letter.beginTransaction(context); 054 } 055 056 /** 057 * @param sync 058 * @throws IOException 059 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 060 */ 061 public void checkpoint(boolean sync) throws IOException { 062 this.letter.checkpoint(sync); 063 } 064 065 /** 066 * @param context 067 * @throws IOException 068 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 069 */ 070 public void commitTransaction(ConnectionContext context) throws IOException { 071 this.letter.commitTransaction(context); 072 } 073 074 /** 075 * @param destination 076 * @return MessageStore 077 * @throws IOException 078 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 079 */ 080 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 081 return this.letter.createQueueMessageStore(destination); 082 } 083 084 /** 085 * @param destination 086 * @return TopicMessageStore 087 * @throws IOException 088 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 089 */ 090 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 091 return this.letter.createTopicMessageStore(destination); 092 } 093 094 /** 095 * @return TrandactionStore 096 * @throws IOException 097 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 098 */ 099 public TransactionStore createTransactionStore() throws IOException { 100 return this.letter.createTransactionStore(); 101 } 102 103 /** 104 * @throws IOException 105 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 106 */ 107 public void deleteAllMessages() throws IOException { 108 this.letter.deleteAllMessages(); 109 } 110 111 /** 112 * @return destinations 113 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 114 */ 115 public Set<ActiveMQDestination> getDestinations() { 116 return this.letter.getDestinations(); 117 } 118 119 /** 120 * @return lastMessageBrokerSequenceId 121 * @throws IOException 122 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 123 */ 124 public long getLastMessageBrokerSequenceId() throws IOException { 125 return this.letter.getLastMessageBrokerSequenceId(); 126 } 127 128 public long getLastProducerSequenceId(ProducerId id) throws IOException { 129 return this.letter.getLastProducerSequenceId(id); 130 } 131 132 /** 133 * @param destination 134 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 135 */ 136 public void removeQueueMessageStore(ActiveMQQueue destination) { 137 this.letter.removeQueueMessageStore(destination); 138 } 139 140 /** 141 * @param destination 142 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 143 */ 144 public void removeTopicMessageStore(ActiveMQTopic destination) { 145 this.letter.removeTopicMessageStore(destination); 146 } 147 148 /** 149 * @param context 150 * @throws IOException 151 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 152 */ 153 public void rollbackTransaction(ConnectionContext context) throws IOException { 154 this.letter.rollbackTransaction(context); 155 } 156 157 /** 158 * @param brokerName 159 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 160 */ 161 public void setBrokerName(String brokerName) { 162 this.letter.setBrokerName(brokerName); 163 } 164 165 /** 166 * @param usageManager 167 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 168 */ 169 public void setUsageManager(SystemUsage usageManager) { 170 this.letter.setUsageManager(usageManager); 171 } 172 173 /** 174 * @return the size of the store 175 * @see org.apache.activemq.store.PersistenceAdapter#size() 176 */ 177 public long size() { 178 return this.letter.size(); 179 } 180 181 /** 182 * @throws Exception 183 * @see org.apache.activemq.Service#start() 184 */ 185 public void start() throws Exception { 186 this.letter.start(); 187 } 188 189 /** 190 * @throws Exception 191 * @see org.apache.activemq.Service#stop() 192 */ 193 public void stop() throws Exception { 194 this.letter.stop(); 195 } 196 197 /** 198 * Get the journalMaxFileLength 199 * 200 * @return the journalMaxFileLength 201 */ 202 public int getJournalMaxFileLength() { 203 return this.letter.getJournalMaxFileLength(); 204 } 205 206 /** 207 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 208 * be used 209 * 210 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 211 */ 212 public void setJournalMaxFileLength(int journalMaxFileLength) { 213 this.letter.setJournalMaxFileLength(journalMaxFileLength); 214 } 215 216 /** 217 * Set the max number of producers (LRU cache) to track for duplicate sends 218 */ 219 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 220 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 221 } 222 223 public int getMaxFailoverProducersToTrack() { 224 return this.letter.getMaxFailoverProducersToTrack(); 225 } 226 227 /** 228 * set the audit window depth for duplicate suppression (should exceed the max transaction 229 * batch) 230 */ 231 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 232 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 233 } 234 235 public int getFailoverProducersAuditDepth() { 236 return this.getFailoverProducersAuditDepth(); 237 } 238 239 /** 240 * Get the checkpointInterval 241 * 242 * @return the checkpointInterval 243 */ 244 public long getCheckpointInterval() { 245 return this.letter.getCheckpointInterval(); 246 } 247 248 /** 249 * Set the checkpointInterval 250 * 251 * @param checkpointInterval 252 * the checkpointInterval to set 253 */ 254 public void setCheckpointInterval(long checkpointInterval) { 255 this.letter.setCheckpointInterval(checkpointInterval); 256 } 257 258 /** 259 * Get the cleanupInterval 260 * 261 * @return the cleanupInterval 262 */ 263 public long getCleanupInterval() { 264 return this.letter.getCleanupInterval(); 265 } 266 267 /** 268 * Set the cleanupInterval 269 * 270 * @param cleanupInterval 271 * the cleanupInterval to set 272 */ 273 public void setCleanupInterval(long cleanupInterval) { 274 this.letter.setCleanupInterval(cleanupInterval); 275 } 276 277 /** 278 * Get the indexWriteBatchSize 279 * 280 * @return the indexWriteBatchSize 281 */ 282 public int getIndexWriteBatchSize() { 283 return this.letter.getIndexWriteBatchSize(); 284 } 285 286 /** 287 * Set the indexWriteBatchSize 288 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 289 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 290 * @param indexWriteBatchSize 291 * the indexWriteBatchSize to set 292 */ 293 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 294 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 295 } 296 297 /** 298 * Get the journalMaxWriteBatchSize 299 * 300 * @return the journalMaxWriteBatchSize 301 */ 302 public int getJournalMaxWriteBatchSize() { 303 return this.letter.getJournalMaxWriteBatchSize(); 304 } 305 306 /** 307 * Set the journalMaxWriteBatchSize 308 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 309 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 310 * @param journalMaxWriteBatchSize 311 * the journalMaxWriteBatchSize to set 312 */ 313 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 314 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 315 } 316 317 /** 318 * Get the enableIndexWriteAsync 319 * 320 * @return the enableIndexWriteAsync 321 */ 322 public boolean isEnableIndexWriteAsync() { 323 return this.letter.isEnableIndexWriteAsync(); 324 } 325 326 /** 327 * Set the enableIndexWriteAsync 328 * 329 * @param enableIndexWriteAsync 330 * the enableIndexWriteAsync to set 331 */ 332 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 333 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 334 } 335 336 /** 337 * Get the directory 338 * 339 * @return the directory 340 */ 341 public File getDirectory() { 342 return this.letter.getDirectory(); 343 } 344 345 /** 346 * @param dir 347 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 348 */ 349 public void setDirectory(File dir) { 350 this.letter.setDirectory(dir); 351 } 352 353 /** 354 * Get the enableJournalDiskSyncs 355 * 356 * @return the enableJournalDiskSyncs 357 */ 358 public boolean isEnableJournalDiskSyncs() { 359 return this.letter.isEnableJournalDiskSyncs(); 360 } 361 362 /** 363 * Set the enableJournalDiskSyncs 364 * 365 * @param enableJournalDiskSyncs 366 * the enableJournalDiskSyncs to set 367 */ 368 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 369 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 370 } 371 372 /** 373 * Get the indexCacheSize 374 * 375 * @return the indexCacheSize 376 */ 377 public int getIndexCacheSize() { 378 return this.letter.getIndexCacheSize(); 379 } 380 381 /** 382 * Set the indexCacheSize 383 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 384 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 385 * @param indexCacheSize 386 * the indexCacheSize to set 387 */ 388 public void setIndexCacheSize(int indexCacheSize) { 389 this.letter.setIndexCacheSize(indexCacheSize); 390 } 391 392 /** 393 * Get the ignoreMissingJournalfiles 394 * 395 * @return the ignoreMissingJournalfiles 396 */ 397 public boolean isIgnoreMissingJournalfiles() { 398 return this.letter.isIgnoreMissingJournalfiles(); 399 } 400 401 /** 402 * Set the ignoreMissingJournalfiles 403 * 404 * @param ignoreMissingJournalfiles 405 * the ignoreMissingJournalfiles to set 406 */ 407 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 408 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 409 } 410 411 public boolean isChecksumJournalFiles() { 412 return letter.isChecksumJournalFiles(); 413 } 414 415 public boolean isCheckForCorruptJournalFiles() { 416 return letter.isCheckForCorruptJournalFiles(); 417 } 418 419 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 420 letter.setChecksumJournalFiles(checksumJournalFiles); 421 } 422 423 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 424 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 425 } 426 427 public void setBrokerService(BrokerService brokerService) { 428 letter.setBrokerService(brokerService); 429 } 430 431 public boolean isArchiveDataLogs() { 432 return letter.isArchiveDataLogs(); 433 } 434 435 public void setArchiveDataLogs(boolean archiveDataLogs) { 436 letter.setArchiveDataLogs(archiveDataLogs); 437 } 438 439 public File getDirectoryArchive() { 440 return letter.getDirectoryArchive(); 441 } 442 443 public void setDirectoryArchive(File directoryArchive) { 444 letter.setDirectoryArchive(directoryArchive); 445 } 446 447 public boolean isConcurrentStoreAndDispatchQueues() { 448 return letter.isConcurrentStoreAndDispatchQueues(); 449 } 450 451 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 452 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 453 } 454 455 public boolean isConcurrentStoreAndDispatchTopics() { 456 return letter.isConcurrentStoreAndDispatchTopics(); 457 } 458 459 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 460 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 461 } 462 463 public int getMaxAsyncJobs() { 464 return letter.getMaxAsyncJobs(); 465 } 466 /** 467 * @param maxAsyncJobs 468 * the maxAsyncJobs to set 469 */ 470 public void setMaxAsyncJobs(int maxAsyncJobs) { 471 letter.setMaxAsyncJobs(maxAsyncJobs); 472 } 473 474 /** 475 * @return the databaseLockedWaitDelay 476 */ 477 public int getDatabaseLockedWaitDelay() { 478 return letter.getDatabaseLockedWaitDelay(); 479 } 480 481 /** 482 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 483 */ 484 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) { 485 letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay); 486 } 487 488 public boolean getForceRecoverIndex() { 489 return letter.getForceRecoverIndex(); 490 } 491 492 public void setForceRecoverIndex(boolean forceRecoverIndex) { 493 letter.setForceRecoverIndex(forceRecoverIndex); 494 } 495 496 // for testing 497 public KahaDBStore getStore() { 498 return letter; 499 } 500 501 @Override 502 public String toString() { 503 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 504 return "KahaDBPersistenceAdapter[" + path + "]"; 505 } 506 507 }