diff options
author | bubbleboy14 <mario.balibrera@gmail.com> | 2022-08-24 11:07:09 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-24 18:07:09 +0000 |
commit | 43eb3446cdff6f092f20c00a5de012d32fc0336d (patch) | |
tree | 16051ddb651857c8ff30883951f88b0c4ddcefdb | |
parent | 61171591b08ee031e02cc6cb129952259062f502 (diff) | |
download | websocket-client-43eb3446cdff6f092f20c00a5de012d32fc0336d.tar.gz |
reconnect (#838)
* rel example
* tweaked rel example for linter
* added rel note to examples.rst
* slightly more compact example
* added example header
* matched wording
* _socket.recv(): _recv() except socket.error - changed or to and; added except TimeoutError - raise WebSocketTimeoutException
* _app - custom dispatcher check_callback() integration (and fixed pyevent compatibility): WrappedDispatcher (for use with generic event dispatchers such as pyevent and rel); create_dispatcher() accepts dispatcher kwarg (default None), and if it is specified, returns a WrappedDispatcher; use create_dispatcher() (passing specified dispatcher if any) every time (regardless of dispatcher specification)
* Add clarifying comment, rerun CI tests
* Add space to make linter happy
* working reconnect
* rmed logs
* added _logging.warning() disconnected/reconnected notifications to handleDisconnect()
* moved connect notification and dispatcher.read() (if doread kwarg [default False] is True) to setSock() (prevents those lines from running on ConnectionRefusedError)
* run_forever(): reconnect kwarg now specifies sleep() time (defualt 5)
* handleDisconnect(): fixed log msg
* run_forever() refactor: stabilized stack frame count (at least in rel mode); added stack frame count to disconnect (warning) log; grossly oversimplified ;)
* dispatcher simplification via DispatcherBase and DispatcherBase/WrappedDispatcher.timeout()
* _logging: info(); enableTrace() supports level kwarg (default "DEBUG")
* handleDisconnect() uses info() log
* Fix linting errors
Co-authored-by: engn33r <engn33r@users.noreply.github.com>
-rw-r--r-- | websocket/_app.py | 163 | ||||
-rw-r--r-- | websocket/_logging.py | 8 |
2 files changed, 99 insertions, 72 deletions
diff --git a/websocket/_app.py b/websocket/_app.py index da49ec7..3a2c8b7 100644 --- a/websocket/_app.py +++ b/websocket/_app.py @@ -1,3 +1,4 @@ +import inspect import selectors import sys import threading @@ -30,14 +31,19 @@ limitations under the License. __all__ = ["WebSocketApp"] -class Dispatcher: +class DispatcherBase: """ - Dispatcher + DispatcherBase """ def __init__(self, app, ping_timeout): self.app = app self.ping_timeout = ping_timeout + +class Dispatcher(DispatcherBase): + """ + Dispatcher + """ def read(self, sock, read_callback, check_callback): while self.app.keep_running: sel = selectors.DefaultSelector() @@ -50,15 +56,15 @@ class Dispatcher: check_callback() sel.close() + def timeout(self, seconds, callback): + time.sleep(seconds) + callback() -class SSLDispatcher: + +class SSLDispatcher(DispatcherBase): """ SSLDispatcher """ - def __init__(self, app, ping_timeout): - self.app = app - self.ping_timeout = ping_timeout - def read(self, sock, read_callback, check_callback): while self.app.keep_running: r = self.select() @@ -95,6 +101,9 @@ class WrappedDispatcher: self.dispatcher.read(sock, read_callback) self.ping_timeout and self.dispatcher.timeout(self.ping_timeout, check_callback) + def timeout(self, seconds, callback): + self.dispatcher.timeout(seconds, callback) + class WebSocketApp: """ @@ -230,7 +239,7 @@ class WebSocketApp: http_no_proxy=None, http_proxy_auth=None, skip_utf8_validation=False, host=None, origin=None, dispatcher=None, - suppress_origin=False, proxy_type=None): + suppress_origin=False, proxy_type=None, reconnect=5): """ Run event loop for WebSocket framework. @@ -317,84 +326,98 @@ class WebSocketApp: # Finally call the callback AFTER all teardown is complete self._callback(self.on_close, close_status_code, close_reason) - try: + def setSock(): self.sock = WebSocket( self.get_mask_key, sockopt=sockopt, sslopt=sslopt, fire_cont_frame=self.on_cont_message is not None, skip_utf8_validation=skip_utf8_validation, enable_multithread=True) self.sock.settimeout(getdefaulttimeout()) - self.sock.connect( - self.url, header=self.header, cookie=self.cookie, - http_proxy_host=http_proxy_host, - http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, - http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, - host=host, origin=origin, suppress_origin=suppress_origin, - proxy_type=proxy_type, socket=self.prepared_socket) - dispatcher = self.create_dispatcher(ping_timeout, dispatcher) - - self._callback(self.on_open) - - if ping_interval: - event = threading.Event() - thread = threading.Thread( - target=self._send_ping, args=(ping_interval, event, ping_payload)) - thread.daemon = True - thread.start() - - def read(): - if not self.keep_running: - return teardown() + try: + self.sock.connect( + self.url, header=self.header, cookie=self.cookie, + http_proxy_host=http_proxy_host, + http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, + http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, + host=host, origin=origin, suppress_origin=suppress_origin, + proxy_type=proxy_type, socket=self.prepared_socket) + + self._callback(self.on_open) + + _logging.warning("websocket connected") + dispatcher.read(self.sock.sock, read, check) + except (Exception, ConnectionRefusedError, KeyboardInterrupt, SystemExit) as e: + handleDisconnect(e) + + def read(): + if not self.keep_running: + return teardown() + try: op_code, frame = self.sock.recv_data_frame(True) - if op_code == ABNF.OPCODE_CLOSE: - return teardown(frame) - elif op_code == ABNF.OPCODE_PING: - self._callback(self.on_ping, frame.data) - elif op_code == ABNF.OPCODE_PONG: - self.last_pong_tm = time.time() - self._callback(self.on_pong, frame.data) - elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: - self._callback(self.on_data, frame.data, - frame.opcode, frame.fin) - self._callback(self.on_cont_message, - frame.data, frame.fin) - else: - data = frame.data - if op_code == ABNF.OPCODE_TEXT: - data = data.decode("utf-8") - self._callback(self.on_data, data, frame.opcode, True) - self._callback(self.on_message, data) - - return True - - def check(): - if (ping_timeout): - has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout - has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 - has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout - - if (self.last_ping_tm and - has_timeout_expired and - (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): - raise WebSocketTimeoutException("ping/pong timed out") - return True - - dispatcher.read(self.sock.sock, read, check) - return False - except (Exception, KeyboardInterrupt, SystemExit) as e: + except WebSocketConnectionClosedException as e: + _logging.error("WebSocketConnectionClosedException - %s" % (reconnect and "reconnecting" or "goodbye")) + return handleDisconnect(e) + if op_code == ABNF.OPCODE_CLOSE: + return teardown(frame) + elif op_code == ABNF.OPCODE_PING: + self._callback(self.on_ping, frame.data) + elif op_code == ABNF.OPCODE_PONG: + self.last_pong_tm = time.time() + self._callback(self.on_pong, frame.data) + elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: + self._callback(self.on_data, frame.data, + frame.opcode, frame.fin) + self._callback(self.on_cont_message, + frame.data, frame.fin) + else: + data = frame.data + if op_code == ABNF.OPCODE_TEXT: + data = data.decode("utf-8") + self._callback(self.on_data, data, frame.opcode, True) + self._callback(self.on_message, data) + + return True + + def check(): + if (ping_timeout): + has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout + has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 + has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout + + if (self.last_ping_tm and + has_timeout_expired and + (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): + raise WebSocketTimeoutException("ping/pong timed out") + return True + + def handleDisconnect(e): self._callback(self.on_error, e) if isinstance(e, SystemExit): # propagate SystemExit further raise - teardown() - return not isinstance(e, KeyboardInterrupt) + if reconnect and not isinstance(e, KeyboardInterrupt): + _logging.info("websocket disconnected (retrying in %s seconds) [%s frames in stack]" % (reconnect, len(inspect.stack()))) + dispatcher.timeout(reconnect, setSock) + else: + teardown() + + dispatcher = self.create_dispatcher(ping_timeout, dispatcher, not not sslopt) + + if ping_interval: + event = threading.Event() + thread = threading.Thread( + target=self._send_ping, args=(ping_interval, event, ping_payload)) + thread.daemon = True + thread.start() + + setSock() - def create_dispatcher(self, ping_timeout, dispatcher=None): + def create_dispatcher(self, ping_timeout, dispatcher=None, is_ssl=False): if dispatcher: # If custom dispatcher is set, use WrappedDispatcher return WrappedDispatcher(self, ping_timeout, dispatcher) timeout = ping_timeout or 10 - if self.sock.is_ssl(): + if is_ssl: return SSLDispatcher(self, timeout) return Dispatcher(self, timeout) diff --git a/websocket/_logging.py b/websocket/_logging.py index df690dc..3921111 100644 --- a/websocket/_logging.py +++ b/websocket/_logging.py @@ -35,7 +35,7 @@ __all__ = ["enableTrace", "dump", "error", "warning", "debug", "trace", "isEnabledForError", "isEnabledForDebug", "isEnabledForTrace"] -def enableTrace(traceable, handler=logging.StreamHandler()): +def enableTrace(traceable, handler=logging.StreamHandler(), level="DEBUG"): """ Turn on/off the traceability. @@ -48,7 +48,7 @@ def enableTrace(traceable, handler=logging.StreamHandler()): _traceEnabled = traceable if traceable: _logger.addHandler(handler) - _logger.setLevel(logging.DEBUG) + _logger.setLevel(getattr(logging, level)) def dump(title, message): @@ -70,6 +70,10 @@ def debug(msg): _logger.debug(msg) +def info(msg): + _logger.info(msg) + + def trace(msg): if _traceEnabled: _logger.debug(msg) |