001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.io.IOException; 020 import java.util.LinkedList; 021 import java.util.List; 022 import org.apache.activemq.ActiveMQMessageAudit; 023 import org.apache.activemq.Service; 024 import org.apache.activemq.broker.ConnectionContext; 025 import org.apache.activemq.broker.region.Destination; 026 import org.apache.activemq.broker.region.MessageReference; 027 import org.apache.activemq.command.MessageId; 028 import org.apache.activemq.usage.SystemUsage; 029 030 /** 031 * Interface to pending message (messages awaiting disptach to a consumer) 032 * cursor 033 * 034 * 035 */ 036 public interface PendingMessageCursor extends Service { 037 038 /** 039 * Add a destination 040 * 041 * @param context 042 * @param destination 043 * @throws Exception 044 */ 045 void add(ConnectionContext context, Destination destination) throws Exception; 046 047 /** 048 * remove a destination 049 * 050 * @param context 051 * @param destination 052 * @throws Exception 053 */ 054 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; 055 056 /** 057 * @return true if there are no pending messages 058 */ 059 boolean isEmpty(); 060 061 /** 062 * check if a Destination is Empty for this cursor 063 * 064 * @param destination 065 * @return true id the Destination is empty 066 */ 067 boolean isEmpty(Destination destination); 068 069 /** 070 * reset the cursor 071 */ 072 void reset(); 073 074 /** 075 * hint to the cursor to release any locks it might have grabbed after a 076 * reset 077 */ 078 void release(); 079 080 /** 081 * add message to await dispatch 082 * 083 * @param node 084 * @throws IOException 085 * @throws Exception 086 */ 087 void addMessageLast(MessageReference node) throws Exception; 088 /** 089 * add message to await dispatch - if it can 090 * 091 * @param node 092 * @param maxWaitTime 093 * @return true if successful 094 * @throws IOException 095 * @throws Exception 096 */ 097 boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception; 098 099 /** 100 * add message to await dispatch 101 * 102 * @param node 103 * @throws Exception 104 */ 105 void addMessageFirst(MessageReference node) throws Exception; 106 107 /** 108 * Add a message recovered from a retroactive policy 109 * 110 * @param node 111 * @throws Exception 112 */ 113 void addRecoveredMessage(MessageReference node) throws Exception; 114 115 /** 116 * @return true if there pending messages to dispatch 117 */ 118 boolean hasNext(); 119 120 /** 121 * @return the next pending message with its reference count increment 122 */ 123 MessageReference next(); 124 125 /** 126 * remove the message at the cursor position 127 */ 128 void remove(); 129 130 /** 131 * @return the number of pending messages 132 */ 133 int size(); 134 135 /** 136 * clear all pending messages 137 */ 138 void clear(); 139 140 /** 141 * Informs the Broker if the subscription needs to intervention to recover 142 * it's state e.g. DurableTopicSubscriber may do 143 * 144 * @return true if recovery required 145 */ 146 boolean isRecoveryRequired(); 147 148 /** 149 * @return the maximum batch size 150 */ 151 int getMaxBatchSize(); 152 153 /** 154 * Set the max batch size 155 * 156 * @param maxBatchSize 157 */ 158 void setMaxBatchSize(int maxBatchSize); 159 160 /** 161 * Give the cursor a hint that we are about to remove messages from memory 162 * only 163 */ 164 void resetForGC(); 165 166 /** 167 * remove a node 168 * 169 * @param node 170 */ 171 void remove(MessageReference node); 172 173 /** 174 * free up any internal buffers 175 */ 176 void gc(); 177 178 /** 179 * Set the UsageManager 180 * 181 * @param systemUsage 182 * @see org.apache.activemq.usage.SystemUsage 183 */ 184 void setSystemUsage(SystemUsage systemUsage); 185 186 /** 187 * @return the usageManager 188 */ 189 SystemUsage getSystemUsage(); 190 191 /** 192 * @return the memoryUsageHighWaterMark 193 */ 194 int getMemoryUsageHighWaterMark(); 195 196 /** 197 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 198 */ 199 void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark); 200 201 /** 202 * @return true if the cursor is full 203 */ 204 boolean isFull(); 205 206 /** 207 * @return true if the cursor has space to page messages into 208 */ 209 public boolean hasSpace(); 210 211 /** 212 * @return true if the cursor has buffered messages ready to deliver 213 */ 214 boolean hasMessagesBufferedToDeliver(); 215 216 /** 217 * destroy the cursor 218 * 219 * @throws Exception 220 */ 221 void destroy() throws Exception; 222 223 /** 224 * Page in a restricted number of messages and increment the reference count 225 * 226 * @param maxItems 227 * @return a list of paged in messages 228 */ 229 LinkedList<MessageReference> pageInList(int maxItems); 230 231 /** 232 * set the maximum number of producers to track at one time 233 * @param value 234 */ 235 void setMaxProducersToAudit(int value); 236 237 /** 238 * @return the maximum number of producers to audit 239 */ 240 int getMaxProducersToAudit(); 241 242 /** 243 * Set the maximum depth of message ids to track 244 * @param depth 245 */ 246 void setMaxAuditDepth(int depth); 247 248 /** 249 * @return the audit depth 250 */ 251 int getMaxAuditDepth(); 252 253 /** 254 * @return the enableAudit 255 */ 256 public boolean isEnableAudit(); 257 /** 258 * @param enableAudit the enableAudit to set 259 */ 260 public void setEnableAudit(boolean enableAudit); 261 262 /** 263 * @return true if the underlying state of this cursor 264 * disappears when the broker shuts down 265 */ 266 public boolean isTransient(); 267 268 269 /** 270 * set the audit 271 * @param audit 272 */ 273 public void setMessageAudit(ActiveMQMessageAudit audit); 274 275 276 /** 277 * @return the audit - could be null 278 */ 279 public ActiveMQMessageAudit getMessageAudit(); 280 281 /** 282 * use a cache to improve performance 283 * @param useCache 284 */ 285 public void setUseCache(boolean useCache); 286 287 /** 288 * @return true if a cache may be used 289 */ 290 public boolean isUseCache(); 291 292 /** 293 * remove from auditing the message id 294 * @param id 295 */ 296 public void rollback(MessageId id); 297 298 /** 299 * @return true if cache is being used 300 */ 301 public boolean isCacheEnabled(); 302 303 }