001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.transport.stomp; 018 019 import java.io.IOException; 020 import java.util.HashMap; 021 import java.util.Map; 022 023 import javax.jms.Destination; 024 import javax.jms.JMSException; 025 026 import com.thoughtworks.xstream.XStream; 027 import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; 028 import org.apache.activemq.advisory.AdvisorySupport; 029 import org.apache.activemq.command.*; 030 031 /** 032 * Implements ActiveMQ 4.0 translations 033 */ 034 public class LegacyFrameTranslator implements FrameTranslator { 035 036 037 public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { 038 final Map headers = command.getHeaders(); 039 final ActiveMQMessage msg; 040 /* 041 * To reduce the complexity of this method perhaps a Chain of Responsibility 042 * would be a better implementation 043 */ 044 if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) { 045 String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE); 046 if(intendedType.equalsIgnoreCase("text")){ 047 ActiveMQTextMessage text = new ActiveMQTextMessage(); 048 try { 049 text.setText(new String(command.getContent(), "UTF-8")); 050 } catch (Throwable e) { 051 throw new ProtocolException("Text could not bet set: " + e, false, e); 052 } 053 msg = text; 054 } else if(intendedType.equalsIgnoreCase("bytes")) { 055 ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage(); 056 byteMessage.writeBytes(command.getContent()); 057 msg = byteMessage; 058 } else { 059 throw new ProtocolException("Unsupported message type '"+intendedType+"'",false); 060 } 061 }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { 062 headers.remove(Stomp.Headers.CONTENT_LENGTH); 063 ActiveMQBytesMessage bm = new ActiveMQBytesMessage(); 064 bm.writeBytes(command.getContent()); 065 msg = bm; 066 } else { 067 ActiveMQTextMessage text = new ActiveMQTextMessage(); 068 try { 069 text.setText(new String(command.getContent(), "UTF-8")); 070 } catch (Throwable e) { 071 throw new ProtocolException("Text could not bet set: " + e, false, e); 072 } 073 msg = text; 074 } 075 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); 076 return msg; 077 } 078 079 public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { 080 StompFrame command = new StompFrame(); 081 command.setAction(Stomp.Responses.MESSAGE); 082 Map<String, String> headers = new HashMap<String, String>(25); 083 command.setHeaders(headers); 084 085 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); 086 087 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 088 089 ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy(); 090 command.setContent(msg.getText().getBytes("UTF-8")); 091 092 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 093 094 ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy(); 095 msg.setReadOnlyBody(true); 096 byte[] data = new byte[(int)msg.getBodyLength()]; 097 msg.readBytes(data); 098 099 headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length); 100 command.setContent(data); 101 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && 102 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { 103 104 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame( 105 converter, message, command, this); 106 107 String body = marshallAdvisory(message.getDataStructure()); 108 command.setContent(body.getBytes("UTF-8")); 109 } 110 return command; 111 } 112 113 public String convertDestination(ProtocolConverter converter, Destination d) { 114 if (d == null) { 115 return null; 116 } 117 ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; 118 String physicalName = activeMQDestination.getPhysicalName(); 119 120 String rc = converter.getCreatedTempDestinationName(activeMQDestination); 121 if( rc!=null ) { 122 return rc; 123 } 124 125 StringBuffer buffer = new StringBuffer(); 126 if (activeMQDestination.isQueue()) { 127 if (activeMQDestination.isTemporary()) { 128 buffer.append("/remote-temp-queue/"); 129 } else { 130 buffer.append("/queue/"); 131 } 132 } else { 133 if (activeMQDestination.isTemporary()) { 134 buffer.append("/remote-temp-topic/"); 135 } else { 136 buffer.append("/topic/"); 137 } 138 } 139 buffer.append(physicalName); 140 return buffer.toString(); 141 } 142 143 public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException { 144 if (name == null) { 145 return null; 146 } else if (name.startsWith("/queue/")) { 147 String qName = name.substring("/queue/".length(), name.length()); 148 return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE); 149 } else if (name.startsWith("/topic/")) { 150 String tName = name.substring("/topic/".length(), name.length()); 151 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); 152 } else if (name.startsWith("/remote-temp-queue/")) { 153 String tName = name.substring("/remote-temp-queue/".length(), name.length()); 154 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); 155 } else if (name.startsWith("/remote-temp-topic/")) { 156 String tName = name.substring("/remote-temp-topic/".length(), name.length()); 157 return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); 158 } else if (name.startsWith("/temp-queue/")) { 159 return converter.createTempQueue(name); 160 } else if (name.startsWith("/temp-topic/")) { 161 return converter.createTempTopic(name); 162 } else { 163 throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " 164 + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); 165 } 166 } 167 168 /** 169 * Return an Advisory message as a JSON formatted string 170 * @param ds 171 * @return 172 */ 173 protected String marshallAdvisory(final DataStructure ds) { 174 XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); 175 xstream.setMode(XStream.NO_REFERENCES); 176 xstream.aliasPackage("", "org.apache.activemq.command"); 177 return xstream.toXML(ds); 178 } 179 }