001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.net.MalformedURLException; 020 import java.util.Enumeration; 021 022 import javax.jms.BytesMessage; 023 import javax.jms.Destination; 024 import javax.jms.JMSException; 025 import javax.jms.MapMessage; 026 import javax.jms.Message; 027 import javax.jms.MessageEOFException; 028 import javax.jms.ObjectMessage; 029 import javax.jms.Queue; 030 import javax.jms.StreamMessage; 031 import javax.jms.TemporaryQueue; 032 import javax.jms.TemporaryTopic; 033 import javax.jms.TextMessage; 034 import javax.jms.Topic; 035 036 import org.apache.activemq.blob.BlobDownloader; 037 import org.apache.activemq.blob.BlobUploader; 038 import org.apache.activemq.command.ActiveMQBlobMessage; 039 import org.apache.activemq.command.ActiveMQBytesMessage; 040 import org.apache.activemq.command.ActiveMQDestination; 041 import org.apache.activemq.command.ActiveMQMapMessage; 042 import org.apache.activemq.command.ActiveMQMessage; 043 import org.apache.activemq.command.ActiveMQObjectMessage; 044 import org.apache.activemq.command.ActiveMQQueue; 045 import org.apache.activemq.command.ActiveMQStreamMessage; 046 import org.apache.activemq.command.ActiveMQTempQueue; 047 import org.apache.activemq.command.ActiveMQTempTopic; 048 import org.apache.activemq.command.ActiveMQTextMessage; 049 import org.apache.activemq.command.ActiveMQTopic; 050 051 /** 052 * A helper class for converting normal JMS interfaces into ActiveMQ specific 053 * ones. 054 * 055 * 056 */ 057 public final class ActiveMQMessageTransformation { 058 059 private ActiveMQMessageTransformation() { 060 } 061 062 /** 063 * Creates a an available JMS message from another provider. 064 * 065 * @param destination - Destination to be converted into ActiveMQ's 066 * implementation. 067 * @return ActiveMQDestination - ActiveMQ's implementation of the 068 * destination. 069 * @throws JMSException if an error occurs 070 */ 071 public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { 072 ActiveMQDestination activeMQDestination = null; 073 074 if (destination != null) { 075 if (destination instanceof ActiveMQDestination) { 076 return (ActiveMQDestination)destination; 077 078 } else { 079 if (destination instanceof TemporaryQueue) { 080 activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName()); 081 } else if (destination instanceof TemporaryTopic) { 082 activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName()); 083 } else if (destination instanceof Queue) { 084 activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName()); 085 } else if (destination instanceof Topic) { 086 activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName()); 087 } 088 } 089 } 090 091 return activeMQDestination; 092 } 093 094 /** 095 * Creates a fast shallow copy of the current ActiveMQMessage or creates a 096 * whole new message instance from an available JMS message from another 097 * provider. 098 * 099 * @param message - Message to be converted into ActiveMQ's implementation. 100 * @param connection 101 * @return ActiveMQMessage - ActiveMQ's implementation object of the 102 * message. 103 * @throws JMSException if an error occurs 104 */ 105 public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection) 106 throws JMSException { 107 if (message instanceof ActiveMQMessage) { 108 return (ActiveMQMessage)message; 109 110 } else { 111 ActiveMQMessage activeMessage = null; 112 113 if (message instanceof BytesMessage) { 114 BytesMessage bytesMsg = (BytesMessage)message; 115 bytesMsg.reset(); 116 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 117 msg.setConnection(connection); 118 try { 119 for (;;) { 120 // Reads a byte from the message stream until the stream 121 // is empty 122 msg.writeByte(bytesMsg.readByte()); 123 } 124 } catch (MessageEOFException e) { 125 // if an end of message stream as expected 126 } catch (JMSException e) { 127 } 128 129 activeMessage = msg; 130 } else if (message instanceof MapMessage) { 131 MapMessage mapMsg = (MapMessage)message; 132 ActiveMQMapMessage msg = new ActiveMQMapMessage(); 133 msg.setConnection(connection); 134 Enumeration iter = mapMsg.getMapNames(); 135 136 while (iter.hasMoreElements()) { 137 String name = iter.nextElement().toString(); 138 msg.setObject(name, mapMsg.getObject(name)); 139 } 140 141 activeMessage = msg; 142 } else if (message instanceof ObjectMessage) { 143 ObjectMessage objMsg = (ObjectMessage)message; 144 ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); 145 msg.setConnection(connection); 146 msg.setObject(objMsg.getObject()); 147 msg.storeContent(); 148 activeMessage = msg; 149 } else if (message instanceof StreamMessage) { 150 StreamMessage streamMessage = (StreamMessage)message; 151 streamMessage.reset(); 152 ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); 153 msg.setConnection(connection); 154 Object obj = null; 155 156 try { 157 while ((obj = streamMessage.readObject()) != null) { 158 msg.writeObject(obj); 159 } 160 } catch (MessageEOFException e) { 161 // if an end of message stream as expected 162 } catch (JMSException e) { 163 } 164 165 activeMessage = msg; 166 } else if (message instanceof TextMessage) { 167 TextMessage textMsg = (TextMessage)message; 168 ActiveMQTextMessage msg = new ActiveMQTextMessage(); 169 msg.setConnection(connection); 170 msg.setText(textMsg.getText()); 171 activeMessage = msg; 172 } else if (message instanceof BlobMessage) { 173 BlobMessage blobMessage = (BlobMessage)message; 174 ActiveMQBlobMessage msg = new ActiveMQBlobMessage(); 175 msg.setConnection(connection); 176 msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy())); 177 try { 178 msg.setURL(blobMessage.getURL()); 179 } catch (MalformedURLException e) { 180 181 } 182 activeMessage = msg; 183 } else { 184 activeMessage = new ActiveMQMessage(); 185 activeMessage.setConnection(connection); 186 } 187 188 copyProperties(message, activeMessage); 189 190 return activeMessage; 191 } 192 } 193 194 /** 195 * Copies the standard JMS and user defined properties from the givem 196 * message to the specified message 197 * 198 * @param fromMessage the message to take the properties from 199 * @param toMessage the message to add the properties to 200 * @throws JMSException 201 */ 202 public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { 203 toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); 204 toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); 205 toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); 206 toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); 207 toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); 208 toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); 209 toMessage.setJMSType(fromMessage.getJMSType()); 210 toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); 211 toMessage.setJMSPriority(fromMessage.getJMSPriority()); 212 toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); 213 214 Enumeration propertyNames = fromMessage.getPropertyNames(); 215 216 while (propertyNames.hasMoreElements()) { 217 String name = propertyNames.nextElement().toString(); 218 Object obj = fromMessage.getObjectProperty(name); 219 toMessage.setObjectProperty(name, obj); 220 } 221 } 222 }