diff options
Diffstat (limited to 'gear/__init__.py')
-rw-r--r-- | gear/__init__.py | 78 |
1 files changed, 57 insertions, 21 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index d436274..38b037a 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -1487,6 +1487,7 @@ class Client(BaseClient): self.sendPacket(packet, conn) except Exception: # Error handling is all done by sendPacket + self.log.info("Sending packet failed") continue complete = task.wait(timeout) if not complete: @@ -2704,6 +2705,38 @@ class ServerConnection(NonBlockingConnection): id(self), self.client_id, self.host, self.port) +class Poller(object): + """A poller using Epoll if available and Poll on non-linux systems. + + :arg bool use_epoll: If epoll should be used (needs to be also supported + by the OS) + """ + + def __init__(self, use_epoll=True): + self.use_epoll = use_epoll and hasattr(select, 'epoll') + + self.POLL_EDGE = select.EPOLLET if self.use_epoll else 0 + self.POLL_IN = select.EPOLLIN if self.use_epoll else select.POLLIN + self.POLL_OUT = select.EPOLLOUT if self.use_epoll else select.POLLOUT + self.POLL_HUP = select.EPOLLHUP if self.use_epoll else select.POLLHUP + self.POLL_PRI = 0 if self.use_epoll else select.POLLPRI + self.POLL_ERR = select.EPOLLERR if self.use_epoll else select.POLLERR + + if self.use_epoll: + self._poll = select.epoll() + else: + self._poll = select.poll() + + def register(self, fd, event_mask): + self._poll.register(fd, event_mask) + + def unregister(self, fd): + self._poll.unregister(fd) + + def poll(self): + return self._poll.poll(0) + + class Server(BaseClientServer): """A simple gearman server implementation for testing (not for production use). @@ -2727,17 +2760,15 @@ class Server(BaseClientServer): :arg int tcp_keepidle: Idle time after which to start keepalives sending :arg int tcp_keepintvl: Interval in seconds between TCP keepalives :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect + :arg bool use_epoll: If epoll should be used (needs to be also supported + by the OS) """ - edge_bitmask = select.EPOLLET - error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask) - read_bitmask = (select.EPOLLIN | error_bitmask) - readwrite_bitmask = (select.EPOLLOUT | read_bitmask) - def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, statsd_host=None, statsd_port=8125, statsd_prefix=None, server_id=None, acl=None, host=None, keepalive=False, - tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): + tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9, + use_epoll=True): self.port = port self.ssl_key = ssl_key self.ssl_cert = ssl_cert @@ -2753,10 +2784,15 @@ class Server(BaseClientServer): self.max_handle = 0 self.acl = acl self.connect_wake_read, self.connect_wake_write = os.pipe() - self.poll = select.epoll() - # Reverse mapping of fd -> connection + self.poller = Poller(use_epoll) self.connection_map = {} + self.edge_bitmask = self.poller.POLL_EDGE + self.error_bitmask = (self.poller.POLL_ERR | self.poller.POLL_HUP + | self.edge_bitmask) + self.read_bitmask = (self.poller.POLL_IN | self.error_bitmask) + self.readwrite_bitmask = (self.poller.POLL_OUT | self.read_bitmask) + self.use_ssl = False if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): self.use_ssl = True @@ -2805,7 +2841,7 @@ class Server(BaseClientServer): # Register the wake pipe so that we can break if we need to # reconfigure connections - self.poll.register(self.wake_read, self.read_bitmask) + self.poller.register(self.wake_read, self.read_bitmask) if server_id: self.log = logging.getLogger("gear.Server.%s" % (self.client_id,)) @@ -2835,8 +2871,7 @@ class Server(BaseClientServer): poll = select.poll() bitmask = (select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) - # Register the wake pipe so that we can break if we need to - # shutdown. + # Register the wake pipe so that we can break if we need to shutdown. poll.register(self.connect_wake_read, bitmask) poll.register(self.socket.fileno(), bitmask) while self.running: @@ -2897,11 +2932,11 @@ class Server(BaseClientServer): # The exception handlers here can raise exceptions and if they # do, it's okay, the poll loop will be restarted. try: - if event & (select.EPOLLERR | select.EPOLLHUP): - self.log.debug("Received error event on %s: %s" % ( - conn, event)) + if event & (self.poller.POLL_ERR | self.poller.POLL_HUP): + self.log.debug("Received error event on %s: %s" % + (conn, event)) raise DisconnectError() - if event & (select.POLLIN | select.POLLOUT): + if event & (self.poller.POLL_IN | self.poller.POLL_OUT): self.readFromConnection(conn) self.writeToConnection(conn) except socket.error as e: @@ -2933,15 +2968,16 @@ class Server(BaseClientServer): # loop and therefore the list in guaranteed never to shrink. connections = self.active_connections[:] for conn in connections: - self._processPollEvent(conn, select.POLLIN | select.POLLOUT) + self._processPollEvent(conn, + self.poller.POLL_IN | self.poller.POLL_OUT) def _doPollLoop(self): # Outer run method of poll thread. while self.running: try: self._pollLoop() - except Exception: - self.log.exception("Exception in poll loop:") + except Exception as e: + self.log.exception("Exception in poll loop: %s" % str(e)) def _pollLoop(self): # Inner method of poll loop. @@ -2951,7 +2987,7 @@ class Server(BaseClientServer): while self.running: self.log.debug("Polling %s connections" % len(self.active_connections)) - ret = self.poll.poll() + ret = self.poller.poll() # Since we're using edge-triggering, we need to make sure # that every file descriptor in 'ret' is processed. for fd, event in ret: @@ -2983,7 +3019,7 @@ class Server(BaseClientServer): # Call while holding the connection condition self.log.debug("Registering %s" % conn) self.connection_map[conn.conn.fileno()] = conn - self.poll.register(conn.conn.fileno(), self.readwrite_bitmask) + self.poller.register(conn.conn.fileno(), self.readwrite_bitmask) def _unregisterConnection(self, conn): # Unregister the connection with the poll object @@ -2993,7 +3029,7 @@ class Server(BaseClientServer): if fd not in self.connection_map: return try: - self.poll.unregister(fd) + self.poller.unregister(fd) except KeyError: pass try: |