summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDavid Cullen <david.cullen@technicolor.com>2016-06-28 20:48:56 -0400
committerDavid Cullen <david.cullen@technicolor.com>2016-06-28 20:48:56 -0400
commitd4d2a36e1dc7cc356ba841e6aeb79ab62d9d8c96 (patch)
treec28c8f36307e19784a485a9223fb7aca14b3f526 /tests
parentfd24b95a5d33262f21301c996be65a48ad39d671 (diff)
downloadpexpect-git-d4d2a36e1dc7cc356ba841e6aeb79ab62d9d8c96.tar.gz
Make socket unit test self contained.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_socket.py121
1 files changed, 90 insertions, 31 deletions
diff --git a/tests/test_socket.py b/tests/test_socket.py
index 24d62b8..3f417fc 100644
--- a/tests/test_socket.py
+++ b/tests/test_socket.py
@@ -27,38 +27,99 @@ import os
import signal
import socket
import time
+import errno
class ExpectTestCase(PexpectTestCase.PexpectTestCase):
+
def setUp(self):
print(self.id())
PexpectTestCase.PexpectTestCase.setUp(self)
+ self.host = '127.0.0.1'
+ self.port = 49152 + 10000
self.motd = b"""\
------------------------------------------------------------------------------
-* Welcome to THE WEATHER UNDERGROUND telnet service! *
+* Welcome to the SOCKET UNIT TEST code! *
------------------------------------------------------------------------------
* *
-* National Weather Service information provided by Alden Electronics, Inc. *
-* and updated each minute as reports come in over our data feed. *
+* This unit test code is our best effort at testing the ability of the *
+* pexpect library to handle sockets. We need some text to test buffer size *
+* handling. *
+* *
+* A page is 1024 bytes or 1K. 80 x 24 = 1920. So a standard terminal window *
+* contains more than one page. We actually want more than a page for our *
+* tests. *
+* *
+* This is the twelfth line, and we need 24. So we need a few more paragraphs.*
+* We can keep them short and just put lines between them. *
* *
-* **Note: If you cannot get past this opening screen, you must use a *
-* different version of the "telnet" program--some of the ones for IBM *
-* compatible PC's have a bug that prevents proper connection. *
+* The 80 x 24 terminal size comes from the ancient past when computers were *
+* only able to display text in cuneiform writing. *
* *
-* comments: jmasters@wunderground.com *
+* The cunieform writing system used the edge of a reed to make marks on clay *
+* tablets. *
+* *
+* It was the forerunner of the style of handwriting used by doctors to write *
+* prescriptions. Thus the name: pre (before) script (writing) ion (charged *
+* particle). *
------------------------------------------------------------------------------
""".replace(b'\n', b'\n\r') + b"\r\n"
+ self.prompt1 = 'Press Return to continue:'
+ self.prompt2 = 'Rate this unit test>'
+ self.prompt3 = 'Press X to exit:'
+ self.server_up = multiprocessing.Event()
+ self.server_process = multiprocessing.Process(target=self.socket_server, args=(self.server_up,))
+ self.server_process.daemon = True
+ self.server_process.start()
+ while not self.server_up.is_set():
+ time.sleep(0.250)
+
+ def tearDown(self):
+ os.kill(self.server_process.pid, signal.SIGINT)
+ self.server_process.join(timeout=5.0)
+ PexpectTestCase.PexpectTestCase.tearDown(self)
+
+ def socket_server(self, server_up):
+ sock = None
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((self.host, self.port))
+ sock.listen(5)
+ server_up.set()
+ while True:
+ (conn, addr) = sock.accept()
+ conn.send(self.motd)
+ conn.send(self.prompt1)
+ result = conn.recv(1024)
+ if result != '\r\n':
+ break
+ conn.send(self.prompt2)
+ result = conn.recv(1024)
+ if result != '\r\n':
+ break
+ conn.send(self.prompt3)
+ result = conn.recv(1024)
+ if result.startswith('X'):
+ conn.shutdown(socket.SHUT_RDWR)
+ conn.close()
+ except KeyboardInterrupt:
+ pass
+ if sock is not None:
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ exit(0)
def test_socket(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
- session.expect('Press Return to continue:')
+ session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.send('\r\n')
- session.expect('or enter 3 letter forecast city code--')
+ session.expect(self.prompt2)
session.send('\r\n')
- session.expect('Selection:')
+ session.expect(self.prompt3)
session.send('X\r\n')
session.expect(pexpect.EOF)
self.assertEqual(session.before, b'')
@@ -74,55 +135,53 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_timeout(self):
with self.assertRaises(pexpect.TIMEOUT):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock, timeout=10)
session.expect(b'Bogus response')
def test_interrupt(self):
- def socket_fn(self):
+ def socket_fn(self, timed_out):
result = 0
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock, timeout=10)
# Get all data from server
session.read_nonblocking(size=4096)
# This read should timeout
session.read_nonblocking(size=4096)
except pexpect.TIMEOUT:
- result = 1
+ timed_out.set()
+ result = errno.ETIMEDOUT
exit(result)
- test_proc = multiprocessing.Process(target=socket_fn, args=(self,))
+ timed_out = multiprocessing.Event()
+ test_proc = multiprocessing.Process(target=socket_fn, args=(self, timed_out))
test_proc.daemon = True
test_proc.start()
- time.sleep(5.0)
- while True:
- if test_proc.is_alive():
- os.kill(test_proc.pid, signal.SIGWINCH)
- else:
- break
+ while not timed_out.is_set():
time.sleep(0.250)
- test_proc.join()
- self.assertEqual(test_proc.exitcode, 1)
+ os.kill(test_proc.pid, signal.SIGWINCH)
+ test_proc.join(timeout=5.0)
+ self.assertEqual(test_proc.exitcode, errno.ETIMEDOUT)
def test_maxread(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
session.maxread = 1100
- session.expect('Press Return to continue:')
+ session.expect(self.prompt1)
self.assertEqual(session.before, self.motd)
session.send('\r\n')
- session.expect('or enter 3 letter forecast city code--')
+ session.expect(self.prompt2)
session.send('\r\n')
- session.expect('Selection:')
+ session.expect(self.prompt3)
session.send('X\r\n')
session.expect(pexpect.EOF)
self.assertEqual(session.before, b'')
def test_fd_isalive (self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
assert session.isalive()
sock.close()
@@ -130,14 +189,14 @@ class ExpectTestCase(PexpectTestCase.PexpectTestCase):
def test_fd_isatty (self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock.fileno(), timeout=10)
assert not session.isatty()
session.close()
def test_fileobj(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('rainmaker.wunderground.com', 23))
+ sock.connect((self.host, self.port))
session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket
session.expect('Press Return to continue:')
session.close()