001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.Map.Entry;
023    import java.util.concurrent.ConcurrentHashMap;
024    import javax.transaction.xa.XAException;
025    import org.apache.activemq.broker.BrokerService;
026    import org.apache.activemq.broker.BrokerServiceAware;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.command.TransactionId;
032    import org.apache.activemq.command.XATransactionId;
033    import org.apache.activemq.kaha.RuntimeStoreException;
034    import org.apache.activemq.store.MessageStore;
035    import org.apache.activemq.store.ProxyMessageStore;
036    import org.apache.activemq.store.ProxyTopicMessageStore;
037    import org.apache.activemq.store.TopicMessageStore;
038    import org.apache.activemq.store.TransactionRecoveryListener;
039    import org.apache.activemq.store.TransactionStore;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * Provides a TransactionStore implementation that can create transaction aware
045     * MessageStore objects from non transaction aware MessageStore objects.
046     * 
047     * 
048     */
049    public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {     
050        private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051            
052        private final Map transactions = new ConcurrentHashMap();
053        private final Map prepared;
054        private final KahaPersistenceAdapter adaptor;
055        
056        private BrokerService brokerService;
057    
058        KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059            this.adaptor = adaptor;
060            this.prepared = preparedMap;
061        }
062    
063        public MessageStore proxy(MessageStore messageStore) {
064            return new ProxyMessageStore(messageStore) {
065                @Override
066                public void addMessage(ConnectionContext context, final Message send) throws IOException {
067                    KahaTransactionStore.this.addMessage(getDelegate(), send);
068                }
069    
070                @Override
071                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
072                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
073                }
074            };
075        }
076    
077        public TopicMessageStore proxy(TopicMessageStore messageStore) {
078            return new ProxyTopicMessageStore(messageStore) {
079                @Override
080                public void addMessage(ConnectionContext context, final Message send) throws IOException {
081                    KahaTransactionStore.this.addMessage(getDelegate(), send);
082                }
083    
084                @Override
085                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
086                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
087                }
088    
089                @Override
090                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
091                                MessageId messageId, MessageAck ack) throws IOException {
092                    KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
093                }
094            };
095        }
096    
097        /**
098         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
099         */
100        public void prepare(TransactionId txid) {
101            KahaTransaction tx = getTx(txid);
102            if (tx != null) {
103                tx.prepare();
104                prepared.put(txid, tx);
105            }
106        }
107    
108        public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
109            if(before != null) {
110                before.run();
111            }
112            KahaTransaction tx = getTx(txid);
113            if (tx != null) {
114                tx.commit(this);
115                removeTx(txid);
116            }
117            if (after != null) {
118                after.run();
119            }
120        }
121    
122        /**
123         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
124         */
125        public void rollback(TransactionId txid) {
126            KahaTransaction tx = getTx(txid);
127            if (tx != null) {
128                tx.rollback();
129                removeTx(txid);
130            }
131        }
132    
133        public void start() throws Exception {
134        }
135    
136        public void stop() throws Exception {
137        }
138    
139        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
140            for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
141                Map.Entry entry = (Entry)i.next();
142                XATransactionId xid = (XATransactionId)entry.getKey();
143                KahaTransaction kt = (KahaTransaction)entry.getValue();
144                listener.recover(xid, kt.getMessages(), kt.getAcks());
145            }
146        }
147    
148        /**
149         * @param message
150         * @throws IOException
151         */
152        void addMessage(final MessageStore destination, final Message message) throws IOException {
153            try {
154                    if (message.isInTransaction()) {
155                            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
156                            tx.add((KahaMessageStore)destination, message);
157                    } else {
158                            destination.addMessage(null, message);
159                    }
160            } catch (RuntimeStoreException rse) {
161                if (rse.getCause() instanceof IOException) {
162                    brokerService.handleIOException((IOException)rse.getCause());
163                }
164                throw rse;
165            }
166        }
167    
168        /**
169         * @param ack
170         * @throws IOException
171         */
172        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
173            try {
174                    if (ack.isInTransaction()) {
175                            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
176                            tx.add((KahaMessageStore)destination, ack);
177                    } else {
178                            destination.removeMessage(null, ack);
179                    }
180            } catch (RuntimeStoreException rse) {
181                if (rse.getCause() instanceof IOException) {
182                    brokerService.handleIOException((IOException)rse.getCause());
183                }
184                throw rse;
185            }
186        }
187    
188        final void acknowledge(final TopicMessageStore destination, String clientId,
189                               String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
190            try {
191                if (ack.isInTransaction()) {
192                    KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
193                    tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
194                } else {
195                    destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
196                }
197            } catch (RuntimeStoreException rse) {
198                if (rse.getCause() instanceof IOException) {
199                    brokerService.handleIOException((IOException)rse.getCause());
200                }
201                throw rse;
202            }
203        }
204    
205        protected synchronized KahaTransaction getTx(TransactionId key) {
206            KahaTransaction result = (KahaTransaction)transactions.get(key);
207            if (result == null) {
208                result = (KahaTransaction)prepared.get(key);
209            }
210            return result;
211        }
212    
213        protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
214            KahaTransaction result = (KahaTransaction)transactions.get(key);
215            if (result == null) {
216                result = new KahaTransaction();
217                transactions.put(key, result);
218            }
219            return result;
220        }
221    
222        protected synchronized void removeTx(TransactionId key) {
223            transactions.remove(key);
224            prepared.remove(key);
225        }
226    
227        public void delete() {
228            transactions.clear();
229            prepared.clear();
230        }
231    
232        protected MessageStore getStoreById(Object id) {
233            return adaptor.retrieveMessageStore(id);
234        }
235    
236            public void setBrokerService(BrokerService brokerService) {
237                    this.brokerService = brokerService;
238            }
239    }