summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbubbleboy14 <mario.balibrera@gmail.com>2022-08-24 11:07:09 -0700
committerGitHub <noreply@github.com>2022-08-24 18:07:09 +0000
commit43eb3446cdff6f092f20c00a5de012d32fc0336d (patch)
tree16051ddb651857c8ff30883951f88b0c4ddcefdb
parent61171591b08ee031e02cc6cb129952259062f502 (diff)
downloadwebsocket-client-43eb3446cdff6f092f20c00a5de012d32fc0336d.tar.gz
reconnect (#838)
* rel example * tweaked rel example for linter * added rel note to examples.rst * slightly more compact example * added example header * matched wording * _socket.recv(): _recv() except socket.error - changed or to and; added except TimeoutError - raise WebSocketTimeoutException * _app - custom dispatcher check_callback() integration (and fixed pyevent compatibility): WrappedDispatcher (for use with generic event dispatchers such as pyevent and rel); create_dispatcher() accepts dispatcher kwarg (default None), and if it is specified, returns a WrappedDispatcher; use create_dispatcher() (passing specified dispatcher if any) every time (regardless of dispatcher specification) * Add clarifying comment, rerun CI tests * Add space to make linter happy * working reconnect * rmed logs * added _logging.warning() disconnected/reconnected notifications to handleDisconnect() * moved connect notification and dispatcher.read() (if doread kwarg [default False] is True) to setSock() (prevents those lines from running on ConnectionRefusedError) * run_forever(): reconnect kwarg now specifies sleep() time (defualt 5) * handleDisconnect(): fixed log msg * run_forever() refactor: stabilized stack frame count (at least in rel mode); added stack frame count to disconnect (warning) log; grossly oversimplified ;) * dispatcher simplification via DispatcherBase and DispatcherBase/WrappedDispatcher.timeout() * _logging: info(); enableTrace() supports level kwarg (default "DEBUG") * handleDisconnect() uses info() log * Fix linting errors Co-authored-by: engn33r <engn33r@users.noreply.github.com>
-rw-r--r--websocket/_app.py163
-rw-r--r--websocket/_logging.py8
2 files changed, 99 insertions, 72 deletions
diff --git a/websocket/_app.py b/websocket/_app.py
index da49ec7..3a2c8b7 100644
--- a/websocket/_app.py
+++ b/websocket/_app.py
@@ -1,3 +1,4 @@
+import inspect
import selectors
import sys
import threading
@@ -30,14 +31,19 @@ limitations under the License.
__all__ = ["WebSocketApp"]
-class Dispatcher:
+class DispatcherBase:
"""
- Dispatcher
+ DispatcherBase
"""
def __init__(self, app, ping_timeout):
self.app = app
self.ping_timeout = ping_timeout
+
+class Dispatcher(DispatcherBase):
+ """
+ Dispatcher
+ """
def read(self, sock, read_callback, check_callback):
while self.app.keep_running:
sel = selectors.DefaultSelector()
@@ -50,15 +56,15 @@ class Dispatcher:
check_callback()
sel.close()
+ def timeout(self, seconds, callback):
+ time.sleep(seconds)
+ callback()
-class SSLDispatcher:
+
+class SSLDispatcher(DispatcherBase):
"""
SSLDispatcher
"""
- def __init__(self, app, ping_timeout):
- self.app = app
- self.ping_timeout = ping_timeout
-
def read(self, sock, read_callback, check_callback):
while self.app.keep_running:
r = self.select()
@@ -95,6 +101,9 @@ class WrappedDispatcher:
self.dispatcher.read(sock, read_callback)
self.ping_timeout and self.dispatcher.timeout(self.ping_timeout, check_callback)
+ def timeout(self, seconds, callback):
+ self.dispatcher.timeout(seconds, callback)
+
class WebSocketApp:
"""
@@ -230,7 +239,7 @@ class WebSocketApp:
http_no_proxy=None, http_proxy_auth=None,
skip_utf8_validation=False,
host=None, origin=None, dispatcher=None,
- suppress_origin=False, proxy_type=None):
+ suppress_origin=False, proxy_type=None, reconnect=5):
"""
Run event loop for WebSocket framework.
@@ -317,84 +326,98 @@ class WebSocketApp:
# Finally call the callback AFTER all teardown is complete
self._callback(self.on_close, close_status_code, close_reason)
- try:
+ def setSock():
self.sock = WebSocket(
self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
fire_cont_frame=self.on_cont_message is not None,
skip_utf8_validation=skip_utf8_validation,
enable_multithread=True)
self.sock.settimeout(getdefaulttimeout())
- self.sock.connect(
- self.url, header=self.header, cookie=self.cookie,
- http_proxy_host=http_proxy_host,
- http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
- http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
- host=host, origin=origin, suppress_origin=suppress_origin,
- proxy_type=proxy_type, socket=self.prepared_socket)
- dispatcher = self.create_dispatcher(ping_timeout, dispatcher)
-
- self._callback(self.on_open)
-
- if ping_interval:
- event = threading.Event()
- thread = threading.Thread(
- target=self._send_ping, args=(ping_interval, event, ping_payload))
- thread.daemon = True
- thread.start()
-
- def read():
- if not self.keep_running:
- return teardown()
+ try:
+ self.sock.connect(
+ self.url, header=self.header, cookie=self.cookie,
+ http_proxy_host=http_proxy_host,
+ http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
+ http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
+ host=host, origin=origin, suppress_origin=suppress_origin,
+ proxy_type=proxy_type, socket=self.prepared_socket)
+
+ self._callback(self.on_open)
+
+ _logging.warning("websocket connected")
+ dispatcher.read(self.sock.sock, read, check)
+ except (Exception, ConnectionRefusedError, KeyboardInterrupt, SystemExit) as e:
+ handleDisconnect(e)
+
+ def read():
+ if not self.keep_running:
+ return teardown()
+ try:
op_code, frame = self.sock.recv_data_frame(True)
- if op_code == ABNF.OPCODE_CLOSE:
- return teardown(frame)
- elif op_code == ABNF.OPCODE_PING:
- self._callback(self.on_ping, frame.data)
- elif op_code == ABNF.OPCODE_PONG:
- self.last_pong_tm = time.time()
- self._callback(self.on_pong, frame.data)
- elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
- self._callback(self.on_data, frame.data,
- frame.opcode, frame.fin)
- self._callback(self.on_cont_message,
- frame.data, frame.fin)
- else:
- data = frame.data
- if op_code == ABNF.OPCODE_TEXT:
- data = data.decode("utf-8")
- self._callback(self.on_data, data, frame.opcode, True)
- self._callback(self.on_message, data)
-
- return True
-
- def check():
- if (ping_timeout):
- has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
- has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
- has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
-
- if (self.last_ping_tm and
- has_timeout_expired and
- (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
- raise WebSocketTimeoutException("ping/pong timed out")
- return True
-
- dispatcher.read(self.sock.sock, read, check)
- return False
- except (Exception, KeyboardInterrupt, SystemExit) as e:
+ except WebSocketConnectionClosedException as e:
+ _logging.error("WebSocketConnectionClosedException - %s" % (reconnect and "reconnecting" or "goodbye"))
+ return handleDisconnect(e)
+ if op_code == ABNF.OPCODE_CLOSE:
+ return teardown(frame)
+ elif op_code == ABNF.OPCODE_PING:
+ self._callback(self.on_ping, frame.data)
+ elif op_code == ABNF.OPCODE_PONG:
+ self.last_pong_tm = time.time()
+ self._callback(self.on_pong, frame.data)
+ elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
+ self._callback(self.on_data, frame.data,
+ frame.opcode, frame.fin)
+ self._callback(self.on_cont_message,
+ frame.data, frame.fin)
+ else:
+ data = frame.data
+ if op_code == ABNF.OPCODE_TEXT:
+ data = data.decode("utf-8")
+ self._callback(self.on_data, data, frame.opcode, True)
+ self._callback(self.on_message, data)
+
+ return True
+
+ def check():
+ if (ping_timeout):
+ has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
+ has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
+ has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
+
+ if (self.last_ping_tm and
+ has_timeout_expired and
+ (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
+ raise WebSocketTimeoutException("ping/pong timed out")
+ return True
+
+ def handleDisconnect(e):
self._callback(self.on_error, e)
if isinstance(e, SystemExit):
# propagate SystemExit further
raise
- teardown()
- return not isinstance(e, KeyboardInterrupt)
+ if reconnect and not isinstance(e, KeyboardInterrupt):
+ _logging.info("websocket disconnected (retrying in %s seconds) [%s frames in stack]" % (reconnect, len(inspect.stack())))
+ dispatcher.timeout(reconnect, setSock)
+ else:
+ teardown()
+
+ dispatcher = self.create_dispatcher(ping_timeout, dispatcher, not not sslopt)
+
+ if ping_interval:
+ event = threading.Event()
+ thread = threading.Thread(
+ target=self._send_ping, args=(ping_interval, event, ping_payload))
+ thread.daemon = True
+ thread.start()
+
+ setSock()
- def create_dispatcher(self, ping_timeout, dispatcher=None):
+ def create_dispatcher(self, ping_timeout, dispatcher=None, is_ssl=False):
if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
return WrappedDispatcher(self, ping_timeout, dispatcher)
timeout = ping_timeout or 10
- if self.sock.is_ssl():
+ if is_ssl:
return SSLDispatcher(self, timeout)
return Dispatcher(self, timeout)
diff --git a/websocket/_logging.py b/websocket/_logging.py
index df690dc..3921111 100644
--- a/websocket/_logging.py
+++ b/websocket/_logging.py
@@ -35,7 +35,7 @@ __all__ = ["enableTrace", "dump", "error", "warning", "debug", "trace",
"isEnabledForError", "isEnabledForDebug", "isEnabledForTrace"]
-def enableTrace(traceable, handler=logging.StreamHandler()):
+def enableTrace(traceable, handler=logging.StreamHandler(), level="DEBUG"):
"""
Turn on/off the traceability.
@@ -48,7 +48,7 @@ def enableTrace(traceable, handler=logging.StreamHandler()):
_traceEnabled = traceable
if traceable:
_logger.addHandler(handler)
- _logger.setLevel(logging.DEBUG)
+ _logger.setLevel(getattr(logging, level))
def dump(title, message):
@@ -70,6 +70,10 @@ def debug(msg):
_logger.debug(msg)
+def info(msg):
+ _logger.info(msg)
+
+
def trace(msg):
if _traceEnabled:
_logger.debug(msg)