OpenWalnut 1.2.5
|
00001 //--------------------------------------------------------------------------- 00002 // 00003 // Project: OpenWalnut ( http://www.openwalnut.org ) 00004 // 00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS 00006 // For more information see http://www.openwalnut.org/copying 00007 // 00008 // This file is part of OpenWalnut. 00009 // 00010 // OpenWalnut is free software: you can redistribute it and/or modify 00011 // it under the terms of the GNU Lesser General Public License as published by 00012 // the Free Software Foundation, either version 3 of the License, or 00013 // (at your option) any later version. 00014 // 00015 // OpenWalnut is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 // GNU Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public License 00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>. 00022 // 00023 //--------------------------------------------------------------------------- 00024 00025 #include <list> 00026 #include <set> 00027 #include <vector> 00028 #include <string> 00029 #include <sstream> 00030 #include <algorithm> 00031 #include <utility> 00032 00033 #include "../common/WLogger.h" 00034 #include "../common/WThreadedRunner.h" 00035 #include "WBatchLoader.h" 00036 #include "WKernel.h" 00037 #include "WModule.h" 00038 #include "WModuleCombiner.h" 00039 #include "WModuleFactory.h" 00040 #include "WModuleInputConnector.h" 00041 #include "WModuleOutputConnector.h" 00042 #include "WModuleTypes.h" 00043 #include "combiner/WApplyCombiner.h" 00044 #include "exceptions/WModuleAlreadyAssociated.h" 00045 #include "exceptions/WModuleSignalSubscriptionFailed.h" 00046 #include "exceptions/WModuleUninitialized.h" 00047 #include "WDataModule.h" 00048 00049 #include "WModuleContainer.h" 00050 00051 WModuleContainer::WModuleContainer( std::string name, std::string description ): 00052 WModule(), 00053 m_name( name ), 00054 m_description( description ), 00055 m_crashIfModuleCrashes( true ) 00056 { 00057 WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG ); 00058 // initialize members 00059 } 00060 00061 WModuleContainer::~WModuleContainer() 00062 { 00063 // cleanup 00064 } 00065 00066 void WModuleContainer::moduleMain() 00067 { 00068 // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it. 00069 // Only set the ready flag. 00070 ready(); 00071 } 00072 00073 boost::shared_ptr< WModule > WModuleContainer::factory() const 00074 { 00075 // this factory is not used actually. 00076 return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) ); 00077 } 00078 00079 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run ) 00080 { 00081 if( !module ) 00082 { 00083 // just ignore NULL Pointer 00084 return; 00085 } 00086 00087 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." , 00088 "ModuleContainer (" + getName() + ")", LL_INFO ); 00089 00090 if( !module->isInitialized()() ) 00091 { 00092 std::ostringstream s; 00093 s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized."; 00094 00095 throw WModuleUninitialized( s.str() ); 00096 } 00097 00098 // already associated with this container? 00099 if( module->getAssociatedContainer() == shared_from_this() ) 00100 { 00101 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." , 00102 "ModuleContainer (" + getName() + ")", LL_INFO ); 00103 return; 00104 } 00105 00106 // is this module already associated? 00107 if( module->isAssociated()() ) 00108 { 00109 module->getAssociatedContainer()->remove( module ); 00110 } 00111 00112 // get write lock 00113 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00114 wlock->get().insert( module ); 00115 wlock.reset(); 00116 00117 module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ); 00118 WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")", 00119 LL_INFO ); 00120 00121 // now module->isUsable() is true 00122 00123 // Connect the error handler and all default handlers: 00124 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket(); 00125 00126 // connect the containers signal handler explicitly 00127 t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 ); 00128 boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func ); 00129 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00130 00131 // connect default notifiers: 00132 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock ); 00133 for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter) 00134 { 00135 signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) ); 00136 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00137 } 00138 slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock ); 00139 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter) 00140 { 00141 // call associated notifier 00142 ( *iter )( module ); 00143 } 00144 slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00145 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin(); 00146 iter != m_connectorEstablishedNotifiers.end(); ++iter ) 00147 { 00148 // subscribe on each input 00149 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins ) 00150 { 00151 signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) ); 00152 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00153 } 00154 } 00155 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin(); 00156 iter != m_connectorClosedNotifiers.end(); ++iter ) 00157 { 00158 // subscribe on each input 00159 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins ) 00160 { 00161 signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) ); 00162 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00163 } 00164 } 00165 slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock ); 00166 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter) 00167 { 00168 signalCon = module->subscribeSignal( WM_READY, ( *iter ) ); 00169 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00170 } 00171 slock.unlock(); 00172 00173 // free the subscriptions lock 00174 subscriptionsLock.reset(); 00175 00176 // add the modules progress to local progress combiner 00177 m_progress->addSubProgress( module->getRootProgressCombiner() ); 00178 00179 // run it 00180 if( run ) 00181 { 00182 module->run(); 00183 } 00184 } 00185 00186 void WModuleContainer::remove( boost::shared_ptr< WModule > module ) 00187 { 00188 // simple flat removal. 00189 00190 WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")", 00191 LL_DEBUG ); 00192 00193 if( module->getAssociatedContainer() != shared_from_this() ) 00194 { 00195 return; 00196 } 00197 00198 // remove connections inside this container 00199 module->disconnect(); 00200 00201 // remove progress combiner 00202 m_progress->removeSubProgress( module->getRootProgressCombiner() ); 00203 00204 // remove signal subscriptions to this containers default notifiers 00205 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket(); 00206 00207 // find all subscriptions for this module 00208 std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module ); 00209 for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it ) 00210 { 00211 // disconnect subscription. 00212 ( *it ).second.disconnect(); 00213 } 00214 // erase them 00215 subscriptionsLock->get().erase( subscriptions.first, subscriptions.second ); 00216 subscriptionsLock.reset(); 00217 00218 // get write lock 00219 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00220 wlock->get().erase( module ); 00221 wlock.reset(); 00222 00223 module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); 00224 00225 // tell all interested about removal 00226 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock ); 00227 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter) 00228 { 00229 // call associated notifier 00230 ( *iter )( module ); 00231 } 00232 slock.unlock(); 00233 } 00234 00235 void WModuleContainer::removeDeep( boost::shared_ptr< WModule > module ) 00236 { 00237 WLogger::getLogger()->addLogMessage( "Deep removal of modules is not yet implemented.", "ModuleContainer (" + getName() + ")", LL_WARNING ); 00238 00239 // at least, remove the module itself 00240 remove( module ); 00241 } 00242 00243 WModuleContainer::DataModuleListType WModuleContainer::getDataModules() 00244 { 00245 DataModuleListType l; 00246 00247 // lock, unlocked if l looses focus 00248 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00249 00250 // iterate module list 00251 for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter ) 00252 { 00253 // is this module a data module? 00254 if( ( *iter )->getType() == MODULE_DATA ) 00255 { 00256 boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter ); 00257 00258 // now check the contained dataset ( isTexture and whether it is ready ) 00259 if( dm->isReady()() ) 00260 { 00261 l.insert( dm ); 00262 } 00263 } 00264 } 00265 00266 return l; 00267 } 00268 00269 void WModuleContainer::stop() 00270 { 00271 WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO ); 00272 00273 // read lock 00274 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00275 for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end(); 00276 ++listIter ) 00277 { 00278 ( *listIter )->wait( true ); 00279 } 00280 slock.unlock(); 00281 00282 WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO ); 00283 00284 // lock, unlocked if l looses focus 00285 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00286 00287 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00288 { 00289 WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." , 00290 "ModuleContainer (" + getName() + ")", LL_INFO ); 00291 ( *listIter )->wait( true ); 00292 ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module 00293 } 00294 lock.reset(); 00295 00296 // get write lock 00297 // lock, unlocked if l looses focus 00298 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00299 wlock->get().clear(); 00300 } 00301 00302 const std::string WModuleContainer::getName() const 00303 { 00304 return m_name; 00305 } 00306 00307 const std::string WModuleContainer::getDescription() const 00308 { 00309 return m_description; 00310 } 00311 00312 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier ) 00313 { 00314 boost::unique_lock<boost::shared_mutex> lock; 00315 switch (signal) 00316 { 00317 case WM_ASSOCIATED: 00318 lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock ); 00319 m_associatedNotifiers.push_back( notifier ); 00320 lock.unlock(); 00321 break; 00322 case WM_READY: 00323 lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock ); 00324 m_readyNotifiers.push_back( notifier ); 00325 lock.unlock(); 00326 break; 00327 case WM_REMOVED: 00328 lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock ); 00329 m_removedNotifiers.push_back( notifier ); 00330 lock.unlock(); 00331 break; 00332 default: 00333 std::ostringstream s; 00334 s << "Could not subscribe to unknown signal."; 00335 throw WModuleSignalSubscriptionFailed( s.str() ); 00336 break; 00337 } 00338 } 00339 00340 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier ) 00341 { 00342 boost::unique_lock<boost::shared_mutex> lock; 00343 switch (signal) 00344 { 00345 case WM_ERROR: 00346 lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock ); 00347 m_errorNotifiers.push_back( notifier ); 00348 lock.unlock(); 00349 break; 00350 default: 00351 std::ostringstream s; 00352 s << "Could not subscribe to unknown signal."; 00353 throw WModuleSignalSubscriptionFailed( s.str() ); 00354 break; 00355 } 00356 } 00357 00358 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier ) 00359 { 00360 boost::unique_lock<boost::shared_mutex> lock; 00361 switch (signal) 00362 { 00363 case CONNECTION_ESTABLISHED: 00364 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00365 m_connectorEstablishedNotifiers.push_back( notifier ); 00366 lock.unlock(); 00367 break; 00368 case CONNECTION_CLOSED: 00369 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00370 m_connectorClosedNotifiers.push_back( notifier ); 00371 lock.unlock(); 00372 break; 00373 default: 00374 std::ostringstream s; 00375 s << "Could not subscribe to unknown signal."; 00376 throw WModuleSignalSubscriptionFailed( s.str() ); 00377 break; 00378 } 00379 } 00380 00381 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly ) 00382 { 00383 boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >(); 00384 if( tryOnly ) 00385 { 00386 // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception 00387 prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what ); 00388 if( !prototype ) 00389 { 00390 return prototype; 00391 } 00392 } 00393 else 00394 { 00395 prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what ); 00396 } 00397 00398 return applyModule( applyOn, prototype ); 00399 } 00400 00401 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, 00402 boost::shared_ptr< WModule > prototype ) 00403 { 00404 // is this module already associated with another container? 00405 if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) ) 00406 { 00407 throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() + 00408 std::string( "\" is associated with another container." ) ); 00409 } 00410 00411 // create a new initialized instance of the module 00412 boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype ); 00413 00414 // add it 00415 add( m, true ); 00416 applyOn->isReadyOrCrashed().wait(); 00417 m->isReadyOrCrashed().wait(); 00418 00419 // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a 00420 // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here. 00421 00422 // get offered outputs 00423 WModule::InputConnectorList ins = m->getInputConnectors(); 00424 // get offered inputs 00425 WModule::OutputConnectorList outs = applyOn->getOutputConnectors(); 00426 00427 // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners. 00428 if( !ins.empty() && !outs.empty() ) 00429 { 00430 ( *ins.begin() )->connect( ( *outs.begin() ) ); 00431 } 00432 00433 return m; 00434 } 00435 00436 boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames ) 00437 { 00438 // create thread which actually loads the data 00439 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames, 00440 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ) 00441 ); 00442 t->run(); 00443 return t; 00444 } 00445 00446 void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames ) 00447 { 00448 // create thread which actually loads the data 00449 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames, 00450 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) ) 00451 ); 00452 t->run(); 00453 t->wait(); 00454 } 00455 00456 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread ) 00457 { 00458 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00459 m_pendingThreads.insert( thread ); 00460 lock.unlock(); 00461 } 00462 00463 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread ) 00464 { 00465 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00466 m_pendingThreads.erase( thread ); 00467 lock.unlock(); 00468 } 00469 00470 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception ) 00471 { 00472 errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container."; 00473 00474 // simply forward it to the other signal handler 00475 signal_error( module, exception ); 00476 00477 if( m_crashIfModuleCrashes ) 00478 { 00479 infoLog() << "Crash caused this container to shutdown."; 00480 requestStop(); 00481 m_isCrashed( true ); 00482 } 00483 } 00484 00485 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed ) 00486 { 00487 m_crashIfModuleCrashes = crashIfCrashed; 00488 } 00489 00490 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const 00491 { 00492 return m_modules.getReadTicket(); 00493 } 00494 00495 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module ) 00496 { 00497 WCombinerTypes::WCompatiblesList complist; 00498 00499 if( !module ) 00500 { 00501 // be nice in case of a null pointer 00502 return complist; 00503 } 00504 00505 // read lock the container 00506 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00507 00508 // handle each module 00509 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00510 { 00511 WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) ); 00512 00513 if( lComp.size() != 0 ) 00514 { 00515 complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) ); 00516 } 00517 } 00518 00519 // sort the compatibles 00520 std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort ); 00521 00522 return complist; 00523 } 00524