001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileLock;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.CountDownLatch;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    import java.util.concurrent.atomic.AtomicLong;
034    import org.apache.activeio.journal.Journal;
035    import org.apache.activemq.broker.BrokerService;
036    import org.apache.activemq.broker.BrokerServiceAware;
037    import org.apache.activemq.broker.ConnectionContext;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ActiveMQQueue;
040    import org.apache.activemq.command.ActiveMQTopic;
041    import org.apache.activemq.command.DataStructure;
042    import org.apache.activemq.command.JournalQueueAck;
043    import org.apache.activemq.command.JournalTopicAck;
044    import org.apache.activemq.command.JournalTrace;
045    import org.apache.activemq.command.JournalTransaction;
046    import org.apache.activemq.command.Message;
047    import org.apache.activemq.command.ProducerId;
048    import org.apache.activemq.command.SubscriptionInfo;
049    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051    import org.apache.activemq.kaha.impl.async.Location;
052    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053    import org.apache.activemq.openwire.OpenWireFormat;
054    import org.apache.activemq.store.MessageStore;
055    import org.apache.activemq.store.PersistenceAdapter;
056    import org.apache.activemq.store.ReferenceStore;
057    import org.apache.activemq.store.ReferenceStoreAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TopicReferenceStore;
060    import org.apache.activemq.store.TransactionStore;
061    import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062    import org.apache.activemq.thread.Scheduler;
063    import org.apache.activemq.thread.Task;
064    import org.apache.activemq.thread.TaskRunner;
065    import org.apache.activemq.thread.TaskRunnerFactory;
066    import org.apache.activemq.usage.SystemUsage;
067    import org.apache.activemq.usage.Usage;
068    import org.apache.activemq.usage.UsageListener;
069    import org.apache.activemq.util.ByteSequence;
070    import org.apache.activemq.util.IOExceptionSupport;
071    import org.apache.activemq.util.IOHelper;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.slf4j.Logger;
074    import org.slf4j.LoggerFactory;
075    
076    
077    /**
078     * An implementation of {@link PersistenceAdapter} designed for use with a
079     * {@link Journal} and then check pointing asynchronously on a timeout with some
080     * other long term persistent storage.
081     * 
082     * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083     * 
084     */
085    public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086    
087        private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088        private Scheduler scheduler;
089        private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090        private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091        private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092        private static final boolean BROKEN_FILE_LOCK;
093        private static final boolean DISABLE_LOCKING;
094        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095        private AsyncDataManager asyncDataManager;
096        private ReferenceStoreAdapter referenceStoreAdapter;
097        private TaskRunnerFactory taskRunnerFactory;
098        private WireFormat wireFormat = new OpenWireFormat();
099        private SystemUsage usageManager;
100        private long checkpointInterval = 1000 * 20;
101        private int maxCheckpointMessageAddSize = 1024 * 4;
102        private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103        private TaskRunner checkpointTask;
104        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105        private final AtomicBoolean started = new AtomicBoolean(false);
106        private Runnable periodicCheckpointTask;
107        private Runnable periodicCleanupTask;
108        private boolean deleteAllMessages;
109        private boolean syncOnWrite;
110        private boolean syncOnTransaction=true;
111        private String brokerName = "";
112        private File directory;
113        private File directoryArchive;
114        private BrokerService brokerService;
115        private final AtomicLong storeSize = new AtomicLong();
116        private boolean persistentIndex=true;
117        private boolean useNio = true;
118        private boolean archiveDataLogs=false;
119        private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
120        private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
121        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
122        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
123        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
124        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
125        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
126        private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
127        private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
128        private RandomAccessFile lockFile;
129        private FileLock lock;
130        private boolean disableLocking = DISABLE_LOCKING;
131            private boolean failIfJournalIsLocked;
132        private boolean lockLogged;
133        private boolean lockAquired;
134        private boolean recoverReferenceStore=true;
135        private boolean forceRecoverReferenceStore=false;
136        private boolean useDedicatedTaskRunner=false;
137        private int journalThreadPriority = Thread.MAX_PRIORITY;
138    
139        public String getBrokerName() {
140            return this.brokerName;
141        }
142    
143        public void setBrokerName(String brokerName) {
144            this.brokerName = brokerName;
145            if (this.referenceStoreAdapter != null) {
146                this.referenceStoreAdapter.setBrokerName(brokerName);
147            }
148        }
149    
150        public BrokerService getBrokerService() {
151            return brokerService;
152        }
153    
154        public void setBrokerService(BrokerService brokerService) {
155            this.brokerService = brokerService;
156        }
157    
158        public synchronized void start() throws Exception {
159            if (!started.compareAndSet(false, true)) {
160                return;
161            }
162            if (this.directory == null) {
163                if (brokerService != null) {
164                    this.directory = brokerService.getBrokerDataDirectory();
165                   
166                } else {
167                    this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
168                    this.directory = new File(directory, "amqstore");
169                    directory.getAbsolutePath();
170                }
171            }
172            if (this.directoryArchive == null) {
173                this.directoryArchive = new File(this.directory,"archive");
174            }
175            if (this.brokerService != null) {
176                this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
177                this.scheduler = this.brokerService.getScheduler();
178            } else {
179                this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
180                    true, 1000, isUseDedicatedTaskRunner());
181                this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
182            }
183    
184            IOHelper.mkdirs(this.directory);
185            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
186            lock();
187            LOG.info("AMQStore starting using directory: " + directory); 
188            if (archiveDataLogs) {
189                IOHelper.mkdirs(this.directoryArchive);
190            }
191    
192            if (this.usageManager != null) {
193                this.usageManager.getMemoryUsage().addUsageListener(this);
194            }
195            if (asyncDataManager == null) {
196                asyncDataManager = createAsyncDataManager();
197            }
198            if (referenceStoreAdapter == null) {
199                referenceStoreAdapter = createReferenceStoreAdapter();
200            }
201            referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
202            referenceStoreAdapter.setBrokerName(getBrokerName());
203            referenceStoreAdapter.setUsageManager(usageManager);
204            referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
205            
206            if (failIfJournalIsLocked) {
207                asyncDataManager.lock();
208            } else {
209                while (true) {
210                    try {
211                        asyncDataManager.lock();
212                        break;
213                    } catch (IOException e) {
214                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
215                        try {
216                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
217                        } catch (InterruptedException e1) {
218                        }
219                    }
220                }
221            }
222            
223            asyncDataManager.start();
224            if (deleteAllMessages) {
225                asyncDataManager.delete();
226                try {
227                    JournalTrace trace = new JournalTrace();
228                    trace.setMessage("DELETED " + new Date());
229                    Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
230                    asyncDataManager.setMark(location, true);
231                    LOG.info("Journal deleted: ");
232                    deleteAllMessages = false;
233                } catch (IOException e) {
234                    throw e;
235                } catch (Throwable e) {
236                    throw IOExceptionSupport.create(e);
237                }
238                referenceStoreAdapter.deleteAllMessages();
239            }
240            referenceStoreAdapter.start();
241            Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
242            LOG.info("Active data files: " + files);
243            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
244    
245                public boolean iterate() {
246                    doCheckpoint();
247                    return false;
248                }
249            }, "ActiveMQ Journal Checkpoint Worker");
250            createTransactionStore();
251    
252            //
253            // The following was attempting to reduce startup times by avoiding the
254            // log
255            // file scanning that recovery performs. The problem with it is that XA
256            // transactions
257            // only live in transaction log and are not stored in the reference
258            // store, but they still
259            // need to be recovered when the broker starts up.
260    
261            if (isForceRecoverReferenceStore()
262                    || (isRecoverReferenceStore() && !referenceStoreAdapter
263                            .isStoreValid())) {
264                LOG.warn("The ReferenceStore is not valid - recovering ...");
265                recover();
266                LOG.info("Finished recovering the ReferenceStore");
267            } else {
268                Location location = writeTraceMessage("RECOVERED " + new Date(),
269                        true);
270                asyncDataManager.setMark(location, true);
271                // recover transactions
272                getTransactionStore().setPreparedTransactions(
273                        referenceStoreAdapter.retrievePreparedState());
274            }
275    
276            // Do a checkpoint periodically.
277            periodicCheckpointTask = new Runnable() {
278    
279                public void run() {
280                    checkpoint(false);
281                }
282            };
283            scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
284            periodicCleanupTask = new Runnable() {
285    
286                public void run() {
287                    cleanup();
288                }
289            };
290            scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
291            
292            if (lockAquired && lockLogged) {
293                LOG.info("Aquired lock for AMQ Store" + getDirectory());
294                if (brokerService != null) {
295                    brokerService.getBroker().nowMasterBroker();
296                }
297            }
298    
299        }
300    
301        public void stop() throws Exception {
302    
303            if (!started.compareAndSet(true, false)) {
304                return;
305            }
306            unlock();
307            if (lockFile != null) {
308                lockFile.close();
309                lockFile = null;
310            }
311            this.usageManager.getMemoryUsage().removeUsageListener(this);
312            synchronized (this) {
313                scheduler.cancel(periodicCheckpointTask);
314                scheduler.cancel(periodicCleanupTask);
315            }
316            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
317            while (queueIterator.hasNext()) {
318                AMQMessageStore ms = queueIterator.next();
319                ms.stop();
320            }
321            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
322            while (topicIterator.hasNext()) {
323                final AMQTopicMessageStore ms = topicIterator.next();
324                ms.stop();
325            }
326            // Take one final checkpoint and stop checkpoint processing.
327            checkpoint(true);
328            synchronized (this) {
329                checkpointTask.shutdown();
330            }
331            referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
332            queues.clear();
333            topics.clear();
334            IOException firstException = null;
335            referenceStoreAdapter.stop();
336            referenceStoreAdapter = null;
337    
338            if (this.brokerService == null) {
339                this.taskRunnerFactory.shutdown();
340                this.scheduler.stop();
341            }
342            try {
343                LOG.debug("Journal close");
344                asyncDataManager.close();
345            } catch (Exception e) {
346                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
347            }
348            if (firstException != null) {
349                throw firstException;
350            }
351        }
352    
353        /**
354         * When we checkpoint we move all the journalled data to long term storage.
355         * 
356         * @param sync
357         */
358        public void checkpoint(boolean sync) {
359            try {
360                if (asyncDataManager == null) {
361                    throw new IllegalStateException("Journal is closed.");
362                }
363                CountDownLatch latch = null;
364                synchronized (this) {
365                    latch = nextCheckpointCountDownLatch;
366                    checkpointTask.wakeup();
367                }
368                if (sync) {
369                    if (LOG.isDebugEnabled()) {
370                        LOG.debug("Waitng for checkpoint to complete.");
371                    }
372                    latch.await();
373                }
374                referenceStoreAdapter.checkpoint(sync);
375            } catch (InterruptedException e) {
376                Thread.currentThread().interrupt();
377                LOG.warn("Request to start checkpoint failed: " + e, e);
378            } catch (IOException e) {
379                LOG.error("checkpoint failed: " + e, e);
380            }
381        }
382    
383        /**
384         * This does the actual checkpoint.
385         * 
386         * @return true if successful
387         */
388        public boolean doCheckpoint() {
389            CountDownLatch latch = null;
390            synchronized (this) {
391                latch = nextCheckpointCountDownLatch;
392                nextCheckpointCountDownLatch = new CountDownLatch(1);
393            }
394            try {
395                if (LOG.isDebugEnabled()) {
396                    LOG.debug("Checkpoint started.");
397                }
398    
399                Location currentMark = asyncDataManager.getMark();
400                Location newMark = currentMark;
401                Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
402                while (queueIterator.hasNext()) {
403                    final AMQMessageStore ms = queueIterator.next();
404                    Location mark = ms.getMark();
405                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
406                        newMark = mark;
407                    }
408                }
409                Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
410                while (topicIterator.hasNext()) {
411                    final AMQTopicMessageStore ms = topicIterator.next();
412                    Location mark = ms.getMark();
413                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
414                        newMark = mark;
415                    }
416                }
417                try {
418                    if (newMark != currentMark) {
419                        if (LOG.isDebugEnabled()) {
420                            LOG.debug("Marking journal at: " + newMark);
421                        }
422                        asyncDataManager.setMark(newMark, false);
423                        writeTraceMessage("CHECKPOINT " + new Date(), true);
424                    }
425                } catch (Exception e) {
426                    LOG.error("Failed to mark the Journal: " + e, e);
427                }
428                if (LOG.isDebugEnabled()) {
429                    LOG.debug("Checkpoint done.");
430                }
431            } finally {
432                latch.countDown();
433            }
434            return true;
435        }
436    
437        /**
438         * Cleans up the data files
439         * @throws IOException
440         */
441        public void cleanup() {
442            try {
443                Set<Integer>inProgress = new HashSet<Integer>();
444                if (LOG.isDebugEnabled()) {
445                    LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
446                }      
447                for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
448                    inProgress.addAll(set.keySet());
449                }
450                Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
451                inProgress.add(lastDataFile);
452                lastDataFile = asyncDataManager.getMark().getDataFileId();
453                inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
454                Location lastActiveTx = transactionStore.checkpoint();
455                if (lastActiveTx != null) {
456                    lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
457                }
458                LOG.debug("lastDataFile: " + lastDataFile);
459                asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
460            } catch (IOException e) {
461                LOG.error("Could not cleanup data files: " + e, e);
462            }
463        }
464    
465        public Set<ActiveMQDestination> getDestinations() {
466            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
467            destinations.addAll(queues.keySet());
468            destinations.addAll(topics.keySet());
469            return destinations;
470        }
471    
472        MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
473            if (destination.isQueue()) {
474                return createQueueMessageStore((ActiveMQQueue)destination);
475            } else {
476                return createTopicMessageStore((ActiveMQTopic)destination);
477            }
478        }
479    
480        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
481            AMQMessageStore store = queues.get(destination);
482            if (store == null) {
483                ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
484                store = new AMQMessageStore(this, checkpointStore, destination);
485                try {
486                    store.start();
487                } catch (Exception e) {
488                    throw IOExceptionSupport.create(e);
489                }
490                queues.put(destination, store);
491            }
492            return store;
493        }
494    
495        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
496            AMQTopicMessageStore store = topics.get(destinationName);
497            if (store == null) {
498                TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
499                store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
500                try {
501                    store.start();
502                } catch (Exception e) {
503                    throw IOExceptionSupport.create(e);
504                }
505                topics.put(destinationName, store);
506            }
507            return store;
508        }
509    
510        /**
511         * Cleanup method to remove any state associated with the given destination
512         *
513         * @param destination
514         */
515        public void removeQueueMessageStore(ActiveMQQueue destination) {
516            AMQMessageStore store= queues.remove(destination);
517            referenceStoreAdapter.removeQueueMessageStore(destination);
518        }
519    
520        /**
521         * Cleanup method to remove any state associated with the given destination
522         *
523         * @param destination
524         */
525        public void removeTopicMessageStore(ActiveMQTopic destination) {
526            topics.remove(destination);
527        }
528    
529        public TransactionStore createTransactionStore() throws IOException {
530            return transactionStore;
531        }
532    
533        public long getLastMessageBrokerSequenceId() throws IOException {
534            return referenceStoreAdapter.getLastMessageBrokerSequenceId();
535        }
536    
537        public void beginTransaction(ConnectionContext context) throws IOException {
538            referenceStoreAdapter.beginTransaction(context);
539        }
540    
541        public void commitTransaction(ConnectionContext context) throws IOException {
542            referenceStoreAdapter.commitTransaction(context);
543        }
544    
545        public void rollbackTransaction(ConnectionContext context) throws IOException {
546            referenceStoreAdapter.rollbackTransaction(context);
547        }
548        
549        public boolean isPersistentIndex() {
550                    return persistentIndex;
551            }
552    
553            public void setPersistentIndex(boolean persistentIndex) {
554                    this.persistentIndex = persistentIndex;
555            }
556    
557        /**
558         * @param location
559         * @return
560         * @throws IOException
561         */
562        public DataStructure readCommand(Location location) throws IOException {
563            try {
564                ByteSequence packet = asyncDataManager.read(location);
565                return (DataStructure)wireFormat.unmarshal(packet);
566            } catch (IOException e) {
567                throw createReadException(location, e);
568            }
569        }
570    
571        /**
572         * Move all the messages that were in the journal into long term storage. We
573         * just replay and do a checkpoint.
574         * 
575         * @throws IOException
576         * @throws IOException
577         * @throws IllegalStateException
578         */
579        private void recover() throws IllegalStateException, IOException {
580            referenceStoreAdapter.clearMessages();
581            Location pos = null;
582            int redoCounter = 0;
583            LOG.info("Journal Recovery Started from: " + asyncDataManager);
584            long start = System.currentTimeMillis();
585            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
586            // While we have records in the journal.
587            while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
588                ByteSequence data = asyncDataManager.read(pos);
589                DataStructure c = (DataStructure)wireFormat.unmarshal(data);
590                if (c instanceof Message) {
591                    Message message = (Message)c;
592                    AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
593                    if (message.isInTransaction()) {
594                        transactionStore.addMessage(store, message, pos);
595                    } else {
596                        if (store.replayAddMessage(context, message, pos)) {
597                            redoCounter++;
598                        }
599                    }
600                } else {
601                    switch (c.getDataStructureType()) {
602                    case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
603                        referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
604                    }
605                        break;
606                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
607                        JournalQueueAck command = (JournalQueueAck)c;
608                        AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
609                        if (command.getMessageAck().isInTransaction()) {
610                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
611                        } else {
612                            if (store.replayRemoveMessage(context, command.getMessageAck())) {
613                                redoCounter++;
614                            }
615                        }
616                    }
617                        break;
618                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
619                        JournalTopicAck command = (JournalTopicAck)c;
620                        AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
621                        if (command.getTransactionId() != null) {
622                            transactionStore.acknowledge(store, command, pos);
623                        } else {
624                            if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
625                                redoCounter++;
626                            }
627                        }
628                    }
629                        break;
630                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
631                        JournalTransaction command = (JournalTransaction)c;
632                        try {
633                            // Try to replay the packet.
634                            switch (command.getType()) {
635                            case JournalTransaction.XA_PREPARE:
636                                transactionStore.replayPrepare(command.getTransactionId());
637                                break;
638                            case JournalTransaction.XA_COMMIT:
639                            case JournalTransaction.LOCAL_COMMIT:
640                                AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
641                                if (tx == null) {
642                                    break; // We may be trying to replay a commit
643                                }
644                                // that
645                                // was already committed.
646                                // Replay the committed operations.
647                                tx.getOperations();
648                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
649                                    AMQTxOperation op = (AMQTxOperation)iter.next();
650                                    if (op.replay(this, context)) {
651                                        redoCounter++;
652                                    }
653                                }
654                                break;
655                            case JournalTransaction.LOCAL_ROLLBACK:
656                            case JournalTransaction.XA_ROLLBACK:
657                                transactionStore.replayRollback(command.getTransactionId());
658                                break;
659                            default:
660                                throw new IOException("Invalid journal command type: " + command.getType());
661                            }
662                        } catch (IOException e) {
663                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
664                        }
665                    }
666                        break;
667                    case JournalTrace.DATA_STRUCTURE_TYPE:
668                        JournalTrace trace = (JournalTrace)c;
669                        LOG.debug("TRACE Entry: " + trace.getMessage());
670                        break;
671                    default:
672                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
673                    }
674                }
675            }
676            Location location = writeTraceMessage("RECOVERED " + new Date(), true);
677            asyncDataManager.setMark(location, true);
678            long end = System.currentTimeMillis();
679            LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
680        }
681    
682        private IOException createReadException(Location location, Exception e) {
683            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
684        }
685    
686        protected IOException createWriteException(DataStructure packet, Exception e) {
687            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
688        }
689    
690        protected IOException createWriteException(String command, Exception e) {
691            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
692        }
693    
694        protected IOException createRecoveryFailedException(Exception e) {
695            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
696        }
697    
698        /**
699         * @param command
700         * @param syncHint
701         * @return
702         * @throws IOException
703         */
704        public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
705            return writeCommand(command, syncHint,false);
706        }
707        
708        public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
709            try {
710                    return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
711            } catch (IOException ioe) {
712                    LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
713                    brokerService.handleIOException(ioe);
714                    throw ioe;
715            }
716        }
717    
718        private Location writeTraceMessage(String message, boolean sync) throws IOException {
719            JournalTrace trace = new JournalTrace();
720            trace.setMessage(message);
721            return writeCommand(trace, sync);
722        }
723    
724        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
725            newPercentUsage = (newPercentUsage / 10) * 10;
726            oldPercentUsage = (oldPercentUsage / 10) * 10;
727            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
728                checkpoint(false);
729            }
730        }
731    
732        public AMQTransactionStore getTransactionStore() {
733            return transactionStore;
734        }
735    
736        public synchronized void deleteAllMessages() throws IOException {
737            deleteAllMessages = true;
738        }
739    
740        @Override
741        public String toString() {
742            return "AMQPersistenceAdapter(" + directory + ")";
743        }
744    
745        // /////////////////////////////////////////////////////////////////
746        // Subclass overridables
747        // /////////////////////////////////////////////////////////////////
748        protected AsyncDataManager createAsyncDataManager() {
749            AsyncDataManager manager = new AsyncDataManager(storeSize);
750            manager.setDirectory(new File(directory, "journal"));
751            manager.setDirectoryArchive(getDirectoryArchive());
752            manager.setArchiveDataLogs(isArchiveDataLogs());
753            manager.setMaxFileLength(maxFileLength);
754            manager.setUseNio(useNio);    
755            return manager;
756        }
757    
758        protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
759            KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
760            adaptor.setPersistentIndex(isPersistentIndex());
761            adaptor.setIndexBinSize(getIndexBinSize());
762            adaptor.setIndexKeySize(getIndexKeySize());
763            adaptor.setIndexPageSize(getIndexPageSize());
764            adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
765            adaptor.setIndexLoadFactor(getIndexLoadFactor());
766            return adaptor;
767        }
768    
769        // /////////////////////////////////////////////////////////////////
770        // Property Accessors
771        // /////////////////////////////////////////////////////////////////
772        public AsyncDataManager getAsyncDataManager() {
773            return asyncDataManager;
774        }
775    
776        public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
777            this.asyncDataManager = asyncDataManager;
778        }
779    
780        public ReferenceStoreAdapter getReferenceStoreAdapter() {
781            return referenceStoreAdapter;
782        }
783    
784        public TaskRunnerFactory getTaskRunnerFactory() {
785            return taskRunnerFactory;
786        }
787    
788        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
789            this.taskRunnerFactory = taskRunnerFactory;
790        }
791    
792        /**
793         * @return Returns the wireFormat.
794         */
795        public WireFormat getWireFormat() {
796            return wireFormat;
797        }
798    
799        public void setWireFormat(WireFormat wireFormat) {
800            this.wireFormat = wireFormat;
801        }
802    
803        public SystemUsage getUsageManager() {
804            return usageManager;
805        }
806    
807        public void setUsageManager(SystemUsage usageManager) {
808            this.usageManager = usageManager;
809        }
810    
811        public int getMaxCheckpointMessageAddSize() {
812            return maxCheckpointMessageAddSize;
813        }
814    
815        /** 
816         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
817         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
818         */
819        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
820            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
821        }
822    
823       
824        public synchronized File getDirectory() {
825            return directory;
826        }
827    
828        public synchronized void setDirectory(File directory) {
829            this.directory = directory;
830        }
831    
832        public boolean isSyncOnWrite() {
833            return this.syncOnWrite;
834        }
835    
836        public void setSyncOnWrite(boolean syncOnWrite) {
837            this.syncOnWrite = syncOnWrite;
838        }
839        
840        public boolean isSyncOnTransaction() {
841            return syncOnTransaction;
842        }
843    
844        public void setSyncOnTransaction(boolean syncOnTransaction) {
845            this.syncOnTransaction = syncOnTransaction;
846        }
847    
848        /**
849         * @param referenceStoreAdapter the referenceStoreAdapter to set
850         */
851        public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
852            this.referenceStoreAdapter = referenceStoreAdapter;
853        }
854        
855        public long size(){
856            return storeSize.get();
857        }
858    
859            public boolean isUseNio() {
860                    return useNio;
861            }
862    
863            public void setUseNio(boolean useNio) {
864                    this.useNio = useNio;
865            }
866    
867            public int getMaxFileLength() {
868                    return maxFileLength;
869            }
870    
871             /**
872          * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
873          * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
874          */
875            public void setMaxFileLength(int maxFileLength) {
876                    this.maxFileLength = maxFileLength;
877            }
878            
879            public long getCleanupInterval() {
880            return cleanupInterval;
881        }
882    
883        public void setCleanupInterval(long cleanupInterval) {
884            this.cleanupInterval = cleanupInterval;
885        }
886    
887        public long getCheckpointInterval() {
888            return checkpointInterval;
889        }
890    
891        public void setCheckpointInterval(long checkpointInterval) {
892            this.checkpointInterval = checkpointInterval;
893        }
894        
895        public int getIndexBinSize() {
896            return indexBinSize;
897        }
898    
899        public void setIndexBinSize(int indexBinSize) {
900            this.indexBinSize = indexBinSize;
901        }
902    
903        public int getIndexKeySize() {
904            return indexKeySize;
905        }
906    
907        public void setIndexKeySize(int indexKeySize) {
908            this.indexKeySize = indexKeySize;
909        }
910    
911        public int getIndexPageSize() {
912            return indexPageSize;
913        }
914        
915        public int getIndexMaxBinSize() {
916            return indexMaxBinSize;
917        }
918    
919        public void setIndexMaxBinSize(int maxBinSize) {
920            this.indexMaxBinSize = maxBinSize;
921        }
922    
923        /**
924         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
925         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
926         */
927        public void setIndexPageSize(int indexPageSize) {
928            this.indexPageSize = indexPageSize;
929        }
930        
931        public void setIndexLoadFactor(int factor){
932            this.indexLoadFactor=factor;    
933        }
934        
935        public int getIndexLoadFactor(){
936            return this.indexLoadFactor;
937        }
938        
939        public int getMaxReferenceFileLength() {
940            return maxReferenceFileLength;
941        }
942    
943        /**
944         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
945         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
946         */
947        public void setMaxReferenceFileLength(int maxReferenceFileLength) {
948            this.maxReferenceFileLength = maxReferenceFileLength;
949        }
950        
951        public File getDirectoryArchive() {
952            return directoryArchive;
953        }
954    
955        public void setDirectoryArchive(File directoryArchive) {
956            this.directoryArchive = directoryArchive;
957        }
958    
959        public boolean isArchiveDataLogs() {
960            return archiveDataLogs;
961        }
962    
963        public void setArchiveDataLogs(boolean archiveDataLogs) {
964            this.archiveDataLogs = archiveDataLogs;
965        }  
966        
967        public boolean isDisableLocking() {
968            return disableLocking;
969        }
970    
971        public void setDisableLocking(boolean disableLocking) {
972            this.disableLocking = disableLocking;
973        }
974        
975        /**
976         * @return the recoverReferenceStore
977         */
978        public boolean isRecoverReferenceStore() {
979            return recoverReferenceStore;
980        }
981    
982        /**
983         * @param recoverReferenceStore the recoverReferenceStore to set
984         */
985        public void setRecoverReferenceStore(boolean recoverReferenceStore) {
986            this.recoverReferenceStore = recoverReferenceStore;
987        }
988    
989        /**
990         * @return the forceRecoverReferenceStore
991         */
992        public boolean isForceRecoverReferenceStore() {
993            return forceRecoverReferenceStore;
994        }
995    
996        /**
997         * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
998         */
999        public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
1000            this.forceRecoverReferenceStore = forceRecoverReferenceStore;
1001        }
1002        
1003        public boolean isUseDedicatedTaskRunner() {
1004            return useDedicatedTaskRunner;
1005        }
1006        
1007        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1008            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1009        }
1010        
1011        /**
1012         * @return the journalThreadPriority
1013         */
1014        public int getJournalThreadPriority() {
1015            return this.journalThreadPriority;
1016        }
1017    
1018        /**
1019         * @param journalThreadPriority the journalThreadPriority to set
1020         */
1021        public void setJournalThreadPriority(int journalThreadPriority) {
1022            this.journalThreadPriority = journalThreadPriority;
1023        }
1024    
1025            
1026            protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1027                Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1028                if (map == null) {
1029                    map = new ConcurrentHashMap<Integer, AtomicInteger>();
1030                    dataFilesInProgress.put(store, map);
1031                }
1032                AtomicInteger count = map.get(dataFileId);
1033                if (count == null) {
1034                    count = new AtomicInteger(0);
1035                    map.put(dataFileId, count);
1036                }
1037                count.incrementAndGet();
1038            }
1039            
1040            protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1041            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1042            if (map != null) {
1043                AtomicInteger count = map.get(dataFileId);
1044                if (count != null) {
1045                    int newCount = count.decrementAndGet(); 
1046                    if (newCount <=0) {
1047                        map.remove(dataFileId);
1048                    }
1049                }
1050                if (map.isEmpty()) {
1051                    dataFilesInProgress.remove(store);
1052                }
1053            }
1054        }
1055            
1056            
1057            protected void lock() throws Exception {
1058            lockLogged = false;
1059            lockAquired = false;
1060            do {
1061                if (doLock()) {
1062                    lockAquired = true;
1063                } else {
1064                    if (!lockLogged) {
1065                        LOG.warn("Waiting to Lock the Store " + getDirectory());
1066                        lockLogged = true;
1067                    }
1068                    Thread.sleep(1000);
1069                }
1070    
1071            } while (!lockAquired && !disableLocking);
1072        }
1073            
1074            private synchronized void unlock() throws IOException {
1075            if (!disableLocking && (null != lock)) {
1076                //clear property doesn't work on some platforms
1077                System.getProperties().remove(getPropertyKey());
1078                System.clearProperty(getPropertyKey());
1079                assert(System.getProperty(getPropertyKey())==null);
1080                if (lock.isValid()) {
1081                    lock.release();
1082                    lock.channel().close();
1083                    
1084                }
1085                lock = null;
1086            }
1087        }
1088    
1089            
1090            protected boolean doLock() throws IOException {
1091                boolean result = true;
1092                if (!disableLocking && directory != null && lock == null) {
1093                String key = getPropertyKey();
1094                String property = System.getProperty(key);
1095                if (null == property) {
1096                    if (!BROKEN_FILE_LOCK) {
1097                        lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
1098                        if (lock == null) {
1099                            result = false;
1100                        } else {
1101                            System.setProperty(key, new Date().toString());
1102                        }
1103                    }
1104                } else { // already locked
1105                    result = false;
1106                }
1107            }
1108                return result;
1109            }
1110            
1111            private String getPropertyKey() throws IOException {
1112            return getClass().getName() + ".lock." + directory.getCanonicalPath();
1113        }
1114            
1115            static {
1116                BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1117                        + ".FileLockBroken",
1118                        "false"));
1119                DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1120                       + ".DisableLocking",
1121                       "false"));
1122            }
1123    
1124            
1125        public long getLastProducerSequenceId(ProducerId id) {
1126            // reference store send has adequate duplicate suppression
1127            return -1;
1128        }
1129    }