001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.IOException; 020 import java.io.Serializable; 021 import java.io.StringReader; 022 import java.io.StringWriter; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.advisory.AdvisorySupport; 029 import org.apache.activemq.broker.BrokerContext; 030 import org.apache.activemq.broker.BrokerContextAware; 031 import org.apache.activemq.command.ActiveMQMapMessage; 032 import org.apache.activemq.command.ActiveMQMessage; 033 import org.apache.activemq.command.ActiveMQObjectMessage; 034 import org.apache.activemq.command.DataStructure; 035 import org.apache.activemq.util.JettisonMappedXmlDriver; 036 import org.codehaus.jettison.mapped.Configuration; 037 import org.springframework.beans.BeansException; 038 import org.springframework.context.ApplicationContext; 039 040 import com.thoughtworks.xstream.XStream; 041 import com.thoughtworks.xstream.io.HierarchicalStreamReader; 042 import com.thoughtworks.xstream.io.HierarchicalStreamWriter; 043 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; 044 import com.thoughtworks.xstream.io.xml.XppReader; 045 046 /** 047 * Frame translator implementation that uses XStream to convert messages to and 048 * from XML and JSON 049 * 050 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a> 051 */ 052 public class JmsFrameTranslator extends LegacyFrameTranslator implements 053 BrokerContextAware { 054 055 XStream xStream = null; 056 BrokerContext brokerContext; 057 058 public ActiveMQMessage convertFrame(ProtocolConverter converter, 059 StompFrame command) throws JMSException, ProtocolException { 060 Map headers = command.getHeaders(); 061 ActiveMQMessage msg; 062 String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION); 063 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) { 064 msg = super.convertFrame(converter, command); 065 } else { 066 HierarchicalStreamReader in; 067 068 try { 069 String text = new String(command.getContent(), "UTF-8"); 070 switch (Stomp.Transformations.getValue(transformation)) { 071 case JMS_OBJECT_XML: 072 in = new XppReader(new StringReader(text)); 073 msg = createObjectMessage(in); 074 break; 075 case JMS_OBJECT_JSON: 076 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 077 msg = createObjectMessage(in); 078 break; 079 case JMS_MAP_XML: 080 in = new XppReader(new StringReader(text)); 081 msg = createMapMessage(in); 082 break; 083 case JMS_MAP_JSON: 084 in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); 085 msg = createMapMessage(in); 086 break; 087 default: 088 throw new Exception("Unkown transformation: " + transformation); 089 } 090 } catch (Throwable e) { 091 command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage()); 092 msg = super.convertFrame(converter, command); 093 } 094 } 095 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 096 return msg; 097 } 098 099 public StompFrame convertMessage(ProtocolConverter converter, 100 ActiveMQMessage message) throws IOException, JMSException { 101 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { 102 StompFrame command = new StompFrame(); 103 command.setAction(Stomp.Responses.MESSAGE); 104 Map<String, String> headers = new HashMap<String, String>(25); 105 command.setHeaders(headers); 106 107 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 108 converter, message, command, this); 109 110 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 111 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString()); 112 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 113 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString()); 114 } 115 116 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); 117 command.setContent(marshall(msg.getObject(), 118 headers.get(Stomp.Headers.TRANSFORMATION)) 119 .getBytes("UTF-8")); 120 return command; 121 122 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 123 StompFrame command = new StompFrame(); 124 command.setAction(Stomp.Responses.MESSAGE); 125 Map<String, String> headers = new HashMap<String, String>(25); 126 command.setHeaders(headers); 127 128 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 129 converter, message, command, this); 130 131 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 132 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString()); 133 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 134 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString()); 135 } 136 137 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 138 command.setContent(marshall((Serializable)msg.getContentMap(), 139 headers.get(Stomp.Headers.TRANSFORMATION)) 140 .getBytes("UTF-8")); 141 return command; 142 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && 143 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 144 145 StompFrame command = new StompFrame(); 146 command.setAction(Stomp.Responses.MESSAGE); 147 Map<String, String> headers = new HashMap<String, String>(25); 148 command.setHeaders(headers); 149 150 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 151 converter, message, command, this); 152 153 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) { 154 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString()); 155 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) { 156 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString()); 157 } 158 159 String body = marshallAdvisory(message.getDataStructure(), 160 headers.get(Stomp.Headers.TRANSFORMATION)); 161 command.setContent(body.getBytes("UTF-8")); 162 return command; 163 } else { 164 return super.convertMessage(converter, message); 165 } 166 } 167 168 /** 169 * Marshalls the Object to a string using XML or JSON encoding 170 */ 171 protected String marshall(Serializable object, String transformation) 172 throws JMSException { 173 StringWriter buffer = new StringWriter(); 174 HierarchicalStreamWriter out; 175 if (transformation.toLowerCase().endsWith("json")) { 176 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer); 177 } else { 178 out = new PrettyPrintWriter(buffer); 179 } 180 getXStream().marshal(object, out); 181 return buffer.toString(); 182 } 183 184 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { 185 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); 186 Object obj = getXStream().unmarshal(in); 187 objMsg.setObject((Serializable) obj); 188 return objMsg; 189 } 190 191 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { 192 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); 193 Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in); 194 for (String key : map.keySet()) { 195 mapMsg.setObject(key, map.get(key)); 196 } 197 return mapMsg; 198 } 199 200 protected String marshallAdvisory(final DataStructure ds, String transformation) { 201 202 StringWriter buffer = new StringWriter(); 203 HierarchicalStreamWriter out; 204 if (transformation.toLowerCase().endsWith("json")) { 205 out = new JettisonMappedXmlDriver().createWriter(buffer); 206 } else { 207 out = new PrettyPrintWriter(buffer); 208 } 209 210 XStream xstream = getXStream(); 211 xstream.setMode(XStream.NO_REFERENCES); 212 xstream.aliasPackage("", "org.apache.activemq.command"); 213 xstream.marshal(ds, out); 214 return buffer.toString(); 215 } 216 217 // Properties 218 // ------------------------------------------------------------------------- 219 public XStream getXStream() { 220 if (xStream == null) { 221 xStream = createXStream(); 222 } 223 return xStream; 224 } 225 226 public void setXStream(XStream xStream) { 227 this.xStream = xStream; 228 } 229 230 // Implementation methods 231 // ------------------------------------------------------------------------- 232 protected XStream createXStream() { 233 XStream xstream = null; 234 if (brokerContext != null) { 235 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class); 236 for (XStream bean : beans.values()) { 237 if (bean != null) { 238 xstream = bean; 239 break; 240 } 241 } 242 } 243 244 if (xstream == null) { 245 xstream = new XStream(); 246 } 247 return xstream; 248 249 } 250 251 public void setBrokerContext(BrokerContext brokerContext) { 252 this.brokerContext = brokerContext; 253 } 254 255 }