001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.Serializable;
020    
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.MessageId;
026    import org.apache.activemq.command.ProducerId;
027    import org.apache.activemq.util.BitArrayBin;
028    import org.apache.activemq.util.IdGenerator;
029    import org.apache.activemq.util.LRUCache;
030    
031    /**
032     * Provides basic audit functions for Messages without sync
033     * 
034     * 
035     */
036    public class ActiveMQMessageAuditNoSync implements Serializable {
037    
038        private static final long serialVersionUID = 1L;
039    
040        public static final int DEFAULT_WINDOW_SIZE = 2048;
041        public static final int MAXIMUM_PRODUCER_COUNT = 64;
042        private int auditDepth;
043        private int maximumNumberOfProducersToTrack;
044        private LRUCache<Object, BitArrayBin> map;
045    
046        /**
047         * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
048         * 64
049         */
050        public ActiveMQMessageAuditNoSync() {
051            this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052        }
053    
054        /**
055         * Construct a MessageAudit
056         * 
057         * @param auditDepth range of ids to track
058         * @param maximumNumberOfProducersToTrack number of producers expected in
059         *                the system
060         */
061        public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
062            this.auditDepth = auditDepth;
063            this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
064            this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
065        }
066        
067        /**
068         * @return the auditDepth
069         */
070        public int getAuditDepth() {
071            return auditDepth;
072        }
073    
074        /**
075         * @param auditDepth the auditDepth to set
076         */
077        public void setAuditDepth(int auditDepth) {
078            this.auditDepth = auditDepth;
079        }
080    
081        /**
082         * @return the maximumNumberOfProducersToTrack
083         */
084        public int getMaximumNumberOfProducersToTrack() {
085            return maximumNumberOfProducersToTrack;
086        }
087    
088        /**
089         * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090         */
091        public void setMaximumNumberOfProducersToTrack(
092                int maximumNumberOfProducersToTrack) {
093            this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
094            this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
095        }
096    
097        /**
098         * Checks if this message has been seen before
099         * 
100         * @param message
101         * @return true if the message is a duplicate
102         * @throws JMSException
103         */
104        public boolean isDuplicate(Message message) throws JMSException {
105            return isDuplicate(message.getJMSMessageID());
106        }
107    
108        /**
109         * checks whether this messageId has been seen before and adds this
110         * messageId to the list
111         * 
112         * @param id
113         * @return true if the message is a duplicate
114         */
115        public boolean isDuplicate(String id) {
116            boolean answer = false;
117            String seed = IdGenerator.getSeedFromId(id);
118            if (seed != null) {
119                BitArrayBin bab = map.get(seed);
120                if (bab == null) {
121                    bab = new BitArrayBin(auditDepth);
122                    map.put(seed, bab);
123                }
124                long index = IdGenerator.getSequenceFromId(id);
125                if (index >= 0) {
126                    answer = bab.setBit(index, true);
127                }
128            }
129            return answer;
130        }
131    
132        /**
133         * Checks if this message has been seen before
134         * 
135         * @param message
136         * @return true if the message is a duplicate
137         */
138        public boolean isDuplicate(final MessageReference message) {
139            MessageId id = message.getMessageId();
140            return isDuplicate(id);
141        }
142        
143        /**
144         * Checks if this messageId has been seen before
145         * 
146         * @param id
147         * @return true if the message is a duplicate
148         */
149        public boolean isDuplicate(final MessageId id) {
150            boolean answer = false;
151            
152            if (id != null) {
153                ProducerId pid = id.getProducerId();
154                if (pid != null) {
155                    BitArrayBin bab = map.get(pid);
156                    if (bab == null) {
157                        bab = new BitArrayBin(auditDepth);
158                        map.put(pid, bab);
159                    }
160                    answer = bab.setBit(id.getProducerSequenceId(), true);
161                }
162            }
163            return answer;
164        }
165    
166        /**
167         * mark this message as being received
168         * 
169         * @param message
170         */
171        public void rollback(final MessageReference message) {
172            MessageId id = message.getMessageId();
173            rollback(id);
174        }
175        
176        /**
177         * mark this message as being received
178         * 
179         * @param id
180         */
181        public void rollback(final  MessageId id) {
182            if (id != null) {
183                ProducerId pid = id.getProducerId();
184                if (pid != null) {
185                    BitArrayBin bab = map.get(pid);
186                    if (bab != null) {
187                        bab.setBit(id.getProducerSequenceId(), false);
188                    }
189                }
190            }
191        }
192        
193        /**
194         * Check the message is in order
195         * @param msg
196         * @return
197         * @throws JMSException
198         */
199        public boolean isInOrder(Message msg) throws JMSException {
200            return isInOrder(msg.getJMSMessageID());
201        }
202        
203        /**
204         * Check the message id is in order
205         * @param id
206         * @return
207         */
208        public boolean isInOrder(final String id) {
209            boolean answer = true;
210            
211            if (id != null) {
212                String seed = IdGenerator.getSeedFromId(id);
213                if (seed != null) {
214                    BitArrayBin bab = map.get(seed);
215                    if (bab != null) {
216                        long index = IdGenerator.getSequenceFromId(id);
217                        answer = bab.isInOrder(index);
218                    }
219                   
220                }
221            }
222            return answer;
223        }
224        
225        /**
226         * Check the MessageId is in order
227         * @param message 
228         * @return
229         */
230        public boolean isInOrder(final MessageReference message) {
231            return isInOrder(message.getMessageId());
232        }
233        
234        /**
235         * Check the MessageId is in order
236         * @param id
237         * @return
238         */
239        public boolean isInOrder(final MessageId id) {
240            boolean answer = false;
241    
242            if (id != null) {
243                ProducerId pid = id.getProducerId();
244                if (pid != null) {
245                    BitArrayBin bab = map.get(pid);
246                    if (bab == null) {
247                        bab = new BitArrayBin(auditDepth);
248                        map.put(pid, bab);
249                    }
250                    answer = bab.isInOrder(id.getProducerSequenceId());
251    
252                }
253            }
254            return answer;
255        }
256    
257        public long getLastSeqId(ProducerId id) {
258            long result = -1;
259            BitArrayBin bab = map.get(id.toString() + ":");
260            if (bab != null) {
261                result = bab.getLastSetIndex();
262            }
263            return result;
264        }
265    
266        public void clear() {
267            map.clear();
268        }
269    }