001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.FilterOutputStream;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.OutputStream;
026    import java.util.zip.Deflater;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.JMSException;
032    import javax.jms.MessageFormatException;
033    import javax.jms.MessageNotReadableException;
034    
035    import org.apache.activemq.ActiveMQConnection;
036    import org.apache.activemq.util.ByteArrayInputStream;
037    import org.apache.activemq.util.ByteArrayOutputStream;
038    import org.apache.activemq.util.ByteSequence;
039    import org.apache.activemq.util.ByteSequenceData;
040    import org.apache.activemq.util.JMSExceptionSupport;
041    
042    /**
043     * A <CODE>BytesMessage</CODE> object is used to send a message containing a
044     * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
045     * interface and adds a bytes message body. The receiver of the message supplies
046     * the interpretation of the bytes.
047     * <P>
048     * The <CODE>BytesMessage</CODE> methods are based largely on those found in
049     * <CODE>java.io.DataInputStream</CODE> and
050     * <CODE>java.io.DataOutputStream</CODE>.
051     * <P>
052     * This message type is for client encoding of existing message formats. If
053     * possible, one of the other self-defining message types should be used
054     * instead.
055     * <P>
056     * Although the JMS API allows the use of message properties with byte messages,
057     * they are typically not used, since the inclusion of properties may affect the
058     * format.
059     * <P>
060     * The primitive types can be written explicitly using methods for each type.
061     * They may also be written generically as objects. For instance, a call to
062     * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
063     * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
064     * provided, because the explicit form is convenient for static programming, and
065     * the object form is needed when types are not known at compile time.
066     * <P>
067     * When the message is first created, and when <CODE>clearBody</CODE> is
068     * called, the body of the message is in write-only mode. After the first call
069     * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
070     * After a message has been sent, the client that sent it can retain and modify
071     * it without affecting the message that has been sent. The same message object
072     * can be sent multiple times. When a message has been received, the provider
073     * has called <CODE>reset</CODE> so that the message body is in read-only mode
074     * for the client.
075     * <P>
076     * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
077     * message body is cleared and the message is in write-only mode.
078     * <P>
079     * If a client attempts to read a message in write-only mode, a
080     * <CODE>MessageNotReadableException</CODE> is thrown.
081     * <P>
082     * If a client attempts to write a message in read-only mode, a
083     * <CODE>MessageNotWriteableException</CODE> is thrown.
084     *
085     * @openwire:marshaller code=24
086     * @see javax.jms.Session#createBytesMessage()
087     * @see javax.jms.MapMessage
088     * @see javax.jms.Message
089     * @see javax.jms.ObjectMessage
090     * @see javax.jms.StreamMessage
091     * @see javax.jms.TextMessage
092     */
093    public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
094    
095        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
096    
097        protected transient DataOutputStream dataOut;
098        protected transient ByteArrayOutputStream bytesOut;
099        protected transient DataInputStream dataIn;
100        protected transient int length;
101    
102        public Message copy() {
103            ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
104            copy(copy);
105            return copy;
106        }
107    
108        private void copy(ActiveMQBytesMessage copy) {
109            storeContent();
110            super.copy(copy);
111            copy.dataOut = null;
112            copy.bytesOut = null;
113            copy.dataIn = null;
114        }
115    
116        public void onSend() throws JMSException {
117            super.onSend();
118            storeContent();
119        }
120    
121        private void storeContent() {
122            try {
123                if (dataOut != null) {
124                    dataOut.close();
125                    ByteSequence bs = bytesOut.toByteSequence();
126                    if (compressed) {
127                        int pos = bs.offset;
128                        ByteSequenceData.writeIntBig(bs, length);
129                        bs.offset = pos;
130                    }
131                    setContent(bs);
132                    bytesOut = null;
133                    dataOut = null;
134                }
135            } catch (IOException ioe) {
136                throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
137                                                                    // RuntimeException
138            }
139        }
140    
141        public byte getDataStructureType() {
142            return DATA_STRUCTURE_TYPE;
143        }
144    
145        public String getJMSXMimeType() {
146            return "jms/bytes-message";
147        }
148    
149        /**
150         * Clears out the message body. Clearing a message's body does not clear its
151         * header values or property entries.
152         * <P>
153         * If this message body was read-only, calling this method leaves the
154         * message body in the same state as an empty body in a newly created
155         * message.
156         *
157         * @throws JMSException if the JMS provider fails to clear the message body
158         *                 due to some internal error.
159         */
160        public void clearBody() throws JMSException {
161            super.clearBody();
162            this.dataOut = null;
163            this.dataIn = null;
164            this.bytesOut = null;
165        }
166    
167        /**
168         * Gets the number of bytes of the message body when the message is in
169         * read-only mode. The value returned can be used to allocate a byte array.
170         * The value returned is the entire length of the message body, regardless
171         * of where the pointer for reading the message is currently located.
172         *
173         * @return number of bytes in the message
174         * @throws JMSException if the JMS provider fails to read the message due to
175         *                 some internal error.
176         * @throws MessageNotReadableException if the message is in write-only mode.
177         * @since 1.1
178         */
179    
180        public long getBodyLength() throws JMSException {
181            initializeReading();
182            return length;
183        }
184    
185        /**
186         * Reads a <code>boolean</code> from the bytes message stream.
187         *
188         * @return the <code>boolean</code> value read
189         * @throws JMSException if the JMS provider fails to read the message due to
190         *                 some internal error.
191         * @throws MessageEOFException if unexpected end of bytes stream has been
192         *                 reached.
193         * @throws MessageNotReadableException if the message is in write-only mode.
194         */
195        public boolean readBoolean() throws JMSException {
196            initializeReading();
197            try {
198                return this.dataIn.readBoolean();
199            } catch (EOFException e) {
200                throw JMSExceptionSupport.createMessageEOFException(e);
201            } catch (IOException e) {
202                throw JMSExceptionSupport.createMessageFormatException(e);
203            }
204        }
205    
206        /**
207         * Reads a signed 8-bit value from the bytes message stream.
208         *
209         * @return the next byte from the bytes message stream as a signed 8-bit
210         *         <code>byte</code>
211         * @throws JMSException if the JMS provider fails to read the message due to
212         *                 some internal error.
213         * @throws MessageEOFException if unexpected end of bytes stream has been
214         *                 reached.
215         * @throws MessageNotReadableException if the message is in write-only mode.
216         */
217        public byte readByte() throws JMSException {
218            initializeReading();
219            try {
220                return this.dataIn.readByte();
221            } catch (EOFException e) {
222                throw JMSExceptionSupport.createMessageEOFException(e);
223            } catch (IOException e) {
224                throw JMSExceptionSupport.createMessageFormatException(e);
225            }
226        }
227    
228        /**
229         * Reads an unsigned 8-bit number from the bytes message stream.
230         *
231         * @return the next byte from the bytes message stream, interpreted as an
232         *         unsigned 8-bit number
233         * @throws JMSException if the JMS provider fails to read the message due to
234         *                 some internal error.
235         * @throws MessageEOFException if unexpected end of bytes stream has been
236         *                 reached.
237         * @throws MessageNotReadableException if the message is in write-only mode.
238         */
239        public int readUnsignedByte() throws JMSException {
240            initializeReading();
241            try {
242                return this.dataIn.readUnsignedByte();
243            } catch (EOFException e) {
244                throw JMSExceptionSupport.createMessageEOFException(e);
245            } catch (IOException e) {
246                throw JMSExceptionSupport.createMessageFormatException(e);
247            }
248        }
249    
250        /**
251         * Reads a signed 16-bit number from the bytes message stream.
252         *
253         * @return the next two bytes from the bytes message stream, interpreted as
254         *         a signed 16-bit number
255         * @throws JMSException if the JMS provider fails to read the message due to
256         *                 some internal error.
257         * @throws MessageEOFException if unexpected end of bytes stream has been
258         *                 reached.
259         * @throws MessageNotReadableException if the message is in write-only mode.
260         */
261        public short readShort() throws JMSException {
262            initializeReading();
263            try {
264                return this.dataIn.readShort();
265            } catch (EOFException e) {
266                throw JMSExceptionSupport.createMessageEOFException(e);
267            } catch (IOException e) {
268                throw JMSExceptionSupport.createMessageFormatException(e);
269            }
270        }
271    
272        /**
273         * Reads an unsigned 16-bit number from the bytes message stream.
274         *
275         * @return the next two bytes from the bytes message stream, interpreted as
276         *         an unsigned 16-bit integer
277         * @throws JMSException if the JMS provider fails to read the message due to
278         *                 some internal error.
279         * @throws MessageEOFException if unexpected end of bytes stream has been
280         *                 reached.
281         * @throws MessageNotReadableException if the message is in write-only mode.
282         */
283        public int readUnsignedShort() throws JMSException {
284            initializeReading();
285            try {
286                return this.dataIn.readUnsignedShort();
287            } catch (EOFException e) {
288                throw JMSExceptionSupport.createMessageEOFException(e);
289            } catch (IOException e) {
290                throw JMSExceptionSupport.createMessageFormatException(e);
291            }
292        }
293    
294        /**
295         * Reads a Unicode character value from the bytes message stream.
296         *
297         * @return the next two bytes from the bytes message stream as a Unicode
298         *         character
299         * @throws JMSException if the JMS provider fails to read the message due to
300         *                 some internal error.
301         * @throws MessageEOFException if unexpected end of bytes stream has been
302         *                 reached.
303         * @throws MessageNotReadableException if the message is in write-only mode.
304         */
305        public char readChar() throws JMSException {
306            initializeReading();
307            try {
308                return this.dataIn.readChar();
309            } catch (EOFException e) {
310                throw JMSExceptionSupport.createMessageEOFException(e);
311            } catch (IOException e) {
312                throw JMSExceptionSupport.createMessageFormatException(e);
313            }
314        }
315    
316        /**
317         * Reads a signed 32-bit integer from the bytes message stream.
318         *
319         * @return the next four bytes from the bytes message stream, interpreted as
320         *         an <code>int</code>
321         * @throws JMSException if the JMS provider fails to read the message due to
322         *                 some internal error.
323         * @throws MessageEOFException if unexpected end of bytes stream has been
324         *                 reached.
325         * @throws MessageNotReadableException if the message is in write-only mode.
326         */
327        public int readInt() throws JMSException {
328            initializeReading();
329            try {
330                return this.dataIn.readInt();
331            } catch (EOFException e) {
332                throw JMSExceptionSupport.createMessageEOFException(e);
333            } catch (IOException e) {
334                throw JMSExceptionSupport.createMessageFormatException(e);
335            }
336        }
337    
338        /**
339         * Reads a signed 64-bit integer from the bytes message stream.
340         *
341         * @return the next eight bytes from the bytes message stream, interpreted
342         *         as a <code>long</code>
343         * @throws JMSException if the JMS provider fails to read the message due to
344         *                 some internal error.
345         * @throws MessageEOFException if unexpected end of bytes stream has been
346         *                 reached.
347         * @throws MessageNotReadableException if the message is in write-only mode.
348         */
349        public long readLong() throws JMSException {
350            initializeReading();
351            try {
352                return this.dataIn.readLong();
353            } catch (EOFException e) {
354                throw JMSExceptionSupport.createMessageEOFException(e);
355            } catch (IOException e) {
356                throw JMSExceptionSupport.createMessageFormatException(e);
357            }
358        }
359    
360        /**
361         * Reads a <code>float</code> from the bytes message stream.
362         *
363         * @return the next four bytes from the bytes message stream, interpreted as
364         *         a <code>float</code>
365         * @throws JMSException if the JMS provider fails to read the message due to
366         *                 some internal error.
367         * @throws MessageEOFException if unexpected end of bytes stream has been
368         *                 reached.
369         * @throws MessageNotReadableException if the message is in write-only mode.
370         */
371        public float readFloat() throws JMSException {
372            initializeReading();
373            try {
374                return this.dataIn.readFloat();
375            } catch (EOFException e) {
376                throw JMSExceptionSupport.createMessageEOFException(e);
377            } catch (IOException e) {
378                throw JMSExceptionSupport.createMessageFormatException(e);
379            }
380        }
381    
382        /**
383         * Reads a <code>double</code> from the bytes message stream.
384         *
385         * @return the next eight bytes from the bytes message stream, interpreted
386         *         as a <code>double</code>
387         * @throws JMSException if the JMS provider fails to read the message due to
388         *                 some internal error.
389         * @throws MessageEOFException if unexpected end of bytes stream has been
390         *                 reached.
391         * @throws MessageNotReadableException if the message is in write-only mode.
392         */
393        public double readDouble() throws JMSException {
394            initializeReading();
395            try {
396                return this.dataIn.readDouble();
397            } catch (EOFException e) {
398                throw JMSExceptionSupport.createMessageEOFException(e);
399            } catch (IOException e) {
400                throw JMSExceptionSupport.createMessageFormatException(e);
401            }
402        }
403    
404        /**
405         * Reads a string that has been encoded using a modified UTF-8 format from
406         * the bytes message stream.
407         * <P>
408         * For more information on the UTF-8 format, see "File System Safe UCS
409         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
410         * X/Open Company Ltd., Document Number: P316. This information also appears
411         * in ISO/IEC 10646, Annex P.
412         *
413         * @return a Unicode string from the bytes message stream
414         * @throws JMSException if the JMS provider fails to read the message due to
415         *                 some internal error.
416         * @throws MessageEOFException if unexpected end of bytes stream has been
417         *                 reached.
418         * @throws MessageNotReadableException if the message is in write-only mode.
419         */
420        public String readUTF() throws JMSException {
421            initializeReading();
422            try {
423                return this.dataIn.readUTF();
424            } catch (EOFException e) {
425                throw JMSExceptionSupport.createMessageEOFException(e);
426            } catch (IOException e) {
427                throw JMSExceptionSupport.createMessageFormatException(e);
428            }
429        }
430    
431        /**
432         * Reads a byte array from the bytes message stream.
433         * <P>
434         * If the length of array <code>value</code> is less than the number of
435         * bytes remaining to be read from the stream, the array should be filled. A
436         * subsequent call reads the next increment, and so on.
437         * <P>
438         * If the number of bytes remaining in the stream is less than the length of
439         * array <code>value</code>, the bytes should be read into the array. The
440         * return value of the total number of bytes read will be less than the
441         * length of the array, indicating that there are no more bytes left to be
442         * read from the stream. The next read of the stream returns -1.
443         *
444         * @param value the buffer into which the data is read
445         * @return the total number of bytes read into the buffer, or -1 if there is
446         *         no more data because the end of the stream has been reached
447         * @throws JMSException if the JMS provider fails to read the message due to
448         *                 some internal error.
449         * @throws MessageNotReadableException if the message is in write-only mode.
450         */
451        public int readBytes(byte[] value) throws JMSException {
452            return readBytes(value, value.length);
453        }
454    
455        /**
456         * Reads a portion of the bytes message stream.
457         * <P>
458         * If the length of array <code>value</code> is less than the number of
459         * bytes remaining to be read from the stream, the array should be filled. A
460         * subsequent call reads the next increment, and so on.
461         * <P>
462         * If the number of bytes remaining in the stream is less than the length of
463         * array <code>value</code>, the bytes should be read into the array. The
464         * return value of the total number of bytes read will be less than the
465         * length of the array, indicating that there are no more bytes left to be
466         * read from the stream. The next read of the stream returns -1. <p/> If
467         * <code>length</code> is negative, or <code>length</code> is greater
468         * than the length of the array <code>value</code>, then an
469         * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
470         * from the stream for this exception case.
471         *
472         * @param value the buffer into which the data is read
473         * @param length the number of bytes to read; must be less than or equal to
474         *                <code>value.length</code>
475         * @return the total number of bytes read into the buffer, or -1 if there is
476         *         no more data because the end of the stream has been reached
477         * @throws JMSException if the JMS provider fails to read the message due to
478         *                 some internal error.
479         * @throws MessageNotReadableException if the message is in write-only mode.
480         */
481        public int readBytes(byte[] value, int length) throws JMSException {
482            initializeReading();
483            try {
484                int n = 0;
485                while (n < length) {
486                    int count = this.dataIn.read(value, n, length - n);
487                    if (count < 0) {
488                        break;
489                    }
490                    n += count;
491                }
492                if (n == 0 && length > 0) {
493                    n = -1;
494                }
495                return n;
496            } catch (EOFException e) {
497                throw JMSExceptionSupport.createMessageEOFException(e);
498            } catch (IOException e) {
499                throw JMSExceptionSupport.createMessageFormatException(e);
500            }
501        }
502    
503        /**
504         * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
505         * value. The value <code>true</code> is written as the value
506         * <code>(byte)1</code>; the value <code>false</code> is written as the
507         * value <code>(byte)0</code>.
508         *
509         * @param value the <code>boolean</code> value to be written
510         * @throws JMSException if the JMS provider fails to write the message due
511         *                 to some internal error.
512         * @throws MessageNotWriteableException if the message is in read-only mode.
513         */
514        public void writeBoolean(boolean value) throws JMSException {
515            initializeWriting();
516            try {
517                this.dataOut.writeBoolean(value);
518            } catch (IOException ioe) {
519                throw JMSExceptionSupport.create(ioe);
520            }
521        }
522    
523        /**
524         * Writes a <code>byte</code> to the bytes message stream as a 1-byte
525         * value.
526         *
527         * @param value the <code>byte</code> value to be written
528         * @throws JMSException if the JMS provider fails to write the message due
529         *                 to some internal error.
530         * @throws MessageNotWriteableException if the message is in read-only mode.
531         */
532        public void writeByte(byte value) throws JMSException {
533            initializeWriting();
534            try {
535                this.dataOut.writeByte(value);
536            } catch (IOException ioe) {
537                throw JMSExceptionSupport.create(ioe);
538            }
539        }
540    
541        /**
542         * Writes a <code>short</code> to the bytes message stream as two bytes,
543         * high byte first.
544         *
545         * @param value the <code>short</code> to be written
546         * @throws JMSException if the JMS provider fails to write the message due
547         *                 to some internal error.
548         * @throws MessageNotWriteableException if the message is in read-only mode.
549         */
550        public void writeShort(short value) throws JMSException {
551            initializeWriting();
552            try {
553                this.dataOut.writeShort(value);
554            } catch (IOException ioe) {
555                throw JMSExceptionSupport.create(ioe);
556            }
557        }
558    
559        /**
560         * Writes a <code>char</code> to the bytes message stream as a 2-byte
561         * value, high byte first.
562         *
563         * @param value the <code>char</code> value to be written
564         * @throws JMSException if the JMS provider fails to write the message due
565         *                 to some internal error.
566         * @throws MessageNotWriteableException if the message is in read-only mode.
567         */
568        public void writeChar(char value) throws JMSException {
569            initializeWriting();
570            try {
571                this.dataOut.writeChar(value);
572            } catch (IOException ioe) {
573                throw JMSExceptionSupport.create(ioe);
574            }
575        }
576    
577        /**
578         * Writes an <code>int</code> to the bytes message stream as four bytes,
579         * high byte first.
580         *
581         * @param value the <code>int</code> to be written
582         * @throws JMSException if the JMS provider fails to write the message due
583         *                 to some internal error.
584         * @throws MessageNotWriteableException if the message is in read-only mode.
585         */
586        public void writeInt(int value) throws JMSException {
587            initializeWriting();
588            try {
589                this.dataOut.writeInt(value);
590            } catch (IOException ioe) {
591                throw JMSExceptionSupport.create(ioe);
592            }
593        }
594    
595        /**
596         * Writes a <code>long</code> to the bytes message stream as eight bytes,
597         * high byte first.
598         *
599         * @param value the <code>long</code> to be written
600         * @throws JMSException if the JMS provider fails to write the message due
601         *                 to some internal error.
602         * @throws MessageNotWriteableException if the message is in read-only mode.
603         */
604        public void writeLong(long value) throws JMSException {
605            initializeWriting();
606            try {
607                this.dataOut.writeLong(value);
608            } catch (IOException ioe) {
609                throw JMSExceptionSupport.create(ioe);
610            }
611        }
612    
613        /**
614         * Converts the <code>float</code> argument to an <code>int</code> using
615         * the <code>floatToIntBits</code> method in class <code>Float</code>,
616         * and then writes that <code>int</code> value to the bytes message stream
617         * as a 4-byte quantity, high byte first.
618         *
619         * @param value the <code>float</code> value to be written
620         * @throws JMSException if the JMS provider fails to write the message due
621         *                 to some internal error.
622         * @throws MessageNotWriteableException if the message is in read-only mode.
623         */
624        public void writeFloat(float value) throws JMSException {
625            initializeWriting();
626            try {
627                this.dataOut.writeFloat(value);
628            } catch (IOException ioe) {
629                throw JMSExceptionSupport.create(ioe);
630            }
631        }
632    
633        /**
634         * Converts the <code>double</code> argument to a <code>long</code>
635         * using the <code>doubleToLongBits</code> method in class
636         * <code>Double</code>, and then writes that <code>long</code> value to
637         * the bytes message stream as an 8-byte quantity, high byte first.
638         *
639         * @param value the <code>double</code> value to be written
640         * @throws JMSException if the JMS provider fails to write the message due
641         *                 to some internal error.
642         * @throws MessageNotWriteableException if the message is in read-only mode.
643         */
644        public void writeDouble(double value) throws JMSException {
645            initializeWriting();
646            try {
647                this.dataOut.writeDouble(value);
648            } catch (IOException ioe) {
649                throw JMSExceptionSupport.create(ioe);
650            }
651        }
652    
653        /**
654         * Writes a string to the bytes message stream using UTF-8 encoding in a
655         * machine-independent manner.
656         * <P>
657         * For more information on the UTF-8 format, see "File System Safe UCS
658         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
659         * X/Open Company Ltd., Document Number: P316. This information also appears
660         * in ISO/IEC 10646, Annex P.
661         *
662         * @param value the <code>String</code> value to be written
663         * @throws JMSException if the JMS provider fails to write the message due
664         *                 to some internal error.
665         * @throws MessageNotWriteableException if the message is in read-only mode.
666         */
667        public void writeUTF(String value) throws JMSException {
668            initializeWriting();
669            try {
670                this.dataOut.writeUTF(value);
671            } catch (IOException ioe) {
672                throw JMSExceptionSupport.create(ioe);
673            }
674        }
675    
676        /**
677         * Writes a byte array to the bytes message stream.
678         *
679         * @param value the byte array to be written
680         * @throws JMSException if the JMS provider fails to write the message due
681         *                 to some internal error.
682         * @throws MessageNotWriteableException if the message is in read-only mode.
683         */
684        public void writeBytes(byte[] value) throws JMSException {
685            initializeWriting();
686            try {
687                this.dataOut.write(value);
688            } catch (IOException ioe) {
689                throw JMSExceptionSupport.create(ioe);
690            }
691        }
692    
693        /**
694         * Writes a portion of a byte array to the bytes message stream.
695         *
696         * @param value the byte array value to be written
697         * @param offset the initial offset within the byte array
698         * @param length the number of bytes to use
699         * @throws JMSException if the JMS provider fails to write the message due
700         *                 to some internal error.
701         * @throws MessageNotWriteableException if the message is in read-only mode.
702         */
703        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
704            initializeWriting();
705            try {
706                this.dataOut.write(value, offset, length);
707            } catch (IOException ioe) {
708                throw JMSExceptionSupport.create(ioe);
709            }
710        }
711    
712        /**
713         * Writes an object to the bytes message stream.
714         * <P>
715         * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
716         * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
717         * arrays.
718         *
719         * @param value the object in the Java programming language ("Java object")
720         *                to be written; it must not be null
721         * @throws JMSException if the JMS provider fails to write the message due
722         *                 to some internal error.
723         * @throws MessageFormatException if the object is of an invalid type.
724         * @throws MessageNotWriteableException if the message is in read-only mode.
725         * @throws java.lang.NullPointerException if the parameter
726         *                 <code>value</code> is null.
727         */
728        public void writeObject(Object value) throws JMSException {
729            if (value == null) {
730                throw new NullPointerException();
731            }
732            initializeWriting();
733            if (value instanceof Boolean) {
734                writeBoolean(((Boolean)value).booleanValue());
735            } else if (value instanceof Character) {
736                writeChar(((Character)value).charValue());
737            } else if (value instanceof Byte) {
738                writeByte(((Byte)value).byteValue());
739            } else if (value instanceof Short) {
740                writeShort(((Short)value).shortValue());
741            } else if (value instanceof Integer) {
742                writeInt(((Integer)value).intValue());
743            } else if (value instanceof Long) {
744                writeLong(((Long)value).longValue());
745            } else if (value instanceof Float) {
746                writeFloat(((Float)value).floatValue());
747            } else if (value instanceof Double) {
748                writeDouble(((Double)value).doubleValue());
749            } else if (value instanceof String) {
750                writeUTF(value.toString());
751            } else if (value instanceof byte[]) {
752                writeBytes((byte[])value);
753            } else {
754                throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
755            }
756        }
757    
758        /**
759         * Puts the message body in read-only mode and repositions the stream of
760         * bytes to the beginning.
761         *
762         * @throws JMSException if an internal error occurs
763         */
764        public void reset() throws JMSException {
765            storeContent();
766            this.bytesOut = null;
767            this.dataIn = null;
768            this.dataOut = null;
769            setReadOnlyBody(true);
770        }
771    
772        private void initializeWriting() throws JMSException {
773            checkReadOnlyBody();
774            if (this.dataOut == null) {
775                this.bytesOut = new ByteArrayOutputStream();
776                OutputStream os = bytesOut;
777                ActiveMQConnection connection = getConnection();
778                if (connection != null && connection.isUseCompression()) {
779                    // keep track of the real length of the content if
780                    // we are compressed.
781                    try {
782                        os.write(new byte[4]);
783                    } catch (IOException e) {
784                        throw JMSExceptionSupport.create(e);
785                    }
786                    length = 0;
787                    compressed = true;
788                    final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
789                    os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
790                        public void write(byte[] arg0) throws IOException {
791                            length += arg0.length;
792                            out.write(arg0);
793                        }
794    
795                        public void write(byte[] arg0, int arg1, int arg2) throws IOException {
796                            length += arg2;
797                            out.write(arg0, arg1, arg2);
798                        }
799    
800                        public void write(int arg0) throws IOException {
801                            length++;
802                            out.write(arg0);
803                        }
804    
805                        @Override
806                        public void close() throws IOException {
807                            super.close();
808                            deflater.end();
809                        }
810                    };
811                }
812                this.dataOut = new DataOutputStream(os);
813            }
814        }
815    
816        protected void checkWriteOnlyBody() throws MessageNotReadableException {
817            if (!readOnlyBody) {
818                throw new MessageNotReadableException("Message body is write-only");
819            }
820        }
821    
822        private void initializeReading() throws JMSException {
823            checkWriteOnlyBody();
824            if (dataIn == null) {
825                ByteSequence data = getContent();
826                if (data == null) {
827                    data = new ByteSequence(new byte[] {}, 0, 0);
828                }
829                InputStream is = new ByteArrayInputStream(data);
830                if (isCompressed()) {
831                    // keep track of the real length of the content if
832                    // we are compressed.
833                    try {
834                        DataInputStream dis = new DataInputStream(is);
835                        length = dis.readInt();
836                        dis.close();
837                    } catch (IOException e) {
838                        throw JMSExceptionSupport.create(e);
839                    }
840                    is = new InflaterInputStream(is);
841                } else {
842                    length = data.getLength();
843                }
844                dataIn = new DataInputStream(is);
845            }
846        }
847    
848        public void setObjectProperty(String name, Object value) throws JMSException {
849            initializeWriting();
850            super.setObjectProperty(name, value);
851        }
852    
853        public String toString() {
854            return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
855        }
856    }