001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.Arrays;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.activemq.ActiveMQMessageAudit;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.ActiveMQTopic;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageAck;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.SubscriptionInfo;
033    import org.apache.activemq.store.MessageRecoveryListener;
034    import org.apache.activemq.store.TopicMessageStore;
035    import org.apache.activemq.util.ByteSequence;
036    import org.apache.activemq.util.IOExceptionSupport;
037    import org.apache.activemq.wireformat.WireFormat;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * 
043     */
044    public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
047        private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
048    
049        public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
050            super(persistenceAdapter, adapter, wireFormat, topic, audit);
051        }
052    
053        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
054            if (ack != null && ack.isUnmatchedAck()) {
055                if (LOG.isTraceEnabled()) {
056                    LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
057                }
058                return;
059            }
060            TransactionContext c = persistenceAdapter.getTransactionContext(context);
061            try {
062                long[] res = adapter.getStoreSequenceId(c, destination, messageId);
063                if (this.isPrioritizedMessages()) {
064                    adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
065                } else {
066                    adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
067                }
068                if (LOG.isTraceEnabled()) {
069                    LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
070                }
071            } catch (SQLException e) {
072                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
073                throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
074            } finally {
075                c.close();
076            }
077        }
078    
079        /**
080         * @throws Exception
081         */
082        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
083            TransactionContext c = persistenceAdapter.getTransactionContext();
084            try {
085                adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
086                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
087                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
088                        msg.getMessageId().setBrokerSequenceId(sequenceId);
089                        return listener.recoverMessage(msg);
090                    }
091    
092                    public boolean recoverMessageReference(String reference) throws Exception {
093                        return listener.recoverMessageReference(new MessageId(reference));
094                    }
095    
096                });
097            } catch (SQLException e) {
098                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
099                throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
100            } finally {
101                c.close();
102            }
103        }
104    
105        private class LastRecovered implements Iterable<LastRecoveredEntry> {
106            LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
107            LastRecovered() {
108                for (int i=0; i<perPriority.length; i++) {
109                    perPriority[i] = new LastRecoveredEntry(i);
110                }
111            }
112    
113            public void updateStored(long sequence, int priority) {
114                perPriority[priority].stored = sequence;
115            }
116    
117            public LastRecoveredEntry defaultPriority() {
118                return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
119            }
120    
121            public String toString() {
122                return Arrays.deepToString(perPriority);
123            }
124    
125            public Iterator<LastRecoveredEntry> iterator() {
126                return new PriorityIterator();
127            }
128    
129            class PriorityIterator implements Iterator<LastRecoveredEntry> {
130                int current = 9;
131                public boolean hasNext() {
132                    for (int i=current; i>=0; i--) {
133                        if (perPriority[i].hasMessages()) {
134                            current = i;
135                            return true;
136                        }
137                    }
138                    return false;
139                }
140    
141                public LastRecoveredEntry next() {
142                    return perPriority[current];
143                }
144    
145                public void remove() {
146                    throw new RuntimeException("not implemented");
147                }
148            }
149        }
150    
151        private class LastRecoveredEntry {
152            final int priority;
153            long recovered = 0;
154            long stored = Integer.MAX_VALUE;
155    
156            public LastRecoveredEntry(int priority) {
157                this.priority = priority;
158            }
159    
160            public String toString() {
161                return priority + "-" + stored + ":" + recovered;
162            }
163    
164            public void exhausted() {
165                stored = recovered;
166            }
167    
168            public boolean hasMessages() {
169                return stored > recovered;
170            }
171        }
172    
173        class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
174            final MessageRecoveryListener delegate;
175            final int maxMessages;
176            LastRecoveredEntry lastRecovered;
177            int recoveredCount;
178            int recoveredMarker;
179    
180            public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
181                this.delegate = delegate;
182                this.maxMessages = maxMessages;
183            }
184    
185            public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
186                if (delegate.hasSpace()) {
187                    Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
188                    msg.getMessageId().setBrokerSequenceId(sequenceId);
189                    if (delegate.recoverMessage(msg)) {
190                        lastRecovered.recovered = sequenceId;
191                        recoveredCount++;
192                        return true;
193                    }
194                }
195                return false;
196            }
197    
198            public boolean recoverMessageReference(String reference) throws Exception {
199                return delegate.recoverMessageReference(new MessageId(reference));
200            }
201    
202            public void setLastRecovered(LastRecoveredEntry lastRecovered) {
203                this.lastRecovered = lastRecovered;
204                recoveredMarker = recoveredCount;
205            }
206    
207            public boolean complete() {
208                return  !delegate.hasSpace() || recoveredCount == maxMessages;
209            }
210    
211            public boolean stalled() {
212                return recoveredMarker == recoveredCount;
213            }
214        }
215    
216        public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
217                throws Exception {
218            //Duration duration = new Duration("recoverNextMessages");
219            TransactionContext c = persistenceAdapter.getTransactionContext();
220    
221            String key = getSubscriptionKey(clientId, subscriptionName);
222            if (!subscriberLastRecoveredMap.containsKey(key)) {
223               subscriberLastRecoveredMap.put(key, new LastRecovered());
224            }
225            final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);        
226            LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
227            try {
228                if (LOG.isTraceEnabled()) {
229                    LOG.trace(key + " existing last recovered: " + lastRecovered);
230                }
231                if (isPrioritizedMessages()) {
232                    Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
233                    for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
234                        LastRecoveredEntry entry = it.next();
235                        recoveredAwareListener.setLastRecovered(entry);
236                        //Duration microDuration = new Duration("recoverNextMessages:loop");
237                        adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
238                            entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
239                        //microDuration.end(entry);
240                        if (recoveredAwareListener.stalled()) {
241                            if (recoveredAwareListener.complete()) {
242                                break;
243                            } else {
244                                entry.exhausted();
245                            }
246                        }
247                    }
248                } else {
249                    LastRecoveredEntry last = lastRecovered.defaultPriority();
250                    recoveredAwareListener.setLastRecovered(last);
251                    adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
252                            last.recovered, 0, maxReturned, recoveredAwareListener);
253                }
254                if (LOG.isTraceEnabled()) {
255                    LOG.trace(key + " last recovered: " + lastRecovered);
256                }
257                //duration.end();
258            } catch (SQLException e) {
259                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
260            } finally {
261                c.close();
262            }
263        }
264    
265        public void resetBatching(String clientId, String subscriptionName) {
266            subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
267        }
268    
269        protected void onAdd(long sequenceId, byte priority) {
270            // update last recovered state
271            for (LastRecovered last : subscriberLastRecoveredMap.values()) {
272                last.updateStored(sequenceId, priority);
273            }
274        }
275    
276    
277        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
278            TransactionContext c = persistenceAdapter.getTransactionContext();
279            try {
280                c = persistenceAdapter.getTransactionContext();
281                adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
282            } catch (SQLException e) {
283                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
284                throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
285            } finally {
286                c.close();
287            }
288        }
289    
290        /**
291         * @see org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
292         *      String)
293         */
294        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
295            TransactionContext c = persistenceAdapter.getTransactionContext();
296            try {
297                return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
298            } catch (SQLException e) {
299                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
300                throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
301            } finally {
302                c.close();
303            }
304        }
305    
306        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
307            TransactionContext c = persistenceAdapter.getTransactionContext();
308            try {
309                adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
310            } catch (SQLException e) {
311                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
312                throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
313            } finally {
314                c.close();
315                resetBatching(clientId, subscriptionName);
316            }
317        }
318    
319        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
320            TransactionContext c = persistenceAdapter.getTransactionContext();
321            try {
322                return adapter.doGetAllSubscriptions(c, destination);
323            } catch (SQLException e) {
324                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
325                throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
326            } finally {
327                c.close();
328            }
329        }
330    
331        public int getMessageCount(String clientId, String subscriberName) throws IOException {
332            //Duration duration = new Duration("getMessageCount");
333            int result = 0;
334            TransactionContext c = persistenceAdapter.getTransactionContext();
335            try {
336                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
337            } catch (SQLException e) {
338                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
339                throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
340            } finally {
341                c.close();
342            }
343            if (LOG.isTraceEnabled()) {
344                LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
345            }
346            //duration.end();
347            return result;
348        }
349    
350        protected String getSubscriptionKey(String clientId, String subscriberName) {
351            String result = clientId + ":";
352            result += subscriberName != null ? subscriberName : "NOT_SET";
353            return result;
354        }
355    
356    }