diff options
Diffstat (limited to 'gear/__init__.py')
-rw-r--r-- | gear/__init__.py | 78 |
1 files changed, 21 insertions, 57 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index 55f804f..f7fc767 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -1490,7 +1490,6 @@ class Client(BaseClient): self.sendPacket(packet, conn) except Exception: # Error handling is all done by sendPacket - self.log.info("Sending packet failed") continue complete = task.wait(timeout) if not complete: @@ -2708,38 +2707,6 @@ class ServerConnection(NonBlockingConnection): id(self), self.client_id, self.host, self.port) -class Poller(object): - """A poller using Epoll if available and Poll on non-linux systems. - - :arg bool use_epoll: If epoll should be used (needs to be also supported - by the OS) - """ - - def __init__(self, use_epoll=True): - self.use_epoll = use_epoll and hasattr(select, 'epoll') - - self.POLL_EDGE = select.EPOLLET if self.use_epoll else 0 - self.POLL_IN = select.EPOLLIN if self.use_epoll else select.POLLIN - self.POLL_OUT = select.EPOLLOUT if self.use_epoll else select.POLLOUT - self.POLL_HUP = select.EPOLLHUP if self.use_epoll else select.POLLHUP - self.POLL_PRI = 0 if self.use_epoll else select.POLLPRI - self.POLL_ERR = select.EPOLLERR if self.use_epoll else select.POLLERR - - if self.use_epoll: - self._poll = select.epoll() - else: - self._poll = select.poll() - - def register(self, fd, event_mask): - self._poll.register(fd, event_mask) - - def unregister(self, fd): - self._poll.unregister(fd) - - def poll(self): - return self._poll.poll(0) - - class Server(BaseClientServer): """A simple gearman server implementation for testing (not for production use). @@ -2763,15 +2730,17 @@ class Server(BaseClientServer): :arg int tcp_keepidle: Idle time after which to start keepalives sending :arg int tcp_keepintvl: Interval in seconds between TCP keepalives :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect - :arg bool use_epoll: If epoll should be used (needs to be also supported - by the OS) """ + edge_bitmask = select.EPOLLET + error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask) + read_bitmask = (select.EPOLLIN | error_bitmask) + readwrite_bitmask = (select.EPOLLOUT | read_bitmask) + def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, statsd_host=None, statsd_port=8125, statsd_prefix=None, server_id=None, acl=None, host=None, keepalive=False, - tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9, - use_epoll=True): + tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): self.port = port self.ssl_key = ssl_key self.ssl_cert = ssl_cert @@ -2787,15 +2756,10 @@ class Server(BaseClientServer): self.max_handle = 0 self.acl = acl self.connect_wake_read, self.connect_wake_write = os.pipe() - self.poller = Poller(use_epoll) + self.poll = select.epoll() + # Reverse mapping of fd -> connection self.connection_map = {} - self.edge_bitmask = self.poller.POLL_EDGE - self.error_bitmask = (self.poller.POLL_ERR | self.poller.POLL_HUP - | self.edge_bitmask) - self.read_bitmask = (self.poller.POLL_IN | self.error_bitmask) - self.readwrite_bitmask = (self.poller.POLL_OUT | self.read_bitmask) - self.use_ssl = False if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): self.use_ssl = True @@ -2847,7 +2811,7 @@ class Server(BaseClientServer): # Register the wake pipe so that we can break if we need to # reconfigure connections - self.poller.register(self.wake_read, self.read_bitmask) + self.poll.register(self.wake_read, self.read_bitmask) if server_id: self.log = logging.getLogger("gear.Server.%s" % (self.client_id,)) @@ -2877,7 +2841,8 @@ class Server(BaseClientServer): poll = select.poll() bitmask = (select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL) - # Register the wake pipe so that we can break if we need to shutdown. + # Register the wake pipe so that we can break if we need to + # shutdown. poll.register(self.connect_wake_read, bitmask) poll.register(self.socket.fileno(), bitmask) while self.running: @@ -2938,11 +2903,11 @@ class Server(BaseClientServer): # The exception handlers here can raise exceptions and if they # do, it's okay, the poll loop will be restarted. try: - if event & (self.poller.POLL_ERR | self.poller.POLL_HUP): - self.log.debug("Received error event on %s: %s" % - (conn, event)) + if event & (select.EPOLLERR | select.EPOLLHUP): + self.log.debug("Received error event on %s: %s" % ( + conn, event)) raise DisconnectError() - if event & (self.poller.POLL_IN | self.poller.POLL_OUT): + if event & (select.POLLIN | select.POLLOUT): self.readFromConnection(conn) self.writeToConnection(conn) except socket.error as e: @@ -2974,16 +2939,15 @@ class Server(BaseClientServer): # loop and therefore the list in guaranteed never to shrink. connections = self.active_connections[:] for conn in connections: - self._processPollEvent(conn, - self.poller.POLL_IN | self.poller.POLL_OUT) + self._processPollEvent(conn, select.POLLIN | select.POLLOUT) def _doPollLoop(self): # Outer run method of poll thread. while self.running: try: self._pollLoop() - except Exception as e: - self.log.exception("Exception in poll loop: %s" % str(e)) + except Exception: + self.log.exception("Exception in poll loop:") def _pollLoop(self): # Inner method of poll loop. @@ -2993,7 +2957,7 @@ class Server(BaseClientServer): while self.running: self.log.debug("Polling %s connections" % len(self.active_connections)) - ret = self.poller.poll() + ret = self.poll.poll() # Since we're using edge-triggering, we need to make sure # that every file descriptor in 'ret' is processed. for fd, event in ret: @@ -3025,7 +2989,7 @@ class Server(BaseClientServer): # Call while holding the connection condition self.log.debug("Registering %s" % conn) self.connection_map[conn.conn.fileno()] = conn - self.poller.register(conn.conn.fileno(), self.readwrite_bitmask) + self.poll.register(conn.conn.fileno(), self.readwrite_bitmask) def _unregisterConnection(self, conn): # Unregister the connection with the poll object @@ -3035,7 +2999,7 @@ class Server(BaseClientServer): if fd not in self.connection_map: return try: - self.poller.unregister(fd) + self.poll.unregister(fd) except KeyError: pass try: |