001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.kahadb.page; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.DataInputStream; 022 import java.io.DataOutputStream; 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InterruptedIOException; 028 import java.io.RandomAccessFile; 029 import java.util.*; 030 import java.util.Map.Entry; 031 import java.util.concurrent.CountDownLatch; 032 import java.util.concurrent.atomic.AtomicBoolean; 033 import java.util.concurrent.atomic.AtomicLong; 034 import java.util.zip.Adler32; 035 import java.util.zip.Checksum; 036 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 import org.apache.kahadb.util.DataByteArrayOutputStream; 040 import org.apache.kahadb.util.IOExceptionSupport; 041 import org.apache.kahadb.util.IOHelper; 042 import org.apache.kahadb.util.IntrospectionSupport; 043 import org.apache.kahadb.util.LRUCache; 044 import org.apache.kahadb.util.Sequence; 045 import org.apache.kahadb.util.SequenceSet; 046 047 /** 048 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 049 * be externally synchronized. 050 * 051 * The file has 3 parts: 052 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 053 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 054 * Page Space: The pages in the page file. 055 * 056 * 057 */ 058 public class PageFile { 059 060 private static final String PAGEFILE_SUFFIX = ".data"; 061 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 062 private static final String FREE_FILE_SUFFIX = ".free"; 063 064 // 4k Default page size. 065 public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 066 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000)); 067 private static final int RECOVERY_FILE_HEADER_SIZE=1024*4; 068 private static final int PAGE_FILE_HEADER_SIZE=1024*4; 069 070 // Recovery header is (long offset) 071 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); 072 073 // A PageFile will use a couple of files in this directory 074 private File directory; 075 // And the file names in that directory will be based on this name. 076 private final String name; 077 078 // File handle used for reading pages.. 079 private RandomAccessFile readFile; 080 // File handle used for writing pages.. 081 private RandomAccessFile writeFile; 082 // File handle used for writing pages.. 083 private RandomAccessFile recoveryFile; 084 085 // The size of pages 086 private int pageSize = DEFAULT_PAGE_SIZE; 087 088 // The minimum number of space allocated to the recovery file in number of pages. 089 private int recoveryFileMinPageCount = 1000; 090 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 091 // to this max size as soon as possible. 092 private int recoveryFileMaxPageCount = 10000; 093 // The number of pages in the current recovery buffer 094 private int recoveryPageCount; 095 096 private AtomicBoolean loaded = new AtomicBoolean(); 097 // The number of pages we are aiming to write every time we 098 // write to disk. 099 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 100 101 // We keep a cache of pages recently used? 102 private Map<Long, Page> pageCache; 103 // The cache of recently used pages. 104 private boolean enablePageCaching=true; 105 // How many pages will we keep in the cache? 106 private int pageCacheSize = 100; 107 108 // Should first log the page write to the recovery buffer? Avoids partial 109 // page write failures.. 110 private boolean enableRecoveryFile=true; 111 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 112 private boolean enableDiskSyncs=true; 113 // Will writes be done in an async thread? 114 private boolean enabledWriteThread=false; 115 116 // These are used if enableAsyncWrites==true 117 private AtomicBoolean stopWriter = new AtomicBoolean(); 118 private Thread writerThread; 119 private CountDownLatch checkpointLatch; 120 121 // Keeps track of writes that are being written to disk. 122 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>(); 123 124 // Keeps track of free pages. 125 private final AtomicLong nextFreePageId = new AtomicLong(); 126 private SequenceSet freeList = new SequenceSet(); 127 128 private AtomicLong nextTxid = new AtomicLong(); 129 130 // Persistent settings stored in the page file. 131 private MetaData metaData; 132 133 /** 134 * Use to keep track of updated pages which have not yet been committed. 135 */ 136 static class PageWrite { 137 Page page; 138 byte[] current; 139 byte[] diskBound; 140 141 public PageWrite(Page page, byte[] data) { 142 this.page=page; 143 current=data; 144 } 145 146 public void setCurrent(Page page, byte[] data) { 147 this.page=page; 148 current=data; 149 } 150 151 @Override 152 public String toString() { 153 return "[PageWrite:"+page.getPageId()+"]"; 154 } 155 156 @SuppressWarnings("unchecked") 157 public Page getPage() { 158 return page; 159 } 160 161 void begin() { 162 diskBound = current; 163 current = null; 164 } 165 166 /** 167 * @return true if there is no pending writes to do. 168 */ 169 boolean done() { 170 diskBound=null; 171 return current == null; 172 } 173 174 boolean isDone() { 175 return diskBound == null && current == null; 176 } 177 178 } 179 180 /** 181 * The MetaData object hold the persistent data associated with a PageFile object. 182 */ 183 public static class MetaData { 184 185 String fileType; 186 String fileTypeVersion; 187 188 long metaDataTxId=-1; 189 int pageSize; 190 boolean cleanShutdown; 191 long lastTxId; 192 long freePages; 193 194 public String getFileType() { 195 return fileType; 196 } 197 public void setFileType(String fileType) { 198 this.fileType = fileType; 199 } 200 public String getFileTypeVersion() { 201 return fileTypeVersion; 202 } 203 public void setFileTypeVersion(String version) { 204 this.fileTypeVersion = version; 205 } 206 public long getMetaDataTxId() { 207 return metaDataTxId; 208 } 209 public void setMetaDataTxId(long metaDataTxId) { 210 this.metaDataTxId = metaDataTxId; 211 } 212 public int getPageSize() { 213 return pageSize; 214 } 215 public void setPageSize(int pageSize) { 216 this.pageSize = pageSize; 217 } 218 public boolean isCleanShutdown() { 219 return cleanShutdown; 220 } 221 public void setCleanShutdown(boolean cleanShutdown) { 222 this.cleanShutdown = cleanShutdown; 223 } 224 public long getLastTxId() { 225 return lastTxId; 226 } 227 public void setLastTxId(long lastTxId) { 228 this.lastTxId = lastTxId; 229 } 230 public long getFreePages() { 231 return freePages; 232 } 233 public void setFreePages(long value) { 234 this.freePages = value; 235 } 236 } 237 238 public Transaction tx() { 239 assertLoaded(); 240 return new Transaction(this); 241 } 242 243 /** 244 * Creates a PageFile in the specified directory who's data files are named by name. 245 * 246 * @param directory 247 * @param name 248 */ 249 public PageFile(File directory, String name) { 250 this.directory = directory; 251 this.name = name; 252 } 253 254 /** 255 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 256 * 257 * @throws IOException 258 * if the files cannot be deleted. 259 * @throws IllegalStateException 260 * if this PageFile is loaded 261 */ 262 public void delete() throws IOException { 263 if( loaded.get() ) { 264 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 265 } 266 delete(getMainPageFile()); 267 delete(getFreeFile()); 268 delete(getRecoveryFile()); 269 } 270 271 /** 272 * @param file 273 * @throws IOException 274 */ 275 private void delete(File file) throws IOException { 276 if( file.exists() ) { 277 if( !file.delete() ) { 278 throw new IOException("Could not delete: "+file.getPath()); 279 } 280 } 281 } 282 283 /** 284 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 285 * first time the page file is loaded, then this creates the page file in the file system. 286 * 287 * @throws IOException 288 * If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 289 * there was a disk error. 290 * @throws IllegalStateException 291 * If the page file was already loaded. 292 */ 293 public void load() throws IOException, IllegalStateException { 294 if (loaded.compareAndSet(false, true)) { 295 296 if( enablePageCaching ) { 297 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true)); 298 } 299 300 File file = getMainPageFile(); 301 IOHelper.mkdirs(file.getParentFile()); 302 writeFile = new RandomAccessFile(file, "rw"); 303 readFile = new RandomAccessFile(file, "r"); 304 305 if (readFile.length() > 0) { 306 // Load the page size setting cause that can't change once the file is created. 307 loadMetaData(); 308 pageSize = metaData.getPageSize(); 309 } else { 310 // Store the page size setting cause that can't change once the file is created. 311 metaData = new MetaData(); 312 metaData.setFileType(PageFile.class.getName()); 313 metaData.setFileTypeVersion("1"); 314 metaData.setPageSize(getPageSize()); 315 metaData.setCleanShutdown(true); 316 metaData.setFreePages(-1); 317 metaData.setLastTxId(0); 318 storeMetaData(); 319 } 320 321 if( enableRecoveryFile ) { 322 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw"); 323 } 324 325 if( metaData.isCleanShutdown() ) { 326 nextTxid.set(metaData.getLastTxId()+1); 327 if( metaData.getFreePages()>0 ) { 328 loadFreeList(); 329 } 330 } else { 331 LOG.debug(toString() + ", Recovering page file..."); 332 nextTxid.set(redoRecoveryUpdates()); 333 334 // Scan all to find the free pages. 335 freeList = new SequenceSet(); 336 for (Iterator i = tx().iterator(true); i.hasNext();) { 337 Page page = (Page)i.next(); 338 if( page.getType() == Page.PAGE_FREE_TYPE ) { 339 freeList.add(page.getPageId()); 340 } 341 } 342 343 } 344 345 metaData.setCleanShutdown(false); 346 storeMetaData(); 347 getFreeFile().delete(); 348 349 if( writeFile.length() < PAGE_FILE_HEADER_SIZE) { 350 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 351 } 352 nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize); 353 startWriter(); 354 355 } else { 356 throw new IllegalStateException("Cannot load the page file when it is allready loaded."); 357 } 358 } 359 360 361 /** 362 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 363 * once unloaded, you can no longer use the page file to read or write Pages. 364 * 365 * @throws IOException 366 * if there was a disk error occurred while closing the down the page file. 367 * @throws IllegalStateException 368 * if the PageFile is not loaded 369 */ 370 public void unload() throws IOException { 371 if (loaded.compareAndSet(true, false)) { 372 flush(); 373 try { 374 stopWriter(); 375 } catch (InterruptedException e) { 376 throw new InterruptedIOException(); 377 } 378 379 if( freeList.isEmpty() ) { 380 metaData.setFreePages(0); 381 } else { 382 storeFreeList(); 383 metaData.setFreePages(freeList.size()); 384 } 385 386 metaData.setLastTxId( nextTxid.get()-1 ); 387 metaData.setCleanShutdown(true); 388 storeMetaData(); 389 390 if (readFile != null) { 391 readFile.close(); 392 readFile = null; 393 writeFile.close(); 394 writeFile=null; 395 if( enableRecoveryFile ) { 396 recoveryFile.close(); 397 recoveryFile=null; 398 } 399 freeList.clear(); 400 if( pageCache!=null ) { 401 pageCache=null; 402 } 403 synchronized(writes) { 404 writes.clear(); 405 } 406 } 407 } else { 408 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 409 } 410 } 411 412 public boolean isLoaded() { 413 return loaded.get(); 414 } 415 416 /** 417 * Flush and sync all write buffers to disk. 418 * 419 * @throws IOException 420 * If an disk error occurred. 421 */ 422 public void flush() throws IOException { 423 424 if( enabledWriteThread && stopWriter.get() ) { 425 throw new IOException("Page file already stopped: checkpointing is not allowed"); 426 } 427 428 // Setup a latch that gets notified when all buffered writes hits the disk. 429 CountDownLatch checkpointLatch; 430 synchronized( writes ) { 431 if( writes.isEmpty()) { 432 return; 433 } 434 if( enabledWriteThread ) { 435 if( this.checkpointLatch == null ) { 436 this.checkpointLatch = new CountDownLatch(1); 437 } 438 checkpointLatch = this.checkpointLatch; 439 writes.notify(); 440 } else { 441 writeBatch(); 442 return; 443 } 444 } 445 try { 446 checkpointLatch.await(); 447 } catch (InterruptedException e) { 448 throw new InterruptedIOException(); 449 } 450 } 451 452 453 public String toString() { 454 return "Page File: "+getMainPageFile(); 455 } 456 457 /////////////////////////////////////////////////////////////////// 458 // Private Implementation Methods 459 /////////////////////////////////////////////////////////////////// 460 private File getMainPageFile() { 461 return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX); 462 } 463 464 public File getFreeFile() { 465 return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX); 466 } 467 468 public File getRecoveryFile() { 469 return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX); 470 } 471 472 private long toOffset(long pageId) { 473 return PAGE_FILE_HEADER_SIZE+(pageId*pageSize); 474 } 475 476 private void loadMetaData() throws IOException { 477 478 ByteArrayInputStream is; 479 MetaData v1 = new MetaData(); 480 MetaData v2 = new MetaData(); 481 try { 482 Properties p = new Properties(); 483 byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2]; 484 readFile.seek(0); 485 readFile.readFully(d); 486 is = new ByteArrayInputStream(d); 487 p.load(is); 488 IntrospectionSupport.setProperties(v1, p); 489 } catch (IOException e) { 490 v1 = null; 491 } 492 493 try { 494 Properties p = new Properties(); 495 byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2]; 496 readFile.seek(PAGE_FILE_HEADER_SIZE/2); 497 readFile.readFully(d); 498 is = new ByteArrayInputStream(d); 499 p.load(is); 500 IntrospectionSupport.setProperties(v2, p); 501 } catch (IOException e) { 502 v2 = null; 503 } 504 505 if( v1==null && v2==null ) { 506 throw new IOException("Could not load page file meta data"); 507 } 508 509 if( v1 == null || v1.metaDataTxId<0 ) { 510 metaData = v2; 511 } else if( v2==null || v1.metaDataTxId<0 ) { 512 metaData = v1; 513 } else if( v1.metaDataTxId==v2.metaDataTxId ) { 514 metaData = v1; // use the first since the 2nd could be a partial.. 515 } else { 516 metaData = v2; // use the second cause the first is probably a partial. 517 } 518 } 519 520 private void storeMetaData() throws IOException { 521 // Convert the metadata into a property format 522 metaData.metaDataTxId++; 523 Properties p = new Properties(); 524 IntrospectionSupport.getProperties(metaData, p, null); 525 526 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 527 p.store(os, ""); 528 if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 529 throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2); 530 } 531 // Fill the rest with space... 532 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()]; 533 Arrays.fill(filler, (byte)' '); 534 os.write(filler); 535 os.flush(); 536 537 byte[] d = os.toByteArray(); 538 539 // So we don't loose it.. write it 2 times... 540 writeFile.seek(0); 541 writeFile.write(d); 542 writeFile.getFD().sync(); 543 writeFile.seek(PAGE_FILE_HEADER_SIZE/2); 544 writeFile.write(d); 545 writeFile.getFD().sync(); 546 } 547 548 private void storeFreeList() throws IOException { 549 FileOutputStream os = new FileOutputStream(getFreeFile()); 550 DataOutputStream dos = new DataOutputStream(os); 551 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 552 dos.close(); 553 } 554 555 private void loadFreeList() throws IOException { 556 freeList.clear(); 557 FileInputStream is = new FileInputStream(getFreeFile()); 558 DataInputStream dis = new DataInputStream(is); 559 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 560 dis.close(); 561 } 562 563 /////////////////////////////////////////////////////////////////// 564 // Property Accessors 565 /////////////////////////////////////////////////////////////////// 566 567 /** 568 * Is the recovery buffer used to double buffer page writes. Enabled by default. 569 * 570 * @return is the recovery buffer enabled. 571 */ 572 public boolean isEnableRecoveryFile() { 573 return enableRecoveryFile; 574 } 575 576 /** 577 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 578 * may potentially cause partial page writes which can lead to page file corruption. 579 */ 580 public void setEnableRecoveryFile(boolean doubleBuffer) { 581 assertNotLoaded(); 582 this.enableRecoveryFile = doubleBuffer; 583 } 584 585 /** 586 * @return Are page writes synced to disk? 587 */ 588 public boolean isEnableDiskSyncs() { 589 return enableDiskSyncs; 590 } 591 592 /** 593 * Allows you enable syncing writes to disk. 594 * @param syncWrites 595 */ 596 public void setEnableDiskSyncs(boolean syncWrites) { 597 assertNotLoaded(); 598 this.enableDiskSyncs = syncWrites; 599 } 600 601 /** 602 * @return the page size 603 */ 604 public int getPageSize() { 605 return this.pageSize; 606 } 607 608 /** 609 * @return the amount of content data that a page can hold. 610 */ 611 public int getPageContentSize() { 612 return this.pageSize-Page.PAGE_HEADER_SIZE; 613 } 614 615 /** 616 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 617 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 618 * can no longer be changed. 619 * 620 * @param pageSize the pageSize to set 621 * @throws IllegalStateException 622 * once the page file is loaded. 623 */ 624 public void setPageSize(int pageSize) throws IllegalStateException { 625 assertNotLoaded(); 626 this.pageSize = pageSize; 627 } 628 629 /** 630 * @return true if read page caching is enabled 631 */ 632 public boolean isEnablePageCaching() { 633 return this.enablePageCaching; 634 } 635 636 /** 637 * @param allows you to enable read page caching 638 */ 639 public void setEnablePageCaching(boolean enablePageCaching) { 640 assertNotLoaded(); 641 this.enablePageCaching = enablePageCaching; 642 } 643 644 /** 645 * @return the maximum number of pages that will get stored in the read page cache. 646 */ 647 public int getPageCacheSize() { 648 return this.pageCacheSize; 649 } 650 651 /** 652 * @param Sets the maximum number of pages that will get stored in the read page cache. 653 */ 654 public void setPageCacheSize(int pageCacheSize) { 655 assertNotLoaded(); 656 this.pageCacheSize = pageCacheSize; 657 } 658 659 public boolean isEnabledWriteThread() { 660 return enabledWriteThread; 661 } 662 663 public void setEnableWriteThread(boolean enableAsyncWrites) { 664 assertNotLoaded(); 665 this.enabledWriteThread = enableAsyncWrites; 666 } 667 668 public long getDiskSize() throws IOException { 669 return toOffset(nextFreePageId.get()); 670 } 671 672 /** 673 * @return the number of pages allocated in the PageFile 674 */ 675 public long getPageCount() { 676 return nextFreePageId.get(); 677 } 678 679 public int getRecoveryFileMinPageCount() { 680 return recoveryFileMinPageCount; 681 } 682 683 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 684 assertNotLoaded(); 685 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 686 } 687 688 public int getRecoveryFileMaxPageCount() { 689 return recoveryFileMaxPageCount; 690 } 691 692 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 693 assertNotLoaded(); 694 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 695 } 696 697 public int getWriteBatchSize() { 698 return writeBatchSize; 699 } 700 701 public void setWriteBatchSize(int writeBatchSize) { 702 assertNotLoaded(); 703 this.writeBatchSize = writeBatchSize; 704 } 705 706 /////////////////////////////////////////////////////////////////// 707 // Package Protected Methods exposed to Transaction 708 /////////////////////////////////////////////////////////////////// 709 710 /** 711 * @throws IllegalStateException if the page file is not loaded. 712 */ 713 void assertLoaded() throws IllegalStateException { 714 if( !loaded.get() ) { 715 throw new IllegalStateException("PageFile is not loaded"); 716 } 717 } 718 void assertNotLoaded() throws IllegalStateException { 719 if( loaded.get() ) { 720 throw new IllegalStateException("PageFile is loaded"); 721 } 722 } 723 724 /** 725 * Allocates a block of free pages that you can write data to. 726 * 727 * @param count the number of sequential pages to allocate 728 * @return the first page of the sequential set. 729 * @throws IOException 730 * If an disk error occurred. 731 * @throws IllegalStateException 732 * if the PageFile is not loaded 733 */ 734 <T> Page<T> allocate(int count) throws IOException { 735 assertLoaded(); 736 if (count <= 0) { 737 throw new IllegalArgumentException("The allocation count must be larger than zero"); 738 } 739 740 Sequence seq = freeList.removeFirstSequence(count); 741 742 // We may need to create new free pages... 743 if (seq == null) { 744 745 Page<T> first = null; 746 int c = count; 747 while (c > 0) { 748 Page<T> page = new Page<T>(nextFreePageId.getAndIncrement()); 749 page.makeFree(getNextWriteTransactionId()); 750 751 if (first == null) { 752 first = page; 753 } 754 755 addToCache(page); 756 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 757 page.write(out); 758 write(page, out.getData()); 759 760 // LOG.debug("allocate writing: "+page.getPageId()); 761 c--; 762 } 763 764 return first; 765 } 766 767 Page<T> page = new Page<T>(seq.getFirst()); 768 page.makeFree(0); 769 // LOG.debug("allocated: "+page.getPageId()); 770 return page; 771 } 772 773 long getNextWriteTransactionId() { 774 return nextTxid.incrementAndGet(); 775 } 776 777 void readPage(long pageId, byte[] data) throws IOException { 778 readFile.seek(toOffset(pageId)); 779 readFile.readFully(data); 780 } 781 782 public void freePage(long pageId) { 783 freeList.add(pageId); 784 if( enablePageCaching ) { 785 pageCache.remove(pageId); 786 } 787 } 788 789 @SuppressWarnings("unchecked") 790 private <T> void write(Page<T> page, byte[] data) throws IOException { 791 final PageWrite write = new PageWrite(page, data); 792 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){ 793 public Long getKey() { 794 return write.getPage().getPageId(); 795 } 796 public PageWrite getValue() { 797 return write; 798 } 799 public PageWrite setValue(PageWrite value) { 800 return null; 801 } 802 }; 803 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 804 write(Arrays.asList(entries)); 805 } 806 807 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 808 synchronized( writes ) { 809 if( enabledWriteThread ) { 810 while( writes.size() >= writeBatchSize && !stopWriter.get() ) { 811 try { 812 writes.wait(); 813 } catch (InterruptedException e) { 814 Thread.currentThread().interrupt(); 815 throw new InterruptedIOException(); 816 } 817 } 818 } 819 820 for (Map.Entry<Long, PageWrite> entry : updates) { 821 Long key = entry.getKey(); 822 PageWrite value = entry.getValue(); 823 PageWrite write = writes.get(key); 824 if( write==null ) { 825 writes.put(key, value); 826 } else { 827 write.setCurrent(value.page, value.current); 828 } 829 } 830 831 // Once we start approaching capacity, notify the writer to start writing 832 if( canStartWriteBatch() ) { 833 if( enabledWriteThread ) { 834 writes.notify(); 835 } else { 836 writeBatch(); 837 } 838 } 839 } 840 } 841 842 private boolean canStartWriteBatch() { 843 int capacityUsed = ((writes.size() * 100)/writeBatchSize); 844 if( enabledWriteThread ) { 845 // The constant 10 here controls how soon write batches start going to disk.. 846 // would be nice to figure out how to auto tune that value. Make to small and 847 // we reduce through put because we are locking the write mutex too often doing writes 848 return capacityUsed >= 10 || checkpointLatch!=null; 849 } else { 850 return capacityUsed >= 80 || checkpointLatch!=null; 851 } 852 } 853 854 /////////////////////////////////////////////////////////////////// 855 // Cache Related operations 856 /////////////////////////////////////////////////////////////////// 857 @SuppressWarnings("unchecked") 858 <T> Page<T> getFromCache(long pageId) { 859 synchronized(writes) { 860 PageWrite pageWrite = writes.get(pageId); 861 if( pageWrite != null ) { 862 return pageWrite.page; 863 } 864 } 865 866 Page<T> result = null; 867 if (enablePageCaching) { 868 result = pageCache.get(pageId); 869 } 870 return result; 871 } 872 873 void addToCache(Page page) { 874 if (enablePageCaching) { 875 pageCache.put(page.getPageId(), page); 876 } 877 } 878 879 void removeFromCache(Page page) { 880 if (enablePageCaching) { 881 pageCache.remove(page.getPageId()); 882 } 883 } 884 885 /////////////////////////////////////////////////////////////////// 886 // Internal Double write implementation follows... 887 /////////////////////////////////////////////////////////////////// 888 /** 889 * 890 */ 891 private void pollWrites() { 892 try { 893 while( !stopWriter.get() ) { 894 // Wait for a notification... 895 synchronized( writes ) { 896 writes.notifyAll(); 897 898 // If there is not enough to write, wait for a notification... 899 while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) { 900 writes.wait(100); 901 } 902 903 if( writes.isEmpty() ) { 904 releaseCheckpointWaiter(); 905 } 906 } 907 writeBatch(); 908 } 909 } catch (Throwable e) { 910 e.printStackTrace(); 911 } finally { 912 releaseCheckpointWaiter(); 913 } 914 } 915 916 /** 917 * 918 * @param timeout 919 * @param unit 920 * @return true if there are still pending writes to do. 921 * @throws InterruptedException 922 * @throws IOException 923 */ 924 private void writeBatch() throws IOException { 925 926 CountDownLatch checkpointLatch; 927 ArrayList<PageWrite> batch; 928 synchronized( writes ) { 929 // If there is not enough to write, wait for a notification... 930 931 batch = new ArrayList<PageWrite>(writes.size()); 932 // build a write batch from the current write cache. 933 for (PageWrite write : writes.values()) { 934 batch.add(write); 935 // Move the current write to the diskBound write, this lets folks update the 936 // page again without blocking for this write. 937 write.begin(); 938 if (write.diskBound == null) { 939 batch.remove(write); 940 } 941 } 942 943 // Grab on to the existing checkpoint latch cause once we do this write we can 944 // release the folks that were waiting for those writes to hit disk. 945 checkpointLatch = this.checkpointLatch; 946 this.checkpointLatch=null; 947 } 948 949 try { 950 if (enableRecoveryFile) { 951 952 // Using Adler-32 instead of CRC-32 because it's much faster and 953 // it's 954 // weakness for short messages with few hundred bytes is not a 955 // factor in this case since we know 956 // our write batches are going to much larger. 957 Checksum checksum = new Adler32(); 958 for (PageWrite w : batch) { 959 try { 960 checksum.update(w.diskBound, 0, pageSize); 961 } catch (Throwable t) { 962 throw IOExceptionSupport.create( 963 "Cannot create recovery file. Reason: " + t, t); 964 } 965 } 966 967 // Can we shrink the recovery buffer?? 968 if (recoveryPageCount > recoveryFileMaxPageCount) { 969 int t = Math.max(recoveryFileMinPageCount, batch.size()); 970 recoveryFile.setLength(recoveryFileSizeForPages(t)); 971 } 972 973 // Record the page writes in the recovery buffer. 974 recoveryFile.seek(0); 975 // Store the next tx id... 976 recoveryFile.writeLong(nextTxid.get()); 977 // Store the checksum for thw write batch so that on recovery we 978 // know if we have a consistent 979 // write batch on disk. 980 recoveryFile.writeLong(checksum.getValue()); 981 // Write the # of pages that will follow 982 recoveryFile.writeInt(batch.size()); 983 984 // Write the pages. 985 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 986 987 for (PageWrite w : batch) { 988 recoveryFile.writeLong(w.page.getPageId()); 989 recoveryFile.write(w.diskBound, 0, pageSize); 990 } 991 992 if (enableDiskSyncs) { 993 // Sync to make sure recovery buffer writes land on disk.. 994 recoveryFile.getFD().sync(); 995 } 996 997 recoveryPageCount = batch.size(); 998 } 999 1000 for (PageWrite w : batch) { 1001 writeFile.seek(toOffset(w.page.getPageId())); 1002 writeFile.write(w.diskBound, 0, pageSize); 1003 w.done(); 1004 } 1005 1006 // Sync again 1007 if (enableDiskSyncs) { 1008 writeFile.getFD().sync(); 1009 } 1010 1011 } finally { 1012 synchronized (writes) { 1013 for (PageWrite w : batch) { 1014 // If there are no more pending writes, then remove it from 1015 // the write cache. 1016 if (w.isDone()) { 1017 writes.remove(w.page.getPageId()); 1018 } 1019 } 1020 } 1021 1022 if( checkpointLatch!=null ) { 1023 checkpointLatch.countDown(); 1024 } 1025 } 1026 } 1027 1028 private long recoveryFileSizeForPages(int pageCount) { 1029 return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount); 1030 } 1031 1032 private void releaseCheckpointWaiter() { 1033 if( checkpointLatch!=null ) { 1034 checkpointLatch.countDown(); 1035 checkpointLatch=null; 1036 } 1037 } 1038 1039 /** 1040 * Inspects the recovery buffer and re-applies any 1041 * partially applied page writes. 1042 * 1043 * @return the next transaction id that can be used. 1044 * @throws IOException 1045 */ 1046 private long redoRecoveryUpdates() throws IOException { 1047 if( !enableRecoveryFile ) { 1048 return 0; 1049 } 1050 recoveryPageCount=0; 1051 1052 // Are we initializing the recovery file? 1053 if( recoveryFile.length() == 0 ) { 1054 // Write an empty header.. 1055 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1056 // Preallocate the minium size for better performance. 1057 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1058 return 0; 1059 } 1060 1061 // How many recovery pages do we have in the recovery buffer? 1062 recoveryFile.seek(0); 1063 long nextTxId = recoveryFile.readLong(); 1064 long expectedChecksum = recoveryFile.readLong(); 1065 int pageCounter = recoveryFile.readInt(); 1066 1067 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1068 Checksum checksum = new Adler32(); 1069 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1070 try { 1071 for (int i = 0; i < pageCounter; i++) { 1072 long offset = recoveryFile.readLong(); 1073 byte []data = new byte[pageSize]; 1074 if( recoveryFile.read(data, 0, pageSize) != pageSize ) { 1075 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1076 return nextTxId; 1077 } 1078 checksum.update(data, 0, pageSize); 1079 batch.put(offset, data); 1080 } 1081 } catch (Exception e) { 1082 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1083 // as the pages should still be consistent. 1084 LOG.debug("Redo buffer was not fully intact: ", e); 1085 return nextTxId; 1086 } 1087 1088 recoveryPageCount = pageCounter; 1089 1090 // If the checksum is not valid then the recovery buffer was partially written to disk. 1091 if( checksum.getValue() != expectedChecksum ) { 1092 return nextTxId; 1093 } 1094 1095 // Re-apply all the writes in the recovery buffer. 1096 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1097 writeFile.seek(toOffset(e.getKey())); 1098 writeFile.write(e.getValue()); 1099 } 1100 1101 // And sync it to disk 1102 writeFile.getFD().sync(); 1103 return nextTxId; 1104 } 1105 1106 private void startWriter() { 1107 synchronized( writes ) { 1108 if( enabledWriteThread ) { 1109 stopWriter.set(false); 1110 writerThread = new Thread("KahaDB Page Writer") { 1111 @Override 1112 public void run() { 1113 pollWrites(); 1114 } 1115 }; 1116 writerThread.setPriority(Thread.MAX_PRIORITY); 1117 writerThread.setDaemon(true); 1118 writerThread.start(); 1119 } 1120 } 1121 } 1122 1123 private void stopWriter() throws InterruptedException { 1124 if( enabledWriteThread ) { 1125 stopWriter.set(true); 1126 writerThread.join(); 1127 } 1128 } 1129 1130 public File getFile() { 1131 return getMainPageFile(); 1132 } 1133 1134 }