001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.activemq.transport.stomp; 019 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 import java.net.Socket; 026 import java.net.UnknownHostException; 027 import java.util.HashMap; 028 029 public class StompConnection { 030 031 public static final long RECEIVE_TIMEOUT = 10000; 032 033 private Socket stompSocket; 034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); 035 036 public void open(String host, int port) throws IOException, UnknownHostException { 037 open(new Socket(host, port)); 038 } 039 040 public void open(Socket socket) { 041 stompSocket = socket; 042 } 043 044 public void close() throws IOException { 045 if (stompSocket != null) { 046 stompSocket.close(); 047 stompSocket = null; 048 } 049 } 050 051 public void sendFrame(String data) throws Exception { 052 byte[] bytes = data.getBytes("UTF-8"); 053 OutputStream outputStream = stompSocket.getOutputStream(); 054 outputStream.write(bytes); 055 outputStream.write(0); 056 outputStream.flush(); 057 } 058 059 public void sendFrame(String frame, byte[] data) throws Exception { 060 byte[] bytes = frame.getBytes("UTF-8"); 061 OutputStream outputStream = stompSocket.getOutputStream(); 062 outputStream.write(bytes); 063 outputStream.write(data); 064 outputStream.write(0); 065 outputStream.flush(); 066 } 067 068 public StompFrame receive() throws Exception { 069 return receive(RECEIVE_TIMEOUT); 070 } 071 072 public StompFrame receive(long timeOut) throws Exception { 073 stompSocket.setSoTimeout((int)timeOut); 074 InputStream is = stompSocket.getInputStream(); 075 StompWireFormat wf = new StompWireFormat(); 076 DataInputStream dis = new DataInputStream(is); 077 return (StompFrame)wf.unmarshal(dis); 078 } 079 080 public String receiveFrame() throws Exception { 081 return receiveFrame(RECEIVE_TIMEOUT); 082 } 083 084 public String receiveFrame(long timeOut) throws Exception { 085 stompSocket.setSoTimeout((int)timeOut); 086 InputStream is = stompSocket.getInputStream(); 087 int c = 0; 088 for (;;) { 089 c = is.read(); 090 if (c < 0) { 091 throw new IOException("socket closed."); 092 } else if (c == 0) { 093 c = is.read(); 094 if (c == '\n') { 095 // end of frame 096 return stringFromBuffer(inputBuffer); 097 } else { 098 inputBuffer.write(0); 099 inputBuffer.write(c); 100 } 101 } else { 102 inputBuffer.write(c); 103 } 104 } 105 } 106 107 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception { 108 byte[] ba = inputBuffer.toByteArray(); 109 inputBuffer.reset(); 110 return new String(ba, "UTF-8"); 111 } 112 113 public Socket getStompSocket() { 114 return stompSocket; 115 } 116 117 public void setStompSocket(Socket stompSocket) { 118 this.stompSocket = stompSocket; 119 } 120 121 public void connect(String username, String password) throws Exception { 122 connect(username, password, null); 123 } 124 125 public void connect(String username, String password, String client) throws Exception { 126 HashMap<String, String> headers = new HashMap(); 127 headers.put("login", username); 128 headers.put("passcode", password); 129 if (client != null) { 130 headers.put("client-id", client); 131 } 132 StompFrame frame = new StompFrame("CONNECT", headers); 133 sendFrame(frame.format()); 134 135 StompFrame connect = receive(); 136 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { 137 throw new Exception ("Not connected: " + connect.getBody()); 138 } 139 } 140 141 public void disconnect() throws Exception { 142 StompFrame frame = new StompFrame("DISCONNECT"); 143 sendFrame(frame.format()); 144 } 145 146 public void send(String destination, String message) throws Exception { 147 send(destination, message, null, null); 148 } 149 150 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception { 151 if (headers == null) { 152 headers = new HashMap<String, String>(); 153 } 154 headers.put("destination", destination); 155 if (transaction != null) { 156 headers.put("transaction", transaction); 157 } 158 StompFrame frame = new StompFrame("SEND", headers, message.getBytes()); 159 sendFrame(frame.format()); 160 } 161 162 public void subscribe(String destination) throws Exception { 163 subscribe(destination, null, null); 164 } 165 166 public void subscribe(String destination, String ack) throws Exception { 167 subscribe(destination, ack, new HashMap<String, String>()); 168 } 169 170 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception { 171 if (headers == null) { 172 headers = new HashMap<String, String>(); 173 } 174 headers.put("destination", destination); 175 if (ack != null) { 176 headers.put("ack", ack); 177 } 178 StompFrame frame = new StompFrame("SUBSCRIBE", headers); 179 sendFrame(frame.format()); 180 } 181 182 public void unsubscribe(String destination) throws Exception { 183 unsubscribe(destination, null); 184 } 185 186 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception { 187 if (headers == null) { 188 headers = new HashMap<String, String>(); 189 } 190 headers.put("destination", destination); 191 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers); 192 sendFrame(frame.format()); 193 } 194 195 public void begin(String transaction) throws Exception { 196 HashMap<String, String> headers = new HashMap<String, String>(); 197 headers.put("transaction", transaction); 198 StompFrame frame = new StompFrame("BEGIN", headers); 199 sendFrame(frame.format()); 200 } 201 202 public void abort(String transaction) throws Exception { 203 HashMap<String, String> headers = new HashMap<String, String>(); 204 headers.put("transaction", transaction); 205 StompFrame frame = new StompFrame("ABORT", headers); 206 sendFrame(frame.format()); 207 } 208 209 public void commit(String transaction) throws Exception { 210 HashMap<String, String> headers = new HashMap<String, String>(); 211 headers.put("transaction", transaction); 212 StompFrame frame = new StompFrame("COMMIT", headers); 213 sendFrame(frame.format()); 214 } 215 216 public void ack(StompFrame frame) throws Exception { 217 ack(frame.getHeaders().get("message-id"), null); 218 } 219 220 public void ack(StompFrame frame, String transaction) throws Exception { 221 ack(frame.getHeaders().get("message-id"), transaction); 222 } 223 224 public void ack(String messageId) throws Exception { 225 ack(messageId, null); 226 } 227 228 public void ack(String messageId, String transaction) throws Exception { 229 HashMap<String, String> headers = new HashMap<String, String>(); 230 headers.put("message-id", messageId); 231 if (transaction != null) 232 headers.put("transaction", transaction); 233 StompFrame frame = new StompFrame("ACK", headers); 234 sendFrame(frame.format()); 235 } 236 237 protected String appendHeaders(HashMap<String, Object> headers) { 238 StringBuffer result = new StringBuffer(); 239 for (String key : headers.keySet()) { 240 result.append(key + ":" + headers.get(key) + "\n"); 241 } 242 result.append("\n"); 243 return result.toString(); 244 } 245 246 }