001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    
020    import java.util.*;
021    import java.util.concurrent.ConcurrentHashMap;
022    
023    import javax.jms.JMSException;
024    import javax.transaction.xa.XAException;
025    
026    import org.apache.activemq.ActiveMQMessageAudit;
027    import org.apache.activemq.command.ConnectionInfo;
028    import org.apache.activemq.command.LocalTransactionId;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageAck;
031    import org.apache.activemq.command.ProducerInfo;
032    import org.apache.activemq.command.TransactionId;
033    import org.apache.activemq.command.XATransactionId;
034    import org.apache.activemq.state.ProducerState;
035    import org.apache.activemq.store.TransactionRecoveryListener;
036    import org.apache.activemq.store.TransactionStore;
037    import org.apache.activemq.transaction.LocalTransaction;
038    import org.apache.activemq.transaction.Synchronization;
039    import org.apache.activemq.transaction.Transaction;
040    import org.apache.activemq.transaction.XATransaction;
041    import org.apache.activemq.util.IOExceptionSupport;
042    import org.apache.activemq.util.WrappedException;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * This broker filter handles the transaction related operations in the Broker
048     * interface.
049     * 
050     * 
051     */
052    public class TransactionBroker extends BrokerFilter {
053    
054        private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
055    
056        // The prepared XA transactions.
057        private TransactionStore transactionStore;
058        private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
059        private ActiveMQMessageAudit audit;
060    
061        public TransactionBroker(Broker next, TransactionStore transactionStore) {
062            super(next);
063            this.transactionStore = transactionStore;
064        }
065    
066        // ////////////////////////////////////////////////////////////////////////////
067        //
068        // Life cycle Methods
069        //
070        // ////////////////////////////////////////////////////////////////////////////
071    
072        /**
073         * Recovers any prepared transactions.
074         */
075        public void start() throws Exception {
076            transactionStore.start();
077            try {
078                final ConnectionContext context = new ConnectionContext();
079                context.setBroker(this);
080                context.setInRecoveryMode(true);
081                context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
082                context.setProducerFlowControl(false);
083                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
084                producerExchange.setMutable(true);
085                producerExchange.setConnectionContext(context);
086                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
087                final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
088                consumerExchange.setConnectionContext(context);
089                transactionStore.recover(new TransactionRecoveryListener() {
090                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
091                        try {
092                            beginTransaction(context, xid);
093                            for (int i = 0; i < addedMessages.length; i++) {
094                                send(producerExchange, addedMessages[i]);
095                            }
096                            for (int i = 0; i < aks.length; i++) {
097                                acknowledge(consumerExchange, aks[i]);
098                            }
099                            prepareTransaction(context, xid);
100                        } catch (Throwable e) {
101                            throw new WrappedException(e);
102                        }
103                    }
104                });
105            } catch (WrappedException e) {
106                Throwable cause = e.getCause();
107                throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
108            }
109            next.start();
110        }
111    
112        public void stop() throws Exception {
113            transactionStore.stop();
114            next.stop();
115        }
116    
117        // ////////////////////////////////////////////////////////////////////////////
118        //
119        // BrokerFilter overrides
120        //
121        // ////////////////////////////////////////////////////////////////////////////
122        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
123            List<TransactionId> txs = new ArrayList<TransactionId>();
124            synchronized (xaTransactions) {
125                for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
126                    Transaction tx = iter.next();
127                    if (tx.isPrepared()) {
128                        if (LOG.isDebugEnabled()) {
129                            LOG.debug("prepared transaction: " + tx.getTransactionId());
130                        }
131                        txs.add(tx.getTransactionId());
132                    }
133                }
134            }
135            XATransactionId rc[] = new XATransactionId[txs.size()];
136            txs.toArray(rc);
137            if (LOG.isDebugEnabled()) {
138                LOG.debug("prepared transacton list size: " + rc.length);
139            }
140            return rc;
141        }
142    
143        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
144            // the transaction may have already been started.
145            if (xid.isXATransaction()) {
146                XATransaction transaction = null;
147                synchronized (xaTransactions) {
148                    transaction = xaTransactions.get(xid);
149                    if (transaction != null) {
150                        return;
151                    }
152                    transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
153                    xaTransactions.put(xid, transaction);
154                }
155            } else {
156                Map<TransactionId, Transaction> transactionMap = context.getTransactions();
157                Transaction transaction = transactionMap.get(xid);
158                if (transaction != null) {
159                    throw new JMSException("Transaction '" + xid + "' has already been started.");
160                }
161                transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
162                transactionMap.put(xid, transaction);
163            }
164        }
165    
166        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
167            Transaction transaction = getTransaction(context, xid, false);
168            return transaction.prepare();
169        }
170    
171        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
172            Transaction transaction = getTransaction(context, xid, true);
173            transaction.commit(onePhase);
174        }
175    
176        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
177            Transaction transaction = getTransaction(context, xid, true);
178            transaction.rollback();
179        }
180    
181        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
182            Transaction transaction = getTransaction(context, xid, true);
183            transaction.rollback();
184        }
185    
186        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
187            // This method may be invoked recursively.
188            // Track original tx so that it can be restored.
189            final ConnectionContext context = consumerExchange.getConnectionContext();
190            Transaction originalTx = context.getTransaction();
191            Transaction transaction = null;
192            if (ack.isInTransaction()) {
193                transaction = getTransaction(context, ack.getTransactionId(), false);
194            }
195            context.setTransaction(transaction);
196            try {
197                next.acknowledge(consumerExchange, ack);
198            } finally {
199                context.setTransaction(originalTx);
200            }
201        }
202    
203        public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
204            // This method may be invoked recursively.
205            // Track original tx so that it can be restored.
206            final ConnectionContext context = producerExchange.getConnectionContext();
207            Transaction originalTx = context.getTransaction();
208            Transaction transaction = null;
209            Synchronization sync = null;
210            if (message.getTransactionId() != null) {
211                transaction = getTransaction(context, message.getTransactionId(), false);
212                if (transaction != null) {
213                    sync = new Synchronization() {
214    
215                        public void afterRollback() {
216                            if (audit != null) {
217                                audit.rollback(message);
218                            }
219                        }
220                    };
221                    transaction.addSynchronization(sync);
222                }
223            }
224            if (audit == null || !audit.isDuplicate(message)) {
225                context.setTransaction(transaction);
226                try {
227                    next.send(producerExchange, message);
228                } finally {
229                    context.setTransaction(originalTx);
230                }
231            } else {
232                if (sync != null && transaction != null) {
233                    transaction.removeSynchronization(sync);
234                }
235                if (LOG.isDebugEnabled()) {
236                    LOG.debug("IGNORING duplicate message " + message);
237                }
238            }
239        }
240    
241        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
242            for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
243                try {
244                    Transaction transaction = iter.next();
245                    transaction.rollback();
246                } catch (Exception e) {
247                    LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
248                }
249                iter.remove();
250            }
251    
252            synchronized (xaTransactions) {
253                // first find all txs that belongs to the connection
254                ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
255                for (XATransaction tx : xaTransactions.values()) {
256                    if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
257                        txs.add(tx);
258                    }
259                }
260    
261                // then remove them
262                // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
263                for (XATransaction tx : txs) {
264                    try {
265                        tx.rollback();
266                    } catch (Exception e) {
267                        LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
268                    }
269                }
270    
271            }
272            next.removeConnection(context, info, error);
273        }
274    
275        // ////////////////////////////////////////////////////////////////////////////
276        //
277        // Implementation help methods.
278        //
279        // ////////////////////////////////////////////////////////////////////////////
280        public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
281            Map transactionMap = null;
282            synchronized (xaTransactions) {
283                transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
284            }
285            Transaction transaction = (Transaction)transactionMap.get(xid);
286            if (transaction != null) {
287                return transaction;
288            }
289            if (xid.isXATransaction()) {
290                XAException e = new XAException("Transaction '" + xid + "' has not been started.");
291                e.errorCode = XAException.XAER_NOTA;
292                throw e;
293            } else {
294                throw new JMSException("Transaction '" + xid + "' has not been started.");
295            }
296        }
297    
298        public void removeTransaction(XATransactionId xid) {
299            synchronized (xaTransactions) {
300                xaTransactions.remove(xid);
301            }
302        }
303    
304        public synchronized void brokerServiceStarted() {
305            super.brokerServiceStarted();
306            if (getBrokerService().isSupportFailOver() && audit == null) {
307                audit = new ActiveMQMessageAudit();
308            }
309        }
310    
311    }