diff options
author | liris <liris.pp@gmail.com> | 2014-05-30 12:23:55 +0900 |
---|---|---|
committer | liris <liris.pp@gmail.com> | 2014-05-30 12:23:55 +0900 |
commit | 048d683da1063d67aede619dd6d552394f345f78 (patch) | |
tree | e32a7da747fe25dea63b147da723947c33ec27fb | |
parent | b459b289fdcb14f3d1f2673cf366204c974e13ec (diff) | |
download | websocket-client-048d683da1063d67aede619dd6d552394f345f78.tar.gz |
- refactoring
- split to multiple file
-rw-r--r-- | tests/test_websocket.py | 36 | ||||
-rw-r--r-- | websocket/__init__.py | 1102 | ||||
-rw-r--r-- | websocket/_abnf.py | 166 | ||||
-rw-r--r-- | websocket/_app.py | 190 | ||||
-rw-r--r-- | websocket/_core.py | 807 | ||||
-rw-r--r-- | websocket/_exceptions.py | 46 |
6 files changed, 1231 insertions, 1116 deletions
diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 671ffd6..26dd59c 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -24,9 +24,12 @@ import uuid # websocket-client import websocket as ws +from websocket._core import _parse_url, _create_sec_websocket_key + # Skip test to access the internet. TEST_WITH_INTERNET = False +# TEST_WITH_INTERNET = True # Skip Secure WebSocket test. TEST_SECURE_WS = False @@ -82,91 +85,91 @@ class WebSocketTest(unittest.TestCase): ws.setdefaulttimeout(None) def testParseUrl(self): - p = ws._parse_url("ws://www.example.com/r") + p = _parse_url("ws://www.example.com/r") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 80) self.assertEqual(p[2], "/r") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com/r/") + p = _parse_url("ws://www.example.com/r/") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 80) self.assertEqual(p[2], "/r/") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com/") + p = _parse_url("ws://www.example.com/") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 80) self.assertEqual(p[2], "/") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com") + p = _parse_url("ws://www.example.com") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 80) self.assertEqual(p[2], "/") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com:8080/r") + p = _parse_url("ws://www.example.com:8080/r") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/r") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com:8080/") + p = _parse_url("ws://www.example.com:8080/") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/") self.assertEqual(p[3], False) - p = ws._parse_url("ws://www.example.com:8080") + p = _parse_url("ws://www.example.com:8080") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/") self.assertEqual(p[3], False) - p = ws._parse_url("wss://www.example.com:8080/r") + p = _parse_url("wss://www.example.com:8080/r") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/r") self.assertEqual(p[3], True) - p = ws._parse_url("wss://www.example.com:8080/r?key=value") + p = _parse_url("wss://www.example.com:8080/r?key=value") self.assertEqual(p[0], "www.example.com") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/r?key=value") self.assertEqual(p[3], True) - self.assertRaises(ValueError, ws._parse_url, "http://www.example.com/r") + self.assertRaises(ValueError, _parse_url, "http://www.example.com/r") if sys.version_info[0] == 2 and sys.version_info[1] < 7: return - p = ws._parse_url("ws://[2a03:4000:123:83::3]/r") + p = _parse_url("ws://[2a03:4000:123:83::3]/r") self.assertEqual(p[0], "2a03:4000:123:83::3") self.assertEqual(p[1], 80) self.assertEqual(p[2], "/r") self.assertEqual(p[3], False) - p = ws._parse_url("ws://[2a03:4000:123:83::3]:8080/r") + p = _parse_url("ws://[2a03:4000:123:83::3]:8080/r") self.assertEqual(p[0], "2a03:4000:123:83::3") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/r") self.assertEqual(p[3], False) - p = ws._parse_url("wss://[2a03:4000:123:83::3]/r") + p = _parse_url("wss://[2a03:4000:123:83::3]/r") self.assertEqual(p[0], "2a03:4000:123:83::3") self.assertEqual(p[1], 443) self.assertEqual(p[2], "/r") self.assertEqual(p[3], True) - p = ws._parse_url("wss://[2a03:4000:123:83::3]:8080/r") + p = _parse_url("wss://[2a03:4000:123:83::3]:8080/r") self.assertEqual(p[0], "2a03:4000:123:83::3") self.assertEqual(p[1], 8080) self.assertEqual(p[2], "/r") self.assertEqual(p[3], True) def testWSKey(self): - key = ws._create_sec_websocket_key() + key = _create_sec_websocket_key() self.assertTrue(key != 24) self.assertTrue(six.u("¥n") not in key) @@ -389,7 +392,7 @@ class WebSocketTest(unittest.TestCase): def testUUID4(self): """ WebSocket key should be a UUID4. """ - key = ws._create_sec_websocket_key() + key = _create_sec_websocket_key() u = uuid.UUID(bytes=base64.b64decode(key)) self.assertEqual(4, u.version) @@ -423,6 +426,7 @@ class WebSocketAppTest(unittest.TestCase): close the connection. """ WebSocketAppTest.keep_running_open = self.keep_running + self.close() def on_close(self, *args, **kwargs): diff --git a/websocket/__init__.py b/websocket/__init__.py index d78dc70..fa4b98b 100644 --- a/websocket/__init__.py +++ b/websocket/__init__.py @@ -18,1103 +18,5 @@ Copyright (C) 2010 Hiroki Ohtani(liris) Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ -from __future__ import print_function - - -import six -import socket - -try: - import ssl - from ssl import SSLError - if hasattr(ssl, "match_hostname"): - from ssl import match_hostname - else: - from backports.ssl_match_hostname import match_hostname - - HAVE_SSL = True -except ImportError: - # dummy class of SSLError for ssl none-support environment. - class SSLError(Exception): - pass - - HAVE_SSL = False - -from six.moves.urllib.parse import urlparse -if six.PY3: - from base64 import encodebytes as base64encode -else: - from base64 import encodestring as base64encode - -import os -import array -import struct -import uuid -import hashlib -import threading -import time -import logging -import traceback -import sys -import select - -""" -websocket python client. -========================= - -This version support only hybi-13. -Please see http://tools.ietf.org/html/rfc6455 for protocol. -""" - - -# websocket supported version. -VERSION = 13 - -# closing frame status codes. -STATUS_NORMAL = 1000 -STATUS_GOING_AWAY = 1001 -STATUS_PROTOCOL_ERROR = 1002 -STATUS_UNSUPPORTED_DATA_TYPE = 1003 -STATUS_STATUS_NOT_AVAILABLE = 1005 -STATUS_ABNORMAL_CLOSED = 1006 -STATUS_INVALID_PAYLOAD = 1007 -STATUS_POLICY_VIOLATION = 1008 -STATUS_MESSAGE_TOO_BIG = 1009 -STATUS_INVALID_EXTENSION = 1010 -STATUS_UNEXPECTED_CONDITION = 1011 -STATUS_TLS_HANDSHAKE_ERROR = 1015 - -DEFAULT_SOCKET_OPTION = [(socket.SOL_TCP, socket.TCP_NODELAY, 1),] -if hasattr(socket, "SO_KEEPALIVE"): - DEFAULT_SOCKET_OPTION.append((socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)) -if hasattr(socket, "TCP_KEEPIDLE"): - DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPIDLE, 30)) -if hasattr(socket, "TCP_KEEPINTVL"): - DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)) -if hasattr(socket, "TCP_KEEPCNT"): - DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPCNT, 3)) - -logger = logging.getLogger() - - -class WebSocketException(Exception): - """ - websocket exeception class. - """ - pass - - -class WebSocketConnectionClosedException(WebSocketException): - """ - If remote host closed the connection or some network error happened, - this exception will be raised. - """ - pass - -class WebSocketTimeoutException(WebSocketException): - """ - WebSocketTimeoutException will be raised at socket timeout during read/write data. - """ - pass - -default_timeout = None -traceEnabled = False - - -def enableTrace(tracable): - """ - turn on/off the tracability. - - tracable: boolean value. if set True, tracability is enabled. - """ - global traceEnabled - traceEnabled = tracable - if tracable: - if not logger.handlers: - logger.addHandler(logging.StreamHandler()) - logger.setLevel(logging.DEBUG) - - -def setdefaulttimeout(timeout): - """ - Set the global timeout setting to connect. - - timeout: default socket timeout time. This value is second. - """ - global default_timeout - default_timeout = timeout - - -def getdefaulttimeout(): - """ - Return the global timeout setting(second) to connect. - """ - return default_timeout - - -def _parse_url(url): - """ - parse url and the result is tuple of - (hostname, port, resource path and the flag of secure mode) - - url: url string. - """ - if ":" not in url: - raise ValueError("url is invalid") - - scheme, url = url.split(":", 1) - - parsed = urlparse(url, scheme="ws") - if parsed.hostname: - hostname = parsed.hostname - else: - raise ValueError("hostname is invalid") - port = 0 - if parsed.port: - port = parsed.port - - is_secure = False - if scheme == "ws": - if not port: - port = 80 - elif scheme == "wss": - is_secure = True - if not port: - port = 443 - else: - raise ValueError("scheme %s is invalid" % scheme) - - if parsed.path: - resource = parsed.path - else: - resource = "/" - - if parsed.query: - resource += "?" + parsed.query - - return (hostname, port, resource, is_secure) - - -def create_connection(url, timeout=None, **options): - """ - connect to url and return websocket object. - - Connect to url and return the WebSocket object. - Passing optional timeout parameter will set the timeout on the socket. - If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used. - You can customize using 'options'. - If you set "header" list object, you can set your own custom header. - - >>> conn = create_connection("ws://echo.websocket.org/", - ... header=["User-Agent: MyProgram", - ... "x-custom: header"]) - - - timeout: socket timeout time. This value is integer. - if you set None for this value, it means "use default_timeout value" - - - options: "header" -> custom http header list. - "cookie" -> cookie value. - "http_proxy_host" - http proxy host name. - "http_proxy_port" - http proxy port. If not set, set to 80. - """ - sockopt = options.get("sockopt", []) - sslopt = options.get("sslopt", {}) - fire_cont_frame = options.get("fire_cont_frame", False) - websock = WebSocket(sockopt=sockopt, sslopt=sslopt, fire_cont_frame = fire_cont_frame) - websock.settimeout(timeout if timeout is not None else default_timeout) - websock.connect(url, **options) - return websock - -_MAX_INTEGER = (1 << 32) -1 -_AVAILABLE_KEY_CHARS = list(range(0x21, 0x2f + 1)) + list(range(0x3a, 0x7e + 1)) -_MAX_CHAR_BYTE = (1<<8) -1 - -# ref. Websocket gets an update, and it breaks stuff. -# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html - - -def _create_sec_websocket_key(): - uid = uuid.uuid4() - return base64encode(uid.bytes).decode('utf-8').strip() - - -_HEADERS_TO_CHECK = { - "upgrade": "websocket", - "connection": "upgrade", - } - - -class ABNF(object): - """ - ABNF frame class. - see http://tools.ietf.org/html/rfc5234 - and http://tools.ietf.org/html/rfc6455#section-5.2 - """ - - # operation code values. - OPCODE_CONT = 0x0 - OPCODE_TEXT = 0x1 - OPCODE_BINARY = 0x2 - OPCODE_CLOSE = 0x8 - OPCODE_PING = 0x9 - OPCODE_PONG = 0xa - - # available operation code value tuple - OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, - OPCODE_PING, OPCODE_PONG) - - # opcode human readable string - OPCODE_MAP = { - OPCODE_CONT: "cont", - OPCODE_TEXT: "text", - OPCODE_BINARY: "binary", - OPCODE_CLOSE: "close", - OPCODE_PING: "ping", - OPCODE_PONG: "pong" - } - - # data length threashold. - LENGTH_7 = 0x7e - LENGTH_16 = 1 << 16 - LENGTH_63 = 1 << 63 - - def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0, - opcode=OPCODE_TEXT, mask=1, data=""): - """ - Constructor for ABNF. - please check RFC for arguments. - """ - self.fin = fin - self.rsv1 = rsv1 - self.rsv2 = rsv2 - self.rsv3 = rsv3 - self.opcode = opcode - self.mask = mask - self.data = data - self.get_mask_key = os.urandom - - def __str__(self): - return "fin=" + str(self.fin) \ - + " opcode=" + str(self.opcode) \ - + " data=" + str(self.data) - - @staticmethod - def create_frame(data, opcode, fin=1): - """ - create frame to send text, binary and other data. - - data: data to send. This is string value(byte array). - if opcode is OPCODE_TEXT and this value is uniocde, - data value is conveted into unicode string, automatically. - - opcode: operation code. please see OPCODE_XXX. - - fin: fin flag. if set to 0, create continue fragmentation. - """ - if opcode == ABNF.OPCODE_TEXT and isinstance(data, six.text_type): - data = data.encode("utf-8") - # mask must be set if send data from client - return ABNF(fin, 0, 0, 0, opcode, 1, data) - - def format(self): - """ - format this object to string(byte array) to send data to server. - """ - if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): - raise ValueError("not 0 or 1") - if self.opcode not in ABNF.OPCODES: - raise ValueError("Invalid OPCODE") - length = len(self.data) - if length >= ABNF.LENGTH_63: - raise ValueError("data is too long") - - frame_header = chr(self.fin << 7 - | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 - | self.opcode) - if length < ABNF.LENGTH_7: - frame_header += chr(self.mask << 7 | length) - frame_header = six.b(frame_header) - elif length < ABNF.LENGTH_16: - frame_header += chr(self.mask << 7 | 0x7e) - frame_header = six.b(frame_header) - frame_header += struct.pack("!H", length) - else: - frame_header += chr(self.mask << 7 | 0x7f) - frame_header = six.b(frame_header) - frame_header += struct.pack("!Q", length) - - if not self.mask: - return frame_header + self.data - else: - mask_key = self.get_mask_key(4) - return frame_header + self._get_masked(mask_key) - - def _get_masked(self, mask_key): - s = ABNF.mask(mask_key, self.data) - - if isinstance(mask_key, six.text_type): - mask_key = mask_key.encode('utf-8') - - return mask_key + s - - @staticmethod - def mask(mask_key, data): - """ - mask or unmask data. Just do xor for each byte - - mask_key: 4 byte string(byte). - - data: data to mask/unmask. - """ - - if isinstance(mask_key, six.text_type): - mask_key = six.b(mask_key) - - if isinstance(data, six.text_type): - data = six.b(data) - - _m = array.array("B", mask_key) - _d = array.array("B", data) - for i in range(len(_d)): - _d[i] ^= _m[i % 4] - - if six.PY3: - return _d.tobytes() - else: - return _d.tostring() - - -class WebSocket(object): - """ - Low level WebSocket interface. - This class is based on - The WebSocket protocol draft-hixie-thewebsocketprotocol-76 - http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 - - We can connect to the websocket server and send/recieve data. - The following example is a echo client. - - >>> import websocket - >>> ws = websocket.WebSocket() - >>> ws.connect("ws://echo.websocket.org") - >>> ws.send("Hello, Server") - >>> ws.recv() - 'Hello, Server' - >>> ws.close() - - get_mask_key: a callable to produce new mask keys, see the set_mask_key - function's docstring for more details - sockopt: values for socket.setsockopt. - sockopt must be tuple and each element is argument of sock.setscokopt. - sslopt: dict object for ssl socket option. - fire_cont_frame: fire recv event for each cont frame. default is False - """ - - def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, - fire_cont_frame=False): - """ - Initalize WebSocket object. - """ - if sockopt is None: - sockopt = [] - if sslopt is None: - sslopt = {} - self.connected = False - self.sock = None - self._timeout = None - self.sockopt = sockopt - self.sslopt = sslopt - self.get_mask_key = get_mask_key - self.fire_cont_frame = fire_cont_frame - # Buffers over the packets from the layer beneath until desired amount - # bytes of bytes are received. - self._recv_buffer = [] - # These buffer over the build-up of a single frame. - self._frame_header = None - self._frame_length = None - self._frame_mask = None - self._cont_data = None - - def fileno(self): - return self.sock.fileno() - - def set_mask_key(self, func): - """ - set function to create musk key. You can custumize mask key generator. - Mainly, this is for testing purpose. - - func: callable object. the fuct must 1 argument as integer. - The argument means length of mask key. - This func must be return string(byte array), - which length is argument specified. - """ - self.get_mask_key = func - - def gettimeout(self): - """ - Get the websocket timeout(second). - """ - return self._timeout - - def settimeout(self, timeout): - """ - Set the timeout to the websocket. - - timeout: timeout time(second). - """ - self._timeout = timeout - - timeout = property(gettimeout, settimeout) - - def connect(self, url, **options): - """ - Connect to url. url is websocket url scheme. ie. ws://host:port/resource - You can customize using 'options'. - If you set "header" list object, you can set your own custom header. - - >>> ws = WebSocket() - >>> ws.connect("ws://echo.websocket.org/", - ... header=["User-Agent: MyProgram", - ... "x-custom: header"]) - - timeout: socket timeout time. This value is integer. - if you set None for this value, - it means "use default_timeout value" - - options: "header" -> custom http header list. - "cookie" -> cookie value. - "http_proxy_host" - http proxy host name. - "http_proxy_port" - http proxy port. If not set, set to 80. - - """ - - hostname, port, resource, is_secure = _parse_url(url) - proxy_host, proxy_port = options.get("http_proxy_host", None), options.get("http_proxy_port", 0) - if not proxy_host: - addrinfo_list = socket.getaddrinfo(hostname, port, 0, 0, socket.SOL_TCP) - else: - proxy_port = proxy_port and proxy_port or 80 - addrinfo_list = socket.getaddrinfo(proxy_host, proxy_port, 0, 0, socket.SOL_TCP) - - if not addrinfo_list: - raise WebSocketException("Host not found.: " + hostname + ":" + str(port)) - - family = addrinfo_list[0][0] - self.sock = socket.socket(family) - self.sock.settimeout(self.timeout) - for opts in DEFAULT_SOCKET_OPTION: - self.sock.setsockopt(*opts) - for opts in self.sockopt: - self.sock.setsockopt(*opts) - # TODO: we need to support proxy - address = addrinfo_list[0][4] - self.sock.connect(address) - - if proxy_host: - self._tunnel(hostname, port) - - if is_secure: - if HAVE_SSL: - sslopt = dict(cert_reqs=ssl.CERT_REQUIRED, - ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem")) - sslopt.update(self.sslopt) - self.sock = ssl.wrap_socket(self.sock, **sslopt) - match_hostname(self.sock.getpeercert(), hostname) - else: - raise WebSocketException("SSL not available.") - - self._handshake(hostname, port, resource, **options) - - def _tunnel(self, host, port): - logger.debug("Connecting proxy...") - connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port) - connect_header += "\r\n" - if traceEnabled: - logger.debug("--- request header ---") - logger.debug(connect_header) - logger.debug("-----------------------") - - self._send(connect_header) - - status, resp_headers = self._read_headers() - if status != 200: - raise WebSocketException("failed CONNECT via proxy") - - def _handshake(self, host, port, resource, **options): - headers = [] - headers.append("GET %s HTTP/1.1" % resource) - headers.append("Upgrade: websocket") - headers.append("Connection: Upgrade") - if port == 80: - hostport = host - else: - hostport = "%s:%d" % (host, port) - headers.append("Host: %s" % hostport) - - if "origin" in options: - headers.append("Origin: %s" % options["origin"]) - else: - headers.append("Origin: http://%s" % hostport) - - key = _create_sec_websocket_key() - headers.append("Sec-WebSocket-Key: %s" % key) - headers.append("Sec-WebSocket-Version: %s" % VERSION) - - if "header" in options: - headers.extend(options["header"]) - cookie = options.get("cookie", None) - if cookie: - headers.append("Cookie: %s" % cookie) - - headers.append("") - headers.append("") - - header_str = "\r\n".join(headers) - self._send(header_str) - if traceEnabled: - logger.debug("--- request header ---") - logger.debug(header_str) - logger.debug("-----------------------") - - status, resp_headers = self._read_headers() - if status != 101: - self.close() - raise WebSocketException("Handshake Status %d" % status) - - success = self._validate_header(resp_headers, key) - if not success: - self.close() - raise WebSocketException("Invalid WebSocket Header") - - self.connected = True - - def _validate_header(self, headers, key): - for k, v in _HEADERS_TO_CHECK.items(): - r = headers.get(k, None) - if not r: - return False - r = r.lower() - if v != r: - return False - - result = headers.get("sec-websocket-accept", None) - if not result: - return False - result = result.lower() - - if isinstance(result, six.text_type): - result = result.encode('utf-8') - - value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8') - hashed = base64encode(hashlib.sha1(value).digest()).strip().lower() - return hashed == result - - def _read_headers(self): - status = None - headers = {} - if traceEnabled: - logger.debug("--- response header ---") - - while True: - line = self._recv_line() - line = line.decode('utf-8') - if line == "\r\n" or line == "\n": - break - line = line.strip() - if traceEnabled: - logger.debug(line) - if not status: - status_info = line.split(" ", 2) - status = int(status_info[1]) - else: - kv = line.split(":", 1) - if len(kv) == 2: - key, value = kv - headers[key.lower()] = value.strip().lower() - else: - raise WebSocketException("Invalid header") - - if traceEnabled: - logger.debug("-----------------------") - - return status, headers - - def send(self, payload, opcode=ABNF.OPCODE_TEXT): - """ - Send the data as string. - - payload: Payload must be utf-8 string or unicoce, - if the opcode is OPCODE_TEXT. - Otherwise, it must be string(byte array) - - opcode: operation code to send. Please see OPCODE_XXX. - """ - - frame = ABNF.create_frame(payload, opcode) - return self.send_frame(frame) - - def send_frame(self, frame): - """ - Send the data frame. - - frame: frame data created by ABNF.create_frame - - >>> ws = create_connection("ws://echo.websocket.org/") - >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) - >>> ws.send_frame(frame) - >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) - >>> ws.send_frame(frame) - >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) - >>> ws.send_frame(frame) - - """ - if self.get_mask_key: - frame.get_mask_key = self.get_mask_key - data = frame.format() - length = len(data) - if traceEnabled: - logger.debug("send: " + repr(data)) - while data: - l = self._send(data) - data = data[l:] - return length - - - def send_binary(self, payload): - return self.send(payload, ABNF.OPCODE_BINARY) - - def ping(self, payload=""): - """ - send ping data. - - payload: data payload to send server. - """ - self.send(payload, ABNF.OPCODE_PING) - - def pong(self, payload): - """ - send pong data. - - payload: data payload to send server. - """ - self.send(payload, ABNF.OPCODE_PONG) - - def recv(self): - """ - Receive string data(byte array) from the server. - - return value: string(byte array) value. - """ - opcode, data = self.recv_data() - if six.PY3 and opcode == ABNF.OPCODE_TEXT: - return data.decode("utf-8") - return data - - def recv_data(self, control_frame=False): - """ - Recieve data with operation code. - - control_frame: a boolean flag indicating whether to return control frame - data, defaults to False - - return value: tuple of operation code and string(byte array) value. - """ - while True: - frame = self.recv_frame() - if not frame: - # handle error: - # 'NoneType' object has no attribute 'opcode' - raise WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): - if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: - raise WebSocketException("Illegal frame") - if self._cont_data: - self._cont_data[1] += frame.data - else: - self._cont_data = [frame.opcode, frame.data] - - if frame.fin or self.fire_cont_frame: - data = self._cont_data - self._cont_data = None - return data - elif frame.opcode == ABNF.OPCODE_CLOSE: - self.send_close() - return (frame.opcode, frame.data) - elif frame.opcode == ABNF.OPCODE_PING: - self.pong(frame.data) - if control_frame: - return (frame.opcode, frame.data) - elif frame.opcode == ABNF.OPCODE_PONG: - if control_frame: - return (frame.opcode, frame.data) - - def recv_data_frame(self, control_frame=False): - """ - Recieve data with operation code. - - control_frame: a boolean flag indicating whether to return control frame - data, defaults to False - - return value: tuple of operation code and string(byte array) value. - """ - while True: - frame = self.recv_frame() - if not frame: - # handle error: - # 'NoneType' object has no attribute 'opcode' - raise WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): - if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: - raise WebSocketException("Illegal frame") - if self._cont_data: - self._cont_data[1].data += frame.data - else: - self._cont_data = [frame.opcode, frame] - - if frame.fin or self.fire_cont_frame: - data = self._cont_data - self._cont_data = None - return data - elif frame.opcode == ABNF.OPCODE_CLOSE: - self.send_close() - return (frame.opcode, frame) - elif frame.opcode == ABNF.OPCODE_PING: - self.pong(frame.data) - if control_frame: - return (frame.opcode, frame) - elif frame.opcode == ABNF.OPCODE_PONG: - if control_frame: - return (frame.opcode, frame) - - def recv_frame(self): - """ - recieve data as frame from server. - - return value: ABNF frame object. - """ - - # Header - if self._frame_header is None: - self._frame_header = self._recv_strict(2) - - b1 = self._frame_header[0] - - if six.PY2: - b1 = ord(b1) - - fin = b1 >> 7 & 1 - rsv1 = b1 >> 6 & 1 - rsv2 = b1 >> 5 & 1 - rsv3 = b1 >> 4 & 1 - opcode = b1 & 0xf - b2 = self._frame_header[1] - - if six.PY2: - b2 = ord(b2) - - has_mask = b2 >> 7 & 1 - - # Frame length - if self._frame_length is None: - length_bits = b2 & 0x7f - if length_bits == 0x7e: - length_data = self._recv_strict(2) - self._frame_length = struct.unpack("!H", length_data)[0] - elif length_bits == 0x7f: - length_data = self._recv_strict(8) - self._frame_length = struct.unpack("!Q", length_data)[0] - else: - self._frame_length = length_bits - - # Mask - if self._frame_mask is None: - self._frame_mask = self._recv_strict(4) if has_mask else "" - - # Payload - payload = self._recv_strict(self._frame_length) - if has_mask: - payload = ABNF.mask(self._frame_mask, payload) - - # Reset for next frame - self._frame_header = None - self._frame_length = None - self._frame_mask = None - - return ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) - - - def send_close(self, status=STATUS_NORMAL, reason=""): - """ - send close data to the server. - - status: status code to send. see STATUS_XXX. - - reason: the reason to close. This must be string. - """ - if status < 0 or status >= ABNF.LENGTH_16: - raise ValueError("code is invalid range") - self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) - - def close(self, status=STATUS_NORMAL, reason=""): - """ - Close Websocket object - - status: status code to send. see STATUS_XXX. - - reason: the reason to close. This must be string. - """ - if self.connected: - if status < 0 or status >= ABNF.LENGTH_16: - raise ValueError("code is invalid range") - - try: - self.connected = False - self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) - timeout = self.sock.gettimeout() - self.sock.settimeout(3) - try: - frame = self.recv_frame() - if logger.isEnabledFor(logging.ERROR): - recv_status = struct.unpack("!H", frame.data)[0] - if recv_status != STATUS_NORMAL: - logger.error("close status: " + repr(recv_status)) - except: - pass - self.sock.settimeout(timeout) - self.sock.shutdown(socket.SHUT_RDWR) - except: - pass - self._closeInternal() - - def _closeInternal(self): - self.sock.close() - - def _send(self, data): - - if isinstance(data, six.text_type): - data = data.encode('utf-8') - - try: - return self.sock.send(data) - except socket.timeout as e: - message = getattr(e, 'strerror', getattr(e, 'message', '')) - raise WebSocketTimeoutException(message) - except Exception as e: - message = getattr(e, 'strerror', getattr(e, 'message', '')) - if "timed out" in message: - raise WebSocketTimeoutException(message) - else: - raise - - def _recv(self, bufsize): - try: - bytes = self.sock.recv(bufsize) - except socket.timeout as e: - message = getattr(e, 'strerror', getattr(e, 'message', '')) - raise WebSocketTimeoutException(message) - except SSLError as e: - message = getattr(e, 'strerror', getattr(e, 'message', '')) - if message == "The read operation timed out": - raise WebSocketTimeoutException(message) - else: - raise - - if not bytes: - raise WebSocketConnectionClosedException() - return bytes - - def _recv_strict(self, bufsize): - shortage = bufsize - sum(len(x) for x in self._recv_buffer) - while shortage > 0: - bytes = self._recv(shortage) - self._recv_buffer.append(bytes) - shortage -= len(bytes) - - unified = six.b("").join(self._recv_buffer) - - if shortage == 0: - self._recv_buffer = [] - return unified - else: - self._recv_buffer = [unified[bufsize:]] - return unified[:bufsize] - - - def _recv_line(self): - line = [] - while True: - c = self._recv(1) - line.append(c) - if c == six.b("\n"): - break - return six.b("").join(line) - - -class WebSocketApp(object): - """ - Higher level of APIs are provided. - The interface is like JavaScript WebSocket object. - """ - def __init__(self, url, header=[], - on_open=None, on_message=None, on_error=None, - on_close=None, on_ping=None, on_pong=None, - on_cont_message=None, - keep_running=True, get_mask_key=None, cookie=None): - """ - url: websocket url. - header: custom header for websocket handshake. - on_open: callable object which is called at opening websocket. - this function has one argument. The arugment is this class object. - on_message: callbale object which is called when recieved data. - on_message has 2 arguments. - The 1st arugment is this class object. - The passing 2nd arugment is utf-8 string which we get from the server. - on_error: callable object which is called when we get error. - on_error has 2 arguments. - The 1st arugment is this class object. - The passing 2nd arugment is exception object. - on_close: callable object which is called when closed the connection. - this function has one argument. The arugment is this class object. - on_cont_message: callback object which is called when recieve continued frame data. - on_message has 3 arguments. - The 1st arugment is this class object. - The passing 2nd arugment is utf-8 string which we get from the server. - The 3rd arugment is continue flag. if 0, the data continue to next frame data - keep_running: a boolean flag indicating whether the app's main loop should - keep running, defaults to True - get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's - docstring for more information - """ - self.url = url - self.header = header - self.cookie = cookie - self.on_open = on_open - self.on_message = on_message - self.on_error = on_error - self.on_close = on_close - self.on_ping = on_ping - self.on_pong = on_pong - self.on_cont_message = on_cont_message - self.keep_running = keep_running - self.get_mask_key = get_mask_key - self.sock = None - self.last_ping_tm = 0 - - def send(self, data, opcode=ABNF.OPCODE_TEXT): - """ - send message. - data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. - opcode: operation code of data. default is OPCODE_TEXT. - """ - - if self.sock.send(data, opcode) == 0: - raise WebSocketConnectionClosedException() - - def close(self): - """ - close websocket connection. - """ - self.keep_running = False - self.sock.close() - - def _send_ping(self, interval, event): - while not event.wait(interval): - self.last_ping_tm = time.time() - self.sock.ping() - - def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None, - http_proxy_host=None, http_proxy_port=None): - """ - run event loop for WebSocket framework. - This loop is infinite loop and is alive during websocket is available. - sockopt: values for socket.setsockopt. - sockopt must be tuple and each element is argument of sock.setscokopt. - sslopt: ssl socket optional dict. - ping_interval: automatically send "ping" command every specified period(second) - if set to 0, not send automatically. - ping_timeout: timeout(second) if the pong message is not recieved. - http_proxy_host: http proxy host name. - http_proxy_port: http proxy port. If not set, set to 80. - """ - - if not ping_timeout or ping_timeout<=0: - ping_timeout = None - if sockopt is None: - sockopt = [] - if sslopt is None: - sslopt = {} - if self.sock: - raise WebSocketException("socket is already opened") - thread = None - - try: - self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt, - fire_cont_frame=self.on_cont_message and True or False) - self.sock.settimeout(default_timeout) - self.sock.connect(self.url, header=self.header, cookie=self.cookie, - http_proxy_host=http_proxy_host, http_proxy_port=http_proxy_port) - self._callback(self.on_open) - - if ping_interval: - event = threading.Event() - thread = threading.Thread(target=self._send_ping, args=(ping_interval, event)) - thread.setDaemon(True) - thread.start() - - while True: - r, w, e = select.select((self.sock.sock, ), (), (), ping_timeout) - if not self.keep_running: - break - if ping_timeout and self.last_ping_tm and time.time() - self.last_ping_tm > ping_timeout: - self.last_ping_tm = 0 - raise WebSocketTimeoutException() - - if r: - op_code, frame = self.sock.recv_data_frame(True) - if op_code == ABNF.OPCODE_CLOSE: - break - elif op_code == ABNF.OPCODE_PING: - self._callback(self.on_ping, frame.data) - elif op_code == ABNF.OPCODE_PONG: - self._callback(self.on_pong, frame.data) - elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: - self._callback(self.on_cont_message, frame.data, frame.fin) - else: - data = frame.data - if six.PY3 and frame.opcode == ABNF.OPCODE_TEXT: - data = data.decode("utf-8") - self._callback(self.on_message, data) - except Exception as e: - self._callback(self.on_error, e) - finally: - if thread: - event.set() - thread.join() - self.keep_running = False - self.sock.close() - self._callback(self.on_close) - self.sock = None - - def _callback(self, callback, *args): - if callback: - try: - callback(self, *args) - except Exception as e: - logger.error(e) - if logger.isEnabledFor(logging.DEBUG): - _, _, tb = sys.exc_info() - traceback.print_tb(tb) - - -if __name__ == "__main__": - enableTrace(True) - ws = create_connection("ws://echo.websocket.org/") - print("Sending 'Hello, World'...") - ws.send("Hello, World") - print("Sent") - print("Receiving...") - result = ws.recv() - print("Received '%s'" % result) - ws.close() +from ._core import * +from ._app import WebSocketApp diff --git a/websocket/_abnf.py b/websocket/_abnf.py new file mode 100644 index 0000000..a92f0d1 --- /dev/null +++ b/websocket/_abnf.py @@ -0,0 +1,166 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +import six +import array +import struct +import os + + + + +class ABNF(object): + """ + ABNF frame class. + see http://tools.ietf.org/html/rfc5234 + and http://tools.ietf.org/html/rfc6455#section-5.2 + """ + + # operation code values. + OPCODE_CONT = 0x0 + OPCODE_TEXT = 0x1 + OPCODE_BINARY = 0x2 + OPCODE_CLOSE = 0x8 + OPCODE_PING = 0x9 + OPCODE_PONG = 0xa + + # available operation code value tuple + OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, + OPCODE_PING, OPCODE_PONG) + + # opcode human readable string + OPCODE_MAP = { + OPCODE_CONT: "cont", + OPCODE_TEXT: "text", + OPCODE_BINARY: "binary", + OPCODE_CLOSE: "close", + OPCODE_PING: "ping", + OPCODE_PONG: "pong" + } + + # data length threashold. + LENGTH_7 = 0x7e + LENGTH_16 = 1 << 16 + LENGTH_63 = 1 << 63 + + def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0, + opcode=OPCODE_TEXT, mask=1, data=""): + """ + Constructor for ABNF. + please check RFC for arguments. + """ + self.fin = fin + self.rsv1 = rsv1 + self.rsv2 = rsv2 + self.rsv3 = rsv3 + self.opcode = opcode + self.mask = mask + self.data = data + self.get_mask_key = os.urandom + + def __str__(self): + return "fin=" + str(self.fin) \ + + " opcode=" + str(self.opcode) \ + + " data=" + str(self.data) + + @staticmethod + def create_frame(data, opcode, fin=1): + """ + create frame to send text, binary and other data. + + data: data to send. This is string value(byte array). + if opcode is OPCODE_TEXT and this value is uniocde, + data value is conveted into unicode string, automatically. + + opcode: operation code. please see OPCODE_XXX. + + fin: fin flag. if set to 0, create continue fragmentation. + """ + if opcode == ABNF.OPCODE_TEXT and isinstance(data, six.text_type): + data = data.encode("utf-8") + # mask must be set if send data from client + return ABNF(fin, 0, 0, 0, opcode, 1, data) + + def format(self): + """ + format this object to string(byte array) to send data to server. + """ + if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): + raise ValueError("not 0 or 1") + if self.opcode not in ABNF.OPCODES: + raise ValueError("Invalid OPCODE") + length = len(self.data) + if length >= ABNF.LENGTH_63: + raise ValueError("data is too long") + + frame_header = chr(self.fin << 7 + | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 + | self.opcode) + if length < ABNF.LENGTH_7: + frame_header += chr(self.mask << 7 | length) + frame_header = six.b(frame_header) + elif length < ABNF.LENGTH_16: + frame_header += chr(self.mask << 7 | 0x7e) + frame_header = six.b(frame_header) + frame_header += struct.pack("!H", length) + else: + frame_header += chr(self.mask << 7 | 0x7f) + frame_header = six.b(frame_header) + frame_header += struct.pack("!Q", length) + + if not self.mask: + return frame_header + self.data + else: + mask_key = self.get_mask_key(4) + return frame_header + self._get_masked(mask_key) + + def _get_masked(self, mask_key): + s = ABNF.mask(mask_key, self.data) + + if isinstance(mask_key, six.text_type): + mask_key = mask_key.encode('utf-8') + + return mask_key + s + + @staticmethod + def mask(mask_key, data): + """ + mask or unmask data. Just do xor for each byte + + mask_key: 4 byte string(byte). + + data: data to mask/unmask. + """ + + if isinstance(mask_key, six.text_type): + mask_key = six.b(mask_key) + + if isinstance(data, six.text_type): + data = six.b(data) + + _m = array.array("B", mask_key) + _d = array.array("B", data) + for i in range(len(_d)): + _d[i] ^= _m[i % 4] + + if six.PY3: + return _d.tobytes() + else: + return _d.tostring() diff --git a/websocket/_app.py b/websocket/_app.py new file mode 100644 index 0000000..e5efe66 --- /dev/null +++ b/websocket/_app.py @@ -0,0 +1,190 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" + +""" +WebSocketApp provides higher level APIs. +""" +import threading +import time +import traceback +import sys +import select + + +from ._core import WebSocket, getdefaulttimeout, logger +from ._exceptions import * +from websocket._abnf import ABNF + +class WebSocketApp(object): + """ + Higher level of APIs are provided. + The interface is like JavaScript WebSocket object. + """ + def __init__(self, url, header=[], + on_open=None, on_message=None, on_error=None, + on_close=None, on_ping=None, on_pong=None, + on_cont_message=None, + keep_running=True, get_mask_key=None, cookie=None): + """ + url: websocket url. + header: custom header for websocket handshake. + on_open: callable object which is called at opening websocket. + this function has one argument. The arugment is this class object. + on_message: callbale object which is called when recieved data. + on_message has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is utf-8 string which we get from the server. + on_error: callable object which is called when we get error. + on_error has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is exception object. + on_close: callable object which is called when closed the connection. + this function has one argument. The arugment is this class object. + on_cont_message: callback object which is called when recieve continued frame data. + on_message has 3 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is utf-8 string which we get from the server. + The 3rd arugment is continue flag. if 0, the data continue to next frame data + keep_running: a boolean flag indicating whether the app's main loop should + keep running, defaults to True + get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's + docstring for more information + """ + self.url = url + self.header = header + self.cookie = cookie + self.on_open = on_open + self.on_message = on_message + self.on_error = on_error + self.on_close = on_close + self.on_ping = on_ping + self.on_pong = on_pong + self.on_cont_message = on_cont_message + self.keep_running = keep_running + self.get_mask_key = get_mask_key + self.sock = None + self.last_ping_tm = 0 + + def send(self, data, opcode=ABNF.OPCODE_TEXT): + """ + send message. + data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. + opcode: operation code of data. default is OPCODE_TEXT. + """ + + if self.sock.send(data, opcode) == 0: + raise WebSocketConnectionClosedException() + + def close(self): + """ + close websocket connection. + """ + self.keep_running = False + if self.sock: + self.sock.close() + + def _send_ping(self, interval, event): + while not event.wait(interval): + self.last_ping_tm = time.time() + self.sock.ping() + + def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None, + http_proxy_host=None, http_proxy_port=None): + """ + run event loop for WebSocket framework. + This loop is infinite loop and is alive during websocket is available. + sockopt: values for socket.setsockopt. + sockopt must be tuple and each element is argument of sock.setscokopt. + sslopt: ssl socket optional dict. + ping_interval: automatically send "ping" command every specified period(second) + if set to 0, not send automatically. + ping_timeout: timeout(second) if the pong message is not recieved. + http_proxy_host: http proxy host name. + http_proxy_port: http proxy port. If not set, set to 80. + """ + + if not ping_timeout or ping_timeout<=0: + ping_timeout = None + if sockopt is None: + sockopt = [] + if sslopt is None: + sslopt = {} + if self.sock: + raise WebSocketException("socket is already opened") + thread = None + + try: + self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt, + fire_cont_frame=self.on_cont_message and True or False) + self.sock.settimeout(getdefaulttimeout()) + self.sock.connect(self.url, header=self.header, cookie=self.cookie, + http_proxy_host=http_proxy_host, http_proxy_port=http_proxy_port) + self._callback(self.on_open) + + if ping_interval: + event = threading.Event() + thread = threading.Thread(target=self._send_ping, args=(ping_interval, event)) + thread.setDaemon(True) + thread.start() + + while self.sock.connected: + r, w, e = select.select((self.sock.sock, ), (), (), ping_timeout) + if not self.keep_running: + break + if ping_timeout and self.last_ping_tm and time.time() - self.last_ping_tm > ping_timeout: + self.last_ping_tm = 0 + raise WebSocketTimeoutException() + + if r: + op_code, frame = self.sock.recv_data_frame(True) + if op_code == ABNF.OPCODE_CLOSE: + break + elif op_code == ABNF.OPCODE_PING: + self._callback(self.on_ping, frame.data) + elif op_code == ABNF.OPCODE_PONG: + self._callback(self.on_pong, frame.data) + elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: + self._callback(self.on_cont_message, frame.data, frame.fin) + else: + data = frame.data + if six.PY3 and frame.opcode == ABNF.OPCODE_TEXT: + data = data.decode("utf-8") + self._callback(self.on_message, data) + except Exception as e: + self._callback(self.on_error, e) + finally: + if thread: + event.set() + thread.join() + self.keep_running = False + self.sock.close() + self._callback(self.on_close) + self.sock = None + + def _callback(self, callback, *args): + if callback: + try: + callback(self, *args) + except Exception as e: + logger.error(e) + if logger.isEnabledFor(logging.DEBUG): + _, _, tb = sys.exc_info() + traceback.print_tb(tb) diff --git a/websocket/_core.py b/websocket/_core.py new file mode 100644 index 0000000..04d0c02 --- /dev/null +++ b/websocket/_core.py @@ -0,0 +1,807 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +from __future__ import print_function + + +import six +import socket + +try: + import ssl + from ssl import SSLError + if hasattr(ssl, "match_hostname"): + from ssl import match_hostname + else: + from backports.ssl_match_hostname import match_hostname + + HAVE_SSL = True +except ImportError: + # dummy class of SSLError for ssl none-support environment. + class SSLError(Exception): + pass + + HAVE_SSL = False + +from six.moves.urllib.parse import urlparse +if six.PY3: + from base64 import encodebytes as base64encode +else: + from base64 import encodestring as base64encode + +import os +import struct +import uuid +import hashlib +import logging + +# websocket modules +from ._exceptions import * +from ._abnf import ABNF + +""" +websocket python client. +========================= + +This version support only hybi-13. +Please see http://tools.ietf.org/html/rfc6455 for protocol. +""" + + +# websocket supported version. +VERSION = 13 + +# closing frame status codes. +STATUS_NORMAL = 1000 +STATUS_GOING_AWAY = 1001 +STATUS_PROTOCOL_ERROR = 1002 +STATUS_UNSUPPORTED_DATA_TYPE = 1003 +STATUS_STATUS_NOT_AVAILABLE = 1005 +STATUS_ABNORMAL_CLOSED = 1006 +STATUS_INVALID_PAYLOAD = 1007 +STATUS_POLICY_VIOLATION = 1008 +STATUS_MESSAGE_TOO_BIG = 1009 +STATUS_INVALID_EXTENSION = 1010 +STATUS_UNEXPECTED_CONDITION = 1011 +STATUS_TLS_HANDSHAKE_ERROR = 1015 + +DEFAULT_SOCKET_OPTION = [(socket.SOL_TCP, socket.TCP_NODELAY, 1),] +if hasattr(socket, "SO_KEEPALIVE"): + DEFAULT_SOCKET_OPTION.append((socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)) +if hasattr(socket, "TCP_KEEPIDLE"): + DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPIDLE, 30)) +if hasattr(socket, "TCP_KEEPINTVL"): + DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPINTVL, 10)) +if hasattr(socket, "TCP_KEEPCNT"): + DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPCNT, 3)) + +logger = logging.getLogger() + + + +default_timeout = None +traceEnabled = False + + +def enableTrace(tracable): + """ + turn on/off the tracability. + + tracable: boolean value. if set True, tracability is enabled. + """ + global traceEnabled + traceEnabled = tracable + if tracable: + if not logger.handlers: + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + + +def setdefaulttimeout(timeout): + """ + Set the global timeout setting to connect. + + timeout: default socket timeout time. This value is second. + """ + global default_timeout + default_timeout = timeout + + +def getdefaulttimeout(): + """ + Return the global timeout setting(second) to connect. + """ + return default_timeout + + +def _parse_url(url): + """ + parse url and the result is tuple of + (hostname, port, resource path and the flag of secure mode) + + url: url string. + """ + if ":" not in url: + raise ValueError("url is invalid") + + scheme, url = url.split(":", 1) + + parsed = urlparse(url, scheme="ws") + if parsed.hostname: + hostname = parsed.hostname + else: + raise ValueError("hostname is invalid") + port = 0 + if parsed.port: + port = parsed.port + + is_secure = False + if scheme == "ws": + if not port: + port = 80 + elif scheme == "wss": + is_secure = True + if not port: + port = 443 + else: + raise ValueError("scheme %s is invalid" % scheme) + + if parsed.path: + resource = parsed.path + else: + resource = "/" + + if parsed.query: + resource += "?" + parsed.query + + return (hostname, port, resource, is_secure) + + +def create_connection(url, timeout=None, **options): + """ + connect to url and return websocket object. + + Connect to url and return the WebSocket object. + Passing optional timeout parameter will set the timeout on the socket. + If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used. + You can customize using 'options'. + If you set "header" list object, you can set your own custom header. + + >>> conn = create_connection("ws://echo.websocket.org/", + ... header=["User-Agent: MyProgram", + ... "x-custom: header"]) + + + timeout: socket timeout time. This value is integer. + if you set None for this value, it means "use default_timeout value" + + + options: "header" -> custom http header list. + "cookie" -> cookie value. + "http_proxy_host" - http proxy host name. + "http_proxy_port" - http proxy port. If not set, set to 80. + """ + sockopt = options.get("sockopt", []) + sslopt = options.get("sslopt", {}) + fire_cont_frame = options.get("fire_cont_frame", False) + websock = WebSocket(sockopt=sockopt, sslopt=sslopt, fire_cont_frame = fire_cont_frame) + websock.settimeout(timeout if timeout is not None else default_timeout) + websock.connect(url, **options) + return websock + +_MAX_INTEGER = (1 << 32) -1 +_AVAILABLE_KEY_CHARS = list(range(0x21, 0x2f + 1)) + list(range(0x3a, 0x7e + 1)) +_MAX_CHAR_BYTE = (1<<8) -1 + +# ref. Websocket gets an update, and it breaks stuff. +# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html + + +def _create_sec_websocket_key(): + uid = uuid.uuid4() + return base64encode(uid.bytes).decode('utf-8').strip() + + +_HEADERS_TO_CHECK = { + "upgrade": "websocket", + "connection": "upgrade", + } + + + + +class WebSocket(object): + """ + Low level WebSocket interface. + This class is based on + The WebSocket protocol draft-hixie-thewebsocketprotocol-76 + http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 + + We can connect to the websocket server and send/recieve data. + The following example is a echo client. + + >>> import websocket + >>> ws = websocket.WebSocket() + >>> ws.connect("ws://echo.websocket.org") + >>> ws.send("Hello, Server") + >>> ws.recv() + 'Hello, Server' + >>> ws.close() + + get_mask_key: a callable to produce new mask keys, see the set_mask_key + function's docstring for more details + sockopt: values for socket.setsockopt. + sockopt must be tuple and each element is argument of sock.setscokopt. + sslopt: dict object for ssl socket option. + fire_cont_frame: fire recv event for each cont frame. default is False + """ + + def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, + fire_cont_frame=False): + """ + Initalize WebSocket object. + """ + if sockopt is None: + sockopt = [] + if sslopt is None: + sslopt = {} + self.connected = False + self.sock = None + self._timeout = None + self.sockopt = sockopt + self.sslopt = sslopt + self.get_mask_key = get_mask_key + self.fire_cont_frame = fire_cont_frame + # Buffers over the packets from the layer beneath until desired amount + # bytes of bytes are received. + self._recv_buffer = [] + # These buffer over the build-up of a single frame. + self._frame_header = None + self._frame_length = None + self._frame_mask = None + self._cont_data = None + + def fileno(self): + return self.sock.fileno() + + def set_mask_key(self, func): + """ + set function to create musk key. You can custumize mask key generator. + Mainly, this is for testing purpose. + + func: callable object. the fuct must 1 argument as integer. + The argument means length of mask key. + This func must be return string(byte array), + which length is argument specified. + """ + self.get_mask_key = func + + def gettimeout(self): + """ + Get the websocket timeout(second). + """ + return self._timeout + + def settimeout(self, timeout): + """ + Set the timeout to the websocket. + + timeout: timeout time(second). + """ + self._timeout = timeout + + timeout = property(gettimeout, settimeout) + + def connect(self, url, **options): + """ + Connect to url. url is websocket url scheme. ie. ws://host:port/resource + You can customize using 'options'. + If you set "header" list object, you can set your own custom header. + + >>> ws = WebSocket() + >>> ws.connect("ws://echo.websocket.org/", + ... header=["User-Agent: MyProgram", + ... "x-custom: header"]) + + timeout: socket timeout time. This value is integer. + if you set None for this value, + it means "use default_timeout value" + + options: "header" -> custom http header list. + "cookie" -> cookie value. + "http_proxy_host" - http proxy host name. + "http_proxy_port" - http proxy port. If not set, set to 80. + + """ + + hostname, port, resource, is_secure = _parse_url(url) + proxy_host, proxy_port = options.get("http_proxy_host", None), options.get("http_proxy_port", 0) + if not proxy_host: + addrinfo_list = socket.getaddrinfo(hostname, port, 0, 0, socket.SOL_TCP) + else: + proxy_port = proxy_port and proxy_port or 80 + addrinfo_list = socket.getaddrinfo(proxy_host, proxy_port, 0, 0, socket.SOL_TCP) + + if not addrinfo_list: + raise WebSocketException("Host not found.: " + hostname + ":" + str(port)) + + family = addrinfo_list[0][0] + self.sock = socket.socket(family) + self.sock.settimeout(self.timeout) + for opts in DEFAULT_SOCKET_OPTION: + self.sock.setsockopt(*opts) + for opts in self.sockopt: + self.sock.setsockopt(*opts) + # TODO: we need to support proxy + address = addrinfo_list[0][4] + self.sock.connect(address) + + if proxy_host: + self._tunnel(hostname, port) + + if is_secure: + if HAVE_SSL: + sslopt = dict(cert_reqs=ssl.CERT_REQUIRED, + ca_certs=os.path.join(os.path.dirname(__file__), "cacert.pem")) + sslopt.update(self.sslopt) + self.sock = ssl.wrap_socket(self.sock, **sslopt) + match_hostname(self.sock.getpeercert(), hostname) + else: + raise WebSocketException("SSL not available.") + + self._handshake(hostname, port, resource, **options) + + def _tunnel(self, host, port): + logger.debug("Connecting proxy...") + connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port) + connect_header += "\r\n" + if traceEnabled: + logger.debug("--- request header ---") + logger.debug(connect_header) + logger.debug("-----------------------") + + self._send(connect_header) + + status, resp_headers = self._read_headers() + if status != 200: + raise WebSocketException("failed CONNECT via proxy") + + def _handshake(self, host, port, resource, **options): + headers = [] + headers.append("GET %s HTTP/1.1" % resource) + headers.append("Upgrade: websocket") + headers.append("Connection: Upgrade") + if port == 80: + hostport = host + else: + hostport = "%s:%d" % (host, port) + headers.append("Host: %s" % hostport) + + if "origin" in options: + headers.append("Origin: %s" % options["origin"]) + else: + headers.append("Origin: http://%s" % hostport) + + key = _create_sec_websocket_key() + headers.append("Sec-WebSocket-Key: %s" % key) + headers.append("Sec-WebSocket-Version: %s" % VERSION) + + if "header" in options: + headers.extend(options["header"]) + cookie = options.get("cookie", None) + if cookie: + headers.append("Cookie: %s" % cookie) + + headers.append("") + headers.append("") + + header_str = "\r\n".join(headers) + self._send(header_str) + if traceEnabled: + logger.debug("--- request header ---") + logger.debug(header_str) + logger.debug("-----------------------") + + status, resp_headers = self._read_headers() + if status != 101: + self.close() + raise WebSocketException("Handshake Status %d" % status) + + success = self._validate_header(resp_headers, key) + if not success: + self.close() + raise WebSocketException("Invalid WebSocket Header") + + self.connected = True + + def _validate_header(self, headers, key): + for k, v in _HEADERS_TO_CHECK.items(): + r = headers.get(k, None) + if not r: + return False + r = r.lower() + if v != r: + return False + + result = headers.get("sec-websocket-accept", None) + if not result: + return False + result = result.lower() + + if isinstance(result, six.text_type): + result = result.encode('utf-8') + + value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8') + hashed = base64encode(hashlib.sha1(value).digest()).strip().lower() + return hashed == result + + def _read_headers(self): + status = None + headers = {} + if traceEnabled: + logger.debug("--- response header ---") + + while True: + line = self._recv_line() + line = line.decode('utf-8') + if line == "\r\n" or line == "\n": + break + line = line.strip() + if traceEnabled: + logger.debug(line) + if not status: + status_info = line.split(" ", 2) + status = int(status_info[1]) + else: + kv = line.split(":", 1) + if len(kv) == 2: + key, value = kv + headers[key.lower()] = value.strip().lower() + else: + raise WebSocketException("Invalid header") + + if traceEnabled: + logger.debug("-----------------------") + + return status, headers + + def send(self, payload, opcode=ABNF.OPCODE_TEXT): + """ + Send the data as string. + + payload: Payload must be utf-8 string or unicoce, + if the opcode is OPCODE_TEXT. + Otherwise, it must be string(byte array) + + opcode: operation code to send. Please see OPCODE_XXX. + """ + + frame = ABNF.create_frame(payload, opcode) + return self.send_frame(frame) + + def send_frame(self, frame): + """ + Send the data frame. + + frame: frame data created by ABNF.create_frame + + >>> ws = create_connection("ws://echo.websocket.org/") + >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) + >>> ws.send_frame(frame) + >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) + >>> ws.send_frame(frame) + >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) + >>> ws.send_frame(frame) + + """ + if self.get_mask_key: + frame.get_mask_key = self.get_mask_key + data = frame.format() + length = len(data) + if traceEnabled: + logger.debug("send: " + repr(data)) + while data: + l = self._send(data) + data = data[l:] + return length + + + def send_binary(self, payload): + return self.send(payload, ABNF.OPCODE_BINARY) + + def ping(self, payload=""): + """ + send ping data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PING) + + def pong(self, payload): + """ + send pong data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PONG) + + def recv(self): + """ + Receive string data(byte array) from the server. + + return value: string(byte array) value. + """ + opcode, data = self.recv_data() + if six.PY3 and opcode == ABNF.OPCODE_TEXT: + return data.decode("utf-8") + return data + + def recv_data(self, control_frame=False): + """ + Recieve data with operation code. + + control_frame: a boolean flag indicating whether to return control frame + data, defaults to False + + return value: tuple of operation code and string(byte array) value. + """ + while True: + frame = self.recv_frame() + if not frame: + # handle error: + # 'NoneType' object has no attribute 'opcode' + raise WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): + if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: + raise WebSocketException("Illegal frame") + if self._cont_data: + self._cont_data[1] += frame.data + else: + self._cont_data = [frame.opcode, frame.data] + + if frame.fin or self.fire_cont_frame: + data = self._cont_data + self._cont_data = None + return data + elif frame.opcode == ABNF.OPCODE_CLOSE: + self.send_close() + return (frame.opcode, frame.data) + elif frame.opcode == ABNF.OPCODE_PING: + self.pong(frame.data) + if control_frame: + return (frame.opcode, frame.data) + elif frame.opcode == ABNF.OPCODE_PONG: + if control_frame: + return (frame.opcode, frame.data) + + def recv_data_frame(self, control_frame=False): + """ + Recieve data with operation code. + + control_frame: a boolean flag indicating whether to return control frame + data, defaults to False + + return value: tuple of operation code and string(byte array) value. + """ + while True: + frame = self.recv_frame() + if not frame: + # handle error: + # 'NoneType' object has no attribute 'opcode' + raise WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): + if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: + raise WebSocketException("Illegal frame") + if self._cont_data: + self._cont_data[1].data += frame.data + else: + self._cont_data = [frame.opcode, frame] + + if frame.fin or self.fire_cont_frame: + data = self._cont_data + self._cont_data = None + return data + elif frame.opcode == ABNF.OPCODE_CLOSE: + self.send_close() + return (frame.opcode, frame) + elif frame.opcode == ABNF.OPCODE_PING: + self.pong(frame.data) + if control_frame: + return (frame.opcode, frame) + elif frame.opcode == ABNF.OPCODE_PONG: + if control_frame: + return (frame.opcode, frame) + + def recv_frame(self): + """ + recieve data as frame from server. + + return value: ABNF frame object. + """ + + # Header + if self._frame_header is None: + self._frame_header = self._recv_strict(2) + + b1 = self._frame_header[0] + + if six.PY2: + b1 = ord(b1) + + fin = b1 >> 7 & 1 + rsv1 = b1 >> 6 & 1 + rsv2 = b1 >> 5 & 1 + rsv3 = b1 >> 4 & 1 + opcode = b1 & 0xf + b2 = self._frame_header[1] + + if six.PY2: + b2 = ord(b2) + + has_mask = b2 >> 7 & 1 + + # Frame length + if self._frame_length is None: + length_bits = b2 & 0x7f + if length_bits == 0x7e: + length_data = self._recv_strict(2) + self._frame_length = struct.unpack("!H", length_data)[0] + elif length_bits == 0x7f: + length_data = self._recv_strict(8) + self._frame_length = struct.unpack("!Q", length_data)[0] + else: + self._frame_length = length_bits + + # Mask + if self._frame_mask is None: + self._frame_mask = self._recv_strict(4) if has_mask else "" + + # Payload + payload = self._recv_strict(self._frame_length) + if has_mask: + payload = ABNF.mask(self._frame_mask, payload) + + # Reset for next frame + self._frame_header = None + self._frame_length = None + self._frame_mask = None + + return ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) + + + def send_close(self, status=STATUS_NORMAL, reason=""): + """ + send close data to the server. + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + + def close(self, status=STATUS_NORMAL, reason=""): + """ + Close Websocket object + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + if self.connected: + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + + try: + self.connected = False + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + timeout = self.sock.gettimeout() + self.sock.settimeout(3) + try: + frame = self.recv_frame() + if logger.isEnabledFor(logging.ERROR): + recv_status = struct.unpack("!H", frame.data)[0] + if recv_status != STATUS_NORMAL: + logger.error("close status: " + repr(recv_status)) + except: + pass + self.sock.settimeout(timeout) + self.sock.shutdown(socket.SHUT_RDWR) + except: + pass + + self._closeInternal() + + def _closeInternal(self): + self.sock.close() + + def _send(self, data): + if isinstance(data, six.text_type): + data = data.encode('utf-8') + + try: + return self.sock.send(data) + except socket.timeout as e: + message = getattr(e, 'strerror', getattr(e, 'message', '')) + raise WebSocketTimeoutException(message) + except Exception as e: + message = getattr(e, 'strerror', getattr(e, 'message', '')) + if "timed out" in message: + raise WebSocketTimeoutException(message) + else: + raise + + def _recv(self, bufsize): + try: + bytes = self.sock.recv(bufsize) + except socket.timeout as e: + message = getattr(e, 'strerror', getattr(e, 'message', '')) + raise WebSocketTimeoutException(message) + except SSLError as e: + message = getattr(e, 'strerror', getattr(e, 'message', '')) + if message == "The read operation timed out": + raise WebSocketTimeoutException(message) + else: + raise + + if not bytes: + raise WebSocketConnectionClosedException() + return bytes + + def _recv_strict(self, bufsize): + shortage = bufsize - sum(len(x) for x in self._recv_buffer) + while shortage > 0: + bytes = self._recv(shortage) + self._recv_buffer.append(bytes) + shortage -= len(bytes) + + unified = six.b("").join(self._recv_buffer) + + if shortage == 0: + self._recv_buffer = [] + return unified + else: + self._recv_buffer = [unified[bufsize:]] + return unified[:bufsize] + + + def _recv_line(self): + line = [] + while True: + c = self._recv(1) + line.append(c) + if c == six.b("\n"): + break + return six.b("").join(line) + + + + +if __name__ == "__main__": + enableTrace(True) + ws = create_connection("ws://echo.websocket.org/") + print("Sending 'Hello, World'...") + ws.send("Hello, World") + print("Sent") + print("Receiving...") + result = ws.recv() + print("Received '%s'" % result) + ws.close() diff --git a/websocket/_exceptions.py b/websocket/_exceptions.py new file mode 100644 index 0000000..b0be191 --- /dev/null +++ b/websocket/_exceptions.py @@ -0,0 +1,46 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" + + +""" +define websocket exceptions +""" + +class WebSocketException(Exception): + """ + websocket exeception class. + """ + pass + + +class WebSocketConnectionClosedException(WebSocketException): + """ + If remote host closed the connection or some network error happened, + this exception will be raised. + """ + pass + +class WebSocketTimeoutException(WebSocketException): + """ + WebSocketTimeoutException will be raised at socket timeout during read/write data. + """ + pass + |