diff options
author | Tapple <mfulmer@cisco.com> | 2023-02-21 13:41:20 -0500 |
---|---|---|
committer | Tapple <mfulmer@cisco.com> | 2023-02-21 13:41:20 -0500 |
commit | 693a34d026ef940b7728c78bfae8818083f29ff2 (patch) | |
tree | ff16c3134dd694cbc3159cd688cf6d087330e84e | |
parent | dce06b3ce546dfd5a090233ef0a639c3c5ce6fe6 (diff) | |
download | pexpect-693a34d026ef940b7728c78bfae8818083f29ff2.tar.gz |
socketspawn basic test case, copied from test_fdspawn
-rw-r--r-- | pexpect/socket_spawn.py | 23 | ||||
-rw-r--r-- | tests/PexpectTestCase.py | 35 | ||||
-rw-r--r-- | tests/test_socketspawn.py | 72 |
3 files changed, 100 insertions, 30 deletions
diff --git a/pexpect/socket_spawn.py b/pexpect/socket_spawn.py index 760dca8..5bee9b1 100644 --- a/pexpect/socket_spawn.py +++ b/pexpect/socket_spawn.py @@ -20,12 +20,11 @@ PEXPECT LICENSE """ -import os import socket from contextlib import contextmanager -from exceptions import TIMEOUT -from spawnbase import SpawnBase +from .exceptions import TIMEOUT, EOF +from .spawnbase import SpawnBase __all__ = ["SocketSpawn"] @@ -82,16 +81,8 @@ class SocketSpawn(SpawnBase): self.closed = True def isalive(self): - """This checks if the socket is still valid. If :func:`os.fstat` - does not raise an exception then we assume it is alive.""" - - if self.child_fd == -1: - return False - try: - os.fstat(self.child_fd) - return True - except OSError: - return False + """ Alive if the fileno is valid """ + return self.socket.fileno() >= 0 def send(self, s) -> int: """Write to socket, return number of bytes written""" @@ -145,6 +136,10 @@ class SocketSpawn(SpawnBase): timeout = self.timeout try: with self._timeout(timeout): - return self.socket.recv(size) + s = self.socket.recv(size) + if s == b'': + self.flag_eof = True + raise EOF("Socket closed") + return s except socket.timeout: raise TIMEOUT("Timeout exceeded.") diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py index 5d7a168..a762d8f 100644 --- a/tests/PexpectTestCase.py +++ b/tests/PexpectTestCase.py @@ -49,22 +49,25 @@ class PexpectTestCase(unittest.TestCase): print('\n', self.id(), end=' ') sys.stdout.flush() - # some build agents will ignore SIGHUP and SIGINT, which python - # inherits. This causes some of the tests related to terminate() - # to fail. We set them to the default handlers that they should - # be, and restore them back to their SIG_IGN value on tearDown. - # - # I'm not entirely convinced they need to be restored, only our - # test runner is affected. - self.restore_ignored_signals = [ - value for value in (signal.SIGHUP, signal.SIGINT,) - if signal.getsignal(value) == signal.SIG_IGN] - if signal.SIGHUP in self.restore_ignored_signals: - # sighup should be set to default handler - signal.signal(signal.SIGHUP, signal.SIG_DFL) - if signal.SIGINT in self.restore_ignored_signals: - # SIGINT should be set to signal.default_int_handler - signal.signal(signal.SIGINT, signal.default_int_handler) + if sys.platform != 'win32': + # some build agents will ignore SIGHUP and SIGINT, which python + # inherits. This causes some of the tests related to terminate() + # to fail. We set them to the default handlers that they should + # be, and restore them back to their SIG_IGN value on tearDown. + # + # I'm not entirely convinced they need to be restored, only our + # test runner is affected. + self.restore_ignored_signals = [ + value for value in (signal.SIGHUP, signal.SIGINT,) + if signal.getsignal(value) == signal.SIG_IGN] + if signal.SIGHUP in self.restore_ignored_signals: + # sighup should be set to default handler + signal.signal(signal.SIGHUP, signal.SIG_DFL) + if signal.SIGINT in self.restore_ignored_signals: + # SIGINT should be set to signal.default_int_handler + signal.signal(signal.SIGINT, signal.default_int_handler) + else: + self.restore_ignored_signals = [] unittest.TestCase.setUp(self) def tearDown(self): diff --git a/tests/test_socketspawn.py b/tests/test_socketspawn.py new file mode 100644 index 0000000..2db5e79 --- /dev/null +++ b/tests/test_socketspawn.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +''' +PEXPECT LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier <noah@noah.org> + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' +import pexpect +from pexpect import socket_spawn +import unittest +from . import PexpectTestCase +import socket + +def open_file_socket(filename): + read_socket, write_socket = socket.socketpair() + with open(filename, "rb") as file: + write_socket.sendall(file.read()) + write_socket.close() + return read_socket + +class ExpectTestCase(PexpectTestCase.PexpectTestCase): + def setUp(self): + print(self.id()) + PexpectTestCase.PexpectTestCase.setUp(self) + + def test_socket (self): + socket = open_file_socket('TESTDATA.txt') + s = socket_spawn.SocketSpawn(socket) + s.expect(b'This is the end of test data:') + s.expect(pexpect.EOF) + self.assertEqual(s.before, b' END\n') + + def test_maxread (self): + socket = open_file_socket('TESTDATA.txt') + s = socket_spawn.SocketSpawn(socket) + s.maxread = 100 + s.expect('2') + s.expect ('This is the end of test data:') + s.expect (pexpect.EOF) + self.assertEqual(s.before, b' END\n') + + def test_socket_isalive (self): + socket = open_file_socket('TESTDATA.txt') + s = socket_spawn.SocketSpawn(socket) + assert s.isalive() + s.close() + assert not s.isalive(), "Should not be alive after close()" + + def test_socket_isatty (self): + socket = open_file_socket('TESTDATA.txt') + s = socket_spawn.SocketSpawn(socket) + assert not s.isatty() + s.close() + + +if __name__ == '__main__': + unittest.main() + +suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase) |