001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.Map.Entry;
031    import org.apache.activemq.util.IOHelper;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.apache.activemq.util.ServiceSupport;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    import org.apache.kahadb.index.BTreeIndex;
037    import org.apache.kahadb.journal.Journal;
038    import org.apache.kahadb.journal.Location;
039    import org.apache.kahadb.page.Page;
040    import org.apache.kahadb.page.PageFile;
041    import org.apache.kahadb.page.Transaction;
042    import org.apache.kahadb.util.ByteSequence;
043    import org.apache.kahadb.util.IntegerMarshaller;
044    import org.apache.kahadb.util.LockFile;
045    import org.apache.kahadb.util.StringMarshaller;
046    import org.apache.kahadb.util.VariableMarshaller;
047    
048    /**
049     * @org.apache.xbean.XBean
050     */
051    public class PListStore extends ServiceSupport {
052        static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
053        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
054    
055        static final int CLOSED_STATE = 1;
056        static final int OPEN_STATE = 2;
057    
058        private File directory;
059        PageFile pageFile;
060        private Journal journal;
061        private LockFile lockFile;
062        private boolean failIfDatabaseIsLocked;
063        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
064        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
065        private boolean enableIndexWriteAsync = false;
066        private boolean initialized = false;
067        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068        MetaData metaData = new MetaData(this);
069        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070        Map<String, PList> persistentLists = new HashMap<String, PList>();
071        final Object indexLock = new Object();
072    
073        public Object getIndexLock() {
074            return indexLock;
075        }
076    
077        protected class MetaData {
078            protected MetaData(PListStore store) {
079                this.store = store;
080            }
081    
082            private final PListStore store;
083            Page<MetaData> page;
084            BTreeIndex<Integer, Integer> journalRC;
085            BTreeIndex<String, PList> storedSchedulers;
086    
087            void createIndexes(Transaction tx) throws IOException {
088                this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
089                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
090            }
091    
092            void load(Transaction tx) throws IOException {
093                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
094                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
095                this.storedSchedulers.load(tx);
096                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
097                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
098                this.journalRC.load(tx);
099            }
100    
101            void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
102                for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
103                    Entry<String, PList> entry = i.next();
104                    entry.getValue().load(tx);
105                    schedulers.put(entry.getKey(), entry.getValue());
106                }
107            }
108    
109            public void read(DataInput is) throws IOException {
110                this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
111                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
112                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
113                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
114                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
115                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
116            }
117    
118            public void write(DataOutput os) throws IOException {
119                os.writeLong(this.storedSchedulers.getPageId());
120                os.writeLong(this.journalRC.getPageId());
121    
122            }
123        }
124    
125        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
126            private final PListStore store;
127    
128            MetaDataMarshaller(PListStore store) {
129                this.store = store;
130            }
131            public MetaData readPayload(DataInput dataIn) throws IOException {
132                MetaData rc = new MetaData(this.store);
133                rc.read(dataIn);
134                return rc;
135            }
136    
137            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
138                object.write(dataOut);
139            }
140        }
141    
142        class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
143            public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
144                List<EntryLocation> result = new ArrayList<EntryLocation>();
145                int size = dataIn.readInt();
146                for (int i = 0; i < size; i++) {
147                    EntryLocation jobLocation = new EntryLocation();
148                    jobLocation.readExternal(dataIn);
149                    result.add(jobLocation);
150                }
151                return result;
152            }
153    
154            public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
155                dataOut.writeInt(value.size());
156                for (EntryLocation jobLocation : value) {
157                    jobLocation.writeExternal(dataOut);
158                }
159            }
160        }
161    
162        class JobSchedulerMarshaller extends VariableMarshaller<PList> {
163            private final PListStore store;
164            JobSchedulerMarshaller(PListStore store) {
165                this.store = store;
166            }
167            public PList readPayload(DataInput dataIn) throws IOException {
168                PList result = new PList(this.store);
169                result.read(dataIn);
170                return result;
171            }
172    
173            public void writePayload(PList js, DataOutput dataOut) throws IOException {
174                js.write(dataOut);
175            }
176        }
177    
178        public File getDirectory() {
179            return directory;
180        }
181    
182        public void setDirectory(File directory) {
183            this.directory = directory;
184        }
185    
186        public long size() {
187            synchronized (this) {
188                if (!initialized) {
189                    return 0;
190                }
191            }
192            try {
193                return journal.getDiskSize() + pageFile.getDiskSize();
194            } catch (IOException e) {
195                throw new RuntimeException(e);
196            }
197        }
198    
199        synchronized public PList getPList(final String name) throws Exception {
200            if (!isStarted()) {
201                throw new IllegalStateException("Not started");
202            }
203            intialize();
204            PList result = this.persistentLists.get(name);
205            if (result == null) {
206                final PList pl = new PList(this);
207                pl.setName(name);
208                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
209                    public void execute(Transaction tx) throws IOException {
210                        pl.setRootId(tx.allocate().getPageId());
211                        pl.load(tx);
212                        metaData.storedSchedulers.put(tx, name, pl);
213                    }
214                });
215                result = pl;
216                this.persistentLists.put(name, pl);
217            }
218            final PList load = result;
219            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
220                public void execute(Transaction tx) throws IOException {
221                    load.load(tx);
222                }
223            });
224    
225            return result;
226        }
227    
228        synchronized public boolean removePList(final String name) throws Exception {
229            boolean result = false;
230            final PList pl = this.persistentLists.remove(name);
231            result = pl != null;
232            if (result) {
233                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
234                    public void execute(Transaction tx) throws IOException {
235                        metaData.storedSchedulers.remove(tx, name);
236                        pl.destroy(tx);
237                    }
238                });
239            }
240            return result;
241        }
242    
243        protected synchronized void intialize() throws Exception {
244            if (isStarted()) {
245                if (this.initialized == false) {
246                    this.initialized = true;
247                    if (this.directory == null) {
248                        this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
249                    }
250                    IOHelper.mkdirs(this.directory);
251                    lock();
252                    this.journal = new Journal();
253                    this.journal.setDirectory(directory);
254                    this.journal.setMaxFileLength(getJournalMaxFileLength());
255                    this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
256                    this.journal.start();
257                    this.pageFile = new PageFile(directory, "tmpDB");
258                    this.pageFile.load();
259    
260                    this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
261                        public void execute(Transaction tx) throws IOException {
262                            if (pageFile.getPageCount() == 0) {
263                                Page<MetaData> page = tx.allocate();
264                                assert page.getPageId() == 0;
265                                page.set(metaData);
266                                metaData.page = page;
267                                metaData.createIndexes(tx);
268                                tx.store(metaData.page, metaDataMarshaller, true);
269    
270                            } else {
271                                Page<MetaData> page = tx.load(0, metaDataMarshaller);
272                                metaData = page.get();
273                                metaData.page = page;
274                            }
275                            metaData.load(tx);
276                            metaData.loadLists(tx, persistentLists);
277                        }
278                    });
279    
280                    this.pageFile.flush();
281                    LOG.info(this + " initialized");
282                }
283            }
284        }
285    
286        @Override
287        protected synchronized void doStart() throws Exception {
288            LOG.info(this + " started");
289        }
290    
291        @Override
292        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
293            for (PList pl : this.persistentLists.values()) {
294                pl.unload();
295            }
296            if (this.pageFile != null) {
297                this.pageFile.unload();
298            }
299            if (this.journal != null) {
300                journal.close();
301            }
302            if (this.lockFile != null) {
303                this.lockFile.unlock();
304            }
305            this.lockFile = null;
306            this.initialized = false;
307            LOG.info(this + " stopped");
308    
309        }
310    
311        synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
312            int logId = location.getDataFileId();
313            Integer val = this.metaData.journalRC.get(tx, logId);
314            int refCount = val != null ? val.intValue() + 1 : 1;
315            this.metaData.journalRC.put(tx, logId, refCount);
316    
317        }
318    
319        synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
320            int logId = location.getDataFileId();
321            if (logId != Location.NOT_SET) {
322                int refCount = this.metaData.journalRC.get(tx, logId);
323                refCount--;
324                if (refCount <= 0) {
325                    this.metaData.journalRC.remove(tx, logId);
326                    Set<Integer> set = new HashSet<Integer>();
327                    set.add(logId);
328                    this.journal.removeDataFiles(set);
329                } else {
330                    this.metaData.journalRC.put(tx, logId, refCount);
331                }
332            }
333        }
334    
335        synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
336            ByteSequence result = null;
337            result = this.journal.read(location);
338            return result;
339        }
340    
341        synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
342            return this.journal.write(payload, sync);
343        }
344    
345        private void lock() throws IOException {
346            if (lockFile == null) {
347                File lockFileName = new File(directory, "lock");
348                lockFile = new LockFile(lockFileName, true);
349                if (failIfDatabaseIsLocked) {
350                    lockFile.lock();
351                } else {
352                    while (true) {
353                        try {
354                            lockFile.lock();
355                            break;
356                        } catch (IOException e) {
357                            LOG.info("Database " + lockFileName + " is locked... waiting "
358                                    + (DATABASE_LOCKED_WAIT_DELAY / 1000)
359                                    + " seconds for the database to be unlocked. Reason: " + e);
360                            try {
361                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
362                            } catch (InterruptedException e1) {
363                            }
364                        }
365                    }
366                }
367            }
368        }
369    
370        PageFile getPageFile() {
371            this.pageFile.isLoaded();
372            return this.pageFile;
373        }
374    
375        public boolean isFailIfDatabaseIsLocked() {
376            return failIfDatabaseIsLocked;
377        }
378    
379        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
380            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
381        }
382    
383        public int getJournalMaxFileLength() {
384            return journalMaxFileLength;
385        }
386    
387        public void setJournalMaxFileLength(int journalMaxFileLength) {
388            this.journalMaxFileLength = journalMaxFileLength;
389        }
390    
391        public int getJournalMaxWriteBatchSize() {
392            return journalMaxWriteBatchSize;
393        }
394    
395        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
396            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
397        }
398    
399        public boolean isEnableIndexWriteAsync() {
400            return enableIndexWriteAsync;
401        }
402    
403        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
404            this.enableIndexWriteAsync = enableIndexWriteAsync;
405        }
406    
407        @Override
408        public String toString() {
409            return "PListStore:" + this.directory;
410        }
411    
412    }