001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import org.apache.activemq.state.CommandVisitor; 020 021 /** 022 * @openwire:marshaller code="22" 023 * 024 */ 025 public class MessageAck extends BaseCommand { 026 027 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; 028 029 /** 030 * Used to let the broker know that the message has been delivered to the 031 * client. Message will still be retained until an standard ack is received. 032 * This is used get the broker to send more messages past prefetch limits 033 * when an standard ack has not been sent. 034 */ 035 public static final byte DELIVERED_ACK_TYPE = 0; 036 037 /** 038 * The standard ack case where a client wants the message to be discarded. 039 */ 040 public static final byte STANDARD_ACK_TYPE = 2; 041 042 /** 043 * In case the client want's to explicitly let the broker know that a 044 * message was not processed and the message was considered a poison 045 * message. 046 */ 047 public static final byte POSION_ACK_TYPE = 1; 048 049 /** 050 * In case the client want's to explicitly let the broker know that a 051 * message was not processed and it was re-delivered to the consumer 052 * but it was not yet considered to be a poison message. The messageCount 053 * field will hold the number of times the message was re-delivered. 054 */ 055 public static final byte REDELIVERED_ACK_TYPE = 3; 056 057 /** 058 * The ack case where a client wants only an individual message to be discarded. 059 */ 060 public static final byte INDIVIDUAL_ACK_TYPE = 4; 061 062 /** 063 * The ack case where a durable topic subscription does not match a selector. 064 */ 065 public static final byte UNMATCHED_ACK_TYPE = 5; 066 067 protected byte ackType; 068 protected ConsumerId consumerId; 069 protected MessageId firstMessageId; 070 protected MessageId lastMessageId; 071 protected ActiveMQDestination destination; 072 protected TransactionId transactionId; 073 protected int messageCount; 074 protected Throwable poisonCause; 075 076 protected transient String consumerKey; 077 078 public MessageAck() { 079 } 080 081 public MessageAck(MessageDispatch md, byte ackType, int messageCount) { 082 this.ackType = ackType; 083 this.consumerId = md.getConsumerId(); 084 this.destination = md.getDestination(); 085 this.lastMessageId = md.getMessage().getMessageId(); 086 this.messageCount = messageCount; 087 } 088 089 public void copy(MessageAck copy) { 090 super.copy(copy); 091 copy.firstMessageId = firstMessageId; 092 copy.lastMessageId = lastMessageId; 093 copy.destination = destination; 094 copy.transactionId = transactionId; 095 copy.ackType = ackType; 096 copy.consumerId = consumerId; 097 } 098 099 public byte getDataStructureType() { 100 return DATA_STRUCTURE_TYPE; 101 } 102 103 public boolean isMessageAck() { 104 return true; 105 } 106 107 public boolean isPoisonAck() { 108 return ackType == POSION_ACK_TYPE; 109 } 110 111 public boolean isStandardAck() { 112 return ackType == STANDARD_ACK_TYPE; 113 } 114 115 public boolean isDeliveredAck() { 116 return ackType == DELIVERED_ACK_TYPE; 117 } 118 119 public boolean isRedeliveredAck() { 120 return ackType == REDELIVERED_ACK_TYPE; 121 } 122 123 public boolean isIndividualAck() { 124 return ackType == INDIVIDUAL_ACK_TYPE; 125 } 126 127 public boolean isUnmatchedAck() { 128 return ackType == UNMATCHED_ACK_TYPE; 129 } 130 131 /** 132 * @openwire:property version=1 cache=true 133 */ 134 public ActiveMQDestination getDestination() { 135 return destination; 136 } 137 138 public void setDestination(ActiveMQDestination destination) { 139 this.destination = destination; 140 } 141 142 /** 143 * @openwire:property version=1 cache=true 144 */ 145 public TransactionId getTransactionId() { 146 return transactionId; 147 } 148 149 public void setTransactionId(TransactionId transactionId) { 150 this.transactionId = transactionId; 151 } 152 153 public boolean isInTransaction() { 154 return transactionId != null; 155 } 156 157 /** 158 * @openwire:property version=1 cache=true 159 */ 160 public ConsumerId getConsumerId() { 161 return consumerId; 162 } 163 164 public void setConsumerId(ConsumerId consumerId) { 165 this.consumerId = consumerId; 166 } 167 168 /** 169 * @openwire:property version=1 170 */ 171 public byte getAckType() { 172 return ackType; 173 } 174 175 public void setAckType(byte ackType) { 176 this.ackType = ackType; 177 } 178 179 /** 180 * @openwire:property version=1 181 */ 182 public MessageId getFirstMessageId() { 183 return firstMessageId; 184 } 185 186 public void setFirstMessageId(MessageId firstMessageId) { 187 this.firstMessageId = firstMessageId; 188 } 189 190 /** 191 * @openwire:property version=1 192 */ 193 public MessageId getLastMessageId() { 194 return lastMessageId; 195 } 196 197 public void setLastMessageId(MessageId lastMessageId) { 198 this.lastMessageId = lastMessageId; 199 } 200 201 /** 202 * The number of messages being acknowledged in the range. 203 * 204 * @openwire:property version=1 205 */ 206 public int getMessageCount() { 207 return messageCount; 208 } 209 210 public void setMessageCount(int messageCount) { 211 this.messageCount = messageCount; 212 } 213 214 /** 215 * The cause of a poison ack, if a message listener 216 * throws an exception it will be recorded here 217 * 218 * @openwire:property version=7 219 */ 220 public Throwable getPoisonCause() { 221 return poisonCause; 222 } 223 224 public void setPoisonCause(Throwable poisonCause) { 225 this.poisonCause = poisonCause; 226 } 227 228 public Response visit(CommandVisitor visitor) throws Exception { 229 return visitor.processMessageAck(this); 230 } 231 232 /** 233 * A helper method to allow a single message ID to be acknowledged 234 */ 235 public void setMessageID(MessageId messageID) { 236 setFirstMessageId(messageID); 237 setLastMessageId(messageID); 238 setMessageCount(1); 239 } 240 241 }