001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.Map;
025    import javax.jms.JMSException;
026    import org.apache.activemq.ActiveMQConnection;
027    import org.apache.activemq.advisory.AdvisorySupport;
028    import org.apache.activemq.broker.region.Destination;
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.broker.region.RegionBroker;
031    import org.apache.activemq.usage.MemoryUsage;
032    import org.apache.activemq.util.ByteArrayInputStream;
033    import org.apache.activemq.util.ByteArrayOutputStream;
034    import org.apache.activemq.util.ByteSequence;
035    import org.apache.activemq.util.MarshallingSupport;
036    import org.apache.activemq.wireformat.WireFormat;
037    
038    /**
039     * Represents an ActiveMQ message
040     *
041     * @openwire:marshaller
042     *
043     */
044    public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
045    
046        /**
047         * The default minimum amount of memory a message is assumed to use
048         */
049        public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
050    
051        protected MessageId messageId;
052        protected ActiveMQDestination originalDestination;
053        protected TransactionId originalTransactionId;
054    
055        protected ProducerId producerId;
056        protected ActiveMQDestination destination;
057        protected TransactionId transactionId;
058    
059        protected long expiration;
060        protected long timestamp;
061        protected long arrival;
062        protected long brokerInTime;
063        protected long brokerOutTime;
064        protected String correlationId;
065        protected ActiveMQDestination replyTo;
066        protected boolean persistent;
067        protected String type;
068        protected byte priority;
069        protected String groupID;
070        protected int groupSequence;
071        protected ConsumerId targetConsumerId;
072        protected boolean compressed;
073        protected String userID;
074    
075        protected ByteSequence content;
076        protected ByteSequence marshalledProperties;
077        protected DataStructure dataStructure;
078        protected int redeliveryCounter;
079    
080        protected int size;
081        protected Map<String, Object> properties;
082        protected boolean readOnlyProperties;
083        protected boolean readOnlyBody;
084        protected transient boolean recievedByDFBridge;
085        protected boolean droppable;
086    
087        private transient short referenceCount;
088        private transient ActiveMQConnection connection;
089        private transient org.apache.activemq.broker.region.Destination regionDestination;
090        private transient MemoryUsage memoryUsage;
091    
092        private BrokerId[] brokerPath;
093        private BrokerId[] cluster;
094    
095        public abstract Message copy();
096        public abstract void clearBody() throws JMSException;
097    
098        // useful to reduce the memory footprint of a persisted message
099        public void clearMarshalledState() throws JMSException {
100            properties = null;
101        }
102    
103        protected void copy(Message copy) {
104            super.copy(copy);
105            copy.producerId = producerId;
106            copy.transactionId = transactionId;
107            copy.destination = destination;
108            copy.messageId = messageId != null ? messageId.copy() : null;
109            copy.originalDestination = originalDestination;
110            copy.originalTransactionId = originalTransactionId;
111            copy.expiration = expiration;
112            copy.timestamp = timestamp;
113            copy.correlationId = correlationId;
114            copy.replyTo = replyTo;
115            copy.persistent = persistent;
116            copy.redeliveryCounter = redeliveryCounter;
117            copy.type = type;
118            copy.priority = priority;
119            copy.size = size;
120            copy.groupID = groupID;
121            copy.userID = userID;
122            copy.groupSequence = groupSequence;
123    
124            if (properties != null) {
125                copy.properties = new HashMap<String, Object>(properties);
126    
127                // The new message hasn't expired, so remove this feild.
128                copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
129            } else {
130                copy.properties = properties;
131            }
132    
133            copy.content = content;
134            copy.marshalledProperties = marshalledProperties;
135            copy.dataStructure = dataStructure;
136            copy.readOnlyProperties = readOnlyProperties;
137            copy.readOnlyBody = readOnlyBody;
138            copy.compressed = compressed;
139            copy.recievedByDFBridge = recievedByDFBridge;
140    
141            copy.arrival = arrival;
142            copy.connection = connection;
143            copy.regionDestination = regionDestination;
144            copy.brokerInTime = brokerInTime;
145            copy.brokerOutTime = brokerOutTime;
146            copy.memoryUsage=this.memoryUsage;
147            copy.brokerPath = brokerPath;
148    
149            // lets not copy the following fields
150            // copy.targetConsumerId = targetConsumerId;
151            // copy.referenceCount = referenceCount;
152        }
153    
154        public Object getProperty(String name) throws IOException {
155            if (properties == null) {
156                if (marshalledProperties == null) {
157                    return null;
158                }
159                properties = unmarsallProperties(marshalledProperties);
160            }
161            return properties.get(name);
162        }
163    
164        @SuppressWarnings("unchecked")
165        public Map<String, Object> getProperties() throws IOException {
166            if (properties == null) {
167                if (marshalledProperties == null) {
168                    return Collections.EMPTY_MAP;
169                }
170                properties = unmarsallProperties(marshalledProperties);
171            }
172            return Collections.unmodifiableMap(properties);
173        }
174    
175        public void clearProperties() {
176            marshalledProperties = null;
177            properties = null;
178        }
179    
180        public void setProperty(String name, Object value) throws IOException {
181            lazyCreateProperties();
182            properties.put(name, value);
183        }
184    
185        public void removeProperty(String name) throws IOException {
186            lazyCreateProperties();
187            properties.remove(name);
188        }
189    
190        protected void lazyCreateProperties() throws IOException {
191            if (properties == null) {
192                if (marshalledProperties == null) {
193                    properties = new HashMap<String, Object>();
194                } else {
195                    properties = unmarsallProperties(marshalledProperties);
196                    marshalledProperties = null;
197                }
198            }
199        }
200    
201        private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
202            return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
203        }
204    
205        public void beforeMarshall(WireFormat wireFormat) throws IOException {
206            // Need to marshal the properties.
207            if (marshalledProperties == null && properties != null) {
208                ByteArrayOutputStream baos = new ByteArrayOutputStream();
209                DataOutputStream os = new DataOutputStream(baos);
210                MarshallingSupport.marshalPrimitiveMap(properties, os);
211                os.close();
212                marshalledProperties = baos.toByteSequence();
213            }
214        }
215    
216        public void afterMarshall(WireFormat wireFormat) throws IOException {
217        }
218    
219        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
220        }
221    
222        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
223        }
224    
225        // /////////////////////////////////////////////////////////////////
226        //
227        // Simple Field accessors
228        //
229        // /////////////////////////////////////////////////////////////////
230    
231        /**
232         * @openwire:property version=1 cache=true
233         */
234        public ProducerId getProducerId() {
235            return producerId;
236        }
237    
238        public void setProducerId(ProducerId producerId) {
239            this.producerId = producerId;
240        }
241    
242        /**
243         * @openwire:property version=1 cache=true
244         */
245        public ActiveMQDestination getDestination() {
246            return destination;
247        }
248    
249        public void setDestination(ActiveMQDestination destination) {
250            this.destination = destination;
251        }
252    
253        /**
254         * @openwire:property version=1 cache=true
255         */
256        public TransactionId getTransactionId() {
257            return transactionId;
258        }
259    
260        public void setTransactionId(TransactionId transactionId) {
261            this.transactionId = transactionId;
262        }
263    
264        public boolean isInTransaction() {
265            return transactionId != null;
266        }
267    
268        /**
269         * @openwire:property version=1 cache=true
270         */
271        public ActiveMQDestination getOriginalDestination() {
272            return originalDestination;
273        }
274    
275        public void setOriginalDestination(ActiveMQDestination destination) {
276            this.originalDestination = destination;
277        }
278    
279        /**
280         * @openwire:property version=1
281         */
282        public MessageId getMessageId() {
283            return messageId;
284        }
285    
286        public void setMessageId(MessageId messageId) {
287            this.messageId = messageId;
288        }
289    
290        /**
291         * @openwire:property version=1 cache=true
292         */
293        public TransactionId getOriginalTransactionId() {
294            return originalTransactionId;
295        }
296    
297        public void setOriginalTransactionId(TransactionId transactionId) {
298            this.originalTransactionId = transactionId;
299        }
300    
301        /**
302         * @openwire:property version=1
303         */
304        public String getGroupID() {
305            return groupID;
306        }
307    
308        public void setGroupID(String groupID) {
309            this.groupID = groupID;
310        }
311    
312        /**
313         * @openwire:property version=1
314         */
315        public int getGroupSequence() {
316            return groupSequence;
317        }
318    
319        public void setGroupSequence(int groupSequence) {
320            this.groupSequence = groupSequence;
321        }
322    
323        /**
324         * @openwire:property version=1
325         */
326        public String getCorrelationId() {
327            return correlationId;
328        }
329    
330        public void setCorrelationId(String correlationId) {
331            this.correlationId = correlationId;
332        }
333    
334        /**
335         * @openwire:property version=1
336         */
337        public boolean isPersistent() {
338            return persistent;
339        }
340    
341        public void setPersistent(boolean deliveryMode) {
342            this.persistent = deliveryMode;
343        }
344    
345        /**
346         * @openwire:property version=1
347         */
348        public long getExpiration() {
349            return expiration;
350        }
351    
352        public void setExpiration(long expiration) {
353            this.expiration = expiration;
354        }
355    
356        /**
357         * @openwire:property version=1
358         */
359        public byte getPriority() {
360            return priority;
361        }
362    
363        public void setPriority(byte priority) {
364            if (priority < 0) {
365                this.priority = 0;
366            } else if (priority > 9) {
367                this.priority = 9;
368            } else {
369                this.priority = priority;
370            }
371        }
372    
373        /**
374         * @openwire:property version=1
375         */
376        public ActiveMQDestination getReplyTo() {
377            return replyTo;
378        }
379    
380        public void setReplyTo(ActiveMQDestination replyTo) {
381            this.replyTo = replyTo;
382        }
383    
384        /**
385         * @openwire:property version=1
386         */
387        public long getTimestamp() {
388            return timestamp;
389        }
390    
391        public void setTimestamp(long timestamp) {
392            this.timestamp = timestamp;
393        }
394    
395        /**
396         * @openwire:property version=1
397         */
398        public String getType() {
399            return type;
400        }
401    
402        public void setType(String type) {
403            this.type = type;
404        }
405    
406        /**
407         * @openwire:property version=1
408         */
409        public ByteSequence getContent() {
410            return content;
411        }
412    
413        public void setContent(ByteSequence content) {
414            this.content = content;
415        }
416    
417        /**
418         * @openwire:property version=1
419         */
420        public ByteSequence getMarshalledProperties() {
421            return marshalledProperties;
422        }
423    
424        public void setMarshalledProperties(ByteSequence marshalledProperties) {
425            this.marshalledProperties = marshalledProperties;
426        }
427    
428        /**
429         * @openwire:property version=1
430         */
431        public DataStructure getDataStructure() {
432            return dataStructure;
433        }
434    
435        public void setDataStructure(DataStructure data) {
436            this.dataStructure = data;
437        }
438    
439        /**
440         * Can be used to route the message to a specific consumer. Should be null
441         * to allow the broker use normal JMS routing semantics. If the target
442         * consumer id is an active consumer on the broker, the message is dropped.
443         * Used by the AdvisoryBroker to replay advisory messages to a specific
444         * consumer.
445         *
446         * @openwire:property version=1 cache=true
447         */
448        public ConsumerId getTargetConsumerId() {
449            return targetConsumerId;
450        }
451    
452        public void setTargetConsumerId(ConsumerId targetConsumerId) {
453            this.targetConsumerId = targetConsumerId;
454        }
455    
456        public boolean isExpired() {
457            long expireTime = getExpiration();
458            return expireTime > 0 && System.currentTimeMillis() > expireTime;
459        }
460    
461        public boolean isAdvisory() {
462            return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
463        }
464    
465        /**
466         * @openwire:property version=1
467         */
468        public boolean isCompressed() {
469            return compressed;
470        }
471    
472        public void setCompressed(boolean compressed) {
473            this.compressed = compressed;
474        }
475    
476        public boolean isRedelivered() {
477            return redeliveryCounter > 0;
478        }
479    
480        public void setRedelivered(boolean redelivered) {
481            if (redelivered) {
482                if (!isRedelivered()) {
483                    setRedeliveryCounter(1);
484                }
485            } else {
486                if (isRedelivered()) {
487                    setRedeliveryCounter(0);
488                }
489            }
490        }
491    
492        public void incrementRedeliveryCounter() {
493            redeliveryCounter++;
494        }
495    
496        /**
497         * @openwire:property version=1
498         */
499        public int getRedeliveryCounter() {
500            return redeliveryCounter;
501        }
502    
503        public void setRedeliveryCounter(int deliveryCounter) {
504            this.redeliveryCounter = deliveryCounter;
505        }
506    
507        /**
508         * The route of brokers the command has moved through.
509         *
510         * @openwire:property version=1 cache=true
511         */
512        public BrokerId[] getBrokerPath() {
513            return brokerPath;
514        }
515    
516        public void setBrokerPath(BrokerId[] brokerPath) {
517            this.brokerPath = brokerPath;
518        }
519    
520        public boolean isReadOnlyProperties() {
521            return readOnlyProperties;
522        }
523    
524        public void setReadOnlyProperties(boolean readOnlyProperties) {
525            this.readOnlyProperties = readOnlyProperties;
526        }
527    
528        public boolean isReadOnlyBody() {
529            return readOnlyBody;
530        }
531    
532        public void setReadOnlyBody(boolean readOnlyBody) {
533            this.readOnlyBody = readOnlyBody;
534        }
535    
536        public ActiveMQConnection getConnection() {
537            return this.connection;
538        }
539    
540        public void setConnection(ActiveMQConnection connection) {
541            this.connection = connection;
542        }
543    
544        /**
545         * Used to schedule the arrival time of a message to a broker. The broker
546         * will not dispatch a message to a consumer until it's arrival time has
547         * elapsed.
548         *
549         * @openwire:property version=1
550         */
551        public long getArrival() {
552            return arrival;
553        }
554    
555        public void setArrival(long arrival) {
556            this.arrival = arrival;
557        }
558    
559        /**
560         * Only set by the broker and defines the userID of the producer connection
561         * who sent this message. This is an optional field, it needs to be enabled
562         * on the broker to have this field populated.
563         *
564         * @openwire:property version=1
565         */
566        public String getUserID() {
567            return userID;
568        }
569    
570        public void setUserID(String jmsxUserID) {
571            this.userID = jmsxUserID;
572        }
573    
574        public int getReferenceCount() {
575            return referenceCount;
576        }
577    
578        public Message getMessageHardRef() {
579            return this;
580        }
581    
582        public Message getMessage() {
583            return this;
584        }
585    
586        public org.apache.activemq.broker.region.Destination getRegionDestination() {
587            return regionDestination;
588        }
589    
590        public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
591            this.regionDestination = destination;
592            if(this.memoryUsage==null) {
593                this.memoryUsage=regionDestination.getMemoryUsage();
594            }
595        }
596    
597        public MemoryUsage getMemoryUsage() {
598            return this.memoryUsage;
599        }
600    
601        public void setMemoryUsage(MemoryUsage usage) {
602            this.memoryUsage=usage;
603        }
604    
605        @Override
606        public boolean isMarshallAware() {
607            return true;
608        }
609    
610        public int incrementReferenceCount() {
611            int rc;
612            int size;
613            synchronized (this) {
614                rc = ++referenceCount;
615                size = getSize();
616            }
617    
618            if (rc == 1 && getMemoryUsage() != null) {
619                getMemoryUsage().increaseUsage(size);
620                //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
621    
622            }
623    
624            //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
625            return rc;
626        }
627    
628        public int decrementReferenceCount() {
629            int rc;
630            int size;
631            synchronized (this) {
632                rc = --referenceCount;
633                size = getSize();
634            }
635    
636            if (rc == 0 && getMemoryUsage() != null) {
637                getMemoryUsage().decreaseUsage(size);
638                //Thread.dumpStack();
639                //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
640            }
641    
642            //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
643    
644            return rc;
645        }
646    
647        public int getSize() {
648            int minimumMessageSize = getMinimumMessageSize();
649            if (size < minimumMessageSize || size == 0) {
650                size = minimumMessageSize;
651                if (marshalledProperties != null) {
652                    size += marshalledProperties.getLength();
653                }
654                if (content != null) {
655                    size += content.getLength();
656                }
657            }
658            return size;
659        }
660    
661        protected int getMinimumMessageSize() {
662            int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
663            //let destination override
664            Destination dest = regionDestination;
665            if (dest != null) {
666                result=dest.getMinimumMessageSize();
667            }
668            return result;
669        }
670    
671        /**
672         * @openwire:property version=1
673         * @return Returns the recievedByDFBridge.
674         */
675        public boolean isRecievedByDFBridge() {
676            return recievedByDFBridge;
677        }
678    
679        /**
680         * @param recievedByDFBridge The recievedByDFBridge to set.
681         */
682        public void setRecievedByDFBridge(boolean recievedByDFBridge) {
683            this.recievedByDFBridge = recievedByDFBridge;
684        }
685    
686        public void onMessageRolledBack() {
687            incrementRedeliveryCounter();
688        }
689    
690        /**
691         * @openwire:property version=2 cache=true
692         */
693        public boolean isDroppable() {
694            return droppable;
695        }
696    
697        public void setDroppable(boolean droppable) {
698            this.droppable = droppable;
699        }
700    
701        /**
702         * If a message is stored in multiple nodes on a cluster, all the cluster
703         * members will be listed here. Otherwise, it will be null.
704         *
705         * @openwire:property version=3 cache=true
706         */
707        public BrokerId[] getCluster() {
708            return cluster;
709        }
710    
711        public void setCluster(BrokerId[] cluster) {
712            this.cluster = cluster;
713        }
714    
715        @Override
716        public boolean isMessage() {
717            return true;
718        }
719    
720        /**
721         * @openwire:property version=3
722         */
723        public long getBrokerInTime() {
724            return this.brokerInTime;
725        }
726    
727        public void setBrokerInTime(long brokerInTime) {
728            this.brokerInTime = brokerInTime;
729        }
730    
731        /**
732         * @openwire:property version=3
733         */
734        public long getBrokerOutTime() {
735            return this.brokerOutTime;
736        }
737    
738        public void setBrokerOutTime(long brokerOutTime) {
739            this.brokerOutTime = brokerOutTime;
740        }
741    
742        public boolean isDropped() {
743            return false;
744        }
745    
746        @Override
747        public String toString() {
748            return toString(null);
749        }
750    
751        @Override
752        public String toString(Map<String, Object>overrideFields) {
753            try {
754                getProperties();
755            } catch (IOException e) {
756            }
757            return super.toString(overrideFields);
758        }
759    }