summaryrefslogtreecommitdiff
path: root/gear/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'gear/__init__.py')
-rw-r--r--gear/__init__.py78
1 files changed, 21 insertions, 57 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index 55f804f..f7fc767 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -1490,7 +1490,6 @@ class Client(BaseClient):
self.sendPacket(packet, conn)
except Exception:
# Error handling is all done by sendPacket
- self.log.info("Sending packet failed")
continue
complete = task.wait(timeout)
if not complete:
@@ -2708,38 +2707,6 @@ class ServerConnection(NonBlockingConnection):
id(self), self.client_id, self.host, self.port)
-class Poller(object):
- """A poller using Epoll if available and Poll on non-linux systems.
-
- :arg bool use_epoll: If epoll should be used (needs to be also supported
- by the OS)
- """
-
- def __init__(self, use_epoll=True):
- self.use_epoll = use_epoll and hasattr(select, 'epoll')
-
- self.POLL_EDGE = select.EPOLLET if self.use_epoll else 0
- self.POLL_IN = select.EPOLLIN if self.use_epoll else select.POLLIN
- self.POLL_OUT = select.EPOLLOUT if self.use_epoll else select.POLLOUT
- self.POLL_HUP = select.EPOLLHUP if self.use_epoll else select.POLLHUP
- self.POLL_PRI = 0 if self.use_epoll else select.POLLPRI
- self.POLL_ERR = select.EPOLLERR if self.use_epoll else select.POLLERR
-
- if self.use_epoll:
- self._poll = select.epoll()
- else:
- self._poll = select.poll()
-
- def register(self, fd, event_mask):
- self._poll.register(fd, event_mask)
-
- def unregister(self, fd):
- self._poll.unregister(fd)
-
- def poll(self):
- return self._poll.poll(0)
-
-
class Server(BaseClientServer):
"""A simple gearman server implementation for testing
(not for production use).
@@ -2763,15 +2730,17 @@ class Server(BaseClientServer):
:arg int tcp_keepidle: Idle time after which to start keepalives sending
:arg int tcp_keepintvl: Interval in seconds between TCP keepalives
:arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
- :arg bool use_epoll: If epoll should be used (needs to be also supported
- by the OS)
"""
+ edge_bitmask = select.EPOLLET
+ error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask)
+ read_bitmask = (select.EPOLLIN | error_bitmask)
+ readwrite_bitmask = (select.EPOLLOUT | read_bitmask)
+
def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None,
statsd_host=None, statsd_port=8125, statsd_prefix=None,
server_id=None, acl=None, host=None, keepalive=False,
- tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9,
- use_epoll=True):
+ tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9):
self.port = port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
@@ -2787,15 +2756,10 @@ class Server(BaseClientServer):
self.max_handle = 0
self.acl = acl
self.connect_wake_read, self.connect_wake_write = os.pipe()
- self.poller = Poller(use_epoll)
+ self.poll = select.epoll()
+ # Reverse mapping of fd -> connection
self.connection_map = {}
- self.edge_bitmask = self.poller.POLL_EDGE
- self.error_bitmask = (self.poller.POLL_ERR | self.poller.POLL_HUP
- | self.edge_bitmask)
- self.read_bitmask = (self.poller.POLL_IN | self.error_bitmask)
- self.readwrite_bitmask = (self.poller.POLL_OUT | self.read_bitmask)
-
self.use_ssl = False
if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
self.use_ssl = True
@@ -2847,7 +2811,7 @@ class Server(BaseClientServer):
# Register the wake pipe so that we can break if we need to
# reconfigure connections
- self.poller.register(self.wake_read, self.read_bitmask)
+ self.poll.register(self.wake_read, self.read_bitmask)
if server_id:
self.log = logging.getLogger("gear.Server.%s" % (self.client_id,))
@@ -2877,7 +2841,8 @@ class Server(BaseClientServer):
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
- # Register the wake pipe so that we can break if we need to shutdown.
+ # Register the wake pipe so that we can break if we need to
+ # shutdown.
poll.register(self.connect_wake_read, bitmask)
poll.register(self.socket.fileno(), bitmask)
while self.running:
@@ -2938,11 +2903,11 @@ class Server(BaseClientServer):
# The exception handlers here can raise exceptions and if they
# do, it's okay, the poll loop will be restarted.
try:
- if event & (self.poller.POLL_ERR | self.poller.POLL_HUP):
- self.log.debug("Received error event on %s: %s" %
- (conn, event))
+ if event & (select.EPOLLERR | select.EPOLLHUP):
+ self.log.debug("Received error event on %s: %s" % (
+ conn, event))
raise DisconnectError()
- if event & (self.poller.POLL_IN | self.poller.POLL_OUT):
+ if event & (select.POLLIN | select.POLLOUT):
self.readFromConnection(conn)
self.writeToConnection(conn)
except socket.error as e:
@@ -2974,16 +2939,15 @@ class Server(BaseClientServer):
# loop and therefore the list in guaranteed never to shrink.
connections = self.active_connections[:]
for conn in connections:
- self._processPollEvent(conn,
- self.poller.POLL_IN | self.poller.POLL_OUT)
+ self._processPollEvent(conn, select.POLLIN | select.POLLOUT)
def _doPollLoop(self):
# Outer run method of poll thread.
while self.running:
try:
self._pollLoop()
- except Exception as e:
- self.log.exception("Exception in poll loop: %s" % str(e))
+ except Exception:
+ self.log.exception("Exception in poll loop:")
def _pollLoop(self):
# Inner method of poll loop.
@@ -2993,7 +2957,7 @@ class Server(BaseClientServer):
while self.running:
self.log.debug("Polling %s connections" %
len(self.active_connections))
- ret = self.poller.poll()
+ ret = self.poll.poll()
# Since we're using edge-triggering, we need to make sure
# that every file descriptor in 'ret' is processed.
for fd, event in ret:
@@ -3025,7 +2989,7 @@ class Server(BaseClientServer):
# Call while holding the connection condition
self.log.debug("Registering %s" % conn)
self.connection_map[conn.conn.fileno()] = conn
- self.poller.register(conn.conn.fileno(), self.readwrite_bitmask)
+ self.poll.register(conn.conn.fileno(), self.readwrite_bitmask)
def _unregisterConnection(self, conn):
# Unregister the connection with the poll object
@@ -3035,7 +2999,7 @@ class Server(BaseClientServer):
if fd not in self.connection_map:
return
try:
- self.poller.unregister(fd)
+ self.poll.unregister(fd)
except KeyError:
pass
try: