001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.LinkedList;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatch;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.TransactionId;
036    
037    /**
038     * Keeps track of the STOMP subscription so that acking is correctly done.
039     *
040     * @author <a href="http://hiramchirino.com">chirino</a>
041     */
042    public class StompSubscription {
043    
044        public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045        public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046        public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047    
048        private final ProtocolConverter protocolConverter;
049        private final String subscriptionId;
050        private final ConsumerInfo consumerInfo;
051    
052        private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053        private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054    
055        private String ackMode = AUTO_ACK;
056        private ActiveMQDestination destination;
057        private String transformation;
058    
059    
060        public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
061            this.protocolConverter = stompTransport;
062            this.subscriptionId = subscriptionId;
063            this.consumerInfo = consumerInfo;
064            this.transformation = transformation;
065        }
066    
067        void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
068            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
069            if (ackMode == CLIENT_ACK) {
070                synchronized (this) {
071                    dispatchedMessage.put(message.getMessageId(), md);
072                }
073            } else if (ackMode == INDIVIDUAL_ACK) {
074                synchronized (this) {
075                    dispatchedMessage.put(message.getMessageId(), md);
076                }
077            } else if (ackMode == AUTO_ACK) {
078                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
079                protocolConverter.getStompTransport().sendToActiveMQ(ack);
080            }
081    
082            boolean ignoreTransformation = false;
083    
084            if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
085                    message.setReadOnlyProperties(false);
086                    message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
087            } else {
088                    if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
089                            ignoreTransformation = true;
090                    }
091            }
092    
093            StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
094    
095            command.setAction(Stomp.Responses.MESSAGE);
096            if (subscriptionId != null) {
097                command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
098            }
099    
100            protocolConverter.getStompTransport().sendToStomp(command);
101        }
102    
103        synchronized void onStompAbort(TransactionId transactionId) {
104            unconsumedMessage.clear();
105        }
106    
107        synchronized void onStompCommit(TransactionId transactionId) {
108            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
109                Map.Entry entry = (Entry)iter.next();
110                MessageId id = (MessageId)entry.getKey();
111                MessageDispatch msg = (MessageDispatch)entry.getValue();
112                if (unconsumedMessage.contains(msg)) {
113                    iter.remove();
114                }
115            }
116            unconsumedMessage.clear();
117        }
118    
119        synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
120    
121            MessageId msgId = new MessageId(messageId);
122    
123            if (!dispatchedMessage.containsKey(msgId)) {
124                return null;
125            }
126    
127            MessageAck ack = new MessageAck();
128            ack.setDestination(consumerInfo.getDestination());
129            ack.setConsumerId(consumerInfo.getConsumerId());
130    
131            if (ackMode == CLIENT_ACK) {
132                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
133                int count = 0;
134                for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135    
136                    Map.Entry entry = (Entry)iter.next();
137                    MessageId id = (MessageId)entry.getKey();
138                    MessageDispatch msg = (MessageDispatch)entry.getValue();
139    
140                    if (ack.getFirstMessageId() == null) {
141                        ack.setFirstMessageId(id);
142                    }
143    
144                    if (transactionId != null) {
145                            if (!unconsumedMessage.contains(msg)) {
146                                    unconsumedMessage.add(msg);
147                            }
148                    } else {
149                            iter.remove();
150                    }
151    
152    
153                    count++;
154    
155                    if (id.equals(msgId)) {
156                        ack.setLastMessageId(id);
157                        break;
158                    }
159    
160                }
161                ack.setMessageCount(count);
162                if (transactionId != null) {
163                    ack.setTransactionId(transactionId);
164                }
165            }
166            else if (ackMode == INDIVIDUAL_ACK) {
167                ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
168                ack.setMessageID(msgId);
169                if (transactionId != null) {
170                    unconsumedMessage.add(dispatchedMessage.get(msgId));
171                    ack.setTransactionId(transactionId);
172                }
173                dispatchedMessage.remove(msgId);
174            }
175            return ack;
176        }
177    
178        public String getAckMode() {
179            return ackMode;
180        }
181    
182        public void setAckMode(String ackMode) {
183            this.ackMode = ackMode;
184        }
185    
186        public String getSubscriptionId() {
187            return subscriptionId;
188        }
189    
190        public void setDestination(ActiveMQDestination destination) {
191            this.destination = destination;
192        }
193    
194        public ActiveMQDestination getDestination() {
195            return destination;
196        }
197    
198        public ConsumerInfo getConsumerInfo() {
199            return consumerInfo;
200        }
201    
202    }