wibble 0.1.28
|
00001 // -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net> 00002 00003 #include <wibble/sys/macros.h> 00004 00005 #ifdef POSIX 00006 #include <fcntl.h> 00007 #include <sys/select.h> 00008 00009 #include <deque> 00010 #include <cerrno> 00011 00012 #include <wibble/exception.h> 00013 #include <wibble/sys/thread.h> 00014 #include <wibble/sys/mutex.h> 00015 00016 #ifndef WIBBLE_SYS_PIPE_H 00017 #define WIBBLE_SYS_PIPE_H 00018 00019 namespace wibble { 00020 namespace sys { 00021 00022 namespace wexcept = wibble::exception; 00023 00024 struct Pipe { 00025 00026 struct Writer : wibble::sys::Thread { 00027 int fd; 00028 bool close; 00029 std::string data; 00030 bool running; 00031 bool closed; 00032 wibble::sys::Mutex mutex; 00033 00034 Writer() : fd( -1 ), close( false ), running( false ) {} 00035 00036 void *main() { 00037 do { 00038 int wrote = 0; 00039 00040 { 00041 wibble::sys::MutexLock __l( mutex ); 00042 wrote = ::write( fd, data.c_str(), data.length() ); 00043 if ( wrote > 0 ) 00044 data.erase( data.begin(), data.begin() + wrote ); 00045 } 00046 00047 if ( wrote == -1 ) { 00048 if ( errno == EAGAIN || errno == EWOULDBLOCK ) 00049 sched_yield(); 00050 else 00051 throw wexcept::System( "writing to pipe" ); 00052 } 00053 } while ( !done() ); 00054 00055 if ( close ) 00056 ::close( fd ); 00057 00058 return 0; 00059 } 00060 00061 bool done() { 00062 wibble::sys::MutexLock __l( mutex ); 00063 if ( data.empty() ) 00064 running = false; 00065 return !running; 00066 } 00067 00068 void run( int _fd, std::string what ) { 00069 wibble::sys::MutexLock __l( mutex ); 00070 00071 if ( running ) 00072 assert_eq( _fd, fd ); 00073 fd = _fd; 00074 assert_neq( fd, -1 ); 00075 00076 data += what; 00077 if ( running ) 00078 return; 00079 running = true; 00080 start(); 00081 } 00082 }; 00083 00084 typedef std::deque< char > Buffer; 00085 Buffer buffer; 00086 int fd; 00087 bool _eof; 00088 Writer writer; 00089 00090 Pipe( int p ) : fd( p ), _eof( false ) 00091 { 00092 if ( p == -1 ) 00093 return; 00094 if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 ) 00095 throw wexcept::System( "fcntl on a pipe" ); 00096 } 00097 Pipe() : fd( -1 ), _eof( false ) {} 00098 00099 /* Writes data to the pipe, asynchronously. */ 00100 void write( std::string what ) { 00101 writer.run( fd, what ); 00102 } 00103 00104 void close() { 00105 writer.close = true; 00106 writer.run( fd, "" ); 00107 } 00108 00109 bool valid() { 00110 return fd != -1; 00111 } 00112 00113 bool active() { 00114 return valid() && !eof(); 00115 } 00116 00117 bool eof() { 00118 return _eof; 00119 } 00120 00121 int readMore() { 00122 assert( valid() ); 00123 char _buffer[1024]; 00124 int r = ::read( fd, _buffer, 1023 ); 00125 if ( r == -1 && errno != EAGAIN && errno != EWOULDBLOCK ) 00126 throw wexcept::System( "reading from pipe" ); 00127 else if ( r == -1 ) 00128 return 0; 00129 if ( r == 0 ) 00130 _eof = true; 00131 else 00132 std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) ); 00133 return r; 00134 } 00135 00136 std::string nextChunk() { 00137 std::string line( buffer.begin(), buffer.end() ); 00138 buffer.clear(); 00139 return line; 00140 } 00141 00142 std::string nextLine() { 00143 assert( valid() ); 00144 Buffer::iterator nl = 00145 std::find( buffer.begin(), buffer.end(), '\n' ); 00146 while ( nl == buffer.end() ) { 00147 if ( !readMore() ) 00148 return ""; // would block, so give up 00149 nl = std::find( buffer.begin(), buffer.end(), '\n' ); 00150 } 00151 std::string line( buffer.begin(), nl ); 00152 00153 if ( nl != buffer.end() ) 00154 ++ nl; 00155 buffer.erase( buffer.begin(), nl ); 00156 00157 return line; 00158 } 00159 00160 /* Only returns on eof() or when data is buffered. */ 00161 void wait() { 00162 assert( valid() ); 00163 fd_set fds; 00164 FD_ZERO( &fds ); 00165 while ( buffer.empty() && !eof() ) { 00166 if ( readMore() ) 00167 return; 00168 if ( eof() ) 00169 return; 00170 FD_SET( fd, &fds ); 00171 select( fd + 1, &fds, 0, 0, 0 ); 00172 } 00173 } 00174 std::string nextLineBlocking() { 00175 assert( valid() ); 00176 std::string l; 00177 while ( !eof() ) { 00178 l = nextLine(); 00179 if ( !l.empty() ) 00180 return l; 00181 if ( eof() ) 00182 return std::string( buffer.begin(), buffer.end() ); 00183 wait(); 00184 } 00185 return l; 00186 } 00187 00188 }; 00189 00190 } 00191 } 00192 #endif 00193 #endif