001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.ArrayList;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.LinkedList;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import java.util.concurrent.ExecutorService;
032    import java.util.concurrent.Future;
033    import java.util.concurrent.FutureTask;
034    import java.util.concurrent.LinkedBlockingQueue;
035    import java.util.concurrent.Semaphore;
036    import java.util.concurrent.ThreadFactory;
037    import java.util.concurrent.ThreadPoolExecutor;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.atomic.AtomicBoolean;
040    import java.util.concurrent.atomic.AtomicInteger;
041    import org.apache.activemq.broker.ConnectionContext;
042    import org.apache.activemq.command.ActiveMQDestination;
043    import org.apache.activemq.command.ActiveMQQueue;
044    import org.apache.activemq.command.ActiveMQTempQueue;
045    import org.apache.activemq.command.ActiveMQTempTopic;
046    import org.apache.activemq.command.ActiveMQTopic;
047    import org.apache.activemq.command.LocalTransactionId;
048    import org.apache.activemq.command.Message;
049    import org.apache.activemq.command.MessageAck;
050    import org.apache.activemq.command.MessageId;
051    import org.apache.activemq.command.ProducerId;
052    import org.apache.activemq.command.SubscriptionInfo;
053    import org.apache.activemq.command.TransactionId;
054    import org.apache.activemq.command.XATransactionId;
055    import org.apache.activemq.filter.BooleanExpression;
056    import org.apache.activemq.filter.MessageEvaluationContext;
057    import org.apache.activemq.openwire.OpenWireFormat;
058    import org.apache.activemq.protobuf.Buffer;
059    import org.apache.activemq.selector.SelectorParser;
060    import org.apache.activemq.store.AbstractMessageStore;
061    import org.apache.activemq.store.MessageRecoveryListener;
062    import org.apache.activemq.store.MessageStore;
063    import org.apache.activemq.store.PersistenceAdapter;
064    import org.apache.activemq.store.TopicMessageStore;
065    import org.apache.activemq.store.TransactionStore;
066    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
067    import org.apache.activemq.store.kahadb.data.KahaDestination;
068    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
069    import org.apache.activemq.store.kahadb.data.KahaLocation;
070    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
071    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
072    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
073    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
074    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
075    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
076    import org.apache.activemq.usage.MemoryUsage;
077    import org.apache.activemq.usage.SystemUsage;
078    import org.apache.activemq.util.IOExceptionSupport;
079    import org.apache.activemq.util.ServiceStopper;
080    import org.apache.activemq.wireformat.WireFormat;
081    import org.slf4j.Logger;
082    import org.slf4j.LoggerFactory;
083    import org.apache.kahadb.journal.Location;
084    import org.apache.kahadb.page.Transaction;
085    
086    public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
087        static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
088        private static final int MAX_ASYNC_JOBS = 10000;
089    
090        public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
091        public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
092                PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
093        public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
094        private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
095                PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
096    
097        protected ExecutorService queueExecutor;
098        protected ExecutorService topicExecutor;
099        protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
100        protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
101        final WireFormat wireFormat = new OpenWireFormat();
102        private SystemUsage usageManager;
103        private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
104        private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
105        Semaphore globalQueueSemaphore;
106        Semaphore globalTopicSemaphore;
107        private boolean concurrentStoreAndDispatchQueues = true;
108        // when true, message order may be compromised when cache is exhausted if store is out
109        // or order w.r.t cache
110        private boolean concurrentStoreAndDispatchTopics = false;
111        private boolean concurrentStoreAndDispatchTransactions = false;
112        private int maxAsyncJobs = MAX_ASYNC_JOBS;
113        private final KahaDBTransactionStore transactionStore;
114    
115        public KahaDBStore() {
116            this.transactionStore = new KahaDBTransactionStore(this);
117        }
118    
119        public void setBrokerName(String brokerName) {
120        }
121    
122        public void setUsageManager(SystemUsage usageManager) {
123            this.usageManager = usageManager;
124        }
125    
126        public SystemUsage getUsageManager() {
127            return this.usageManager;
128        }
129    
130        /**
131         * @return the concurrentStoreAndDispatch
132         */
133        public boolean isConcurrentStoreAndDispatchQueues() {
134            return this.concurrentStoreAndDispatchQueues;
135        }
136    
137        /**
138         * @param concurrentStoreAndDispatch
139         *            the concurrentStoreAndDispatch to set
140         */
141        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
142            this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch;
143        }
144    
145        /**
146         * @return the concurrentStoreAndDispatch
147         */
148        public boolean isConcurrentStoreAndDispatchTopics() {
149            return this.concurrentStoreAndDispatchTopics;
150        }
151    
152        /**
153         * @param concurrentStoreAndDispatch
154         *            the concurrentStoreAndDispatch to set
155         */
156        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
157            this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
158        }
159    
160        public boolean isConcurrentStoreAndDispatchTransactions() {
161            return this.concurrentStoreAndDispatchTransactions;
162        }
163        
164        /**
165         * @return the maxAsyncJobs
166         */
167        public int getMaxAsyncJobs() {
168            return this.maxAsyncJobs;
169        }
170        /**
171         * @param maxAsyncJobs
172         *            the maxAsyncJobs to set
173         */
174        public void setMaxAsyncJobs(int maxAsyncJobs) {
175            this.maxAsyncJobs = maxAsyncJobs;
176        }
177    
178        @Override
179        public void doStart() throws Exception {
180            super.doStart();
181            this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
182            this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
183            this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
184            this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
185            this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
186                    asyncQueueJobQueue, new ThreadFactory() {
187                        public Thread newThread(Runnable runnable) {
188                            Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
189                            thread.setDaemon(true);
190                            return thread;
191                        }
192                    });
193            this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
194                    asyncTopicJobQueue, new ThreadFactory() {
195                        public Thread newThread(Runnable runnable) {
196                            Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
197                            thread.setDaemon(true);
198                            return thread;
199                        }
200                    });
201        }
202    
203        @Override
204        public void doStop(ServiceStopper stopper) throws Exception {
205            // drain down async jobs
206            LOG.info("Stopping async queue tasks");
207            if (this.globalQueueSemaphore != null) {
208                this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
209            }
210            synchronized (this.asyncQueueMaps) {
211                for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
212                    synchronized (m) {
213                        for (StoreTask task : m.values()) {
214                            task.cancel();
215                        }
216                    }
217                }
218                this.asyncQueueMaps.clear();
219            }
220            LOG.info("Stopping async topic tasks");
221            if (this.globalTopicSemaphore != null) {
222                this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
223            }
224            synchronized (this.asyncTopicMaps) {
225                for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
226                    synchronized (m) {
227                        for (StoreTask task : m.values()) {
228                            task.cancel();
229                        }
230                    }
231                }
232                this.asyncTopicMaps.clear();
233            }
234            if (this.globalQueueSemaphore != null) {
235                this.globalQueueSemaphore.drainPermits();
236            }
237            if (this.globalTopicSemaphore != null) {
238                this.globalTopicSemaphore.drainPermits();
239            }
240            if (this.queueExecutor != null) {
241                this.queueExecutor.shutdownNow();
242            }
243            if (this.topicExecutor != null) {
244                this.topicExecutor.shutdownNow();
245            }
246            LOG.info("Stopped KahaDB");
247            super.doStop(stopper);
248        }
249    
250        protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
251            StoreQueueTask task = null;
252            synchronized (store.asyncTaskMap) {
253                task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
254            }
255            return task;
256        }
257    
258        protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
259            synchronized (store.asyncTaskMap) {
260                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
261            }
262            this.queueExecutor.execute(task);
263        }
264    
265        protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
266            StoreTopicTask task = null;
267            synchronized (store.asyncTaskMap) {
268                task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
269            }
270            return task;
271        }
272    
273        protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
274            synchronized (store.asyncTaskMap) {
275                store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
276            }
277            this.topicExecutor.execute(task);
278        }
279    
280        public TransactionStore createTransactionStore() throws IOException {
281            return this.transactionStore;
282        }
283    
284        public boolean getForceRecoverIndex() {
285            return this.forceRecoverIndex;
286        }
287    
288        public void setForceRecoverIndex(boolean forceRecoverIndex) {
289            this.forceRecoverIndex = forceRecoverIndex;
290        }
291    
292        public class KahaDBMessageStore extends AbstractMessageStore {
293            protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
294            protected KahaDestination dest;
295            private final int maxAsyncJobs;
296            private final Semaphore localDestinationSemaphore;
297    
298            double doneTasks, canceledTasks = 0;
299    
300            public KahaDBMessageStore(ActiveMQDestination destination) {
301                super(destination);
302                this.dest = convert(destination);
303                this.maxAsyncJobs = getMaxAsyncJobs();
304                this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
305            }
306    
307            @Override
308            public ActiveMQDestination getDestination() {
309                return destination;
310            }
311    
312            @Override
313            public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
314                    throws IOException {
315                if (isConcurrentStoreAndDispatchQueues()) {
316                    StoreQueueTask result = new StoreQueueTask(this, context, message);
317                    result.aquireLocks();
318                    addQueueTask(this, result);
319                    return result.getFuture();
320                } else {
321                    return super.asyncAddQueueMessage(context, message);
322                }
323            }
324    
325            @Override
326            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
327                if (isConcurrentStoreAndDispatchQueues()) {
328                    AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
329                    StoreQueueTask task = null;
330                    synchronized (asyncTaskMap) {
331                        task = (StoreQueueTask) asyncTaskMap.get(key);
332                    }
333                    if (task != null) {
334                        if (!task.cancel()) {
335                            try {
336    
337                                task.future.get();
338                            } catch (InterruptedException e) {
339                                throw new InterruptedIOException(e.toString());
340                            } catch (Exception ignored) {
341                                LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
342                            }
343                            removeMessage(context, ack);
344                        } else {
345                            synchronized (asyncTaskMap) {
346                                asyncTaskMap.remove(key);
347                            }
348                        }
349                    } else {
350                        removeMessage(context, ack);
351                    }
352                } else {
353                    removeMessage(context, ack);
354                }
355            }
356    
357            public void addMessage(ConnectionContext context, Message message) throws IOException {
358                KahaAddMessageCommand command = new KahaAddMessageCommand();
359                command.setDestination(dest);
360                command.setMessageId(message.getMessageId().toString());
361                command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
362                command.setPriority(message.getPriority());
363                command.setPrioritySupported(isPrioritizedMessages());
364                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
365                command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
366                store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
367                
368            }
369    
370            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
371                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
372                command.setDestination(dest);
373                command.setMessageId(ack.getLastMessageId().toString());
374                command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
375    
376                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
377                command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
378                store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
379            }
380    
381            public void removeAllMessages(ConnectionContext context) throws IOException {
382                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
383                command.setDestination(dest);
384                store(command, true, null, null);
385            }
386    
387            public Message getMessage(MessageId identity) throws IOException {
388                final String key = identity.toString();
389    
390                // Hopefully one day the page file supports concurrent read
391                // operations... but for now we must
392                // externally synchronize...
393                Location location;
394                indexLock.readLock().lock();
395                try {
396                    location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
397                        public Location execute(Transaction tx) throws IOException {
398                            StoredDestination sd = getStoredDestination(dest, tx);
399                            Long sequence = sd.messageIdIndex.get(tx, key);
400                            if (sequence == null) {
401                                return null;
402                            }
403                            return sd.orderIndex.get(tx, sequence).location;
404                        }
405                    });
406                }finally {
407                    indexLock.readLock().unlock();
408                }
409                if (location == null) {
410                    return null;
411                }
412    
413                return loadMessage(location);
414            }
415    
416            public int getMessageCount() throws IOException {
417                try {
418                    lockAsyncJobQueue();
419                    indexLock.readLock().lock();
420                    try {
421                        return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
422                            public Integer execute(Transaction tx) throws IOException {
423                                // Iterate through all index entries to get a count
424                                // of
425                                // messages in the destination.
426                                StoredDestination sd = getStoredDestination(dest, tx);
427                                int rc = 0;
428                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
429                                        .hasNext();) {
430                                    iterator.next();
431                                    rc++;
432                                }
433                                return rc;
434                            }
435                        });
436                    }finally {
437                        indexLock.readLock().unlock();
438                    }
439                } finally {
440                    unlockAsyncJobQueue();
441                }
442            }
443    
444            @Override
445            public boolean isEmpty() throws IOException {
446                indexLock.readLock().lock();
447                try {
448                    return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
449                        public Boolean execute(Transaction tx) throws IOException {
450                            // Iterate through all index entries to get a count of
451                            // messages in the destination.
452                            StoredDestination sd = getStoredDestination(dest, tx);
453                            return sd.locationIndex.isEmpty(tx);
454                        }
455                    });
456                }finally {
457                    indexLock.readLock().unlock();
458                }
459            }
460    
461            public void recover(final MessageRecoveryListener listener) throws Exception {
462                indexLock.readLock().lock();
463                try {
464                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
465                        public void execute(Transaction tx) throws Exception {
466                            StoredDestination sd = getStoredDestination(dest, tx);
467                            sd.orderIndex.resetCursorPosition();
468                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
469                                    .hasNext();) {
470                                Entry<Long, MessageKeys> entry = iterator.next();
471                                Message msg = loadMessage(entry.getValue().location);
472                                listener.recoverMessage(msg);
473                            }
474                        }
475                    });
476                }finally {
477                    indexLock.readLock().unlock();
478                }
479            }
480    
481            
482            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
483                indexLock.readLock().lock();
484                try {
485                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
486                        public void execute(Transaction tx) throws Exception {
487                            StoredDestination sd = getStoredDestination(dest, tx);
488                            Entry<Long, MessageKeys> entry = null;
489                            int counter = 0;
490                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
491                                 listener.hasSpace() && iterator.hasNext(); ) {
492                                entry = iterator.next();
493                                Message msg = loadMessage(entry.getValue().location);
494                                listener.recoverMessage(msg);
495                                counter++;
496                                if (counter >= maxReturned) {
497                                    break;
498                                }
499                            }
500                            sd.orderIndex.stoppedIterating();
501                        }
502                    });
503                }finally {
504                    indexLock.readLock().unlock();
505                }
506            }
507    
508            public void resetBatching() {
509                try {
510                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
511                        public void execute(Transaction tx) throws Exception {
512                            StoredDestination sd = getExistingStoredDestination(dest, tx);
513                            if (sd != null) {
514                                sd.orderIndex.resetCursorPosition();}
515                            }
516                        });
517                } catch (Exception e) {
518                    LOG.error("Failed to reset batching",e);
519                }
520            }
521    
522            @Override
523            public void setBatch(MessageId identity) throws IOException {
524                try {
525                    final String key = identity.toString();
526                    lockAsyncJobQueue();
527    
528                    // Hopefully one day the page file supports concurrent read
529                    // operations... but for now we must
530                    // externally synchronize...
531                   
532                    indexLock.writeLock().lock();
533                    try {
534                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
535                            public void execute(Transaction tx) throws IOException {
536                                StoredDestination sd = getStoredDestination(dest, tx);
537                                Long location = sd.messageIdIndex.get(tx, key);
538                                if (location != null) {
539                                    sd.orderIndex.setBatch(tx, location);
540                                }
541                            }
542                        });
543                    }finally {
544                        indexLock.writeLock().unlock();
545                    }
546                    
547                } finally {
548                    unlockAsyncJobQueue();
549                }
550    
551            }
552    
553            @Override
554            public void setMemoryUsage(MemoryUsage memoeyUSage) {
555            }
556            @Override
557            public void start() throws Exception {
558                super.start();
559            }
560            @Override
561            public void stop() throws Exception {
562                super.stop();
563            }
564    
565            protected void lockAsyncJobQueue() {
566                try {
567                    this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
568                } catch (Exception e) {
569                    LOG.error("Failed to lock async jobs for " + this.destination, e);
570                }
571            }
572    
573            protected void unlockAsyncJobQueue() {
574                this.localDestinationSemaphore.release(this.maxAsyncJobs);
575            }
576    
577            protected void acquireLocalAsyncLock() {
578                try {
579                    this.localDestinationSemaphore.acquire();
580                } catch (InterruptedException e) {
581                    LOG.error("Failed to aquire async lock for " + this.destination, e);
582                }
583            }
584    
585            protected void releaseLocalAsyncLock() {
586                this.localDestinationSemaphore.release();
587            }
588    
589        }
590    
591        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
592            private final AtomicInteger subscriptionCount = new AtomicInteger();
593            public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
594                super(destination);
595                this.subscriptionCount.set(getAllSubscriptions().length);
596                asyncTopicMaps.add(asyncTaskMap);
597            }
598    
599            @Override
600            public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
601                    throws IOException {
602                if (isConcurrentStoreAndDispatchTopics()) {
603                    StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
604                    result.aquireLocks();
605                    addTopicTask(this, result);
606                    return result.getFuture();
607                } else {
608                    return super.asyncAddTopicMessage(context, message);
609                }
610            }
611    
612            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
613                                    MessageId messageId, MessageAck ack)
614                    throws IOException {
615                String subscriptionKey = subscriptionKey(clientId, subscriptionName);
616                if (isConcurrentStoreAndDispatchTopics()) {
617                    AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
618                    StoreTopicTask task = null;
619                    synchronized (asyncTaskMap) {
620                        task = (StoreTopicTask) asyncTaskMap.get(key);
621                    }
622                    if (task != null) {
623                        if (task.addSubscriptionKey(subscriptionKey)) {
624                            removeTopicTask(this, messageId);
625                            if (task.cancel()) {
626                                synchronized (asyncTaskMap) {
627                                    asyncTaskMap.remove(key);
628                                }
629                            }
630                        }
631                    } else {
632                        doAcknowledge(context, subscriptionKey, messageId, ack);
633                    }
634                } else {
635                    doAcknowledge(context, subscriptionKey, messageId, ack);
636                }
637            }
638    
639            protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack)
640                    throws IOException {
641                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
642                command.setDestination(dest);
643                command.setSubscriptionKey(subscriptionKey);
644                command.setMessageId(messageId.toString());
645                command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()));
646                if (ack != null && ack.isUnmatchedAck()) {
647                    command.setAck(UNMATCHED);
648                }
649                store(command, false, null, null);
650            }
651    
652            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
653                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo
654                        .getSubscriptionName());
655                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
656                command.setDestination(dest);
657                command.setSubscriptionKey(subscriptionKey);
658                command.setRetroactive(retroactive);
659                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
660                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
661                store(command, isEnableJournalDiskSyncs() && true, null, null);
662                this.subscriptionCount.incrementAndGet();
663            }
664    
665            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
666                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
667                command.setDestination(dest);
668                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
669                store(command, isEnableJournalDiskSyncs() && true, null, null);
670                this.subscriptionCount.decrementAndGet();
671            }
672    
673            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
674    
675                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
676                indexLock.readLock().lock();
677                try {
678                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
679                        public void execute(Transaction tx) throws IOException {
680                            StoredDestination sd = getStoredDestination(dest, tx);
681                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator
682                                    .hasNext();) {
683                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
684                                SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry
685                                        .getValue().getSubscriptionInfo().newInput()));
686                                subscriptions.add(info);
687    
688                            }
689                        }
690                    });
691                }finally {
692                    indexLock.readLock().unlock();
693                }
694    
695                SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
696                subscriptions.toArray(rc);
697                return rc;
698            }
699    
700            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
701                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
702                indexLock.readLock().lock();
703                try {
704                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
705                        public SubscriptionInfo execute(Transaction tx) throws IOException {
706                            StoredDestination sd = getStoredDestination(dest, tx);
707                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
708                            if (command == null) {
709                                return null;
710                            }
711                            return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command
712                                    .getSubscriptionInfo().newInput()));
713                        }
714                    });
715                }finally {
716                    indexLock.readLock().unlock();
717                }
718            }
719    
720            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
721                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
722                indexLock.writeLock().lock();
723                try {
724                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
725                        public Integer execute(Transaction tx) throws IOException {
726                            StoredDestination sd = getStoredDestination(dest, tx);
727                            LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
728                            if (cursorPos == null) {
729                                // The subscription might not exist.
730                                return 0;
731                            }
732    
733                            int counter = 0;
734                            for (Iterator<Entry<Long, HashSet<String>>> iterator =
735                                    sd.ackPositions.iterator(tx, cursorPos.lastAckedSequence); iterator.hasNext();) {
736                                Entry<Long, HashSet<String>> entry = iterator.next();
737                                if (entry.getValue().contains(subscriptionKey)) {
738                                    counter++;
739                                }
740                            }
741                            return counter;
742                        }
743                    });
744                }finally {
745                    indexLock.writeLock().unlock();
746                }
747            }
748    
749            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
750                    throws Exception {
751                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
752                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
753                indexLock.writeLock().lock();
754                try {
755                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
756                        public void execute(Transaction tx) throws Exception {
757                            StoredDestination sd = getStoredDestination(dest, tx);
758                            LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
759                            sd.orderIndex.setBatch(tx, cursorPos);
760                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
761                                    .hasNext();) {
762                                Entry<Long, MessageKeys> entry = iterator.next();
763                                listener.recoverMessage(loadMessage(entry.getValue().location));
764                            }
765                            sd.orderIndex.resetCursorPosition();
766                        }
767                    });
768                }finally {
769                    indexLock.writeLock().unlock();
770                }
771            }
772    
773            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
774                    final MessageRecoveryListener listener) throws Exception {
775                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
776                final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
777                indexLock.writeLock().lock();
778                try {
779                    pageFile.tx().execute(new Transaction.Closure<Exception>() {
780                        public void execute(Transaction tx) throws Exception {
781                            StoredDestination sd = getStoredDestination(dest, tx);
782                            sd.orderIndex.resetCursorPosition();
783                            MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
784                            if (moc == null) {
785                                LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
786                                if (pos == null) {
787                                    // sub deleted
788                                    return;
789                                }
790                                sd.orderIndex.setBatch(tx, pos);
791                                moc = sd.orderIndex.cursor;
792                            } else {
793                                sd.orderIndex.cursor.sync(moc);
794                            }
795    
796                            Entry<Long, MessageKeys> entry = null;
797                            int counter = 0;
798                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
799                                    .hasNext();) {
800                                entry = iterator.next();
801                                if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
802                                    counter++;
803                                }
804                                if (counter >= maxReturned || listener.hasSpace() == false) {
805                                    break;
806                                }
807                            }
808                            sd.orderIndex.stoppedIterating();
809                            if (entry != null) {
810                                MessageOrderCursor copy = sd.orderIndex.cursor.copy();
811                                sd.subscriptionCursors.put(subscriptionKey, copy);
812                            }
813                        }
814                    });
815                }finally {
816                    indexLock.writeLock().unlock();
817                }
818            }
819    
820            public void resetBatching(String clientId, String subscriptionName) {
821                try {
822                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
823                    indexLock.writeLock().lock();
824                    try {
825                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
826                            public void execute(Transaction tx) throws IOException {
827                                StoredDestination sd = getStoredDestination(dest, tx);
828                                sd.subscriptionCursors.remove(subscriptionKey);
829                            }
830                        });
831                    }finally {
832                        indexLock.writeLock().unlock();
833                    }
834                } catch (IOException e) {
835                    throw new RuntimeException(e);
836                }
837            }
838        }
839    
840        String subscriptionKey(String clientId, String subscriptionName) {
841            return clientId + ":" + subscriptionName;
842        }
843    
844        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
845            return this.transactionStore.proxy(new KahaDBMessageStore(destination));
846        }
847    
848        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
849            return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
850        }
851    
852        /**
853         * Cleanup method to remove any state associated with the given destination.
854         * This method does not stop the message store (it might not be cached).
855         * 
856         * @param destination
857         *            Destination to forget
858         */
859        public void removeQueueMessageStore(ActiveMQQueue destination) {
860        }
861    
862        /**
863         * Cleanup method to remove any state associated with the given destination
864         * This method does not stop the message store (it might not be cached).
865         * 
866         * @param destination
867         *            Destination to forget
868         */
869        public void removeTopicMessageStore(ActiveMQTopic destination) {
870        }
871    
872        public void deleteAllMessages() throws IOException {
873            deleteAllMessages = true;
874        }
875    
876        public Set<ActiveMQDestination> getDestinations() {
877            try {
878                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
879                indexLock.readLock().lock();
880                try {
881                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
882                        public void execute(Transaction tx) throws IOException {
883                            for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
884                                    .hasNext();) {
885                                Entry<String, StoredDestination> entry = iterator.next();
886                                if (!isEmptyTopic(entry, tx)) {
887                                    rc.add(convert(entry.getKey()));
888                                }
889                            }
890                        }
891    
892                        private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx)
893                                throws IOException {
894                            boolean isEmptyTopic = false;
895                            ActiveMQDestination dest = convert(entry.getKey());
896                            if (dest.isTopic()) {
897                                StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
898                                if (loadedStore.subscriptionAcks.isEmpty(tx)) {
899                                    isEmptyTopic = true;
900                                }
901                            }
902                            return isEmptyTopic;
903                        }
904                    });
905                }finally {
906                    indexLock.readLock().unlock();
907                }
908                return rc;
909            } catch (IOException e) {
910                throw new RuntimeException(e);
911            }
912        }
913    
914        public long getLastMessageBrokerSequenceId() throws IOException {
915            return 0;
916        }
917        
918        public long getLastProducerSequenceId(ProducerId id) {
919            indexLock.readLock().lock();
920            try {
921                return metadata.producerSequenceIdTracker.getLastSeqId(id);
922            } finally {
923                indexLock.readLock().unlock();
924            }
925        }
926    
927        public long size() {
928            return storeSize.get();
929        }
930    
931        public void beginTransaction(ConnectionContext context) throws IOException {
932            throw new IOException("Not yet implemented.");
933        }
934        public void commitTransaction(ConnectionContext context) throws IOException {
935            throw new IOException("Not yet implemented.");
936        }
937        public void rollbackTransaction(ConnectionContext context) throws IOException {
938            throw new IOException("Not yet implemented.");
939        }
940    
941        public void checkpoint(boolean sync) throws IOException {
942            super.checkpointCleanup(false);
943        }
944    
945        // /////////////////////////////////////////////////////////////////
946        // Internal helper methods.
947        // /////////////////////////////////////////////////////////////////
948    
949        /**
950         * @param location
951         * @return
952         * @throws IOException
953         */
954        Message loadMessage(Location location) throws IOException {
955            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
956            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
957            return msg;
958        }
959    
960        // /////////////////////////////////////////////////////////////////
961        // Internal conversion methods.
962        // /////////////////////////////////////////////////////////////////
963    
964        KahaLocation convert(Location location) {
965            KahaLocation rc = new KahaLocation();
966            rc.setLogId(location.getDataFileId());
967            rc.setOffset(location.getOffset());
968            return rc;
969        }
970    
971        KahaDestination convert(ActiveMQDestination dest) {
972            KahaDestination rc = new KahaDestination();
973            rc.setName(dest.getPhysicalName());
974            switch (dest.getDestinationType()) {
975            case ActiveMQDestination.QUEUE_TYPE:
976                rc.setType(DestinationType.QUEUE);
977                return rc;
978            case ActiveMQDestination.TOPIC_TYPE:
979                rc.setType(DestinationType.TOPIC);
980                return rc;
981            case ActiveMQDestination.TEMP_QUEUE_TYPE:
982                rc.setType(DestinationType.TEMP_QUEUE);
983                return rc;
984            case ActiveMQDestination.TEMP_TOPIC_TYPE:
985                rc.setType(DestinationType.TEMP_TOPIC);
986                return rc;
987            default:
988                return null;
989            }
990        }
991    
992        ActiveMQDestination convert(String dest) {
993            int p = dest.indexOf(":");
994            if (p < 0) {
995                throw new IllegalArgumentException("Not in the valid destination format");
996            }
997            int type = Integer.parseInt(dest.substring(0, p));
998            String name = dest.substring(p + 1);
999    
1000            switch (KahaDestination.DestinationType.valueOf(type)) {
1001            case QUEUE:
1002                return new ActiveMQQueue(name);
1003            case TOPIC:
1004                return new ActiveMQTopic(name);
1005            case TEMP_QUEUE:
1006                return new ActiveMQTempQueue(name);
1007            case TEMP_TOPIC:
1008                return new ActiveMQTempTopic(name);
1009            default:
1010                throw new IllegalArgumentException("Not in the valid destination format");
1011            }
1012        }
1013    
1014        static class AsyncJobKey {
1015            MessageId id;
1016            ActiveMQDestination destination;
1017    
1018            AsyncJobKey(MessageId id, ActiveMQDestination destination) {
1019                this.id = id;
1020                this.destination = destination;
1021            }
1022    
1023            @Override
1024            public boolean equals(Object obj) {
1025                if (obj == this) {
1026                    return true;
1027                }
1028                return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
1029                        && destination.equals(((AsyncJobKey) obj).destination);
1030            }
1031    
1032            @Override
1033            public int hashCode() {
1034                return id.hashCode() + destination.hashCode();
1035            }
1036    
1037            @Override
1038            public String toString() {
1039                return destination.getPhysicalName() + "-" + id;
1040            }
1041        }
1042    
1043        interface StoreTask {
1044            public boolean cancel();
1045        }
1046    
1047        class StoreQueueTask implements Runnable, StoreTask {
1048            protected final Message message;
1049            protected final ConnectionContext context;
1050            protected final KahaDBMessageStore store;
1051            protected final InnerFutureTask future;
1052            protected final AtomicBoolean done = new AtomicBoolean();
1053            protected final AtomicBoolean locked = new AtomicBoolean();
1054    
1055            public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
1056                this.store = store;
1057                this.context = context;
1058                this.message = message;
1059                this.future = new InnerFutureTask(this);
1060            }
1061    
1062            public Future<Object> getFuture() {
1063                return this.future;
1064            }
1065    
1066            public boolean cancel() {
1067                releaseLocks();
1068                if (this.done.compareAndSet(false, true)) {
1069                    return this.future.cancel(false);
1070                }
1071                return false;
1072            }
1073    
1074            void aquireLocks() {
1075                if (this.locked.compareAndSet(false, true)) {
1076                    try {
1077                        globalQueueSemaphore.acquire();
1078                        store.acquireLocalAsyncLock();
1079                        message.incrementReferenceCount();
1080                    } catch (InterruptedException e) {
1081                        LOG.warn("Failed to aquire lock", e);
1082                    }
1083                }
1084    
1085            }
1086    
1087            void releaseLocks() {
1088                if (this.locked.compareAndSet(true, false)) {
1089                    store.releaseLocalAsyncLock();
1090                    globalQueueSemaphore.release();
1091                    message.decrementReferenceCount();
1092                }
1093            }
1094    
1095            public void run() {
1096                this.store.doneTasks++;
1097                try {
1098                    if (this.done.compareAndSet(false, true)) {
1099                        this.store.addMessage(context, message);
1100                        removeQueueTask(this.store, this.message.getMessageId());
1101                        this.future.complete();
1102                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1103                        System.err.println(this.store.dest.getName() + " cancelled: "
1104                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1105                        this.store.canceledTasks = this.store.doneTasks = 0;
1106                    }
1107                } catch (Exception e) {
1108                    this.future.setException(e);
1109                } finally {
1110                    releaseLocks();
1111                }
1112            }
1113    
1114            protected Message getMessage() {
1115                return this.message;
1116            }
1117    
1118            private class InnerFutureTask extends FutureTask<Object> {
1119    
1120                public InnerFutureTask(Runnable runnable) {
1121                    super(runnable, null);
1122    
1123                }
1124    
1125                public void setException(final Exception e) {
1126                    super.setException(e);
1127                }
1128    
1129                public void complete() {
1130                    super.set(null);
1131                }
1132            }
1133        }
1134    
1135        class StoreTopicTask extends StoreQueueTask {
1136            private final int subscriptionCount;
1137            private final List<String> subscriptionKeys = new ArrayList<String>(1);
1138            private final KahaDBTopicMessageStore topicStore;
1139            public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message,
1140                    int subscriptionCount) {
1141                super(store, context, message);
1142                this.topicStore = store;
1143                this.subscriptionCount = subscriptionCount;
1144    
1145            }
1146    
1147            @Override
1148            void aquireLocks() {
1149                if (this.locked.compareAndSet(false, true)) {
1150                    try {
1151                        globalTopicSemaphore.acquire();
1152                        store.acquireLocalAsyncLock();
1153                        message.incrementReferenceCount();
1154                    } catch (InterruptedException e) {
1155                        LOG.warn("Failed to aquire lock", e);
1156                    }
1157                }
1158    
1159            }
1160    
1161            @Override
1162            void releaseLocks() {
1163                if (this.locked.compareAndSet(true, false)) {
1164                    message.decrementReferenceCount();
1165                    store.releaseLocalAsyncLock();
1166                    globalTopicSemaphore.release();
1167                }
1168            }
1169    
1170            /**
1171             * add a key
1172             * 
1173             * @param key
1174             * @return true if all acknowledgements received
1175             */
1176            public boolean addSubscriptionKey(String key) {
1177                synchronized (this.subscriptionKeys) {
1178                    this.subscriptionKeys.add(key);
1179                }
1180                return this.subscriptionKeys.size() >= this.subscriptionCount;
1181            }
1182    
1183            @Override
1184            public void run() {
1185                this.store.doneTasks++;
1186                try {
1187                    if (this.done.compareAndSet(false, true)) {
1188                        this.topicStore.addMessage(context, message);
1189                        // apply any acks we have
1190                        synchronized (this.subscriptionKeys) {
1191                            for (String key : this.subscriptionKeys) {
1192                                this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null);
1193    
1194                            }
1195                        }
1196                        removeTopicTask(this.topicStore, this.message.getMessageId());
1197                        this.future.complete();
1198                    } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
1199                        System.err.println(this.store.dest.getName() + " cancelled: "
1200                                + (this.store.canceledTasks / this.store.doneTasks) * 100);
1201                        this.store.canceledTasks = this.store.doneTasks = 0;
1202                    }
1203                } catch (Exception e) {
1204                    this.future.setException(e);
1205                } finally {
1206                    releaseLocks();
1207                }
1208            }
1209        }
1210    }