Source code for rpyc.utils.server

"""
rpyc plug-in server (threaded or forking)
"""
import sys
import os
import socket
import time
import threading
import errno
import logging
from contextlib import closing
try:
    import Queue
except ImportError:
    import queue as Queue
from rpyc.core import SocketStream, Channel
from rpyc.utils.registry import UDPRegistryClient
from rpyc.utils.authenticators import AuthenticationError
from rpyc.lib import safe_import, spawn, spawn_waitready, MissingModule
from rpyc.lib.compat import poll, get_exc_errno
signal = safe_import("signal")
gevent = MissingModule("gevent") # disabled in Debian



[docs]class Server(object): """Base server implementation :param service: the :class:`~rpyc.core.service.Service` to expose :param hostname: the host to bind to. Default is IPADDR_ANY, but you may want to restrict it only to ``localhost`` in some setups :param ipv6: whether to create an IPv6 or IPv4 socket. The default is IPv4 :param port: the TCP port to bind to :param backlog: the socket's backlog (passed to ``listen()``) :param reuse_addr: whether or not to create the socket with the ``SO_REUSEADDR`` option set. :param authenticator: the :ref:`api-authenticators` to use. If ``None``, no authentication is performed. :param registrar: the :class:`~rpyc.utils.registry.RegistryClient` to use. If ``None``, a default :class:`~rpyc.utils.registry.UDPRegistryClient` will be used :param auto_register: whether or not to register using the *registrar*. By default, the server will attempt to register only if a registrar was explicitly given. :param protocol_config: the :data:`configuration dictionary <rpyc.core.protocol.DEFAULT_CONFIG>` that is passed to the RPyC connection :param logger: the ``logger`` to use (of the built-in ``logging`` module). If ``None``, a default logger will be created. :param listener_timeout: the timeout of the listener socket; set to ``None`` to disable (e.g. on embedded platforms with limited battery) """ def __init__(self, service, hostname = "", ipv6 = False, port = 0, backlog = 10, reuse_addr = True, authenticator = None, registrar = None, auto_register = None, protocol_config = {}, logger = None, listener_timeout = 0.5, socket_path = None): self.active = False self._closed = False self.service = service self.authenticator = authenticator self.backlog = backlog if auto_register is None: self.auto_register = bool(registrar) else: self.auto_register = auto_register self.protocol_config = protocol_config self.clients = set() if socket_path is not None: if hostname != "" or port != 0 or ipv6 != False: raise ValueError("socket_path is mutually exclusive with: hostname, port, ipv6") self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.listener.bind(socket_path) # set the self.port to the path as it's used for the registry and logging self.host, self.port = "", socket_path else: if ipv6: if hostname == "localhost" and sys.platform != "win32": # on windows, you should bind to localhost even for ipv6 hostname = "localhost6" self.listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) else: self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if reuse_addr and sys.platform != "win32": # warning: reuseaddr is not what you'd expect on windows! # it allows you to bind an already bound port, resulting in # "unexpected behavior" (quoting MSDN) self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listener.bind((hostname, port)) self.listener.settimeout(listener_timeout) # hack for IPv6 (the tuple can be longer than 2) sockname = self.listener.getsockname() self.host, self.port = sockname[0], sockname[1] if logger is None: logger = logging.getLogger("%s/%s" % (self.service.get_service_name(), self.port)) self.logger = logger if "logger" not in self.protocol_config: self.protocol_config["logger"] = self.logger if registrar is None: registrar = UDPRegistryClient(logger = self.logger) self.registrar = registrar
[docs] def close(self): """Closes (terminates) the server and all of its clients. If applicable, also unregisters from the registry server""" if self._closed: return self._closed = True self.active = False if self.auto_register: try: self.registrar.unregister(self.port) except Exception: self.logger.exception("error unregistering services") try: self.listener.shutdown(socket.SHUT_RDWR) except (EnvironmentError, socket.error): pass self.listener.close() self.logger.info("listener closed") for c in set(self.clients): try: c.shutdown(socket.SHUT_RDWR) except Exception: pass c.close() self.clients.clear()
[docs] def fileno(self): """returns the listener socket's file descriptor""" return self.listener.fileno()
[docs] def accept(self): """accepts an incoming socket connection (blocking)""" while self.active: try: sock, addrinfo = self.listener.accept() except socket.timeout: pass except socket.error: ex = sys.exc_info()[1] if get_exc_errno(ex) in (errno.EINTR, errno.EAGAIN): pass else: raise EOFError() else: break if not self.active: return sock.setblocking(True) self.logger.info("accepted %s with fd %s", addrinfo, sock.fileno()) self.clients.add(sock) self._accept_method(sock)
def _accept_method(self, sock): """this method should start a thread, fork a child process, or anything else in order to serve the client. once the mechanism has been created, it should invoke _authenticate_and_serve_client with `sock` as the argument""" raise NotImplementedError def _authenticate_and_serve_client(self, sock): try: if self.authenticator: addrinfo = sock.getpeername() try: sock2, credentials = self.authenticator(sock) except AuthenticationError: self.logger.info("%s failed to authenticate, rejecting connection", addrinfo) return else: self.logger.info("%s authenticated successfully", addrinfo) else: credentials = None sock2 = sock try: self._serve_client(sock2, credentials) except Exception: self.logger.exception("client connection terminated abruptly") raise finally: try: sock.shutdown(socket.SHUT_RDWR) except Exception: pass sock.close() self.clients.discard(sock) def _serve_client(self, sock, credentials): addrinfo = sock.getpeername() if credentials: self.logger.info("welcome %s (%r)", addrinfo, credentials) else: self.logger.info("welcome %s", addrinfo) try: config = dict(self.protocol_config, credentials = credentials, endpoints = (sock.getsockname(), addrinfo), logger = self.logger) conn = self.service._connect(Channel(SocketStream(sock)), config) self._handle_connection(conn) finally: self.logger.info("goodbye %s", addrinfo) def _handle_connection(self, conn): """This methoed should implement the server's logic.""" conn.serve_all() def _bg_register(self): interval = self.registrar.REREGISTER_INTERVAL self.logger.info("started background auto-register thread " "(interval = %s)", interval) tnext = 0 try: while self.active: t = time.time() if t >= tnext: did_register = False aliases = self.service.get_service_aliases() try: did_register = self.registrar.register(aliases, self.port, interface = self.host) except Exception: self.logger.exception("error registering services") # If registration worked out, retry to register again after # interval time. Otherwise, try to register soon again. if did_register: tnext = t + interval else: self.logger.info("registering services did not work - retry") time.sleep(1) finally: if not self._closed: self.logger.info("background auto-register thread finished") def _listen(self): if self.active: return self.listener.listen(self.backlog) # On Jython, if binding to port 0, we can get the correct port only # once `listen()` was called, see #156: if not self.port: # Note that for AF_UNIX the following won't work (but we are safe # since we already saved the socket_path into self.port): self.port = self.listener.getsockname()[1] self.logger.info("server started on [%s]:%s", self.host, self.port) self.active = True def _register(self): if self.auto_register: self.auto_register = False spawn(self._bg_register)
[docs] def start(self): """Starts the server (blocking). Use :meth:`close` to stop""" self._listen() self._register() try: while self.active: self.accept() except EOFError: pass # server closed by another thread except KeyboardInterrupt: print("") self.logger.warn("keyboard interrupt!") finally: self.logger.info("server has terminated") self.close()
def _start_in_thread(self): """ Start the server in a thread, returns when when server is listening and ready to accept incoming connections. Used for testing, API could change anytime! Do not use!""" return spawn_waitready(self._listen, self.start)[0]
[docs]class OneShotServer(Server): """ A server that handles a single connection (blockingly), and terminates after that Parameters: see :class:`Server` """ def _accept_method(self, sock): with closing(sock): self._authenticate_and_serve_client(sock)
[docs]class ThreadedServer(Server): """ A server that spawns a thread for each connection. Works on any platform that supports threads. Parameters: see :class:`Server` """ def _accept_method(self, sock): spawn(self._authenticate_and_serve_client, sock)
[docs]class ThreadPoolServer(Server): """This server is threaded like the ThreadedServer but reuses threads so that recreation is not necessary for each request. The pool of threads has a fixed size that can be set with the 'nbThreads' argument. The default size is 20. The server dispatches request to threads by batch, that is a given thread may process up to request_batch_size requests from the same connection in one go, before it goes to the next connection with pending requests. By default, self.request_batch_size is set to 10 and it can be overwritten in the constructor arguments. Contributed by *@sponce* Parameters: see :class:`Server` """ def __init__(self, *args, **kwargs): '''Initializes a ThreadPoolServer. In particular, instantiate the thread pool.''' # get the number of threads in the pool self.nbthreads = kwargs.pop('nbThreads', 20) self.request_batch_size = kwargs.pop('requestBatchSize', 10) # init the parent Server.__init__(self, *args, **kwargs) # a queue of connections having something to process self._active_connection_queue = Queue.Queue() # a dictionary fd -> connection self.fd_to_conn = {} # a polling object to be used be the polling thread self.poll_object = poll() def _listen(self): if self.active: return super(ThreadPoolServer, self)._listen() # setup the thread pool for handling requests self.workers = [] for i in range(self.nbthreads): t = spawn(self._serve_clients) t.setName('Worker%i' % i) self.workers.append(t) # setup a thread for polling inactive connections self.polling_thread = spawn(self._poll_inactive_clients) self.polling_thread.setName('PollingThread')
[docs] def close(self): '''closes a ThreadPoolServer. In particular, joins the thread pool.''' # close parent server Server.close(self) # stop producer thread self.polling_thread.join() # cleanup thread pool : first fill the pool with None fds so that all threads exit # the blocking get on the queue of active connections. Then join the threads for _ in range(len(self.workers)): self._active_connection_queue.put(None) for w in self.workers: w.join()
def _remove_from_inactive_connection(self, fd): '''removes a connection from the set of inactive ones''' # unregister the connection in the polling object try: self.poll_object.unregister(fd) except KeyError: # the connection has already been unregistered pass def _drop_connection(self, fd): '''removes a connection by closing it and removing it from internal structs''' conn = None # cleanup fd_to_conn dictionnary try: conn = self.fd_to_conn[fd] del self.fd_to_conn[fd] except KeyError: # the active connection has already been removed pass # close connection self.logger.info("Closing connection for fd %d", fd) if conn: conn.close() def _add_inactive_connection(self, fd): '''adds a connection to the set of inactive ones''' self.poll_object.register(fd, "reh") def _handle_poll_result(self, connlist): '''adds a connection to the set of inactive ones''' for fd, evt in connlist: try: # remove connection from the inactive ones self._remove_from_inactive_connection(fd) # Is it an error ? if "e" in evt or "n" in evt or "h" in evt: # it was an error, connection was closed. Do the same on our side self._drop_connection(fd) else: # connection has data, let's add it to the active queue self._active_connection_queue.put(fd) except KeyError: # the connection has already been dropped. Give up pass def _poll_inactive_clients(self): '''Main method run by the polling thread of the thread pool. Check whether inactive clients have become active''' while self.active: try: # the actual poll, with a timeout of 0.1s so that we can exit in case # we re not active anymore active_clients = self.poll_object.poll(0.1) # for each client that became active, put them in the active queue self._handle_poll_result(active_clients) except Exception: ex = sys.exc_info()[1] # "Caught exception in Worker thread" message self.logger.warning("Failed to poll clients, caught exception : %s", str(ex)) # wait a bit so that we do not loop too fast in case of error time.sleep(0.2) def _serve_requests(self, fd): '''Serves requests from the given connection and puts it back to the appropriate queue''' # serve a maximum of RequestBatchSize requests for this connection for _ in range(self.request_batch_size): try: if not self.fd_to_conn[fd].poll(): # note that poll serves the request # we could not find a request, so we put this connection back to the inactive set self._add_inactive_connection(fd) return except EOFError: # the connection has been closed by the remote end. Close it on our side and return self._drop_connection(fd) return except Exception: # put back the connection to active queue in doubt and raise the exception to the upper level self._active_connection_queue.put(fd) raise # we've processed the maximum number of requests. Put back the connection in the active queue self._active_connection_queue.put(fd) def _serve_clients(self): '''Main method run by the processing threads of the thread pool. Loops forever, handling requests read from the connections present in the active_queue''' while self.active: try: # note that we do not use a timeout here. This is because the implementation of # the timeout version performs badly. So we block forever, and exit by filling # the queue with None fds fd = self._active_connection_queue.get(True) # fd may be None (case where we want to exit the blocking get to close the service) if fd: # serve the requests of this connection self._serve_requests(fd) except Queue.Empty: # we've timed out, let's just retry. We only use the timeout so that this # thread can stop even if there is nothing in the queue pass except Exception: # "Caught exception in Worker thread" message self.logger.exception("failed to serve client, caught exception") # wait a bit so that we do not loop too fast in case of error time.sleep(0.2) def _authenticate_and_build_connection(self, sock): '''Authenticate a client and if it succees, wraps the socket in a connection object. Note that this code is cut and paste from the rpyc internals and may have to be changed if rpyc evolves''' # authenticate if self.authenticator: h, p = sock.getpeername() sock, credentials = self.authenticator(sock) else: credentials = None # build a connection h, p = sock.getpeername() config = dict(self.protocol_config, credentials=credentials, connid="%s:%d"%(h, p), endpoints=(sock.getsockname(), (h, p))) return self.service._connect(Channel(SocketStream(sock)), config) def _accept_method(self, sock): '''Implementation of the accept method : only pushes the work to the internal queue. In case the queue is full, raises an AsynResultTimeout error''' try: # authenticate and build connection object conn = self._authenticate_and_build_connection(sock) # put the connection in the active queue h, p = sock.getpeername() fd = conn.fileno() self.logger.debug("Created connection to %s:%d with fd %d", h, p, fd) self.fd_to_conn[fd] = conn self._add_inactive_connection(fd) self.clients.clear() except Exception: h, p = sock.getpeername() self.logger.exception("Failed to serve client for %s:%d, caught exception", h, p) sock.close()
[docs]class ForkingServer(Server): """ A server that forks a child process for each connection. Available on POSIX compatible systems only. Parameters: see :class:`Server` """ def __init__(self, *args, **kwargs): if not signal: raise OSError("ForkingServer not supported on this platform") Server.__init__(self, *args, **kwargs) # setup sigchld handler self._prevhandler = signal.signal(signal.SIGCHLD, self._handle_sigchld)
[docs] def close(self): Server.close(self) signal.signal(signal.SIGCHLD, self._prevhandler)
@classmethod def _handle_sigchld(cls, signum, unused): try: while True: pid, dummy = os.waitpid(-1, os.WNOHANG) if pid <= 0: break except OSError: pass # re-register signal handler (see man signal(2), under Portability) signal.signal(signal.SIGCHLD, cls._handle_sigchld) def _accept_method(self, sock): pid = os.fork() if pid == 0: # child try: self.logger.debug("child process created") signal.signal(signal.SIGCHLD, self._prevhandler) #76: call signal.siginterrupt(False) in forked child signal.siginterrupt(signal.SIGCHLD, False) self.listener.close() self.clients.clear() self._authenticate_and_serve_client(sock) except: self.logger.exception("child process terminated abnormally") else: self.logger.debug("child process terminated") finally: self.logger.debug("child terminated") os._exit(0) else: # parent sock.close()
[docs]class GeventServer(Server): """gevent based Server. Requires using ``gevent.monkey.patch_all()``.""" def _register(self): if self.auto_register: self.auto_register = False gevent.spawn(self._bg_register) def _accept_method(self, sock): gevent.spawn(self._authenticate_and_serve_client, sock)