001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.scheduler;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import org.apache.activemq.util.IOHelper;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.apache.activemq.util.ServiceSupport;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    import org.apache.kahadb.index.BTreeIndex;
037    import org.apache.kahadb.journal.Journal;
038    import org.apache.kahadb.journal.Location;
039    import org.apache.kahadb.page.Page;
040    import org.apache.kahadb.page.PageFile;
041    import org.apache.kahadb.page.Transaction;
042    import org.apache.kahadb.util.ByteSequence;
043    import org.apache.kahadb.util.IntegerMarshaller;
044    import org.apache.kahadb.util.LockFile;
045    import org.apache.kahadb.util.StringMarshaller;
046    import org.apache.kahadb.util.VariableMarshaller;
047    
048    public class JobSchedulerStore extends ServiceSupport {
049        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
050        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
051    
052        public static final int CLOSED_STATE = 1;
053        public static final int OPEN_STATE = 2;
054    
055        private File directory;
056        PageFile pageFile;
057        private Journal journal;
058        private LockFile lockFile;
059        private boolean failIfDatabaseIsLocked;
060        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062        private boolean enableIndexWriteAsync = false;
063        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
064        MetaData metaData = new MetaData(this);
065        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
066        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
067    
068        protected class MetaData {
069            protected MetaData(JobSchedulerStore store) {
070                this.store = store;
071            }
072            private final JobSchedulerStore store;
073            Page<MetaData> page;
074            BTreeIndex<Integer, Integer> journalRC;
075            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
076    
077            void createIndexes(Transaction tx) throws IOException {
078                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
079                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
080            }
081    
082            void load(Transaction tx) throws IOException {
083                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
084                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
085                this.storedSchedulers.load(tx);
086                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
087                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.load(tx);
089            }
090    
091            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
092                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
093                    Entry<String, JobSchedulerImpl> entry = i.next();
094                    entry.getValue().load(tx);
095                    schedulers.put(entry.getKey(), entry.getValue());
096                }
097            }
098    
099            public void read(DataInput is) throws IOException {
100                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
101                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
102                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
103                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
104                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
105                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
106            }
107    
108            public void write(DataOutput os) throws IOException {
109                os.writeLong(this.storedSchedulers.getPageId());
110                os.writeLong(this.journalRC.getPageId());
111    
112            }
113        }
114    
115        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
116            private final JobSchedulerStore store;
117    
118            MetaDataMarshaller(JobSchedulerStore store) {
119                this.store = store;
120            }
121            public MetaData readPayload(DataInput dataIn) throws IOException {
122                MetaData rc = new MetaData(this.store);
123                rc.read(dataIn);
124                return rc;
125            }
126    
127            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
128                object.write(dataOut);
129            }
130        }
131    
132        class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
133            public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
134                List<JobLocation> result = new ArrayList<JobLocation>();
135                int size = dataIn.readInt();
136                for (int i = 0; i < size; i++) {
137                    JobLocation jobLocation = new JobLocation();
138                    jobLocation.readExternal(dataIn);
139                    result.add(jobLocation);
140                }
141                return result;
142            }
143    
144            public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
145                dataOut.writeInt(value.size());
146                for (JobLocation jobLocation : value) {
147                    jobLocation.writeExternal(dataOut);
148                }
149            }
150        }
151    
152        class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
153            private final JobSchedulerStore store;
154            JobSchedulerMarshaller(JobSchedulerStore store) {
155                this.store = store;
156            }
157            public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
158                JobSchedulerImpl result = new JobSchedulerImpl(this.store);
159                result.read(dataIn);
160                return result;
161            }
162    
163            public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
164                js.write(dataOut);
165            }
166        }
167    
168        public File getDirectory() {
169            return directory;
170        }
171    
172        public void setDirectory(File directory) {
173            this.directory = directory;
174        }
175        
176        public long size() {
177            if ( !isStarted() ) {
178                return 0;
179            }
180            try {
181                return journal.getDiskSize() + pageFile.getDiskSize();
182            } catch (IOException e) {
183                throw new RuntimeException(e);
184            }
185        }
186    
187        public JobScheduler getJobScheduler(final String name) throws Exception {
188            JobSchedulerImpl result = this.schedulers.get(name);
189            if (result == null) {
190                final JobSchedulerImpl js = new JobSchedulerImpl(this);
191                js.setName(name);
192                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
193                    public void execute(Transaction tx) throws IOException {
194                        js.createIndexes(tx);
195                        js.load(tx);
196                        metaData.storedSchedulers.put(tx, name, js);
197                    }
198                });
199                result = js;
200                this.schedulers.put(name, js);
201                if (isStarted()) {
202                    result.start();
203                }
204                this.pageFile.flush();
205            }
206            return result;
207        }
208    
209        synchronized public boolean removeJobScheduler(final String name) throws Exception {
210            boolean result = false;
211            final JobSchedulerImpl js = this.schedulers.remove(name);
212            result = js != null;
213            if (result) {
214                js.stop();
215                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
216                    public void execute(Transaction tx) throws IOException {
217                        metaData.storedSchedulers.remove(tx, name);
218                        js.destroy(tx);
219                    }
220                });
221            }
222            return result;
223        }
224    
225        @Override
226        protected synchronized void doStart() throws Exception {
227            if (this.directory == null) {
228                this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
229            }
230            IOHelper.mkdirs(this.directory);
231            lock();
232            this.journal = new Journal();
233            this.journal.setDirectory(directory);
234            this.journal.setMaxFileLength(getJournalMaxFileLength());
235            this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
236            this.journal.start();
237            this.pageFile = new PageFile(directory, "scheduleDB");
238            this.pageFile.load();
239    
240            this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
241                public void execute(Transaction tx) throws IOException {
242                    if (pageFile.getPageCount() == 0) {
243                        Page<MetaData> page = tx.allocate();
244                        assert page.getPageId() == 0;
245                        page.set(metaData);
246                        metaData.page = page;
247                        metaData.createIndexes(tx);
248                        tx.store(metaData.page, metaDataMarshaller, true);
249    
250                    } else {
251                        Page<MetaData> page = tx.load(0, metaDataMarshaller);
252                        metaData = page.get();
253                        metaData.page = page;
254                    }
255                    metaData.load(tx);
256                    metaData.loadScheduler(tx, schedulers);
257                    for (JobSchedulerImpl js :schedulers.values()) {
258                        try {
259                            js.start();
260                        } catch (Exception e) {
261                            JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
262                        }
263                   }
264                }
265            });
266    
267            this.pageFile.flush();
268            LOG.info(this + " started");
269        }
270        
271        @Override
272        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
273            for (JobSchedulerImpl js : this.schedulers.values()) {
274                js.stop();
275            }
276            if (this.pageFile != null) {
277                this.pageFile.unload();
278            }
279            if (this.journal != null) {
280                journal.close();
281            }
282            if (this.lockFile != null) {
283                this.lockFile.unlock();
284            }
285            this.lockFile = null;
286            LOG.info(this + " stopped");
287    
288        }
289    
290        synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
291            int logId = location.getDataFileId();
292            Integer val = this.metaData.journalRC.get(tx, logId);
293            int refCount = val != null ? val.intValue() + 1 : 1;
294            this.metaData.journalRC.put(tx, logId, refCount);
295    
296        }
297    
298        synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
299            int logId = location.getDataFileId();
300            int refCount = this.metaData.journalRC.get(tx, logId);
301            refCount--;
302            if (refCount <= 0) {
303                this.metaData.journalRC.remove(tx, logId);
304                Set<Integer> set = new HashSet<Integer>();
305                set.add(logId);
306                this.journal.removeDataFiles(set);
307            } else {
308                this.metaData.journalRC.put(tx, logId, refCount);
309            }
310    
311        }
312    
313        synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
314            ByteSequence result = null;
315            result = this.journal.read(location);
316            return result;
317        }
318    
319        synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
320            return this.journal.write(payload, sync);
321        }
322    
323        private void lock() throws IOException {
324            if (lockFile == null) {
325                File lockFileName = new File(directory, "lock");
326                lockFile = new LockFile(lockFileName, true);
327                if (failIfDatabaseIsLocked) {
328                    lockFile.lock();
329                } else {
330                    while (true) {
331                        try {
332                            lockFile.lock();
333                            break;
334                        } catch (IOException e) {
335                            LOG.info("Database " + lockFileName + " is locked... waiting "
336                                    + (DATABASE_LOCKED_WAIT_DELAY / 1000)
337                                    + " seconds for the database to be unlocked. Reason: " + e);
338                            try {
339                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
340                            } catch (InterruptedException e1) {
341                            }
342                        }
343                    }
344                }
345            }
346        }
347    
348        PageFile getPageFile() {
349            this.pageFile.isLoaded();
350            return this.pageFile;
351        }
352    
353        public boolean isFailIfDatabaseIsLocked() {
354            return failIfDatabaseIsLocked;
355        }
356    
357        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
358            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
359        }
360    
361        public int getJournalMaxFileLength() {
362            return journalMaxFileLength;
363        }
364    
365        public void setJournalMaxFileLength(int journalMaxFileLength) {
366            this.journalMaxFileLength = journalMaxFileLength;
367        }
368    
369        public int getJournalMaxWriteBatchSize() {
370            return journalMaxWriteBatchSize;
371        }
372    
373        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
374            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
375        }
376    
377        public boolean isEnableIndexWriteAsync() {
378            return enableIndexWriteAsync;
379        }
380    
381        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
382            this.enableIndexWriteAsync = enableIndexWriteAsync;
383        }
384    
385        @Override
386        public String toString() {
387            return "JobSchedulerStore:" + this.directory;
388        }
389    
390    }