001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.Serializable;
021    import java.io.StringReader;
022    import java.io.StringWriter;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.advisory.AdvisorySupport;
029    import org.apache.activemq.broker.BrokerContext;
030    import org.apache.activemq.broker.BrokerContextAware;
031    import org.apache.activemq.command.ActiveMQMapMessage;
032    import org.apache.activemq.command.ActiveMQMessage;
033    import org.apache.activemq.command.ActiveMQObjectMessage;
034    import org.apache.activemq.command.DataStructure;
035    import org.apache.activemq.util.JettisonMappedXmlDriver;
036    import org.codehaus.jettison.mapped.Configuration;
037    import org.springframework.beans.BeansException;
038    import org.springframework.context.ApplicationContext;
039    
040    import com.thoughtworks.xstream.XStream;
041    import com.thoughtworks.xstream.io.HierarchicalStreamReader;
042    import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
043    import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
044    import com.thoughtworks.xstream.io.xml.XppReader;
045    
046    /**
047     * Frame translator implementation that uses XStream to convert messages to and
048     * from XML and JSON
049     *
050     * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
051     */
052    public class JmsFrameTranslator extends LegacyFrameTranslator implements
053                    BrokerContextAware {
054    
055            XStream xStream = null;
056            BrokerContext brokerContext;
057    
058            public ActiveMQMessage convertFrame(ProtocolConverter converter,
059                            StompFrame command) throws JMSException, ProtocolException {
060                    Map headers = command.getHeaders();
061                    ActiveMQMessage msg;
062                    String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
063                    if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
064                            msg = super.convertFrame(converter, command);
065                    } else {
066                            HierarchicalStreamReader in;
067    
068                            try {
069                                    String text = new String(command.getContent(), "UTF-8");
070                                    switch (Stomp.Transformations.getValue(transformation)) {
071                                    case JMS_OBJECT_XML:
072                                            in = new XppReader(new StringReader(text));
073                                            msg = createObjectMessage(in);
074                                            break;
075                                    case JMS_OBJECT_JSON:
076                                            in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
077                                            msg = createObjectMessage(in);
078                                            break;
079                                    case JMS_MAP_XML:
080                                            in = new XppReader(new StringReader(text));
081                                            msg = createMapMessage(in);
082                                            break;
083                                    case JMS_MAP_JSON:
084                                            in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
085                                            msg = createMapMessage(in);
086                                            break;
087                                    default:
088                                            throw new Exception("Unkown transformation: " + transformation);
089                                    }
090                            } catch (Throwable e) {
091                                    command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
092                                    msg = super.convertFrame(converter, command);
093                            }
094                    }
095                    FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
096                    return msg;
097            }
098    
099            public StompFrame convertMessage(ProtocolConverter converter,
100                            ActiveMQMessage message) throws IOException, JMSException {
101                    if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
102                            StompFrame command = new StompFrame();
103                            command.setAction(Stomp.Responses.MESSAGE);
104                            Map<String, String> headers = new HashMap<String, String>(25);
105                            command.setHeaders(headers);
106    
107                            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
108                                            converter, message, command, this);
109    
110                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
111                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
112                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
113                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
114                }
115    
116                ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
117                            command.setContent(marshall(msg.getObject(),
118                                            headers.get(Stomp.Headers.TRANSFORMATION))
119                                            .getBytes("UTF-8"));
120                            return command;
121    
122                    } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
123                            StompFrame command = new StompFrame();
124                            command.setAction(Stomp.Responses.MESSAGE);
125                            Map<String, String> headers = new HashMap<String, String>(25);
126                            command.setHeaders(headers);
127    
128                            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
129                                            converter, message, command, this);
130    
131                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
132                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
133                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
134                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
135                }
136    
137                            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
138                            command.setContent(marshall((Serializable)msg.getContentMap(),
139                                            headers.get(Stomp.Headers.TRANSFORMATION))
140                                            .getBytes("UTF-8"));
141                            return command;
142            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
143                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
144    
145                            StompFrame command = new StompFrame();
146                            command.setAction(Stomp.Responses.MESSAGE);
147                            Map<String, String> headers = new HashMap<String, String>(25);
148                            command.setHeaders(headers);
149    
150                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
151                                            converter, message, command, this);
152    
153                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
154                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
155                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
156                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
157                }
158    
159                String body = marshallAdvisory(message.getDataStructure(),
160                            headers.get(Stomp.Headers.TRANSFORMATION));
161                command.setContent(body.getBytes("UTF-8"));
162                return command;
163                    } else {
164                            return super.convertMessage(converter, message);
165                    }
166            }
167    
168            /**
169             * Marshalls the Object to a string using XML or JSON encoding
170             */
171            protected String marshall(Serializable object, String transformation)
172                            throws JMSException {
173                    StringWriter buffer = new StringWriter();
174                    HierarchicalStreamWriter out;
175                    if (transformation.toLowerCase().endsWith("json")) {
176                            out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
177                    } else {
178                            out = new PrettyPrintWriter(buffer);
179                    }
180                    getXStream().marshal(object, out);
181                    return buffer.toString();
182            }
183    
184            protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
185                    ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
186                    Object obj = getXStream().unmarshal(in);
187                    objMsg.setObject((Serializable) obj);
188                    return objMsg;
189            }
190    
191            protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
192                    ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
193                    Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
194                    for (String key : map.keySet()) {
195                            mapMsg.setObject(key, map.get(key));
196                    }
197                    return mapMsg;
198            }
199    
200        protected String marshallAdvisory(final DataStructure ds, String transformation) {
201    
202                    StringWriter buffer = new StringWriter();
203                    HierarchicalStreamWriter out;
204                    if (transformation.toLowerCase().endsWith("json")) {
205                            out = new JettisonMappedXmlDriver().createWriter(buffer);
206                    } else {
207                            out = new PrettyPrintWriter(buffer);
208                    }
209    
210                    XStream xstream = getXStream();
211            xstream.setMode(XStream.NO_REFERENCES);
212            xstream.aliasPackage("", "org.apache.activemq.command");
213                    xstream.marshal(ds, out);
214                    return buffer.toString();
215        }
216    
217            // Properties
218            // -------------------------------------------------------------------------
219            public XStream getXStream() {
220                    if (xStream == null) {
221                            xStream = createXStream();
222                    }
223                    return xStream;
224            }
225    
226            public void setXStream(XStream xStream) {
227                    this.xStream = xStream;
228            }
229    
230            // Implementation methods
231            // -------------------------------------------------------------------------
232            protected XStream createXStream() {
233                    XStream xstream = null;
234                    if (brokerContext != null) {
235                            Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
236                            for (XStream bean : beans.values()) {
237                                if (bean != null) {
238                                    xstream = bean;
239                                    break;
240                                }
241                            }
242                    }
243    
244                    if (xstream == null) {
245                            xstream = new XStream();
246                    }
247                    return xstream;
248    
249            }
250    
251            public void setBrokerContext(BrokerContext brokerContext) {
252                    this.brokerContext = brokerContext;
253            }
254    
255    }