Drizzled Public API Documentation

CSThread.h
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh (H&G2JCtL)
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-20
00023  *
00024  * CORE SYSTEM:
00025  * A independently running thread.
00026  *
00027  */
00028 
00029 #pragma once
00030 #ifndef __CSTHREAD_H__
00031 #define __CSTHREAD_H__
00032 
00033 #include <pthread.h>
00034 #include <setjmp.h>
00035 
00036 #include "CSDefs.h"
00037 #include "CSMutex.h"
00038 #include "CSException.h"
00039 #include "CSObject.h"
00040 #include "CSStorage.h"
00041 
00042 #define CS_THREAD_TYPE        int
00043 
00044 /* Types of threads: */
00045 #define CS_ANY_THREAD       0
00046 #define CS_THREAD         1
00047 
00048 typedef struct CSCallStack {
00049   const char* cs_func;
00050   const char* cs_file;
00051   int     cs_line;
00052 } CSCallStack, *CSCallStackPtr;
00053 
00054 /* 
00055  * The release stack contains objects that need to be
00056  * released when an exception occurs.
00057  */
00058 #define CS_RELEASE_OBJECT   1
00059 #define CS_RELEASE_MUTEX    2
00060 #define CS_RELEASE_POOLED   3
00061 #define CS_RELEASE_MEM      4
00062 #define CS_RELEASE_OBJECT_PTR 5
00063 
00064 typedef struct CSRelease {
00065   int           r_type;
00066   union {
00067     CSObject      *r_object;          /* The object to be released. */
00068     CSMutex       *r_mutex;         /* The mutex to be unlocked! */
00069     CSPooled      *r_pooled;
00070     void        *r_mem;
00071     CSObject      **r_objectPtr;
00072   } x;
00073 } CSReleaseRec, *CSReleasePtr;
00074 
00075 typedef struct CSJumpBuf {
00076   CSReleasePtr      jb_res_top;
00077   int           jb_call_top;
00078   jmp_buf         jb_buffer;
00079 } CSJumpBufRec, *CSJumpBufPtr;
00080 
00081 class CSThreadList: public CSLinkedList, public CSMutex {
00082 public:
00083   CSThreadList():
00084     CSLinkedList(),
00085     CSMutex()
00086   {
00087   }
00088 
00089   virtual ~CSThreadList() {
00090     stopAllThreads();
00091   }
00092 
00096   void signalAllThreads(int sig);
00097 
00098   void quitAllThreads();
00099 
00100   void stopAllThreads();
00101 };
00102 
00103 typedef void *(*ThreadRunFunc)();
00104 
00105 class CSThread : public CSRefObject {
00106 public:
00107   /* The name of the thread. */
00108   CSString    *threadName;
00109   CSThreadList  *myThreadList;        /* The thread list that this thread belongs to. */
00110 
00111   /* If this value is non-zero, this signal is pending and
00112    * must be thrown.
00113    *
00114    * SIGTERM, SIGQUIT - Means the thread has been terminated.
00115    * SIGINT - Means the thread has been interrupted.
00116    *
00117    * When a signal is throw it clears this value. This includes
00118    * the case when system calls return error due to interrupt.
00119    */
00120   int       signalPending;
00121   bool      ignoreSignals;
00122 
00123   /* Set to true once the thread is running (never reset!). */
00124   bool      isRunning;
00125 
00126   /* Set to true when the thread must quit (never reset!): */
00127   bool      myMustQuit; 
00128   
00129   CSException   myException;
00130 #if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
00131 
00132   /* Set to true when this tread was initialized through the internal PBMS api. */
00133   /* When this is the case than it must only be freed via the API as well. */
00134   bool      pbms_api_owner;
00135 
00136   /* Transaction references. */
00137 #ifdef DRIZZLED
00138   CSSortedList  mySavePoints;
00139 #endif
00140   uint32_t    myTID;      // Current transaction ID
00141   uint32_t    myTransRef;   // Reference to the current transaction cache index
00142   bool      myIsAutoCommit; // Is the current transaction in auto commit mode.
00143   uint32_t    myCacheVersion; // The last transaction cache version checked. Used during overflow.
00144   bool      myStartTxn;   // A flag to indicate the start of a new transaction.
00145   uint32_t    myStmtCount;  // Counts the number of statements in the current transaction.
00146   uint32_t    myStartStmt;  // The myStmtCount at the start of the last logical statement. (An update is 2 statements but only 1 logical statement.)
00147   void      *myInfo;
00148 #endif
00149   
00150   /* The call stack */
00151   int       callTop;
00152   CSCallStack   callStack[CS_CALL_STACK_SIZE];
00153 
00154   /* The long jump stack: */
00155   int       jumpDepth;              /* The current jump depth */
00156   CSJumpBufRec  jumpEnv[CS_JUMP_STACK_SIZE];    /* The process environment to be restored on exception */
00157 
00158   /* The release stack */
00159   CSReleasePtr  relTop;               /* The top of the resource stack (reference next free space). */
00160   CSReleaseRec  relStack[CS_RELEASE_STACK_SIZE];  /* Temporary data to be freed if an exception occurs. */
00161 
00162   CSThread(CSThreadList *list):
00163     CSRefObject(),
00164     threadName(NULL),
00165     myThreadList(list),
00166     signalPending(0),
00167     ignoreSignals(false),
00168     isRunning(false),
00169     myMustQuit(false),
00170 #if defined(MYSQL_SERVER) ||  defined(DRIZZLED)
00171     pbms_api_owner(false),
00172     myTID(0),
00173     myTransRef(0),
00174     myIsAutoCommit(true),
00175     myCacheVersion(0),
00176     myStartTxn(true),
00177     myStmtCount(0),
00178     myStartStmt(0),
00179     myInfo(NULL),
00180 #endif
00181     callTop(0),
00182     jumpDepth(0),
00183     relTop(relStack),
00184     iIsMain(false),
00185     isDetached(false),
00186     iRunFunc(NULL),
00187     iNextLink(NULL),
00188     iPrevLink(NULL)
00189   {
00190   }
00191 
00192   virtual ~CSThread() {
00193     if (threadName)
00194       threadName->release();
00195   }
00196 
00202   virtual void *run();
00203 
00209   void start(bool detached = false);
00210 
00211   /*
00212    * Stop execution of the thread.
00213    */
00214   virtual void stop();
00215 
00221   void *join();
00222 
00227   void signal(unsigned int);
00228 
00229   void setSignalPending(unsigned int);
00230 
00237   void interrupted() { if (signalPending) throwSignal(); }
00238   void throwSignal();
00239 
00240   /* Log the stack to the specified depth along with the message. */
00241   void logStack(int depth, const char *msg);
00242 
00243   /* Log the exception, and the current stack. */
00244   void logException();
00245   
00246   /* Log the exception, and the current stack. */
00247   void logMessage();
00248   
00249   /*
00250    * Return true if this is the main thread.
00251    */
00252   bool isMain();
00253 
00254   /*
00255    * Throwing exceptions:
00256    */
00257   void releaseObjects(CSReleasePtr top);
00258   void throwException();
00259   void caught();
00260   bool isMe(CSThread *me) { return (pthread_equal(me->iThread,iThread) != 0);}
00261   /* Make this object linkable: */
00262   virtual CSObject *getNextLink() { return iNextLink; }
00263   virtual CSObject *getPrevLink() { return iPrevLink; }
00264   virtual void setNextLink(CSObject *link) { iNextLink = link; }
00265   virtual void setPrevLink(CSObject *link) { iPrevLink = link; }
00266 
00267   friend class CSDaemon;
00268 
00269 private:
00270   pthread_t   iThread;
00271   bool      iIsMain;
00272   bool      isDetached;
00273   ThreadRunFunc iRunFunc;
00274   CSObject    *iNextLink;
00275   CSObject    *iPrevLink;
00276 
00277   void addToList();
00278   void removeFromList();
00279 
00280 public:
00281   /* Each thread stores is thread object in this key: */
00282   static pthread_key_t sThreadKey;
00283 
00293   static void sleep(unsigned long timeout);
00294 
00295   /* Do static initialization and de-initialization. */
00296   static bool isUp;
00297   static bool startUp();
00298   static void shutDown();
00299 
00300   /* Attach and detach an already running thread: */
00301   static bool attach(CSThread *thread);
00302   static void detach(CSThread *thread);
00303 
00308   static CSThread *getSelf();
00309   static bool setSelf(CSThread *self);
00310 
00311   static CSThread *newCSThread();
00312   static CSThread *newThread(CSString *name, ThreadRunFunc run_func, CSThreadList *list);
00313 
00314   /* called for a newly created thread. */
00315   static void *dispatch(void *arg);
00316 
00317 };
00318 
00319 class CSDaemon : public CSThread, public CSSync {
00320 public:
00321   time_t      myWaitTime;         /* Wait time in milli-seconds */
00322 
00323   CSDaemon(time_t wait_time, CSThreadList *list);
00324   CSDaemon(CSThreadList *list);
00325   virtual ~CSDaemon() { }
00326 
00327   virtual void *run();
00328 
00329   /* Return false if startup failed, and the thread must quit. */
00330   virtual bool initializeWork() { return true; };
00331 
00332   /* Return true of the thread should sleep before doing more work. */
00333   virtual bool doWork();
00334 
00335   virtual void *completeWork() { return NULL; };
00336 
00337   /* Return false if the excpetion is not handled and the thread must quit.
00338    * Set must_sleep to true of the thread should pause before doing work
00339    * again.
00340    */
00341   virtual bool handleException();
00342 
00343   virtual void stop();
00344 
00345   void wakeup();
00346 
00347   void suspend();
00348 
00349   bool isSuspend() { return (iSuspendCount != 0);} // Don't use iSuspended, we are interested in if suspend() was called.
00350 
00351   void resume();
00352 
00353   virtual void returnToPool() {
00354     resume();
00355     release();
00356   }
00357 
00358   void suspended();
00359 
00360   void suspendedWait();
00361 
00362   void suspendedWait(time_t milli_sec);
00363 
00364 private:
00365   void    try_Run(CSThread *self, const bool must_sleep);
00366   bool    iSuspended;
00367   uint32_t  iSuspendCount;
00368 };
00369 
00370 #endif