Drizzled Public API Documentation

temp_log_ms.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-07-03
00023  *
00024  * H&G2JCtL
00025  *
00026  * Network interface.
00027  *
00028  */
00029 
00030 #include "cslib/CSConfig.h"
00031 
00032 #include <stddef.h>
00033 
00034 #include "defs_ms.h"
00035 
00036 #include "cslib/CSGlobal.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSStorage.h"
00039 
00040 #include "temp_log_ms.h"
00041 #include "open_table_ms.h"
00042 #include "trans_log_ms.h"
00043 #include "transaction_ms.h"
00044 #include "parameters_ms.h"
00045 
00046 
00047 // Search the transaction log for a MS_ReferenceTxn record for the given BLOB.
00048 // Just search the log file and not the cache. Seaching the cache may be faster but
00049 // it would require locks that could block the writers or reader threads and in the worse
00050 // case it will still require the reading of the log anyway.
00051 //
00052 // This search doesn't distinguish between transactions that are still running and
00053 // transactions that are rolled back.
00054 class SearchTXNLog : ReadTXNLog {
00055   public:
00056   SearchTXNLog(uint32_t db_id, MSTrans *log): ReadTXNLog(log), st_db_id(db_id) {}
00057   
00058   bool  st_found;
00059   bool  st_terminated;
00060   bool  st_commited;
00061   uint32_t st_tid; 
00062   uint32_t st_db_id; 
00063   uint32_t st_tab_id; 
00064   uint64_t st_blob_id;
00065   
00066   virtual bool rl_CanContinue() { return ((!st_found) || !st_terminated);}
00067   virtual void rl_Load(uint64_t log_position, MSTransPtr rec) 
00068   {
00069     (void) log_position;
00070     
00071     if ( !st_found && (TRANS_TYPE(rec->tr_type) != MS_ReferenceTxn))
00072       return;
00073     
00074     if (!st_found) {
00075       if  ((rec->tr_db_id == st_db_id) && (rec->tr_tab_id == st_tab_id) && (rec->tr_blob_id == st_blob_id)) {
00076         st_found = true;
00077         st_tid = rec->tr_id;
00078       } else
00079         return;
00080     }
00081     st_terminated = TRANS_IS_TERMINATED(rec->tr_type);
00082     if (st_terminated)
00083       st_commited = (TRANS_IS_AUTOCOMMIT(rec->tr_type) || (TRANS_TYPE(rec->tr_type) == MS_CommitTxn));
00084   }
00085   
00086   bool st_FindBlobRef(bool *committed, uint32_t tab_id, uint64_t blob_id)
00087   {
00088     enter_();
00089     st_found = st_terminated = st_commited = false;
00090     st_tab_id = tab_id;
00091     st_blob_id = blob_id; 
00092     
00093     rl_ReadLog(rl_log->txn_GetStartPosition(), false);
00094     *committed = st_commited;
00095     return_(st_found);
00096   }
00097 };
00098 
00099 MSTempLogFile::MSTempLogFile():
00100 CSReadBufferedFile(),
00101 myTempLogID(0),
00102 myTempLog(NULL)
00103 {
00104 }
00105 
00106 MSTempLogFile::~MSTempLogFile()
00107 {
00108   close();
00109   if (myTempLog)
00110     myTempLog->release();
00111 }
00112 
00113 MSTempLogFile *MSTempLogFile::newTempLogFile(uint32_t id, MSTempLog *temp_log, CSFile *file)
00114 {
00115   MSTempLogFile *f;
00116   enter_();
00117   
00118   push_(temp_log);
00119   push_(file);
00120   
00121   if (!(f = new MSTempLogFile())) 
00122     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00123 
00124   f->myTempLogID = id;
00125   
00126   pop_(file);
00127   f->setFile(file);
00128   
00129   pop_(temp_log);
00130   f->myTempLog = temp_log;
00131   return_(f);
00132 }
00133 
00134 MSTempLog::MSTempLog(uint32_t id, MSDatabase *db, off64_t file_size):
00135 CSRefObject(),
00136 myLogID(id),
00137 myTempLogSize(file_size),
00138 myTemplogRecSize(0),
00139 myTempLogHeadSize(0),
00140 iLogDatabase(db),
00141 iDeleteLog(false)
00142 {
00143 }
00144 
00145 MSTempLog::~MSTempLog()
00146 {
00147   enter_();
00148   if (iDeleteLog) {
00149     CSPath *path;
00150 
00151     path = getLogPath();
00152     push_(path);
00153     path->removeFile();
00154     release_(path);
00155   }
00156   exit_();
00157 }
00158 
00159 void MSTempLog::deleteLog()
00160 {
00161   iDeleteLog = true;
00162 }
00163 
00164 CSPath *MSTempLog::getLogPath()
00165 {
00166   char file_name[120];
00167 
00168   cs_strcpy(120, file_name, "bs-logs");
00169   cs_add_dir_char(120, file_name);
00170   cs_strcat(120, file_name, "temp-");
00171   cs_strcat(120, file_name, myLogID);
00172   cs_strcat(120, file_name, ".bs");
00173   return CSPath::newPath(RETAIN(iLogDatabase->myDatabasePath), file_name);
00174 }
00175 
00176 MSTempLogFile *MSTempLog::openTempLog()
00177 {
00178   CSPath      *path;
00179   MSTempLogFile *fh;
00180 
00181   enter_();
00182   path = getLogPath();
00183   retain();
00184   fh = MSTempLogFile::newTempLogFile(myLogID, this, CSFile::newFile(path));
00185   push_(fh);
00186   if (myTempLogSize)
00187     fh->open(CSFile::DEFAULT);
00188   else
00189     fh->open(CSFile::CREATE);
00190   if (!myTempLogHeadSize) {
00191     MSTempLogHeadRec  head;
00192 
00193     lock_(iLogDatabase->myTempLogArray);
00194     /* Check again after locking: */
00195     if (!myTempLogHeadSize) {
00196       size_t rem;
00197 
00198       if (fh->read(&head, 0, offsetof(MSTempLogHeadRec, th_reserved_4), 0) < offsetof(MSTempLogHeadRec, th_reserved_4)) {
00199         CS_SET_DISK_4(head.th_magic_4, MS_TEMP_LOG_MAGIC);
00200         CS_SET_DISK_2(head.th_version_2, MS_TEMP_LOG_VERSION);
00201         CS_SET_DISK_2(head.th_head_size_2, MS_TEMP_LOG_HEAD_SIZE);
00202         CS_SET_DISK_2(head.th_rec_size_2, sizeof(MSTempLogItemRec));
00203         CS_SET_DISK_4(head.th_reserved_4, 0);
00204         fh->write(&head, 0, sizeof(MSTempLogHeadRec));
00205         fh->flush();
00206       }
00207       
00208       /* Check the file header: */
00209       if (CS_GET_DISK_4(head.th_magic_4) != MS_TEMP_LOG_MAGIC)
00210         CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
00211       if (CS_GET_DISK_2(head.th_version_2) > MS_TEMP_LOG_VERSION)
00212         CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
00213 
00214       /* Load the header details: */
00215       myTempLogHeadSize = CS_GET_DISK_2(head.th_head_size_2);
00216       myTemplogRecSize = CS_GET_DISK_2(head.th_rec_size_2);
00217 
00218       /* File size, cannot be less than header size, adjust to correct offset: */
00219       if (myTempLogSize < myTempLogHeadSize)
00220         myTempLogSize = myTempLogHeadSize;
00221       if ((rem = (myTempLogSize - myTempLogHeadSize) % myTemplogRecSize))
00222         myTempLogSize += myTemplogRecSize - rem;
00223     }
00224     unlock_(iLogDatabase->myTempLogArray);
00225   }
00226   pop_(fh);
00227   return_(fh);
00228 }
00229 
00230 time_t MSTempLog::adjustWaitTime(time_t then, time_t now)
00231 {
00232   time_t wait;
00233 
00234   if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00235     wait = ((then + PBMSParameters::getTempBlobTimeout() - now) * 1000);
00236     if (wait < 2000)
00237       wait = 2000;
00238     else if (wait > 120 * 1000)
00239       wait = 120 * 1000;
00240   }
00241   else
00242     wait = 1;
00243       
00244   return wait;
00245 }
00246 
00247 /*
00248  * ---------------------------------------------------------------
00249  * TEMP LOG THREAD
00250  */
00251 
00252 MSTempLogThread::MSTempLogThread(time_t wait_time, MSDatabase *db):
00253 CSDaemon(wait_time, NULL),
00254 iTempLogDatabase(db),
00255 iTempLogFile(NULL),
00256 iLogRecSize(0),
00257 iLogOffset(0)
00258 {
00259 }
00260 
00261 
00262 void MSTempLogThread::close()
00263 {
00264   if (iTempLogFile) {
00265     iTempLogFile->release();
00266     iTempLogFile = NULL;
00267   }
00268 }
00269 
00270 bool MSTempLogThread::try_ReleaseBLOBReference(CSThread *self, CSStringBuffer *buffer, uint32_t tab_id, int type, uint64_t blob_id, uint32_t auth_code)
00271 {
00272   volatile bool rtc = true;
00273   try_(a) {
00274     /* Release the BLOB reference. */
00275     MSOpenTable *otab;
00276 
00277     if (type == MS_TL_REPO_REF) {
00278       MSRepoFile  *repo_file;
00279 
00280       if ((repo_file = iTempLogDatabase->getRepoFileFromPool(tab_id, true))) {
00281         frompool_(repo_file);
00282         repo_file->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
00283         backtopool_(repo_file);
00284       }
00285     }
00286     else {
00287       if ((otab = MSTableList::getOpenTableByID(iTempLogDatabase->myDatabaseID, tab_id))) {
00288         frompool_(otab);
00289         if (type == MS_TL_BLOB_REF) {
00290           otab->checkBlob(buffer, blob_id, auth_code, iTempLogFile->myTempLogID, iLogOffset);
00291           backtopool_(otab);
00292         }
00293         else {
00294           ASSERT(type == MS_TL_TABLE_REF);
00295           if ((type == MS_TL_TABLE_REF) && otab->deleteReferences(iTempLogFile->myTempLogID, iLogOffset, &myMustQuit)) {
00296             /* Delete the file now... */
00297             MSTable     *tab;
00298             CSPath      *from_path;
00299             MSOpenTablePool *tab_pool;
00300 
00301             tab = otab->getDBTable();
00302             from_path = otab->getDBTable()->getTableFile();
00303 
00304             pop_(otab);
00305 
00306             push_(from_path);
00307             tab->retain();
00308             push_(tab);
00309 
00310             tab_pool = MSTableList::lockTablePoolForDeletion(otab); // This returns otab to the pool.
00311             frompool_(tab_pool);
00312 
00313             from_path->removeFile();
00314             tab->myDatabase->removeTable(tab);
00315 
00316             backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
00317             pop_(tab);        // Returning the pool will have released this. (YUK!)
00318             release_(from_path);
00319           }
00320           else 
00321             backtopool_(otab);
00322         }
00323       }
00324     }
00325     
00326     rtc = false;
00327   }
00328   
00329   catch_(a);
00330   cont_(a);
00331   return rtc;
00332 }
00333 
00334 bool MSTempLogThread::doWork()
00335 {
00336   size_t        tfer;
00337   MSTempLogItemRec  log_item;
00338   CSStringBuffer    *buffer;
00339   SearchTXNLog    txn_log(iTempLogDatabase->myDatabaseID, MSTransactionManager::tm_Log);
00340 
00341   enter_();
00342   new_(buffer, CSStringBuffer(20));
00343   push_(buffer);
00344   while (!myMustQuit) {
00345     if (!iTempLogFile) {
00346       size_t head_size;
00347       if (!(iTempLogFile = iTempLogDatabase->openTempLogFile(0, &iLogRecSize, &head_size))) {
00348         release_(buffer);
00349         return_(true);
00350       }
00351       iLogOffset = head_size;
00352     }
00353 
00354     tfer = iTempLogFile->read(&log_item, iLogOffset, sizeof(MSTempLogItemRec), 0);
00355     if (tfer == 0) {
00356       /* No more data to be read: */
00357 
00358       /* Check to see if there is a log after this: */
00359       if (iTempLogDatabase->getTempLogCount() <= 1) {
00360         /* The next log does not yet exist. We wait for
00361          * it to be created before we delete and
00362          * close the current log.
00363          */
00364         myWaitTime = PBMSParameters::getTempBlobTimeout() * 1000;
00365         break;
00366       }
00367 
00368       iTempLogFile->myTempLog->deleteLog();
00369       iTempLogDatabase->removeTempLog(iTempLogFile->myTempLogID);
00370       close();
00371     }
00372     else if (tfer == sizeof(MSTempLogItemRec)) {
00373       /* We have a record: */
00374       int   type;
00375       uint32_t tab_id;
00376       uint64_t blob_id= 0;
00377       uint32_t auth_code;
00378       uint32_t then;
00379       time_t  now;
00380 
00381       /*
00382        * Items in the temp log are never updated.
00383        * If a temp operation is canceled then the object 
00384        * records this itself and when the temp operation 
00385        * is attempted it will recognize by the templog
00386        * id and offset that it is no longer a valid 
00387        * operation.
00388        */
00389       tab_id = CS_GET_DISK_4(log_item.ti_table_id_4);
00390         
00391       type = CS_GET_DISK_1(log_item.ti_type_1);
00392       blob_id = CS_GET_DISK_6(log_item.ti_blob_id_6);
00393       auth_code = CS_GET_DISK_4(log_item.ti_auth_code_4);
00394       then = CS_GET_DISK_4(log_item.ti_time_4);
00395 
00396       now = time(NULL);
00397       if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00398         /* Time has not yet exired, adjust wait time: */
00399         myWaitTime = MSTempLog::adjustWaitTime(then, now);
00400         break;
00401       }
00402     
00403       if (try_ReleaseBLOBReference(self, buffer, tab_id, type, blob_id, auth_code)) {
00404         int err = self->myException.getErrorCode();
00405         
00406         if (err == MS_ERR_TABLE_LOCKED) {
00407           throw_();
00408         }
00409         else if (err == MS_ERR_REMOVING_REPO) {
00410           /* Wait for the compactor to finish: */
00411           myWaitTime = 2 * 1000;
00412           release_(buffer);
00413           return_(true);
00414         }
00415         else if ((err == MS_ERR_UNKNOWN_TABLE) || (err == MS_ERR_DATABASE_DELETED))
00416           ;
00417         else
00418           self->myException.log(NULL);
00419       }
00420 
00421     }
00422     else {
00423       // Only part of the data read, don't wait very long to try again:
00424       myWaitTime = 2 * 1000;
00425       break;
00426     }
00427     iLogOffset += iLogRecSize;
00428   }
00429 
00430   release_(buffer);
00431   return_(true);
00432 }
00433 
00434 void *MSTempLogThread::completeWork()
00435 {
00436   close();
00437   return NULL;
00438 }
00439