001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.openwire;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.lang.reflect.Method;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import org.apache.activemq.command.CommandTypes;
027    import org.apache.activemq.command.DataStructure;
028    import org.apache.activemq.command.WireFormatInfo;
029    import org.apache.activemq.util.ByteSequence;
030    import org.apache.activemq.util.ByteSequenceData;
031    import org.apache.activemq.util.DataByteArrayInputStream;
032    import org.apache.activemq.util.DataByteArrayOutputStream;
033    import org.apache.activemq.wireformat.WireFormat;
034    
035    /**
036     * 
037     * 
038     */
039    public final class OpenWireFormat implements WireFormat {
040    
041        public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042    
043        static final byte NULL_TYPE = CommandTypes.NULL;
044        private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
045        private static final int MARSHAL_CACHE_FREE_SPACE = 100;
046    
047        private DataStreamMarshaller dataMarshallers[];
048        private int version;
049        private boolean stackTraceEnabled;
050        private boolean tcpNoDelayEnabled;
051        private boolean cacheEnabled;
052        private boolean tightEncodingEnabled;
053        private boolean sizePrefixDisabled;
054    
055        // The following fields are used for value caching
056        private short nextMarshallCacheIndex;
057        private short nextMarshallCacheEvictionIndex;
058        private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
059        private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
060        private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
061        private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
062        private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
063        private WireFormatInfo preferedWireFormatInfo;
064        
065        public OpenWireFormat() {
066            this(DEFAULT_VERSION);
067        }
068    
069        public OpenWireFormat(int i) {
070            setVersion(i);
071        }
072    
073        public int hashCode() {
074            return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
075                   ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
076                   ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
077                   ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
078        }
079    
080        public OpenWireFormat copy() {
081            OpenWireFormat answer = new OpenWireFormat();
082            answer.version = version;
083            answer.stackTraceEnabled = stackTraceEnabled;
084            answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
085            answer.cacheEnabled = cacheEnabled;
086            answer.tightEncodingEnabled = tightEncodingEnabled;
087            answer.sizePrefixDisabled = sizePrefixDisabled;
088            answer.preferedWireFormatInfo = preferedWireFormatInfo;
089            return answer;
090        }
091    
092        public boolean equals(Object object) {
093            if (object == null) {
094                return false;
095            }
096            OpenWireFormat o = (OpenWireFormat)object;
097            return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
098                   && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
099                   && o.sizePrefixDisabled == sizePrefixDisabled;
100        }
101    
102    
103        public String toString() {
104            return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
105                   + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
106            // return "OpenWireFormat{id="+id+",
107            // tightEncodingEnabled="+tightEncodingEnabled+"}";
108        }
109    
110        public int getVersion() {
111            return version;
112        }
113    
114        public synchronized ByteSequence marshal(Object command) throws IOException {
115    
116            if (cacheEnabled) {
117                runMarshallCacheEvictionSweep();
118            }
119    
120    //        MarshallAware ma = null;
121    //        // If not using value caching, then the marshaled form is always the
122    //        // same
123    //        if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
124    //            ma = (MarshallAware)command;
125    //        }
126    
127            ByteSequence sequence = null;
128            // if( ma!=null ) {
129            // sequence = ma.getCachedMarshalledForm(this);
130            // }
131    
132            if (sequence == null) {
133    
134                int size = 1;
135                if (command != null) {
136    
137                    DataStructure c = (DataStructure)command;
138                    byte type = c.getDataStructureType();
139                    DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
140                    if (dsm == null) {
141                        throw new IOException("Unknown data type: " + type);
142                    }
143                    if (tightEncodingEnabled) {
144    
145                        BooleanStream bs = new BooleanStream();
146                        size += dsm.tightMarshal1(this, c, bs);
147                        size += bs.marshalledSize();
148    
149                        bytesOut.restart(size);
150                        if (!sizePrefixDisabled) {
151                            bytesOut.writeInt(size);
152                        }
153                        bytesOut.writeByte(type);
154                        bs.marshal(bytesOut);
155                        dsm.tightMarshal2(this, c, bytesOut, bs);
156                        sequence = bytesOut.toByteSequence();
157    
158                    } else {
159                        bytesOut.restart();
160                        if (!sizePrefixDisabled) {
161                            bytesOut.writeInt(0); // we don't know the final size
162                                                    // yet but write this here for
163                                                    // now.
164                        }
165                        bytesOut.writeByte(type);
166                        dsm.looseMarshal(this, c, bytesOut);
167                        sequence = bytesOut.toByteSequence();
168    
169                        if (!sizePrefixDisabled) {
170                            size = sequence.getLength() - 4;
171                            int pos = sequence.offset;
172                            ByteSequenceData.writeIntBig(sequence, size);
173                            sequence.offset = pos;
174                        }
175                    }
176    
177                } else {
178                    bytesOut.restart(5);
179                    bytesOut.writeInt(size);
180                    bytesOut.writeByte(NULL_TYPE);
181                    sequence = bytesOut.toByteSequence();
182                }
183    
184                // if( ma!=null ) {
185                // ma.setCachedMarshalledForm(this, sequence);
186                // }
187            }
188            return sequence;
189        }
190    
191        public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
192            bytesIn.restart(sequence);
193            // DataInputStream dis = new DataInputStream(new
194            // ByteArrayInputStream(sequence));
195    
196            if (!sizePrefixDisabled) {
197                int size = bytesIn.readInt();
198                if (sequence.getLength() - 4 != size) {
199                    // throw new IOException("Packet size does not match marshaled
200                    // size");
201                }
202            }
203    
204            Object command = doUnmarshal(bytesIn);
205            // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
206            // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
207            // }
208            return command;
209        }
210    
211        public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
212    
213            if (cacheEnabled) {
214                runMarshallCacheEvictionSweep();
215            }
216    
217            int size = 1;
218            if (o != null) {
219    
220                DataStructure c = (DataStructure)o;
221                byte type = c.getDataStructureType();
222                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
223                if (dsm == null) {
224                    throw new IOException("Unknown data type: " + type);
225                }
226                if (tightEncodingEnabled) {
227                    BooleanStream bs = new BooleanStream();
228                    size += dsm.tightMarshal1(this, c, bs);
229                    size += bs.marshalledSize();
230    
231                    if (!sizePrefixDisabled) {
232                        dataOut.writeInt(size);
233                    }
234    
235                    dataOut.writeByte(type);
236                    bs.marshal(dataOut);
237                    dsm.tightMarshal2(this, c, dataOut, bs);
238    
239                } else {
240                    DataOutput looseOut = dataOut;
241    
242                    if (!sizePrefixDisabled) {
243                        bytesOut.restart();
244                        looseOut = bytesOut;
245                    }
246    
247                    looseOut.writeByte(type);
248                    dsm.looseMarshal(this, c, looseOut);
249    
250                    if (!sizePrefixDisabled) {
251                        ByteSequence sequence = bytesOut.toByteSequence();
252                        dataOut.writeInt(sequence.getLength());
253                        dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
254                    }
255    
256                }
257    
258            } else {
259                if (!sizePrefixDisabled) {
260                    dataOut.writeInt(size);
261                }
262                dataOut.writeByte(NULL_TYPE);
263            }
264        }
265    
266        public Object unmarshal(DataInput dis) throws IOException {
267            DataInput dataIn = dis;
268            if (!sizePrefixDisabled) {
269                dis.readInt();
270                // int size = dis.readInt();
271                // byte[] data = new byte[size];
272                // dis.readFully(data);
273                // bytesIn.restart(data);
274                // dataIn = bytesIn;
275            }
276            return doUnmarshal(dataIn);
277        }
278    
279        /**
280         * Used by NIO or AIO transports
281         */
282        public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
283            int size = 1;
284            if (o != null) {
285                DataStructure c = (DataStructure)o;
286                byte type = c.getDataStructureType();
287                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
288                if (dsm == null) {
289                    throw new IOException("Unknown data type: " + type);
290                }
291    
292                size += dsm.tightMarshal1(this, c, bs);
293                size += bs.marshalledSize();
294            }
295            return size;
296        }
297    
298        /**
299         * Used by NIO or AIO transports; note that the size is not written as part
300         * of this method.
301         */
302        public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
303            if (cacheEnabled) {
304                runMarshallCacheEvictionSweep();
305            }
306    
307            if (o != null) {
308                DataStructure c = (DataStructure)o;
309                byte type = c.getDataStructureType();
310                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
311                if (dsm == null) {
312                    throw new IOException("Unknown data type: " + type);
313                }
314                ds.writeByte(type);
315                bs.marshal(ds);
316                dsm.tightMarshal2(this, c, ds, bs);
317            }
318        }
319    
320        /**
321         * Allows you to dynamically switch the version of the openwire protocol
322         * being used.
323         * 
324         * @param version
325         */
326        public void setVersion(int version) {
327            String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
328            Class mfClass;
329            try {
330                mfClass = Class.forName(mfName, false, getClass().getClassLoader());
331            } catch (ClassNotFoundException e) {
332                throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
333                                                                             + ", could not load " + mfName)
334                    .initCause(e);
335            }
336            try {
337                Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
338                dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
339            } catch (Throwable e) {
340                throw (IllegalArgumentException)new IllegalArgumentException(
341                                                                             "Invalid version: "
342                                                                                 + version
343                                                                                 + ", "
344                                                                                 + mfName
345                                                                                 + " does not properly implement the createMarshallerMap method.")
346                    .initCause(e);
347            }
348            this.version = version;
349        }
350    
351        public Object doUnmarshal(DataInput dis) throws IOException {
352            byte dataType = dis.readByte();
353            if (dataType != NULL_TYPE) {
354                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
355                if (dsm == null) {
356                    throw new IOException("Unknown data type: " + dataType);
357                }
358                Object data = dsm.createObject();
359                if (this.tightEncodingEnabled) {
360                    BooleanStream bs = new BooleanStream();
361                    bs.unmarshal(dis);
362                    dsm.tightUnmarshal(this, data, dis, bs);
363                } else {
364                    dsm.looseUnmarshal(this, data, dis);
365                }
366                return data;
367            } else {
368                return null;
369            }
370        }
371    
372        // public void debug(String msg) {
373        // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
374        // System.out.println(t+": "+msg);
375        // }
376        public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
377            bs.writeBoolean(o != null);
378            if (o == null) {
379                return 0;
380            }
381    
382            if (o.isMarshallAware()) {
383                // MarshallAware ma = (MarshallAware)o;
384                ByteSequence sequence = null;
385                // sequence=ma.getCachedMarshalledForm(this);
386                bs.writeBoolean(sequence != null);
387                if (sequence != null) {
388                    return 1 + sequence.getLength();
389                }
390            }
391    
392            byte type = o.getDataStructureType();
393            DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
394            if (dsm == null) {
395                throw new IOException("Unknown data type: " + type);
396            }
397            return 1 + dsm.tightMarshal1(this, o, bs);
398        }
399    
400        public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
401            throws IOException {
402            if (!bs.readBoolean()) {
403                return;
404            }
405    
406            byte type = o.getDataStructureType();
407            ds.writeByte(type);
408    
409            if (o.isMarshallAware() && bs.readBoolean()) {
410    
411                // We should not be doing any caching
412                throw new IOException("Corrupted stream");
413                // MarshallAware ma = (MarshallAware) o;
414                // ByteSequence sequence=ma.getCachedMarshalledForm(this);
415                // ds.write(sequence.getData(), sequence.getOffset(),
416                // sequence.getLength());
417    
418            } else {
419    
420                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
421                if (dsm == null) {
422                    throw new IOException("Unknown data type: " + type);
423                }
424                dsm.tightMarshal2(this, o, ds, bs);
425    
426            }
427        }
428    
429        public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
430            if (bs.readBoolean()) {
431    
432                byte dataType = dis.readByte();
433                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
434                if (dsm == null) {
435                    throw new IOException("Unknown data type: " + dataType);
436                }
437                DataStructure data = dsm.createObject();
438    
439                if (data.isMarshallAware() && bs.readBoolean()) {
440    
441                    dis.readInt();
442                    dis.readByte();
443    
444                    BooleanStream bs2 = new BooleanStream();
445                    bs2.unmarshal(dis);
446                    dsm.tightUnmarshal(this, data, dis, bs2);
447    
448                    // TODO: extract the sequence from the dis and associate it.
449                    // MarshallAware ma = (MarshallAware)data
450                    // ma.setCachedMarshalledForm(this, sequence);
451    
452                } else {
453                    dsm.tightUnmarshal(this, data, dis, bs);
454                }
455    
456                return data;
457            } else {
458                return null;
459            }
460        }
461    
462        public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
463            if (dis.readBoolean()) {
464    
465                byte dataType = dis.readByte();
466                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
467                if (dsm == null) {
468                    throw new IOException("Unknown data type: " + dataType);
469                }
470                DataStructure data = dsm.createObject();
471                dsm.looseUnmarshal(this, data, dis);
472                return data;
473    
474            } else {
475                return null;
476            }
477        }
478    
479        public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
480            dataOut.writeBoolean(o != null);
481            if (o != null) {
482                byte type = o.getDataStructureType();
483                dataOut.writeByte(type);
484                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
485                if (dsm == null) {
486                    throw new IOException("Unknown data type: " + type);
487                }
488                dsm.looseMarshal(this, o, dataOut);
489            }
490        }
491    
492        public void runMarshallCacheEvictionSweep() {
493            // Do we need to start evicting??
494            while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
495    
496                marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
497                marshallCache[nextMarshallCacheEvictionIndex] = null;
498    
499                nextMarshallCacheEvictionIndex++;
500                if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
501                    nextMarshallCacheEvictionIndex = 0;
502                }
503    
504            }
505        }
506    
507        public Short getMarshallCacheIndex(DataStructure o) {
508            return marshallCacheMap.get(o);
509        }
510    
511        public Short addToMarshallCache(DataStructure o) {
512            short i = nextMarshallCacheIndex++;
513            if (nextMarshallCacheIndex >= marshallCache.length) {
514                nextMarshallCacheIndex = 0;
515            }
516    
517            // We can only cache that item if there is space left.
518            if (marshallCacheMap.size() < marshallCache.length) {
519                marshallCache[i] = o;
520                Short index = new Short(i);
521                marshallCacheMap.put(o, index);
522                return index;
523            } else {
524                // Use -1 to indicate that the value was not cached due to cache
525                // being full.
526                return new Short((short)-1);
527            }
528        }
529    
530        public void setInUnmarshallCache(short index, DataStructure o) {
531    
532            // There was no space left in the cache, so we can't
533            // put this in the cache.
534            if (index == -1) {
535                return;
536            }
537    
538            unmarshallCache[index] = o;
539        }
540    
541        public DataStructure getFromUnmarshallCache(short index) {
542            return unmarshallCache[index];
543        }
544    
545        public void setStackTraceEnabled(boolean b) {
546            stackTraceEnabled = b;
547        }
548    
549        public boolean isStackTraceEnabled() {
550            return stackTraceEnabled;
551        }
552    
553        public boolean isTcpNoDelayEnabled() {
554            return tcpNoDelayEnabled;
555        }
556    
557        public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
558            this.tcpNoDelayEnabled = tcpNoDelayEnabled;
559        }
560    
561        public boolean isCacheEnabled() {
562            return cacheEnabled;
563        }
564    
565        public void setCacheEnabled(boolean cacheEnabled) {
566            this.cacheEnabled = cacheEnabled;
567        }
568    
569        public boolean isTightEncodingEnabled() {
570            return tightEncodingEnabled;
571        }
572    
573        public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
574            this.tightEncodingEnabled = tightEncodingEnabled;
575        }
576    
577        public boolean isSizePrefixDisabled() {
578            return sizePrefixDisabled;
579        }
580    
581        public void setSizePrefixDisabled(boolean prefixPacketSize) {
582            this.sizePrefixDisabled = prefixPacketSize;
583        }
584    
585        public void setPreferedWireFormatInfo(WireFormatInfo info) {
586            this.preferedWireFormatInfo = info;
587        }
588    
589        public WireFormatInfo getPreferedWireFormatInfo() {
590            return preferedWireFormatInfo;
591        }
592    
593        public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
594    
595            if (preferedWireFormatInfo == null) {
596                throw new IllegalStateException("Wireformat cannot not be renegotiated.");
597            }
598    
599            this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
600            info.setVersion(this.getVersion());
601    
602            this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
603            info.setStackTraceEnabled(this.stackTraceEnabled);
604    
605            this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
606            info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
607    
608            this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
609            info.setCacheEnabled(this.cacheEnabled);
610    
611            this.tightEncodingEnabled = info.isTightEncodingEnabled()
612                                        && preferedWireFormatInfo.isTightEncodingEnabled();
613            info.setTightEncodingEnabled(this.tightEncodingEnabled);
614    
615            this.sizePrefixDisabled = info.isSizePrefixDisabled()
616                                      && preferedWireFormatInfo.isSizePrefixDisabled();
617            info.setSizePrefixDisabled(this.sizePrefixDisabled);
618    
619            if (cacheEnabled) {
620    
621                int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
622                info.setCacheSize(size);
623    
624                if (size == 0) {
625                    size = MARSHAL_CACHE_SIZE;
626                }
627    
628                marshallCache = new DataStructure[size];
629                unmarshallCache = new DataStructure[size];
630                nextMarshallCacheIndex = 0;
631                nextMarshallCacheEvictionIndex = 0;
632                marshallCacheMap = new HashMap<DataStructure, Short>();
633            } else {
634                marshallCache = null;
635                unmarshallCache = null;
636                nextMarshallCacheIndex = 0;
637                nextMarshallCacheEvictionIndex = 0;
638                marshallCacheMap = null;
639            }
640    
641        }
642    
643        protected int min(int version1, int version2) {
644            if (version1 < version2 && version1 > 0 || version2 <= 0) {
645                return version1;
646            }
647            return version2;
648        }
649    }