001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.region.cursors; 018 019 import java.util.Iterator; 020 import org.apache.activemq.broker.region.Destination; 021 import org.apache.activemq.broker.region.MessageReference; 022 import org.apache.activemq.command.Message; 023 import org.apache.activemq.command.MessageId; 024 import org.apache.activemq.store.MessageRecoveryListener; 025 import org.slf4j.Logger; 026 import org.slf4j.LoggerFactory; 027 028 /** 029 * Store based cursor 030 * 031 */ 032 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 033 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 034 protected final Destination regionDestination; 035 private final PendingList batchList; 036 private Iterator<MessageReference> iterator = null; 037 protected boolean batchResetNeeded = true; 038 private boolean storeHasMessages = false; 039 protected int size; 040 private MessageId lastCachedId; 041 private boolean hadSpace = false; 042 043 protected AbstractStoreCursor(Destination destination) { 044 super((destination != null ? destination.isPrioritizedMessages():false)); 045 this.regionDestination=destination; 046 if (this.prioritizedMessages) { 047 this.batchList= new PrioritizedPendingList(); 048 } else { 049 this.batchList = new OrderedPendingList(); 050 } 051 } 052 053 054 public final synchronized void start() throws Exception{ 055 if (!isStarted()) { 056 clear(); 057 super.start(); 058 resetBatch(); 059 this.size = getStoreSize(); 060 this.storeHasMessages=this.size > 0; 061 setCacheEnabled(!this.storeHasMessages&&useCache); 062 } 063 } 064 065 066 public final synchronized void stop() throws Exception { 067 resetBatch(); 068 super.stop(); 069 gc(); 070 } 071 072 073 public final boolean recoverMessage(Message message) throws Exception { 074 return recoverMessage(message,false); 075 } 076 077 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 078 boolean recovered = false; 079 if (recordUniqueId(message.getMessageId())) { 080 if (!cached) { 081 message.setRegionDestination(regionDestination); 082 if( message.getMemoryUsage()==null ) { 083 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 084 } 085 } 086 message.incrementReferenceCount(); 087 batchList.addMessageLast(message); 088 clearIterator(true); 089 recovered = true; 090 storeHasMessages = true; 091 } else { 092 /* 093 * we should expect to get these - as the message is recorded as it before it goes into 094 * the cache. If subsequently, we pull out that message from the store (before its deleted) 095 * it will be a duplicate - but should be ignored 096 */ 097 if (LOG.isTraceEnabled()) { 098 LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority()); 099 } 100 } 101 return recovered; 102 } 103 104 105 public final void reset() { 106 if (batchList.isEmpty()) { 107 try { 108 fillBatch(); 109 } catch (Exception e) { 110 LOG.error(this + " - Failed to fill batch", e); 111 throw new RuntimeException(e); 112 } 113 } 114 clearIterator(true); 115 size(); 116 } 117 118 119 public synchronized void release() { 120 clearIterator(false); 121 } 122 123 private synchronized void clearIterator(boolean ensureIterator) { 124 boolean haveIterator = this.iterator != null; 125 this.iterator=null; 126 if(haveIterator&&ensureIterator) { 127 ensureIterator(); 128 } 129 } 130 131 private synchronized void ensureIterator() { 132 if(this.iterator==null) { 133 this.iterator=this.batchList.iterator(); 134 } 135 } 136 137 138 public final void finished() { 139 } 140 141 142 public final synchronized boolean hasNext() { 143 if (batchList.isEmpty()) { 144 try { 145 fillBatch(); 146 } catch (Exception e) { 147 LOG.error(this + " - Failed to fill batch", e); 148 throw new RuntimeException(e); 149 } 150 } 151 ensureIterator(); 152 return this.iterator.hasNext(); 153 } 154 155 156 public final synchronized MessageReference next() { 157 MessageReference result = null; 158 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 159 result = this.iterator.next(); 160 } 161 last = result; 162 if (result != null) { 163 result.incrementReferenceCount(); 164 } 165 return result; 166 } 167 168 169 public final synchronized void addMessageLast(MessageReference node) throws Exception { 170 if (hasSpace()) { 171 if (!isCacheEnabled() && size==0 && isStarted() && useCache) { 172 if (LOG.isTraceEnabled()) { 173 LOG.trace(this + " - enabling cache for empty store " + node.getMessageId()); 174 } 175 setCacheEnabled(true); 176 } 177 if (isCacheEnabled()) { 178 recoverMessage(node.getMessage(),true); 179 lastCachedId = node.getMessageId(); 180 } 181 } else if (isCacheEnabled()) { 182 setCacheEnabled(false); 183 // sync with store on disabling the cache 184 if (lastCachedId != null) { 185 if (LOG.isTraceEnabled()) { 186 LOG.trace(this + " - disabling cache" 187 + ", lastCachedId: " + lastCachedId 188 + " current node Id: " + node.getMessageId()); 189 } 190 setBatch(lastCachedId); 191 lastCachedId = null; 192 } 193 } 194 this.storeHasMessages = true; 195 size++; 196 } 197 198 protected void setBatch(MessageId messageId) throws Exception { 199 } 200 201 202 public final synchronized void addMessageFirst(MessageReference node) throws Exception { 203 setCacheEnabled(false); 204 size++; 205 } 206 207 208 public final synchronized void remove() { 209 size--; 210 if (iterator!=null) { 211 iterator.remove(); 212 } 213 if (last != null) { 214 last.decrementReferenceCount(); 215 } 216 } 217 218 219 public final synchronized void remove(MessageReference node) { 220 size--; 221 setCacheEnabled(false); 222 batchList.remove(node); 223 } 224 225 226 public final synchronized void clear() { 227 gc(); 228 } 229 230 231 public synchronized void gc() { 232 for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) { 233 MessageReference msg = i.next(); 234 rollback(msg.getMessageId()); 235 msg.decrementReferenceCount(); 236 } 237 batchList.clear(); 238 clearIterator(false); 239 batchResetNeeded = true; 240 setCacheEnabled(false); 241 } 242 243 @Override 244 public boolean hasSpace() { 245 hadSpace = super.hasSpace(); 246 return hadSpace; 247 } 248 249 protected final synchronized void fillBatch() { 250 if (LOG.isTraceEnabled()) { 251 LOG.trace(this + " - fillBatch"); 252 } 253 if (batchResetNeeded) { 254 resetBatch(); 255 this.batchResetNeeded = false; 256 } 257 if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { 258 this.storeHasMessages = false; 259 try { 260 doFillBatch(); 261 } catch (Exception e) { 262 LOG.error(this + " - Failed to fill batch", e); 263 throw new RuntimeException(e); 264 } 265 if (!this.batchList.isEmpty() || !hadSpace) { 266 this.storeHasMessages=true; 267 } 268 } 269 } 270 271 272 public final synchronized boolean isEmpty() { 273 // negative means more messages added to store through queue.send since last reset 274 return size == 0; 275 } 276 277 278 public final synchronized boolean hasMessagesBufferedToDeliver() { 279 return !batchList.isEmpty(); 280 } 281 282 283 public final synchronized int size() { 284 if (size < 0) { 285 this.size = getStoreSize(); 286 } 287 return size; 288 } 289 290 public String toString() { 291 return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 292 + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled(); 293 } 294 295 protected abstract void doFillBatch() throws Exception; 296 297 protected abstract void resetBatch(); 298 299 protected abstract int getStoreSize(); 300 301 protected abstract boolean isStoreEmpty(); 302 }