001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.Set;
022    import org.apache.activeio.journal.Journal;
023    import org.apache.activemq.broker.BrokerService;
024    import org.apache.activemq.broker.BrokerServiceAware;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.ActiveMQQueue;
028    import org.apache.activemq.command.ActiveMQTopic;
029    import org.apache.activemq.command.ProducerId;
030    import org.apache.activemq.store.MessageStore;
031    import org.apache.activemq.store.PersistenceAdapter;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.store.TransactionStore;
034    import org.apache.activemq.usage.SystemUsage;
035    
036    /**
037     * An implementation of {@link PersistenceAdapter} designed for use with a
038     * {@link Journal} and then check pointing asynchronously on a timeout with some
039     * other long term persistent storage.
040     * 
041     * @org.apache.xbean.XBean element="kahaDB"
042     * 
043     */
044    public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
045        private final KahaDBStore letter = new KahaDBStore();
046    
047        /**
048         * @param context
049         * @throws IOException
050         * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
051         */
052        public void beginTransaction(ConnectionContext context) throws IOException {
053            this.letter.beginTransaction(context);
054        }
055    
056        /**
057         * @param sync
058         * @throws IOException
059         * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
060         */
061        public void checkpoint(boolean sync) throws IOException {
062            this.letter.checkpoint(sync);
063        }
064    
065        /**
066         * @param context
067         * @throws IOException
068         * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
069         */
070        public void commitTransaction(ConnectionContext context) throws IOException {
071            this.letter.commitTransaction(context);
072        }
073    
074        /**
075         * @param destination
076         * @return MessageStore
077         * @throws IOException
078         * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
079         */
080        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
081            return this.letter.createQueueMessageStore(destination);
082        }
083    
084        /**
085         * @param destination
086         * @return TopicMessageStore
087         * @throws IOException
088         * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
089         */
090        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
091            return this.letter.createTopicMessageStore(destination);
092        }
093    
094        /**
095         * @return TrandactionStore
096         * @throws IOException
097         * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
098         */
099        public TransactionStore createTransactionStore() throws IOException {
100            return this.letter.createTransactionStore();
101        }
102    
103        /**
104         * @throws IOException
105         * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
106         */
107        public void deleteAllMessages() throws IOException {
108            this.letter.deleteAllMessages();
109        }
110    
111        /**
112         * @return destinations
113         * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
114         */
115        public Set<ActiveMQDestination> getDestinations() {
116            return this.letter.getDestinations();
117        }
118    
119        /**
120         * @return lastMessageBrokerSequenceId
121         * @throws IOException
122         * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
123         */
124        public long getLastMessageBrokerSequenceId() throws IOException {
125            return this.letter.getLastMessageBrokerSequenceId();
126        }
127    
128        public long getLastProducerSequenceId(ProducerId id) throws IOException {
129            return this.letter.getLastProducerSequenceId(id);
130        }
131    
132        /**
133         * @param destination
134         * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
135         */
136        public void removeQueueMessageStore(ActiveMQQueue destination) {
137            this.letter.removeQueueMessageStore(destination);
138        }
139    
140        /**
141         * @param destination
142         * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
143         */
144        public void removeTopicMessageStore(ActiveMQTopic destination) {
145            this.letter.removeTopicMessageStore(destination);
146        }
147    
148        /**
149         * @param context
150         * @throws IOException
151         * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
152         */
153        public void rollbackTransaction(ConnectionContext context) throws IOException {
154            this.letter.rollbackTransaction(context);
155        }
156    
157        /**
158         * @param brokerName
159         * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
160         */
161        public void setBrokerName(String brokerName) {
162            this.letter.setBrokerName(brokerName);
163        }
164    
165        /**
166         * @param usageManager
167         * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
168         */
169        public void setUsageManager(SystemUsage usageManager) {
170            this.letter.setUsageManager(usageManager);
171        }
172    
173        /**
174         * @return the size of the store
175         * @see org.apache.activemq.store.PersistenceAdapter#size()
176         */
177        public long size() {
178            return this.letter.size();
179        }
180    
181        /**
182         * @throws Exception
183         * @see org.apache.activemq.Service#start()
184         */
185        public void start() throws Exception {
186            this.letter.start();
187        }
188    
189        /**
190         * @throws Exception
191         * @see org.apache.activemq.Service#stop()
192         */
193        public void stop() throws Exception {
194            this.letter.stop();
195        }
196    
197        /**
198         * Get the journalMaxFileLength
199         * 
200         * @return the journalMaxFileLength
201         */
202        public int getJournalMaxFileLength() {
203            return this.letter.getJournalMaxFileLength();
204        }
205    
206        /**
207         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
208         * be used
209         * 
210         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
211         */
212        public void setJournalMaxFileLength(int journalMaxFileLength) {
213            this.letter.setJournalMaxFileLength(journalMaxFileLength);
214        }
215    
216        /**
217         * Set the max number of producers (LRU cache) to track for duplicate sends
218         */
219        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
220            this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
221        }
222        
223        public int getMaxFailoverProducersToTrack() {
224            return this.letter.getMaxFailoverProducersToTrack();
225        }
226    
227        /**
228         * set the audit window depth for duplicate suppression (should exceed the max transaction
229         * batch)
230         */
231        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
232            this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
233        }
234        
235        public int getFailoverProducersAuditDepth() {
236            return this.getFailoverProducersAuditDepth();
237        }
238        
239        /**
240         * Get the checkpointInterval
241         * 
242         * @return the checkpointInterval
243         */
244        public long getCheckpointInterval() {
245            return this.letter.getCheckpointInterval();
246        }
247    
248        /**
249         * Set the checkpointInterval
250         * 
251         * @param checkpointInterval
252         *            the checkpointInterval to set
253         */
254        public void setCheckpointInterval(long checkpointInterval) {
255            this.letter.setCheckpointInterval(checkpointInterval);
256        }
257    
258        /**
259         * Get the cleanupInterval
260         * 
261         * @return the cleanupInterval
262         */
263        public long getCleanupInterval() {
264            return this.letter.getCleanupInterval();
265        }
266    
267        /**
268         * Set the cleanupInterval
269         * 
270         * @param cleanupInterval
271         *            the cleanupInterval to set
272         */
273        public void setCleanupInterval(long cleanupInterval) {
274            this.letter.setCleanupInterval(cleanupInterval);
275        }
276    
277        /**
278         * Get the indexWriteBatchSize
279         * 
280         * @return the indexWriteBatchSize
281         */
282        public int getIndexWriteBatchSize() {
283            return this.letter.getIndexWriteBatchSize();
284        }
285    
286        /**
287         * Set the indexWriteBatchSize
288         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
289         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
290         * @param indexWriteBatchSize
291         *            the indexWriteBatchSize to set
292         */
293        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
294            this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
295        }
296    
297        /**
298         * Get the journalMaxWriteBatchSize
299         * 
300         * @return the journalMaxWriteBatchSize
301         */
302        public int getJournalMaxWriteBatchSize() {
303            return this.letter.getJournalMaxWriteBatchSize();
304        }
305    
306        /**
307         * Set the journalMaxWriteBatchSize
308         *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
309         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
310         * @param journalMaxWriteBatchSize
311         *            the journalMaxWriteBatchSize to set
312         */
313        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
314            this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
315        }
316    
317        /**
318         * Get the enableIndexWriteAsync
319         * 
320         * @return the enableIndexWriteAsync
321         */
322        public boolean isEnableIndexWriteAsync() {
323            return this.letter.isEnableIndexWriteAsync();
324        }
325    
326        /**
327         * Set the enableIndexWriteAsync
328         * 
329         * @param enableIndexWriteAsync
330         *            the enableIndexWriteAsync to set
331         */
332        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
333            this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
334        }
335    
336        /**
337         * Get the directory
338         * 
339         * @return the directory
340         */
341        public File getDirectory() {
342            return this.letter.getDirectory();
343        }
344    
345        /**
346         * @param dir
347         * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
348         */
349        public void setDirectory(File dir) {
350            this.letter.setDirectory(dir);
351        }
352    
353        /**
354         * Get the enableJournalDiskSyncs
355         * 
356         * @return the enableJournalDiskSyncs
357         */
358        public boolean isEnableJournalDiskSyncs() {
359            return this.letter.isEnableJournalDiskSyncs();
360        }
361    
362        /**
363         * Set the enableJournalDiskSyncs
364         * 
365         * @param enableJournalDiskSyncs
366         *            the enableJournalDiskSyncs to set
367         */
368        public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
369            this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
370        }
371    
372        /**
373         * Get the indexCacheSize
374         * 
375         * @return the indexCacheSize
376         */
377        public int getIndexCacheSize() {
378            return this.letter.getIndexCacheSize();
379        }
380    
381        /**
382         * Set the indexCacheSize
383         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
384         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
385         * @param indexCacheSize
386         *            the indexCacheSize to set
387         */
388        public void setIndexCacheSize(int indexCacheSize) {
389            this.letter.setIndexCacheSize(indexCacheSize);
390        }
391    
392        /**
393         * Get the ignoreMissingJournalfiles
394         * 
395         * @return the ignoreMissingJournalfiles
396         */
397        public boolean isIgnoreMissingJournalfiles() {
398            return this.letter.isIgnoreMissingJournalfiles();
399        }
400    
401        /**
402         * Set the ignoreMissingJournalfiles
403         * 
404         * @param ignoreMissingJournalfiles
405         *            the ignoreMissingJournalfiles to set
406         */
407        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
408            this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
409        }
410    
411        public boolean isChecksumJournalFiles() {
412            return letter.isChecksumJournalFiles();
413        }
414    
415        public boolean isCheckForCorruptJournalFiles() {
416            return letter.isCheckForCorruptJournalFiles();
417        }
418    
419        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
420            letter.setChecksumJournalFiles(checksumJournalFiles);
421        }
422    
423        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
424            letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
425        }
426    
427        public void setBrokerService(BrokerService brokerService) {
428            letter.setBrokerService(brokerService);
429        }
430    
431        public boolean isArchiveDataLogs() {
432            return letter.isArchiveDataLogs();
433        }
434    
435        public void setArchiveDataLogs(boolean archiveDataLogs) {
436            letter.setArchiveDataLogs(archiveDataLogs);
437        }
438    
439        public File getDirectoryArchive() {
440            return letter.getDirectoryArchive();
441        }
442    
443        public void setDirectoryArchive(File directoryArchive) {
444            letter.setDirectoryArchive(directoryArchive);
445        }
446    
447        public boolean isConcurrentStoreAndDispatchQueues() {
448            return letter.isConcurrentStoreAndDispatchQueues();
449        }
450    
451        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
452            letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
453        }
454    
455        public boolean isConcurrentStoreAndDispatchTopics() {
456            return letter.isConcurrentStoreAndDispatchTopics();
457        }
458    
459        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
460            letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
461        }
462    
463        public int getMaxAsyncJobs() {
464            return letter.getMaxAsyncJobs();
465        }
466        /**
467         * @param maxAsyncJobs
468         *            the maxAsyncJobs to set
469         */
470        public void setMaxAsyncJobs(int maxAsyncJobs) {
471            letter.setMaxAsyncJobs(maxAsyncJobs);
472        }
473        
474        /**
475         * @return the databaseLockedWaitDelay
476         */
477        public int getDatabaseLockedWaitDelay() {
478            return letter.getDatabaseLockedWaitDelay();
479        }
480    
481        /**
482         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
483         */
484        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
485           letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
486        }
487    
488        public boolean getForceRecoverIndex() {
489            return letter.getForceRecoverIndex();
490        }
491    
492        public void setForceRecoverIndex(boolean forceRecoverIndex) {
493            letter.setForceRecoverIndex(forceRecoverIndex);
494        }
495    
496        //  for testing
497        public KahaDBStore getStore() {
498            return letter;
499        }
500    
501        @Override
502        public String toString() {
503            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
504            return "KahaDBPersistenceAdapter[" + path + "]";
505        }
506    
507    }