summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2016-02-12 15:36:10 +0500
committerSergey Shepelev <temotor@gmail.com>2016-02-12 19:01:07 +0500
commit385e6f6051f44856fd4f9389b312ac809493aab7 (patch)
tree78fd298f455898c06dc33879d544cf815e7b4312
parent3a767d9d08947dc8d57ab99600786b5bf96c282a (diff)
downloadeventlet-moving-on.tar.gz
tpool: yield after setup() and before killall()moving-on
Fixes surprising errors in artificial IO-dry scenarios (like tests). Won't hurt in real environment.
-rw-r--r--eventlet/tpool.py29
1 files changed, 22 insertions, 7 deletions
diff --git a/eventlet/tpool.py b/eventlet/tpool.py
index a6533c8..618c377 100644
--- a/eventlet/tpool.py
+++ b/eventlet/tpool.py
@@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import atexit
import imp
import os
import sys
import traceback
+import eventlet
from eventlet import event, greenio, greenthread, patcher, timeout
from eventlet.support import six
@@ -39,7 +41,7 @@ if six.PY3:
Empty = Queue_module.Empty
Queue = Queue_module.Queue
-_bytetosend = ' '.encode()
+_bytetosend = b' '
_coro = None
_nthreads = int(os.environ.get('EVENTLET_THREADPOOL_SIZE', 20))
_reqq = _rspq = None
@@ -54,6 +56,7 @@ def tpool_trampoline():
try:
_c = _rsock.recv(1)
assert _c
+ # FIXME: this is probably redundant since using sockets instead of pipe now
except ValueError:
break # will be raised when pipe is closed
while not _rspq.empty():
@@ -250,7 +253,7 @@ class Proxy(object):
def setup():
- global _rsock, _wsock, _threads, _coro, _setup_already, _rspq, _reqq
+ global _rsock, _wsock, _coro, _setup_already, _rspq, _reqq
if _setup_already:
return
else:
@@ -271,7 +274,9 @@ def setup():
sock.listen(1)
csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
csock.connect(sock.getsockname())
+ csock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
_wsock, _addr = sock.accept()
+ _wsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
sock.close()
_rsock = greenio.GreenSocket(csock)
@@ -283,12 +288,20 @@ def setup():
_threads.append(t)
_coro = greenthread.spawn_n(tpool_trampoline)
+ # This yield fixes subtle error with GreenSocket.__del__
+ eventlet.sleep(0)
+# Avoid ResourceWarning unclosed socket on Python3.2+
+@atexit.register
def killall():
global _setup_already, _rspq, _rsock, _wsock
if not _setup_already:
return
+
+ # This yield fixes freeze in some scenarios
+ eventlet.sleep(0)
+
for thr in _threads:
_reqq.put(None)
for thr in _threads:
@@ -296,7 +309,7 @@ def killall():
del _threads[:]
# return any remaining results
- while not _rspq.empty():
+ while (_rspq is not None) and not _rspq.empty():
try:
(e, rv) = _rspq.get(block=False)
e.send(rv)
@@ -306,10 +319,12 @@ def killall():
if _coro is not None:
greenthread.kill(_coro)
- _rsock.close()
- _wsock.close()
- _rsock = None
- _wsock = None
+ if _rsock is not None:
+ _rsock.close()
+ _rsock = None
+ if _wsock is not None:
+ _wsock.close()
+ _wsock = None
_rspq = None
_setup_already = False