001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.broker.scheduler; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.File; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.Map.Entry; 031 import org.apache.activemq.util.IOHelper; 032 import org.apache.activemq.util.ServiceStopper; 033 import org.apache.activemq.util.ServiceSupport; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 import org.apache.kahadb.index.BTreeIndex; 037 import org.apache.kahadb.journal.Journal; 038 import org.apache.kahadb.journal.Location; 039 import org.apache.kahadb.page.Page; 040 import org.apache.kahadb.page.PageFile; 041 import org.apache.kahadb.page.Transaction; 042 import org.apache.kahadb.util.ByteSequence; 043 import org.apache.kahadb.util.IntegerMarshaller; 044 import org.apache.kahadb.util.LockFile; 045 import org.apache.kahadb.util.StringMarshaller; 046 import org.apache.kahadb.util.VariableMarshaller; 047 048 public class JobSchedulerStore extends ServiceSupport { 049 static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class); 050 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 051 052 public static final int CLOSED_STATE = 1; 053 public static final int OPEN_STATE = 2; 054 055 private File directory; 056 PageFile pageFile; 057 private Journal journal; 058 private LockFile lockFile; 059 private boolean failIfDatabaseIsLocked; 060 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 061 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 062 private boolean enableIndexWriteAsync = false; 063 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 064 MetaData metaData = new MetaData(this); 065 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 066 Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); 067 068 protected class MetaData { 069 protected MetaData(JobSchedulerStore store) { 070 this.store = store; 071 } 072 private final JobSchedulerStore store; 073 Page<MetaData> page; 074 BTreeIndex<Integer, Integer> journalRC; 075 BTreeIndex<String, JobSchedulerImpl> storedSchedulers; 076 077 void createIndexes(Transaction tx) throws IOException { 078 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId()); 079 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); 080 } 081 082 void load(Transaction tx) throws IOException { 083 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 084 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 085 this.storedSchedulers.load(tx); 086 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 087 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 088 this.journalRC.load(tx); 089 } 090 091 void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException { 092 for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { 093 Entry<String, JobSchedulerImpl> entry = i.next(); 094 entry.getValue().load(tx); 095 schedulers.put(entry.getKey(), entry.getValue()); 096 } 097 } 098 099 public void read(DataInput is) throws IOException { 100 this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong()); 101 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 102 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 103 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); 104 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 105 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 106 } 107 108 public void write(DataOutput os) throws IOException { 109 os.writeLong(this.storedSchedulers.getPageId()); 110 os.writeLong(this.journalRC.getPageId()); 111 112 } 113 } 114 115 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 116 private final JobSchedulerStore store; 117 118 MetaDataMarshaller(JobSchedulerStore store) { 119 this.store = store; 120 } 121 public MetaData readPayload(DataInput dataIn) throws IOException { 122 MetaData rc = new MetaData(this.store); 123 rc.read(dataIn); 124 return rc; 125 } 126 127 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 128 object.write(dataOut); 129 } 130 } 131 132 class ValueMarshaller extends VariableMarshaller<List<JobLocation>> { 133 public List<JobLocation> readPayload(DataInput dataIn) throws IOException { 134 List<JobLocation> result = new ArrayList<JobLocation>(); 135 int size = dataIn.readInt(); 136 for (int i = 0; i < size; i++) { 137 JobLocation jobLocation = new JobLocation(); 138 jobLocation.readExternal(dataIn); 139 result.add(jobLocation); 140 } 141 return result; 142 } 143 144 public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { 145 dataOut.writeInt(value.size()); 146 for (JobLocation jobLocation : value) { 147 jobLocation.writeExternal(dataOut); 148 } 149 } 150 } 151 152 class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> { 153 private final JobSchedulerStore store; 154 JobSchedulerMarshaller(JobSchedulerStore store) { 155 this.store = store; 156 } 157 public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException { 158 JobSchedulerImpl result = new JobSchedulerImpl(this.store); 159 result.read(dataIn); 160 return result; 161 } 162 163 public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException { 164 js.write(dataOut); 165 } 166 } 167 168 public File getDirectory() { 169 return directory; 170 } 171 172 public void setDirectory(File directory) { 173 this.directory = directory; 174 } 175 176 public long size() { 177 if ( !isStarted() ) { 178 return 0; 179 } 180 try { 181 return journal.getDiskSize() + pageFile.getDiskSize(); 182 } catch (IOException e) { 183 throw new RuntimeException(e); 184 } 185 } 186 187 public JobScheduler getJobScheduler(final String name) throws Exception { 188 JobSchedulerImpl result = this.schedulers.get(name); 189 if (result == null) { 190 final JobSchedulerImpl js = new JobSchedulerImpl(this); 191 js.setName(name); 192 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 193 public void execute(Transaction tx) throws IOException { 194 js.createIndexes(tx); 195 js.load(tx); 196 metaData.storedSchedulers.put(tx, name, js); 197 } 198 }); 199 result = js; 200 this.schedulers.put(name, js); 201 if (isStarted()) { 202 result.start(); 203 } 204 this.pageFile.flush(); 205 } 206 return result; 207 } 208 209 synchronized public boolean removeJobScheduler(final String name) throws Exception { 210 boolean result = false; 211 final JobSchedulerImpl js = this.schedulers.remove(name); 212 result = js != null; 213 if (result) { 214 js.stop(); 215 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 216 public void execute(Transaction tx) throws IOException { 217 metaData.storedSchedulers.remove(tx, name); 218 js.destroy(tx); 219 } 220 }); 221 } 222 return result; 223 } 224 225 @Override 226 protected synchronized void doStart() throws Exception { 227 if (this.directory == null) { 228 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 229 } 230 IOHelper.mkdirs(this.directory); 231 lock(); 232 this.journal = new Journal(); 233 this.journal.setDirectory(directory); 234 this.journal.setMaxFileLength(getJournalMaxFileLength()); 235 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 236 this.journal.start(); 237 this.pageFile = new PageFile(directory, "scheduleDB"); 238 this.pageFile.load(); 239 240 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 241 public void execute(Transaction tx) throws IOException { 242 if (pageFile.getPageCount() == 0) { 243 Page<MetaData> page = tx.allocate(); 244 assert page.getPageId() == 0; 245 page.set(metaData); 246 metaData.page = page; 247 metaData.createIndexes(tx); 248 tx.store(metaData.page, metaDataMarshaller, true); 249 250 } else { 251 Page<MetaData> page = tx.load(0, metaDataMarshaller); 252 metaData = page.get(); 253 metaData.page = page; 254 } 255 metaData.load(tx); 256 metaData.loadScheduler(tx, schedulers); 257 for (JobSchedulerImpl js :schedulers.values()) { 258 try { 259 js.start(); 260 } catch (Exception e) { 261 JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e); 262 } 263 } 264 } 265 }); 266 267 this.pageFile.flush(); 268 LOG.info(this + " started"); 269 } 270 271 @Override 272 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 273 for (JobSchedulerImpl js : this.schedulers.values()) { 274 js.stop(); 275 } 276 if (this.pageFile != null) { 277 this.pageFile.unload(); 278 } 279 if (this.journal != null) { 280 journal.close(); 281 } 282 if (this.lockFile != null) { 283 this.lockFile.unlock(); 284 } 285 this.lockFile = null; 286 LOG.info(this + " stopped"); 287 288 } 289 290 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { 291 int logId = location.getDataFileId(); 292 Integer val = this.metaData.journalRC.get(tx, logId); 293 int refCount = val != null ? val.intValue() + 1 : 1; 294 this.metaData.journalRC.put(tx, logId, refCount); 295 296 } 297 298 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { 299 int logId = location.getDataFileId(); 300 int refCount = this.metaData.journalRC.get(tx, logId); 301 refCount--; 302 if (refCount <= 0) { 303 this.metaData.journalRC.remove(tx, logId); 304 Set<Integer> set = new HashSet<Integer>(); 305 set.add(logId); 306 this.journal.removeDataFiles(set); 307 } else { 308 this.metaData.journalRC.put(tx, logId, refCount); 309 } 310 311 } 312 313 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 314 ByteSequence result = null; 315 result = this.journal.read(location); 316 return result; 317 } 318 319 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 320 return this.journal.write(payload, sync); 321 } 322 323 private void lock() throws IOException { 324 if (lockFile == null) { 325 File lockFileName = new File(directory, "lock"); 326 lockFile = new LockFile(lockFileName, true); 327 if (failIfDatabaseIsLocked) { 328 lockFile.lock(); 329 } else { 330 while (true) { 331 try { 332 lockFile.lock(); 333 break; 334 } catch (IOException e) { 335 LOG.info("Database " + lockFileName + " is locked... waiting " 336 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 337 + " seconds for the database to be unlocked. Reason: " + e); 338 try { 339 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 340 } catch (InterruptedException e1) { 341 } 342 } 343 } 344 } 345 } 346 } 347 348 PageFile getPageFile() { 349 this.pageFile.isLoaded(); 350 return this.pageFile; 351 } 352 353 public boolean isFailIfDatabaseIsLocked() { 354 return failIfDatabaseIsLocked; 355 } 356 357 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 358 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 359 } 360 361 public int getJournalMaxFileLength() { 362 return journalMaxFileLength; 363 } 364 365 public void setJournalMaxFileLength(int journalMaxFileLength) { 366 this.journalMaxFileLength = journalMaxFileLength; 367 } 368 369 public int getJournalMaxWriteBatchSize() { 370 return journalMaxWriteBatchSize; 371 } 372 373 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 374 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 375 } 376 377 public boolean isEnableIndexWriteAsync() { 378 return enableIndexWriteAsync; 379 } 380 381 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 382 this.enableIndexWriteAsync = enableIndexWriteAsync; 383 } 384 385 @Override 386 public String toString() { 387 return "JobSchedulerStore:" + this.directory; 388 } 389 390 }