OpenWalnut 1.2.5
|
00001 //--------------------------------------------------------------------------- 00002 // 00003 // Project: OpenWalnut ( http://www.openwalnut.org ) 00004 // 00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS 00006 // For more information see http://www.openwalnut.org/copying 00007 // 00008 // This file is part of OpenWalnut. 00009 // 00010 // OpenWalnut is free software: you can redistribute it and/or modify 00011 // it under the terms of the GNU Lesser General Public License as published by 00012 // the Free Software Foundation, either version 3 of the License, or 00013 // (at your option) any later version. 00014 // 00015 // OpenWalnut is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 // GNU Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public License 00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>. 00022 // 00023 //--------------------------------------------------------------------------- 00024 00025 #ifndef WTHREADEDFUNCTION_H 00026 #define WTHREADEDFUNCTION_H 00027 00028 #include <memory.h> 00029 #include <iostream> 00030 00031 #include <string> 00032 #include <vector> 00033 #include <boost/thread.hpp> 00034 00035 #include "WAssert.h" 00036 #include "WWorkerThread.h" 00037 #include "WSharedObject.h" 00038 #include "WExportCommon.h" 00039 00040 /** 00041 * An enum indicating the status of a multithreaded computation 00042 */ 00043 enum WThreadedFunctionStatus 00044 { 00045 W_THREADS_INITIALIZED, //! the status after constructing the function 00046 W_THREADS_RUNNING, //! the threads were started 00047 W_THREADS_STOP_REQUESTED, //! a stop was requested and not all threads have stopped yet 00048 W_THREADS_ABORTED, //! at least one thread was aborted due to a stop request or an exception 00049 W_THREADS_FINISHED //! all threads completed their work successfully 00050 }; 00051 00052 /** 00053 * An enum indicating the number of threads used 00054 */ 00055 enum WThreadedFunctionNbThreads 00056 { 00057 W_AUTOMATIC_NB_THREADS = 0 //!< Use half the available cores as number of threads 00058 }; 00059 00060 /** 00061 * \class WThreadedFunctionBase 00062 * 00063 * A virtual base class for threaded functions (see below). 00064 */ 00065 class OWCOMMON_EXPORT WThreadedFunctionBase // NOLINT 00066 { 00067 //! a type for exception signals 00068 typedef boost::signal< void ( WException const& ) > ExceptionSignal; 00069 00070 public: 00071 00072 //! a type for exception callbacks 00073 typedef boost::function< void ( WException const& ) > ExceptionFunction; 00074 00075 /** 00076 * Standard constructor. 00077 */ 00078 WThreadedFunctionBase(); 00079 00080 /** 00081 * Destroys the thread pool and stops all threads, if any one of them is still running. 00082 * 00083 * \note Of course, the client has to make sure the threads do not work endlessly on a single job. 00084 */ 00085 virtual ~WThreadedFunctionBase(); 00086 00087 /** 00088 * Starts the threads. 00089 */ 00090 virtual void run() = 0; 00091 00092 /** 00093 * Request all threads to stop. Returns immediately, so you might 00094 * have to wait() for the threads to actually finish. 00095 */ 00096 virtual void stop() = 0; 00097 00098 /** 00099 * Wait for all threads to stop. 00100 */ 00101 virtual void wait() = 0; 00102 00103 /** 00104 * Get the status of the threads. 00105 * 00106 * \return The current status. 00107 */ 00108 WThreadedFunctionStatus status(); 00109 00110 /** 00111 * Returns a condition that gets fired when all threads have finished. 00112 * 00113 * \return The condition indicating all threads are done. 00114 */ 00115 boost::shared_ptr< WCondition > getThreadsDoneCondition(); 00116 00117 /** 00118 * Subscribe a function to an exception signal. 00119 * 00120 * \param func The function to subscribe. 00121 */ 00122 void subscribeExceptionSignal( ExceptionFunction func ); 00123 00124 protected: 00125 /** 00126 * WThreadedFunctionBase is non-copyable, so the copy constructor is not implemented. 00127 */ 00128 WThreadedFunctionBase( WThreadedFunctionBase const& ); // NOLINT 00129 00130 /** 00131 * WThreadedFunctionBase is non-copyable, so the copy operator is not implemented. 00132 * 00133 * \return this function 00134 */ 00135 WThreadedFunctionBase& operator = ( WThreadedFunctionBase const& ); 00136 00137 //! a condition that gets notified when the work is complete 00138 boost::shared_ptr< WCondition > m_doneCondition; 00139 00140 //! a signal for exceptions 00141 ExceptionSignal m_exceptionSignal; 00142 00143 //! the current status 00144 WSharedObject< WThreadedFunctionStatus > m_status; 00145 }; 00146 00147 /** 00148 * \class WThreadedFunction 00149 * 00150 * Creates threads that computes a function in a multithreaded fashion. The template parameter 00151 * is an object that provides a function to execute. The following function needs to be implemented: 00152 * 00153 * void operator ( std::size_t id, std::size_t mx, WBoolFlag const& s ); 00154 * 00155 * Here, 'id' is the number of the thread currently executing the function, ranging from 00156 * 0 to mx - 1, where 'mx' is the number of threads running. 's' is a flag that indicates 00157 * if the execution should be stopped. Make sure to check the flag often, so that the threads 00158 * can be stopped when needed. 00159 * 00160 * This class itself is NOT thread-safe, do not access it from different threads simultaneously. 00161 * Also, make sure any resources used by your function are accessed in a threadsafe manner, 00162 * as all threads share the same function object. 00163 * 00164 * Any exception thrown by your function will be caught and forwarded via the exception 00165 * signal. Beware that the signal function will be called in the executing threads, as opposed 00166 * to in your module thread. This means that the exception handler bound to the exception 00167 * signal must be threadsafe too. 00168 * 00169 * The status of the execution can be checked via the status() function. Also, when all threads 00170 * finish (due to throwing exceptions or actually successfully finishing computation ), a condition 00171 * will be notified. 00172 * 00173 * \ingroup common 00174 */ 00175 template< class Function_T > 00176 class WThreadedFunction : public WThreadedFunctionBase 00177 { 00178 //! a type for exception signals 00179 typedef boost::signal< void ( WException const& ) > ExceptionSignal; 00180 00181 public: 00182 00183 //! a type for exception callbacks 00184 typedef boost::function< void ( WException const& ) > ExceptionFunction; 00185 00186 /** 00187 * Creates the thread pool with a given number of threads. 00188 * 00189 * \param numThreads The number of threads to create. 00190 * \param function The function object. 00191 * 00192 * \note If the number of threads equals 0, a good number of threads will be determined by the threadpool. 00193 */ 00194 WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function ); 00195 00196 /** 00197 * Destroys the thread pool and stops all threads, if any one of them is still running. 00198 * 00199 * \note Of course, the client has to make sure the threads do not work endlessly on a single job. 00200 */ 00201 virtual ~WThreadedFunction(); 00202 00203 /** 00204 * Starts the threads. 00205 */ 00206 virtual void run(); 00207 00208 /** 00209 * Request all threads to stop. Returns immediately, so you might 00210 * have to wait() for the threads to actually finish. 00211 */ 00212 virtual void stop(); 00213 00214 /** 00215 * Wait for all threads to stop. 00216 */ 00217 virtual void wait(); 00218 00219 private: 00220 /** 00221 * WThreadedFunction is non-copyable, so the copy constructor is not implemented. 00222 */ 00223 WThreadedFunction( WThreadedFunction const& ); // NOLINT 00224 00225 /** 00226 * WThreadedFunction is non-copyable, so the copy operator is not implemented. 00227 * 00228 * \return this function 00229 */ 00230 WThreadedFunction& operator = ( WThreadedFunction const& ); 00231 00232 /** 00233 * This function gets subscribed to the threads' stop signals. 00234 */ 00235 void handleThreadDone(); 00236 00237 /** 00238 * This function handles exceptions thrown in the worker threads. 00239 * 00240 * \param e The exception that was thrown. 00241 */ 00242 void handleThreadException( WException const& e ); 00243 00244 //! the number of threads to manage 00245 std::size_t m_numThreads; 00246 00247 //! the threads 00248 // use shared_ptr here, because WWorkerThread is non-copyable 00249 std::vector< boost::shared_ptr< WWorkerThread< Function_T > > > m_threads; 00250 00251 //! the function object 00252 boost::shared_ptr< Function_T > m_func; 00253 00254 //! a counter that keeps track of how many threads have finished 00255 WSharedObject< std::size_t > m_threadsDone; 00256 }; 00257 00258 template< class Function_T > 00259 WThreadedFunction< Function_T >::WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function ) 00260 : WThreadedFunctionBase(), 00261 m_numThreads( numThreads ), 00262 m_threads(), 00263 m_func( function ), 00264 m_threadsDone() 00265 { 00266 if( !m_func ) 00267 { 00268 throw WException( std::string( "No valid thread function pointer." ) ); 00269 } 00270 00271 // find a suitable number of threads 00272 if( m_numThreads == W_AUTOMATIC_NB_THREADS ) 00273 { 00274 m_numThreads = 1; 00275 while( m_numThreads < boost::thread::hardware_concurrency() / 2 && m_numThreads < 1024 ) 00276 { 00277 m_numThreads *= 2; 00278 } 00279 } 00280 00281 // set number of finished threads to 0 00282 m_threadsDone.getWriteTicket()->get() = 0; 00283 00284 // create threads 00285 for( std::size_t k = 0; k < m_numThreads; ++k ) 00286 { 00287 boost::shared_ptr< WWorkerThread< Function_T > > t( new WWorkerThread< Function_T >( m_func, k, m_numThreads ) ); 00288 t->subscribeStopSignal( boost::bind( &WThreadedFunction::handleThreadDone, this ) ); 00289 t->subscribeExceptionSignal( boost::bind( &WThreadedFunction::handleThreadException, this, _1 ) ); 00290 m_threads.push_back( t ); 00291 } 00292 } 00293 00294 template< class Function_T > 00295 WThreadedFunction< Function_T >::~WThreadedFunction() 00296 { 00297 stop(); 00298 } 00299 00300 template< class Function_T > 00301 void WThreadedFunction< Function_T >::run() 00302 { 00303 // set the number of finished threads to 0 00304 m_threadsDone.getWriteTicket()->get() = 0; 00305 // change status 00306 m_status.getWriteTicket()->get() = W_THREADS_RUNNING; 00307 // start threads 00308 for( std::size_t k = 0; k < m_numThreads; ++k ) 00309 { 00310 m_threads[ k ]->run(); 00311 } 00312 } 00313 00314 template< class Function_T > 00315 void WThreadedFunction< Function_T >::stop() 00316 { 00317 // change status 00318 m_status.getWriteTicket()->get() = W_THREADS_STOP_REQUESTED; 00319 00320 typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it; 00321 // tell the threads to stop 00322 for( it = m_threads.begin(); it != m_threads.end(); ++it ) 00323 { 00324 ( *it )->requestStop(); 00325 } 00326 } 00327 00328 template< class Function_T > 00329 void WThreadedFunction< Function_T >::wait() 00330 { 00331 typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it; 00332 // wait for the threads to stop 00333 for( it = m_threads.begin(); it != m_threads.end(); ++it ) 00334 { 00335 ( *it )->wait(); 00336 } 00337 } 00338 00339 template< class Function_T > 00340 void WThreadedFunction< Function_T >::handleThreadDone() 00341 { 00342 typedef typename WSharedObject< std::size_t >::WriteTicket WT; 00343 00344 WT t = m_threadsDone.getWriteTicket(); 00345 WAssert( t->get() < m_numThreads, "" ); 00346 ++t->get(); 00347 std::size_t k = t->get(); 00348 t = WT(); 00349 00350 if( m_numThreads == k ) 00351 { 00352 typedef typename WSharedObject< WThreadedFunctionStatus >::WriteTicket ST; 00353 ST s = m_status.getWriteTicket(); 00354 if( s->get() == W_THREADS_RUNNING ) 00355 { 00356 s->get() = W_THREADS_FINISHED; 00357 } 00358 else if( s->get() == W_THREADS_STOP_REQUESTED ) 00359 { 00360 s->get() = W_THREADS_ABORTED; 00361 } 00362 else 00363 { 00364 throw WException( std::string( "Invalid status change." ) ); 00365 } 00366 m_doneCondition->notify(); 00367 } 00368 } 00369 00370 template< class Function_T > 00371 void WThreadedFunction< Function_T >::handleThreadException( WException const& e ) 00372 { 00373 // change status 00374 typedef typename WSharedObject< WThreadedFunctionStatus >::WriteTicket WT; 00375 WT w = m_status.getWriteTicket(); 00376 WAssert( w->get() != W_THREADS_FINISHED && 00377 w->get() != W_THREADS_ABORTED, "" ); 00378 if( w->get() == W_THREADS_RUNNING ) 00379 { 00380 w->get() = W_THREADS_STOP_REQUESTED; 00381 } 00382 // force destruction of the write ticket 00383 w = WT(); 00384 // update the number of finished threads 00385 handleThreadDone(); 00386 00387 m_exceptionSignal( e ); 00388 } 00389 00390 #endif // WTHREADEDFUNCTION_H