001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.CancellationException;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Future;
029    import javax.transaction.xa.XAException;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.TransactionId;
035    import org.apache.activemq.command.XATransactionId;
036    import org.apache.activemq.openwire.OpenWireFormat;
037    import org.apache.activemq.protobuf.Buffer;
038    import org.apache.activemq.store.AbstractMessageStore;
039    import org.apache.activemq.store.MessageStore;
040    import org.apache.activemq.store.ProxyMessageStore;
041    import org.apache.activemq.store.ProxyTopicMessageStore;
042    import org.apache.activemq.store.TopicMessageStore;
043    import org.apache.activemq.store.TransactionRecoveryListener;
044    import org.apache.activemq.store.TransactionStore;
045    import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046    import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047    import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052    import org.apache.activemq.wireformat.WireFormat;
053    import org.slf4j.Logger;
054    import org.slf4j.LoggerFactory;
055    
056    /**
057     * Provides a TransactionStore implementation that can create transaction aware
058     * MessageStore objects from non transaction aware MessageStore objects.
059     * 
060     * 
061     */
062    public class KahaDBTransactionStore implements TransactionStore {
063        static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064        ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065        private final WireFormat wireFormat = new OpenWireFormat();
066        private final KahaDBStore theStore;
067    
068        public KahaDBTransactionStore(KahaDBStore theStore) {
069            this.theStore = theStore;
070        }
071    
072        public class Tx {
073            private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074    
075            private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076    
077            public void add(AddMessageCommand msg) {
078                messages.add(msg);
079            }
080    
081            public void add(RemoveMessageCommand ack) {
082                acks.add(ack);
083            }
084    
085            public Message[] getMessages() {
086                Message rc[] = new Message[messages.size()];
087                int count = 0;
088                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089                    AddMessageCommand cmd = iter.next();
090                    rc[count++] = cmd.getMessage();
091                }
092                return rc;
093            }
094    
095            public MessageAck[] getAcks() {
096                MessageAck rc[] = new MessageAck[acks.size()];
097                int count = 0;
098                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099                    RemoveMessageCommand cmd = iter.next();
100                    rc[count++] = cmd.getMessageAck();
101                }
102                return rc;
103            }
104    
105            /**
106             * @return true if something to commit
107             * @throws IOException
108             */
109            public List<Future<Object>> commit() throws IOException {
110                List<Future<Object>> results = new ArrayList<Future<Object>>();
111                // Do all the message adds.
112                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113                    AddMessageCommand cmd = iter.next();
114                    results.add(cmd.run());
115    
116                }
117                // And removes..
118                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119                    RemoveMessageCommand cmd = iter.next();
120                    cmd.run();
121                    results.add(cmd.run());
122                }
123                
124                return results;
125            }
126        }
127    
128        public abstract class AddMessageCommand {
129            private final ConnectionContext ctx;
130            AddMessageCommand(ConnectionContext ctx) {
131                this.ctx = ctx;
132            }
133            abstract Message getMessage();
134            Future<Object> run() throws IOException {
135                return run(this.ctx);
136            }
137            abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138        }
139    
140        public abstract class RemoveMessageCommand {
141    
142            private final ConnectionContext ctx;
143            RemoveMessageCommand(ConnectionContext ctx) {
144                this.ctx = ctx;
145            }
146            abstract MessageAck getMessageAck();
147            Future<Object> run() throws IOException {
148                return run(this.ctx);
149            }
150            abstract Future<Object> run(ConnectionContext context) throws IOException;
151        }
152    
153        public MessageStore proxy(MessageStore messageStore) {
154            return new ProxyMessageStore(messageStore) {
155                @Override
156                public void addMessage(ConnectionContext context, final Message send) throws IOException {
157                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158                }
159    
160                @Override
161                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
162                    return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
163                }
164    
165                @Override
166                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
167                    KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
168                }
169    
170                @Override
171                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
172                    KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
173                }
174            };
175        }
176    
177        public TopicMessageStore proxy(TopicMessageStore messageStore) {
178            return new ProxyTopicMessageStore(messageStore) {
179                @Override
180                public void addMessage(ConnectionContext context, final Message send) throws IOException {
181                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
182                }
183    
184                @Override
185                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
186                    return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
187                }
188    
189                @Override
190                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
191                    KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
192                }
193    
194                @Override
195                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
196                    KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
197                }
198    
199                @Override
200                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
201                                MessageId messageId, MessageAck ack) throws IOException {
202                    KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
203                            subscriptionName, messageId, ack);
204                }
205    
206            };
207        }
208    
209        /**
210         * @throws IOException
211         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
212         */
213        public void prepare(TransactionId txid) throws IOException {
214            inflightTransactions.remove(txid);
215            KahaTransactionInfo info = getTransactionInfo(txid);
216            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
217        }
218    
219        public Tx getTx(Object txid) {
220            Tx tx = inflightTransactions.get(txid);
221            if (tx == null) {
222                tx = new Tx();
223                inflightTransactions.put(txid, tx);
224            }
225            return tx;
226        }
227    
228        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
229                throws IOException {
230            if (txid != null) {
231                if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
232                    if (preCommit != null) {
233                        preCommit.run();
234                    }
235                    Tx tx = inflightTransactions.remove(txid);
236                    if (tx != null) {
237                        List<Future<Object>> results = tx.commit();
238                        boolean doneSomething = false;
239                        for (Future<Object> result : results) {
240                            try {
241                                result.get();
242                            } catch (InterruptedException e) {
243                                theStore.brokerService.handleIOException(new IOException(e.getMessage()));
244                            } catch (ExecutionException e) {
245                                theStore.brokerService.handleIOException(new IOException(e.getMessage()));
246                            }catch(CancellationException e) {
247                            }
248                            if (!result.isCancelled()) {
249                                doneSomething = true;
250                            }
251                        }
252                        if (postCommit != null) {
253                            postCommit.run();
254                        }
255                        if (doneSomething) {
256                            KahaTransactionInfo info = getTransactionInfo(txid);
257                            theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
258                        }
259                    }else {
260                        //The Tx will be null for failed over clients - lets run their post commits
261                        if (postCommit != null) {
262                            postCommit.run();
263                        }
264                    }
265    
266                } else {
267                    KahaTransactionInfo info = getTransactionInfo(txid);
268                    // ensure message order w.r.t to cursor and store for setBatch()
269                    synchronized (this) {
270                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
271                    }
272                }
273            }else {
274               LOG.error("Null transaction passed on commit");
275            }
276        }
277    
278        /**
279         * @throws IOException
280         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
281         */
282        public void rollback(TransactionId txid) throws IOException {
283            if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
284                KahaTransactionInfo info = getTransactionInfo(txid);
285                theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
286            } else {
287                inflightTransactions.remove(txid);
288            }
289        }
290    
291        public void start() throws Exception {
292        }
293    
294        public void stop() throws Exception {
295        }
296    
297        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
298            // All the inflight transactions get rolled back..
299            // inflightTransactions.clear();
300            for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
301                XATransactionId xid = (XATransactionId) entry.getKey();
302                ArrayList<Message> messageList = new ArrayList<Message>();
303                ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
304    
305                for (Operation op : entry.getValue()) {
306                    if (op.getClass() == AddOpperation.class) {
307                        AddOpperation addOp = (AddOpperation) op;
308                        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
309                                .newInput()));
310                        messageList.add(msg);
311                    } else {
312                        RemoveOpperation rmOp = (RemoveOpperation) op;
313                        Buffer ackb = rmOp.getCommand().getAck();
314                        MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
315                        ackList.add(ack);
316                    }
317                }
318    
319                Message[] addedMessages = new Message[messageList.size()];
320                MessageAck[] acks = new MessageAck[ackList.size()];
321                messageList.toArray(addedMessages);
322                ackList.toArray(acks);
323                listener.recover(xid, addedMessages, acks);
324            }
325        }
326    
327        /**
328         * @param message
329         * @throws IOException
330         */
331        void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
332                throws IOException {
333    
334            if (message.getTransactionId() != null) {
335                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
336                    destination.addMessage(context, message);
337                } else {
338                    Tx tx = getTx(message.getTransactionId());
339                    tx.add(new AddMessageCommand(context) {
340                        @Override
341                        public Message getMessage() {
342                            return message;
343                        }
344                        @Override
345                        public Future<Object> run(ConnectionContext ctx) throws IOException {
346                            destination.addMessage(ctx, message);
347                            return AbstractMessageStore.FUTURE;
348                        }
349    
350                    });
351                }
352            } else {
353                destination.addMessage(context, message);
354            }
355        }
356    
357        Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
358                throws IOException {
359    
360            if (message.getTransactionId() != null) {
361                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
362                    destination.addMessage(context, message);
363                    return AbstractMessageStore.FUTURE;
364                } else {
365                    Tx tx = getTx(message.getTransactionId());
366                    tx.add(new AddMessageCommand(context) {
367                        @Override
368                        public Message getMessage() {
369                            return message;
370                        }
371                        @Override
372                        public Future<Object> run(ConnectionContext ctx) throws IOException {
373                            return destination.asyncAddQueueMessage(ctx, message);
374                        }
375    
376                    });
377                    return AbstractMessageStore.FUTURE;
378                }
379            } else {
380                return destination.asyncAddQueueMessage(context, message);
381            }
382        }
383    
384        Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
385                throws IOException {
386    
387            if (message.getTransactionId() != null) {
388                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
389                    destination.addMessage(context, message);
390                    return AbstractMessageStore.FUTURE;
391                } else {
392                    Tx tx = getTx(message.getTransactionId());
393                    tx.add(new AddMessageCommand(context) {
394                        @Override
395                        public Message getMessage() {
396                            return message;
397                        }
398                        @Override
399                        public Future run(ConnectionContext ctx) throws IOException {
400                            return destination.asyncAddTopicMessage(ctx, message);
401                        }
402    
403                    });
404                    return AbstractMessageStore.FUTURE;
405                }
406            } else {
407                return destination.asyncAddTopicMessage(context, message);
408            }
409        }
410    
411        /**
412         * @param ack
413         * @throws IOException
414         */
415        final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
416                throws IOException {
417    
418            if (ack.isInTransaction()) {
419                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
420                    destination.removeMessage(context, ack);
421                } else {
422                    Tx tx = getTx(ack.getTransactionId());
423                    tx.add(new RemoveMessageCommand(context) {
424                        @Override
425                        public MessageAck getMessageAck() {
426                            return ack;
427                        }
428    
429                        @Override
430                        public Future<Object> run(ConnectionContext ctx) throws IOException {
431                            destination.removeMessage(ctx, ack);
432                            return AbstractMessageStore.FUTURE;
433                        }
434                    });
435                }
436            } else {
437                destination.removeMessage(context, ack);
438            }
439        }
440    
441        final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
442                throws IOException {
443    
444            if (ack.isInTransaction()) {
445                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
446                    destination.removeAsyncMessage(context, ack);
447                } else {
448                    Tx tx = getTx(ack.getTransactionId());
449                    tx.add(new RemoveMessageCommand(context) {
450                        @Override
451                        public MessageAck getMessageAck() {
452                            return ack;
453                        }
454    
455                        @Override
456                        public Future<Object> run(ConnectionContext ctx) throws IOException {
457                            destination.removeMessage(ctx, ack);
458                            return AbstractMessageStore.FUTURE;
459                        }
460                    });
461                }
462            } else {
463                destination.removeAsyncMessage(context, ack);
464            }
465        }
466    
467        final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
468                               final MessageId messageId, final MessageAck ack) throws IOException {
469    
470            if (ack.isInTransaction()) {
471                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
472                    destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
473                } else {
474                    Tx tx = getTx(ack.getTransactionId());
475                    tx.add(new RemoveMessageCommand(context) {
476                        public MessageAck getMessageAck() {
477                            return ack;
478                        }
479    
480                        public Future<Object> run(ConnectionContext ctx) throws IOException {
481                            destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
482                            return AbstractMessageStore.FUTURE;
483                        }
484                    });
485                }
486            } else {
487                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
488            }
489        }
490    
491    
492        private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
493            return theStore.createTransactionInfo(txid);
494        }
495    
496    }