001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadaptor; 018 019 import java.io.IOException; 020 import java.util.HashSet; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.Map.Entry; 026 import java.util.concurrent.ConcurrentHashMap; 027 028 import org.apache.activemq.broker.ConnectionContext; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.Message; 031 import org.apache.activemq.command.MessageAck; 032 import org.apache.activemq.command.MessageId; 033 import org.apache.activemq.command.SubscriptionInfo; 034 import org.apache.activemq.kaha.ListContainer; 035 import org.apache.activemq.kaha.MapContainer; 036 import org.apache.activemq.kaha.Marshaller; 037 import org.apache.activemq.kaha.Store; 038 import org.apache.activemq.kaha.StoreEntry; 039 import org.apache.activemq.store.MessageRecoveryListener; 040 import org.apache.activemq.store.TopicReferenceStore; 041 import org.slf4j.Logger; 042 import org.slf4j.LoggerFactory; 043 044 public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore { 045 private static final Logger LOG = LoggerFactory.getLogger(KahaTopicReferenceStore.class); 046 protected ListContainer<TopicSubAck> ackContainer; 047 protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>(); 048 private MapContainer<String, SubscriptionInfo> subscriberContainer; 049 private Store store; 050 private static final String TOPIC_SUB_NAME = "tsn"; 051 052 public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter, 053 MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer, 054 MapContainer<String, SubscriptionInfo> subsContainer, ActiveMQDestination destination) 055 throws IOException { 056 super(adapter, messageContainer, destination); 057 this.store = store; 058 this.ackContainer = ackContainer; 059 subscriberContainer = subsContainer; 060 // load all the Ack containers 061 for (Iterator<SubscriptionInfo> i = subscriberContainer.values().iterator(); i.hasNext();) { 062 SubscriptionInfo info = i.next(); 063 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); 064 } 065 } 066 067 public void dispose(ConnectionContext context) { 068 super.dispose(context); 069 subscriberContainer.delete(); 070 } 071 072 protected MessageId getMessageId(Object object) { 073 return new MessageId(((ReferenceRecord)object).getMessageId()); 074 } 075 076 public void addMessage(ConnectionContext context, Message message) throws IOException { 077 throw new RuntimeException("Use addMessageReference instead"); 078 } 079 080 public Message getMessage(MessageId identity) throws IOException { 081 throw new RuntimeException("Use addMessageReference instead"); 082 } 083 084 public boolean addMessageReference(final ConnectionContext context, final MessageId messageId, 085 final ReferenceData data) { 086 boolean uniqueReferenceAdded = false; 087 lock.lock(); 088 try { 089 final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); 090 final int subscriberCount = subscriberMessages.size(); 091 if (subscriberCount > 0 && !isDuplicate(messageId)) { 092 final StoreEntry messageEntry = messageContainer.place(messageId, record); 093 addInterest(record); 094 uniqueReferenceAdded = true; 095 final TopicSubAck tsa = new TopicSubAck(); 096 tsa.setCount(subscriberCount); 097 tsa.setMessageEntry(messageEntry); 098 final StoreEntry ackEntry = ackContainer.placeLast(tsa); 099 for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { 100 final TopicSubContainer container = i.next(); 101 final ConsumerMessageRef ref = new ConsumerMessageRef(); 102 ref.setAckEntry(ackEntry); 103 ref.setMessageEntry(messageEntry); 104 ref.setMessageId(messageId); 105 container.add(ref); 106 } 107 if (LOG.isTraceEnabled()) { 108 LOG.trace(destination.getPhysicalName() + " add reference: " + messageId); 109 } 110 } else { 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("no subscribers or duplicate add for: " + messageId); 113 } 114 } 115 } finally { 116 lock.unlock(); 117 } 118 return uniqueReferenceAdded; 119 } 120 121 public ReferenceData getMessageReference(final MessageId identity) throws IOException { 122 final ReferenceRecord result = messageContainer.get(identity); 123 if (result == null) { 124 return null; 125 } 126 return result.getData(); 127 } 128 129 public void addReferenceFileIdsInUse() { 130 for (StoreEntry entry = ackContainer.getFirst(); entry != null; entry = ackContainer.getNext(entry)) { 131 TopicSubAck subAck = ackContainer.get(entry); 132 if (subAck.getCount() > 0) { 133 ReferenceRecord rr = messageContainer.getValue(subAck.getMessageEntry()); 134 addInterest(rr); 135 } 136 } 137 } 138 139 140 protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException { 141 String containerName = getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName)); 142 MapContainer container = store.getMapContainer(containerName,containerName); 143 container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER); 144 Marshaller marshaller = new ConsumerMessageRefMarshaller(); 145 container.setValueMarshaller(marshaller); 146 TopicSubContainer tsc = new TopicSubContainer(container); 147 subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc); 148 return container; 149 } 150 151 public boolean acknowledgeReference(ConnectionContext context, 152 String clientId, String subscriptionName, MessageId messageId) 153 throws IOException { 154 boolean removeMessage = false; 155 lock.lock(); 156 try { 157 String key = getSubscriptionKey(clientId, subscriptionName); 158 159 TopicSubContainer container = subscriberMessages.get(key); 160 if (container != null) { 161 ConsumerMessageRef ref = null; 162 if((ref = container.remove(messageId)) != null) { 163 StoreEntry entry = ref.getAckEntry(); 164 //ensure we get up to-date pointers 165 entry = ackContainer.refresh(entry); 166 TopicSubAck tsa = ackContainer.get(entry); 167 if (tsa != null) { 168 if (tsa.decrementCount() <= 0) { 169 ackContainer.remove(entry); 170 ReferenceRecord rr = messageContainer.get(messageId); 171 if (rr != null) { 172 entry = tsa.getMessageEntry(); 173 entry = messageContainer.refresh(entry); 174 messageContainer.remove(entry); 175 removeInterest(rr); 176 removeMessage = true; 177 dispatchAudit.isDuplicate(messageId); 178 } 179 }else { 180 ackContainer.update(entry,tsa); 181 } 182 } 183 if (LOG.isTraceEnabled()) { 184 LOG.trace(destination.getPhysicalName() + " remove: " + messageId); 185 } 186 }else{ 187 if (ackContainer.isEmpty() || subscriberMessages.size() == 1 || isUnreferencedBySubscribers(key, subscriberMessages, messageId)) { 188 // no message reference held 189 removeMessage = true; 190 // ensure we don't later add a reference 191 dispatchAudit.isDuplicate(messageId); 192 if (LOG.isDebugEnabled()) { 193 LOG.debug(destination.getPhysicalName() + " remove with no outstanding reference (ack before add): " + messageId); 194 } 195 } 196 } 197 } 198 }finally { 199 lock.unlock(); 200 } 201 return removeMessage; 202 } 203 204 // verify that no subscriber has a reference to this message. In the case where the subscribers 205 // references are persisted but more than the persisted consumers get the message, the ack from the non 206 // persisted consumer would remove the message in error 207 // 208 // see: https://issues.apache.org/activemq/browse/AMQ-2123 209 private boolean isUnreferencedBySubscribers( 210 String key, Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) { 211 boolean isUnreferenced = true; 212 for (Entry<String, TopicSubContainer> entry : subscriberContainers.entrySet()) { 213 if (!key.equals(entry.getKey()) && !entry.getValue().isEmpty()) { 214 TopicSubContainer container = entry.getValue(); 215 for (Iterator i = container.iterator(); i.hasNext();) { 216 ConsumerMessageRef ref = (ConsumerMessageRef) i.next(); 217 if (messageId.equals(ref.getMessageId())) { 218 isUnreferenced = false; 219 break; 220 } 221 } 222 } 223 } 224 return isUnreferenced; 225 } 226 227 public void acknowledge(ConnectionContext context, 228 String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { 229 acknowledgeReference(context, clientId, subscriptionName, messageId); 230 } 231 232 public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { 233 String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); 234 lock.lock(); 235 try { 236 // if already exists - won't add it again as it causes data files 237 // to hang around 238 if (!subscriberContainer.containsKey(key)) { 239 subscriberContainer.put(key, info); 240 adapter.addSubscriberState(info); 241 } 242 // add the subscriber 243 addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); 244 if (retroactive) { 245 /* 246 * for(StoreEntry 247 * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ 248 * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); 249 * ConsumerMessageRef ref=new ConsumerMessageRef(); 250 * ref.setAckEntry(entry); 251 * ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } 252 */ 253 } 254 }finally { 255 lock.unlock(); 256 } 257 } 258 259 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 260 lock.lock(); 261 try { 262 SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 263 if (info != null) { 264 adapter.removeSubscriberState(info); 265 } 266 removeSubscriberMessageContainer(clientId,subscriptionName); 267 }finally { 268 lock.unlock(); 269 } 270 } 271 272 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 273 SubscriptionInfo[] result = subscriberContainer.values() 274 .toArray(new SubscriptionInfo[subscriberContainer.size()]); 275 return result; 276 } 277 278 public int getMessageCount(String clientId, String subscriberName) throws IOException { 279 String key = getSubscriptionKey(clientId, subscriberName); 280 TopicSubContainer container = subscriberMessages.get(key); 281 return container != null ? container.size() : 0; 282 } 283 284 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 285 return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); 286 } 287 288 public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, 289 MessageRecoveryListener listener) throws Exception { 290 String key = getSubscriptionKey(clientId, subscriptionName); 291 lock.lock(); 292 try { 293 TopicSubContainer container = subscriberMessages.get(key); 294 if (container != null) { 295 int count = 0; 296 StoreEntry entry = container.getBatchEntry(); 297 if (entry == null) { 298 entry = container.getEntry(); 299 } else { 300 entry = container.refreshEntry(entry); 301 if (entry != null) { 302 entry = container.getNextEntry(entry); 303 } 304 } 305 306 if (entry != null) { 307 do { 308 ConsumerMessageRef consumerRef = container.get(entry); 309 ReferenceRecord msg = messageContainer.getValue(consumerRef 310 .getMessageEntry()); 311 if (msg != null) { 312 if (recoverReference(listener, msg)) { 313 count++; 314 container.setBatchEntry(msg.getMessageId(), entry); 315 } 316 } else { 317 container.reset(); 318 } 319 320 entry = container.getNextEntry(entry); 321 } while (entry != null && count < maxReturned && listener.hasSpace()); 322 } 323 } 324 }finally { 325 lock.unlock(); 326 } 327 } 328 329 public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) 330 throws Exception { 331 String key = getSubscriptionKey(clientId, subscriptionName); 332 TopicSubContainer container = subscriberMessages.get(key); 333 if (container != null) { 334 for (Iterator i = container.iterator(); i.hasNext();) { 335 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 336 ReferenceRecord msg = messageContainer.getValue(ref.getMessageEntry()); 337 if (msg != null) { 338 if (!recoverReference(listener, msg)) { 339 break; 340 } 341 } 342 } 343 } 344 } 345 346 public void resetBatching(String clientId, String subscriptionName) { 347 lock.lock(); 348 try { 349 String key = getSubscriptionKey(clientId, subscriptionName); 350 TopicSubContainer topicSubContainer = subscriberMessages.get(key); 351 if (topicSubContainer != null) { 352 topicSubContainer.reset(); 353 } 354 }finally { 355 lock.unlock(); 356 } 357 } 358 359 public void removeAllMessages(ConnectionContext context) throws IOException { 360 lock.lock(); 361 try { 362 Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet()); 363 for (String key:tmpSet) { 364 TopicSubContainer container = subscriberMessages.get(key); 365 if (container != null) { 366 container.clear(); 367 } 368 } 369 ackContainer.clear(); 370 }finally { 371 lock.unlock(); 372 } 373 super.removeAllMessages(context); 374 } 375 376 protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException { 377 String subscriberKey = getSubscriptionKey(clientId, subscriptionName); 378 String containerName = getSubscriptionContainerName(subscriberKey); 379 subscriberContainer.remove(subscriberKey); 380 TopicSubContainer container = subscriberMessages.remove(subscriberKey); 381 if (container != null) { 382 for (Iterator i = container.iterator(); i.hasNext();) { 383 ConsumerMessageRef ref = (ConsumerMessageRef)i.next(); 384 if (ref != null) { 385 TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); 386 if (tsa != null) { 387 if (tsa.decrementCount() <= 0) { 388 ackContainer.remove(ref.getAckEntry()); 389 messageContainer.remove(tsa.getMessageEntry()); 390 } else { 391 ackContainer.update(ref.getAckEntry(), tsa); 392 } 393 } 394 } 395 } 396 } 397 store.deleteMapContainer(containerName,containerName); 398 } 399 400 protected String getSubscriptionKey(String clientId, String subscriberName) { 401 StringBuffer buffer = new StringBuffer(); 402 buffer.append(clientId).append(":"); 403 String name = subscriberName != null ? subscriberName : "NOT_SET"; 404 return buffer.append(name).toString(); 405 } 406 407 private String getSubscriptionContainerName(String subscriptionKey) { 408 StringBuffer result = new StringBuffer(TOPIC_SUB_NAME); 409 result.append(destination.getQualifiedName()); 410 result.append(subscriptionKey); 411 return result.toString(); 412 } 413 }