001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.util.HashMap; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 import java.util.concurrent.atomic.AtomicInteger; 028 import java.util.concurrent.atomic.AtomicLong; 029 030 import org.apache.activemq.broker.ConnectionContext; 031 import org.apache.activemq.command.ActiveMQDestination; 032 import org.apache.activemq.command.ActiveMQQueue; 033 import org.apache.activemq.command.ActiveMQTopic; 034 import org.apache.activemq.command.MessageId; 035 import org.apache.activemq.command.SubscriptionInfo; 036 import org.apache.activemq.command.TransactionId; 037 import org.apache.activemq.kaha.CommandMarshaller; 038 import org.apache.activemq.kaha.ListContainer; 039 import org.apache.activemq.kaha.MapContainer; 040 import org.apache.activemq.kaha.MessageIdMarshaller; 041 import org.apache.activemq.kaha.Store; 042 import org.apache.activemq.kaha.StoreFactory; 043 import org.apache.activemq.kaha.impl.index.hash.HashIndex; 044 import org.apache.activemq.store.MessageStore; 045 import org.apache.activemq.store.ReferenceStore; 046 import org.apache.activemq.store.ReferenceStoreAdapter; 047 import org.apache.activemq.store.TopicMessageStore; 048 import org.apache.activemq.store.TopicReferenceStore; 049 import org.apache.activemq.store.amq.AMQTx; 050 import org.apache.activemq.util.IOHelper; 051 import org.slf4j.Logger; 052 import org.slf4j.LoggerFactory; 053 054 public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter { 055 056 057 058 private static final Logger LOG = LoggerFactory.getLogger(KahaReferenceStoreAdapter.class); 059 private static final String STORE_STATE = "store-state"; 060 private static final String QUEUE_DATA = "queue-data"; 061 private static final String INDEX_VERSION_NAME = "INDEX_VERSION"; 062 private static final Integer INDEX_VERSION = new Integer(7); 063 private static final String RECORD_REFERENCES = "record-references"; 064 private static final String TRANSACTIONS = "transactions-state"; 065 private MapContainer stateMap; 066 private MapContainer<TransactionId, AMQTx> preparedTransactions; 067 private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>(); 068 private ListContainer<SubscriptionInfo> durableSubscribers; 069 private boolean storeValid; 070 private Store stateStore; 071 private boolean persistentIndex = true; 072 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 073 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 074 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 075 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 076 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 077 078 079 public KahaReferenceStoreAdapter(AtomicLong size){ 080 super(size); 081 } 082 083 public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 084 throw new RuntimeException("Use createQueueReferenceStore instead"); 085 } 086 087 public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) 088 throws IOException { 089 throw new RuntimeException("Use createTopicReferenceStore instead"); 090 } 091 092 @Override 093 public synchronized void start() throws Exception { 094 super.start(); 095 Store store = getStateStore(); 096 boolean empty = store.getMapContainerIds().isEmpty(); 097 stateMap = store.getMapContainer("state", STORE_STATE); 098 stateMap.load(); 099 storeValid=true; 100 if (!empty) { 101 AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE); 102 if (status != null) { 103 storeValid = status.get(); 104 } 105 106 if (storeValid) { 107 //check what version the indexes are at 108 Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME); 109 if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) { 110 storeValid = false; 111 LOG.warn("Indexes at an older version - need to regenerate"); 112 } 113 } 114 if (storeValid) { 115 if (stateMap.containsKey(RECORD_REFERENCES)) { 116 recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES); 117 } 118 } 119 } 120 stateMap.put(STORE_STATE, new AtomicBoolean()); 121 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION); 122 durableSubscribers = store.getListContainer("durableSubscribers"); 123 durableSubscribers.setMarshaller(new CommandMarshaller()); 124 preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false); 125 // need to set the Marshallers here 126 preparedTransactions.setKeyMarshaller(Store.COMMAND_MARSHALLER); 127 preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat)); 128 } 129 130 @Override 131 public synchronized void stop() throws Exception { 132 stateMap.put(RECORD_REFERENCES, recordReferences); 133 stateMap.put(STORE_STATE, new AtomicBoolean(true)); 134 stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION); 135 if (this.stateStore != null) { 136 this.stateStore.close(); 137 this.stateStore = null; 138 this.stateMap = null; 139 } 140 super.stop(); 141 } 142 143 public void commitTransaction(ConnectionContext context) throws IOException { 144 //we don;t need to force on a commit - as the reference store 145 //is rebuilt on a non clean shutdown 146 } 147 148 public boolean isStoreValid() { 149 return storeValid; 150 } 151 152 public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { 153 ReferenceStore rc = (ReferenceStore)queues.get(destination); 154 if (rc == null) { 155 rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, QUEUE_DATA), 156 destination); 157 messageStores.put(destination, rc); 158 // if(transactionStore!=null){ 159 // rc=transactionStore.proxy(rc); 160 // } 161 queues.put(destination, rc); 162 } 163 return rc; 164 } 165 166 public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException { 167 TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination); 168 if (rc == null) { 169 Store store = getStore(); 170 MapContainer messageContainer = getMapReferenceContainer(destination.getPhysicalName(), "topic-data"); 171 MapContainer subsContainer = getSubsMapContainer(destination.getPhysicalName() + "-Subscriptions", "blob"); 172 ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.getPhysicalName(), "topic-acks"); 173 ackContainer.setMarshaller(new TopicSubAckMarshaller()); 174 rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer, 175 destination); 176 messageStores.put(destination, rc); 177 // if(transactionStore!=null){ 178 // rc=transactionStore.proxy(rc); 179 // } 180 topics.put(destination, rc); 181 } 182 return rc; 183 } 184 185 public void removeReferenceStore(KahaReferenceStore referenceStore) { 186 ActiveMQDestination destination = referenceStore.getDestination(); 187 if (destination.isQueue()) { 188 queues.remove(destination); 189 try { 190 getStore().deleteMapContainer(destination, QUEUE_DATA); 191 } catch (IOException e) { 192 LOG.error("Failed to delete " + QUEUE_DATA + " map container for destination: " + destination, e); 193 } 194 } else { 195 topics.remove(destination); 196 } 197 messageStores.remove(destination); 198 } 199 /* 200 public void buildReferenceFileIdsInUse() throws IOException { 201 recordReferences = new HashMap<Integer, AtomicInteger>(); 202 Set<ActiveMQDestination> destinations = getDestinations(); 203 for (ActiveMQDestination destination : destinations) { 204 if (destination.isQueue()) { 205 KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination); 206 store.addReferenceFileIdsInUse(); 207 } else { 208 KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination); 209 store.addReferenceFileIdsInUse(); 210 } 211 } 212 } 213 */ 214 215 protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id, 216 String containerName) 217 throws IOException { 218 Store store = getStore(); 219 MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex); 220 container.setIndexBinSize(getIndexBinSize()); 221 container.setIndexKeySize(getIndexKeySize()); 222 container.setIndexPageSize(getIndexPageSize()); 223 container.setIndexMaxBinSize(getIndexMaxBinSize()); 224 container.setIndexLoadFactor(getIndexLoadFactor()); 225 container.setKeyMarshaller(new MessageIdMarshaller()); 226 container.setValueMarshaller(new ReferenceRecordMarshaller()); 227 container.load(); 228 return container; 229 } 230 231 synchronized void addInterestInRecordFile(int recordNumber) { 232 Integer key = Integer.valueOf(recordNumber); 233 AtomicInteger rr = recordReferences.get(key); 234 if (rr == null) { 235 rr = new AtomicInteger(); 236 recordReferences.put(key, rr); 237 } 238 rr.incrementAndGet(); 239 } 240 241 synchronized void removeInterestInRecordFile(int recordNumber) { 242 Integer key = Integer.valueOf(recordNumber); 243 AtomicInteger rr = recordReferences.get(key); 244 if (rr != null && rr.decrementAndGet() <= 0) { 245 recordReferences.remove(key); 246 } 247 } 248 249 /** 250 * @return 251 * @throws IOException 252 * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() 253 */ 254 public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException { 255 Set inUse = new HashSet<Integer>(recordReferences.keySet()); 256 257 Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator(); 258 while (ackReferences.hasNext()) { 259 Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next(); 260 if (!inUse.contains(ackReference.getKey())) { 261 // should we keep this data file 262 for (Integer referencedFileId : ackReference.getValue()) { 263 if (inUse.contains(referencedFileId)) { 264 // keep this ack file 265 inUse.add(ackReference.getKey()); 266 LOG.debug("not removing data file: " + ackReference.getKey() 267 + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue()); 268 break; 269 } 270 } 271 } 272 if (!inUse.contains(ackReference.getKey())) { 273 ackReferences.remove(); 274 } 275 } 276 277 return inUse; 278 } 279 280 Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 281 public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) { 282 Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId)); 283 if (referenceFileIds == null) { 284 referenceFileIds = new HashSet<Integer>(); 285 referenceFileIds.add(Integer.valueOf(messageFileId)); 286 ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds); 287 } else { 288 Integer id = Integer.valueOf(messageFileId); 289 if (!referenceFileIds.contains(id)) { 290 referenceFileIds.add(id); 291 } 292 } 293 } 294 295 /** 296 * 297 * @throws IOException 298 * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages() 299 */ 300 public void clearMessages() throws IOException { 301 //don't delete messages as it will clear state - call base 302 //class method to clear out the data instead 303 super.deleteAllMessages(); 304 } 305 306 /** 307 * 308 * @throws IOException 309 * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState() 310 */ 311 312 public void recoverState() throws IOException { 313 Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers); 314 for (SubscriptionInfo info:set) { 315 LOG.info("Recovering subscriber state for durable subscriber: " + info); 316 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination()); 317 ts.addSubsciption(info, false); 318 } 319 } 320 321 public void recoverSubscription(SubscriptionInfo info) throws IOException { 322 TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination()); 323 LOG.info("Recovering subscriber state for durable subscriber: " + info); 324 ts.addSubsciption(info, false); 325 } 326 327 328 public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException { 329 Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>(); 330 preparedTransactions.load(); 331 for (Iterator<TransactionId> i = preparedTransactions.keySet().iterator(); i.hasNext();) { 332 TransactionId key = i.next(); 333 AMQTx value = preparedTransactions.get(key); 334 result.put(key, value); 335 } 336 return result; 337 } 338 339 public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException { 340 preparedTransactions.clear(); 341 for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) { 342 Map.Entry<TransactionId, AMQTx> entry = iter.next(); 343 preparedTransactions.put(entry.getKey(), entry.getValue()); 344 } 345 } 346 347 @Override 348 public synchronized void setDirectory(File directory) { 349 File file = new File(directory, "data"); 350 super.setDirectory(file); 351 this.stateStore = createStateStore(directory); 352 } 353 354 protected synchronized Store getStateStore() throws IOException { 355 if (this.stateStore == null) { 356 File stateDirectory = new File(getDirectory(), "kr-state"); 357 IOHelper.mkdirs(stateDirectory); 358 this.stateStore = createStateStore(getDirectory()); 359 } 360 return this.stateStore; 361 } 362 363 public void deleteAllMessages() throws IOException { 364 super.deleteAllMessages(); 365 if (stateStore != null) { 366 if (stateStore.isInitialized()) { 367 stateStore.clear(); 368 } else { 369 stateStore.delete(); 370 } 371 } else { 372 File stateDirectory = new File(getDirectory(), "kr-state"); 373 StoreFactory.delete(stateDirectory); 374 } 375 } 376 377 public boolean isPersistentIndex() { 378 return persistentIndex; 379 } 380 381 public void setPersistentIndex(boolean persistentIndex) { 382 this.persistentIndex = persistentIndex; 383 } 384 385 private Store createStateStore(File directory) { 386 File stateDirectory = new File(directory, "state"); 387 try { 388 IOHelper.mkdirs(stateDirectory); 389 return StoreFactory.open(stateDirectory, "rw"); 390 } catch (IOException e) { 391 LOG.error("Failed to create the state store", e); 392 } 393 return null; 394 } 395 396 protected void addSubscriberState(SubscriptionInfo info) throws IOException { 397 durableSubscribers.add(info); 398 } 399 400 protected void removeSubscriberState(SubscriptionInfo info) { 401 durableSubscribers.remove(info); 402 } 403 404 public int getIndexBinSize() { 405 return indexBinSize; 406 } 407 408 public void setIndexBinSize(int indexBinSize) { 409 this.indexBinSize = indexBinSize; 410 } 411 412 public int getIndexKeySize() { 413 return indexKeySize; 414 } 415 416 public void setIndexKeySize(int indexKeySize) { 417 this.indexKeySize = indexKeySize; 418 } 419 420 public int getIndexPageSize() { 421 return indexPageSize; 422 } 423 424 public void setIndexPageSize(int indexPageSize) { 425 this.indexPageSize = indexPageSize; 426 } 427 428 public int getIndexMaxBinSize() { 429 return indexMaxBinSize; 430 } 431 432 public void setIndexMaxBinSize(int maxBinSize) { 433 this.indexMaxBinSize = maxBinSize; 434 } 435 436 /** 437 * @return the loadFactor 438 */ 439 public int getIndexLoadFactor() { 440 return indexLoadFactor; 441 } 442 443 /** 444 * @param loadFactor the loadFactor to set 445 */ 446 public void setIndexLoadFactor(int loadFactor) { 447 this.indexLoadFactor = loadFactor; 448 } 449 450 451 }