summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTapple <mfulmer@cisco.com>2023-02-21 13:41:20 -0500
committerTapple <mfulmer@cisco.com>2023-02-21 13:41:20 -0500
commit693a34d026ef940b7728c78bfae8818083f29ff2 (patch)
treeff16c3134dd694cbc3159cd688cf6d087330e84e
parentdce06b3ce546dfd5a090233ef0a639c3c5ce6fe6 (diff)
downloadpexpect-693a34d026ef940b7728c78bfae8818083f29ff2.tar.gz
socketspawn basic test case, copied from test_fdspawn
-rw-r--r--pexpect/socket_spawn.py23
-rw-r--r--tests/PexpectTestCase.py35
-rw-r--r--tests/test_socketspawn.py72
3 files changed, 100 insertions, 30 deletions
diff --git a/pexpect/socket_spawn.py b/pexpect/socket_spawn.py
index 760dca8..5bee9b1 100644
--- a/pexpect/socket_spawn.py
+++ b/pexpect/socket_spawn.py
@@ -20,12 +20,11 @@ PEXPECT LICENSE
"""
-import os
import socket
from contextlib import contextmanager
-from exceptions import TIMEOUT
-from spawnbase import SpawnBase
+from .exceptions import TIMEOUT, EOF
+from .spawnbase import SpawnBase
__all__ = ["SocketSpawn"]
@@ -82,16 +81,8 @@ class SocketSpawn(SpawnBase):
self.closed = True
def isalive(self):
- """This checks if the socket is still valid. If :func:`os.fstat`
- does not raise an exception then we assume it is alive."""
-
- if self.child_fd == -1:
- return False
- try:
- os.fstat(self.child_fd)
- return True
- except OSError:
- return False
+ """ Alive if the fileno is valid """
+ return self.socket.fileno() >= 0
def send(self, s) -> int:
"""Write to socket, return number of bytes written"""
@@ -145,6 +136,10 @@ class SocketSpawn(SpawnBase):
timeout = self.timeout
try:
with self._timeout(timeout):
- return self.socket.recv(size)
+ s = self.socket.recv(size)
+ if s == b'':
+ self.flag_eof = True
+ raise EOF("Socket closed")
+ return s
except socket.timeout:
raise TIMEOUT("Timeout exceeded.")
diff --git a/tests/PexpectTestCase.py b/tests/PexpectTestCase.py
index 5d7a168..a762d8f 100644
--- a/tests/PexpectTestCase.py
+++ b/tests/PexpectTestCase.py
@@ -49,22 +49,25 @@ class PexpectTestCase(unittest.TestCase):
print('\n', self.id(), end=' ')
sys.stdout.flush()
- # some build agents will ignore SIGHUP and SIGINT, which python
- # inherits. This causes some of the tests related to terminate()
- # to fail. We set them to the default handlers that they should
- # be, and restore them back to their SIG_IGN value on tearDown.
- #
- # I'm not entirely convinced they need to be restored, only our
- # test runner is affected.
- self.restore_ignored_signals = [
- value for value in (signal.SIGHUP, signal.SIGINT,)
- if signal.getsignal(value) == signal.SIG_IGN]
- if signal.SIGHUP in self.restore_ignored_signals:
- # sighup should be set to default handler
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
- if signal.SIGINT in self.restore_ignored_signals:
- # SIGINT should be set to signal.default_int_handler
- signal.signal(signal.SIGINT, signal.default_int_handler)
+ if sys.platform != 'win32':
+ # some build agents will ignore SIGHUP and SIGINT, which python
+ # inherits. This causes some of the tests related to terminate()
+ # to fail. We set them to the default handlers that they should
+ # be, and restore them back to their SIG_IGN value on tearDown.
+ #
+ # I'm not entirely convinced they need to be restored, only our
+ # test runner is affected.
+ self.restore_ignored_signals = [
+ value for value in (signal.SIGHUP, signal.SIGINT,)
+ if signal.getsignal(value) == signal.SIG_IGN]
+ if signal.SIGHUP in self.restore_ignored_signals:
+ # sighup should be set to default handler
+ signal.signal(signal.SIGHUP, signal.SIG_DFL)
+ if signal.SIGINT in self.restore_ignored_signals:
+ # SIGINT should be set to signal.default_int_handler
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+ else:
+ self.restore_ignored_signals = []
unittest.TestCase.setUp(self)
def tearDown(self):
diff --git a/tests/test_socketspawn.py b/tests/test_socketspawn.py
new file mode 100644
index 0000000..2db5e79
--- /dev/null
+++ b/tests/test_socketspawn.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+'''
+PEXPECT LICENSE
+
+ This license is approved by the OSI and FSF as GPL-compatible.
+ http://opensource.org/licenses/isc-license.txt
+
+ Copyright (c) 2012, Noah Spurrier <noah@noah.org>
+ PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
+ PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
+ COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''
+import pexpect
+from pexpect import socket_spawn
+import unittest
+from . import PexpectTestCase
+import socket
+
+def open_file_socket(filename):
+ read_socket, write_socket = socket.socketpair()
+ with open(filename, "rb") as file:
+ write_socket.sendall(file.read())
+ write_socket.close()
+ return read_socket
+
+class ExpectTestCase(PexpectTestCase.PexpectTestCase):
+ def setUp(self):
+ print(self.id())
+ PexpectTestCase.PexpectTestCase.setUp(self)
+
+ def test_socket (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_spawn.SocketSpawn(socket)
+ s.expect(b'This is the end of test data:')
+ s.expect(pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_maxread (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_spawn.SocketSpawn(socket)
+ s.maxread = 100
+ s.expect('2')
+ s.expect ('This is the end of test data:')
+ s.expect (pexpect.EOF)
+ self.assertEqual(s.before, b' END\n')
+
+ def test_socket_isalive (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_spawn.SocketSpawn(socket)
+ assert s.isalive()
+ s.close()
+ assert not s.isalive(), "Should not be alive after close()"
+
+ def test_socket_isatty (self):
+ socket = open_file_socket('TESTDATA.txt')
+ s = socket_spawn.SocketSpawn(socket)
+ assert not s.isatty()
+ s.close()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+suite = unittest.TestLoader().loadTestsFromTestCase(ExpectTestCase)