diff options
author | David Cullen <david.cullen@technicolor.com> | 2016-06-28 20:48:56 -0400 |
---|---|---|
committer | David Cullen <david.cullen@technicolor.com> | 2016-06-28 20:48:56 -0400 |
commit | d4d2a36e1dc7cc356ba841e6aeb79ab62d9d8c96 (patch) | |
tree | c28c8f36307e19784a485a9223fb7aca14b3f526 /tests | |
parent | fd24b95a5d33262f21301c996be65a48ad39d671 (diff) | |
download | pexpect-git-d4d2a36e1dc7cc356ba841e6aeb79ab62d9d8c96.tar.gz |
Make socket unit test self contained.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_socket.py | 121 |
1 files changed, 90 insertions, 31 deletions
diff --git a/tests/test_socket.py b/tests/test_socket.py index 24d62b8..3f417fc 100644 --- a/tests/test_socket.py +++ b/tests/test_socket.py @@ -27,38 +27,99 @@ import os import signal import socket import time +import errno class ExpectTestCase(PexpectTestCase.PexpectTestCase): + def setUp(self): print(self.id()) PexpectTestCase.PexpectTestCase.setUp(self) + self.host = '127.0.0.1' + self.port = 49152 + 10000 self.motd = b"""\ ------------------------------------------------------------------------------ -* Welcome to THE WEATHER UNDERGROUND telnet service! * +* Welcome to the SOCKET UNIT TEST code! * ------------------------------------------------------------------------------ * * -* National Weather Service information provided by Alden Electronics, Inc. * -* and updated each minute as reports come in over our data feed. * +* This unit test code is our best effort at testing the ability of the * +* pexpect library to handle sockets. We need some text to test buffer size * +* handling. * +* * +* A page is 1024 bytes or 1K. 80 x 24 = 1920. So a standard terminal window * +* contains more than one page. We actually want more than a page for our * +* tests. * +* * +* This is the twelfth line, and we need 24. So we need a few more paragraphs.* +* We can keep them short and just put lines between them. * * * -* **Note: If you cannot get past this opening screen, you must use a * -* different version of the "telnet" program--some of the ones for IBM * -* compatible PC's have a bug that prevents proper connection. * +* The 80 x 24 terminal size comes from the ancient past when computers were * +* only able to display text in cuneiform writing. * * * -* comments: jmasters@wunderground.com * +* The cunieform writing system used the edge of a reed to make marks on clay * +* tablets. * +* * +* It was the forerunner of the style of handwriting used by doctors to write * +* prescriptions. Thus the name: pre (before) script (writing) ion (charged * +* particle). * ------------------------------------------------------------------------------ """.replace(b'\n', b'\n\r') + b"\r\n" + self.prompt1 = 'Press Return to continue:' + self.prompt2 = 'Rate this unit test>' + self.prompt3 = 'Press X to exit:' + self.server_up = multiprocessing.Event() + self.server_process = multiprocessing.Process(target=self.socket_server, args=(self.server_up,)) + self.server_process.daemon = True + self.server_process.start() + while not self.server_up.is_set(): + time.sleep(0.250) + + def tearDown(self): + os.kill(self.server_process.pid, signal.SIGINT) + self.server_process.join(timeout=5.0) + PexpectTestCase.PexpectTestCase.tearDown(self) + + def socket_server(self, server_up): + sock = None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((self.host, self.port)) + sock.listen(5) + server_up.set() + while True: + (conn, addr) = sock.accept() + conn.send(self.motd) + conn.send(self.prompt1) + result = conn.recv(1024) + if result != '\r\n': + break + conn.send(self.prompt2) + result = conn.recv(1024) + if result != '\r\n': + break + conn.send(self.prompt3) + result = conn.recv(1024) + if result.startswith('X'): + conn.shutdown(socket.SHUT_RDWR) + conn.close() + except KeyboardInterrupt: + pass + if sock is not None: + sock.shutdown(socket.SHUT_RDWR) + sock.close() + exit(0) def test_socket(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) - session.expect('Press Return to continue:') + session.expect(self.prompt1) self.assertEqual(session.before, self.motd) session.send('\r\n') - session.expect('or enter 3 letter forecast city code--') + session.expect(self.prompt2) session.send('\r\n') - session.expect('Selection:') + session.expect(self.prompt3) session.send('X\r\n') session.expect(pexpect.EOF) self.assertEqual(session.before, b'') @@ -74,55 +135,53 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def test_timeout(self): with self.assertRaises(pexpect.TIMEOUT): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) session.expect(b'Bogus response') def test_interrupt(self): - def socket_fn(self): + def socket_fn(self, timed_out): result = 0 try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) # Get all data from server session.read_nonblocking(size=4096) # This read should timeout session.read_nonblocking(size=4096) except pexpect.TIMEOUT: - result = 1 + timed_out.set() + result = errno.ETIMEDOUT exit(result) - test_proc = multiprocessing.Process(target=socket_fn, args=(self,)) + timed_out = multiprocessing.Event() + test_proc = multiprocessing.Process(target=socket_fn, args=(self, timed_out)) test_proc.daemon = True test_proc.start() - time.sleep(5.0) - while True: - if test_proc.is_alive(): - os.kill(test_proc.pid, signal.SIGWINCH) - else: - break + while not timed_out.is_set(): time.sleep(0.250) - test_proc.join() - self.assertEqual(test_proc.exitcode, 1) + os.kill(test_proc.pid, signal.SIGWINCH) + test_proc.join(timeout=5.0) + self.assertEqual(test_proc.exitcode, errno.ETIMEDOUT) def test_maxread(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) session.maxread = 1100 - session.expect('Press Return to continue:') + session.expect(self.prompt1) self.assertEqual(session.before, self.motd) session.send('\r\n') - session.expect('or enter 3 letter forecast city code--') + session.expect(self.prompt2) session.send('\r\n') - session.expect('Selection:') + session.expect(self.prompt3) session.send('X\r\n') session.expect(pexpect.EOF) self.assertEqual(session.before, b'') def test_fd_isalive (self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) assert session.isalive() sock.close() @@ -130,14 +189,14 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase): def test_fd_isatty (self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock.fileno(), timeout=10) assert not session.isatty() session.close() def test_fileobj(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('rainmaker.wunderground.com', 23)) + sock.connect((self.host, self.port)) session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket session.expect('Press Return to continue:') session.close() |