001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.stomp;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    
029    public class StompConnection {
030    
031        public static final long RECEIVE_TIMEOUT = 10000;
032    
033        private Socket stompSocket;
034        private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035    
036        public void open(String host, int port) throws IOException, UnknownHostException {
037            open(new Socket(host, port));
038        }
039    
040        public void open(Socket socket) {
041            stompSocket = socket;
042        }
043    
044        public void close() throws IOException {
045            if (stompSocket != null) {
046                stompSocket.close();
047                stompSocket = null;
048            }
049        }
050    
051        public void sendFrame(String data) throws Exception {
052            byte[] bytes = data.getBytes("UTF-8");
053            OutputStream outputStream = stompSocket.getOutputStream();
054            outputStream.write(bytes);
055            outputStream.write(0);
056            outputStream.flush();
057        }
058    
059        public void sendFrame(String frame, byte[] data) throws Exception {
060            byte[] bytes = frame.getBytes("UTF-8");
061            OutputStream outputStream = stompSocket.getOutputStream();
062            outputStream.write(bytes);
063            outputStream.write(data);
064            outputStream.write(0);
065            outputStream.flush();
066        }
067    
068        public StompFrame receive() throws Exception {
069            return receive(RECEIVE_TIMEOUT);
070        }
071    
072        public StompFrame receive(long timeOut) throws Exception {
073            stompSocket.setSoTimeout((int)timeOut);
074            InputStream is = stompSocket.getInputStream();
075            StompWireFormat wf = new StompWireFormat();
076            DataInputStream dis = new DataInputStream(is);
077            return (StompFrame)wf.unmarshal(dis);
078        }
079    
080        public String receiveFrame() throws Exception {
081            return receiveFrame(RECEIVE_TIMEOUT);
082        }
083    
084        public String receiveFrame(long timeOut) throws Exception {
085            stompSocket.setSoTimeout((int)timeOut);
086            InputStream is = stompSocket.getInputStream();
087            int c = 0;
088            for (;;) {
089                c = is.read();
090                if (c < 0) {
091                    throw new IOException("socket closed.");
092                } else if (c == 0) {
093                    c = is.read();
094                    if (c == '\n') {
095                        // end of frame
096                        return stringFromBuffer(inputBuffer);
097                    } else {
098                        inputBuffer.write(0);
099                        inputBuffer.write(c);
100                    }
101                } else {
102                    inputBuffer.write(c);
103                }
104            }
105        }
106    
107            private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108                byte[] ba = inputBuffer.toByteArray();
109            inputBuffer.reset();
110            return new String(ba, "UTF-8");
111        }
112    
113        public Socket getStompSocket() {
114                    return stompSocket;
115            }
116    
117            public void setStompSocket(Socket stompSocket) {
118                    this.stompSocket = stompSocket;
119            }
120    
121        public void connect(String username, String password) throws Exception {
122            connect(username, password, null);
123        }
124    
125        public void connect(String username, String password, String client) throws Exception {
126            HashMap<String, String> headers = new HashMap();
127            headers.put("login", username);
128            headers.put("passcode", password);
129            if (client != null) {
130                    headers.put("client-id", client);
131            }
132            StompFrame frame = new StompFrame("CONNECT", headers);
133            sendFrame(frame.format());
134    
135            StompFrame connect = receive();
136            if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
137                    throw new Exception ("Not connected: " + connect.getBody());
138            }
139        }
140    
141        public void disconnect() throws Exception {
142            StompFrame frame = new StompFrame("DISCONNECT");
143            sendFrame(frame.format());
144        }
145    
146        public void send(String destination, String message) throws Exception {
147            send(destination, message, null, null);
148        }
149    
150        public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
151            if (headers == null) {
152                    headers = new HashMap<String, String>();
153            }
154            headers.put("destination", destination);
155            if (transaction != null) {
156                    headers.put("transaction", transaction);
157            }
158            StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
159            sendFrame(frame.format());
160        }
161    
162        public void subscribe(String destination) throws Exception {
163            subscribe(destination, null, null);
164        }
165    
166        public void subscribe(String destination, String ack) throws Exception {
167            subscribe(destination, ack, new HashMap<String, String>());
168        }
169    
170        public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
171                    if (headers == null) {
172                            headers = new HashMap<String, String>();
173                    }
174                    headers.put("destination", destination);
175            if (ack != null) {
176                    headers.put("ack", ack);
177            }
178            StompFrame frame = new StompFrame("SUBSCRIBE", headers);
179            sendFrame(frame.format());
180        }
181    
182        public void unsubscribe(String destination) throws Exception {
183            unsubscribe(destination, null);
184        }
185    
186        public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
187                    if (headers == null) {
188                            headers = new HashMap<String, String>();
189                    }
190                    headers.put("destination", destination);
191            StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
192            sendFrame(frame.format());
193        }    
194        
195        public void begin(String transaction) throws Exception {
196            HashMap<String, String> headers = new HashMap<String, String>();
197            headers.put("transaction", transaction);
198            StompFrame frame = new StompFrame("BEGIN", headers);
199            sendFrame(frame.format());
200        }
201    
202        public void abort(String transaction) throws Exception {
203            HashMap<String, String> headers = new HashMap<String, String>();
204            headers.put("transaction", transaction);
205            StompFrame frame = new StompFrame("ABORT", headers);
206            sendFrame(frame.format());
207        }
208    
209        public void commit(String transaction) throws Exception {
210            HashMap<String, String> headers = new HashMap<String, String>();
211            headers.put("transaction", transaction);
212            StompFrame frame = new StompFrame("COMMIT", headers);
213            sendFrame(frame.format());
214        }
215    
216        public void ack(StompFrame frame) throws Exception {
217            ack(frame.getHeaders().get("message-id"), null);
218        }
219    
220        public void ack(StompFrame frame, String transaction) throws Exception {
221            ack(frame.getHeaders().get("message-id"), transaction);
222        }
223    
224        public void ack(String messageId) throws Exception {
225            ack(messageId, null);
226        }
227    
228        public void ack(String messageId, String transaction) throws Exception {
229            HashMap<String, String> headers = new HashMap<String, String>();
230            headers.put("message-id", messageId);
231            if (transaction != null)
232                    headers.put("transaction", transaction);
233            StompFrame frame = new StompFrame("ACK", headers);
234            sendFrame(frame.format());
235        }
236    
237        protected String appendHeaders(HashMap<String, Object> headers) {
238            StringBuffer result = new StringBuffer();
239            for (String key : headers.keySet()) {
240                    result.append(key + ":" + headers.get(key) + "\n");
241            }
242            result.append("\n");
243            return result.toString();
244        }
245    
246    }