1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# coding=utf8
# mainloop/jm_tests.py -- testcases for JSON-parsing state machine abstraction
#
# Copyright (C) 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
import contextlib
import logging
import socket
import unittest
import distbuild
class JsonMachineTests(unittest.TestCase):
def setUp(self):
self.loop = distbuild.MainLoop()
@contextlib.contextmanager
def listening_json_machine(self, port):
'''Return a writable socket connected to an example JSON machine.
This example uses the code path that listens for incoming build jobs
from remote machines running `morph distbuild`.
'''
listener = distbuild.ListenServer(
'localhost', port, distbuild.InitiatorConnection, [None, None])
self.loop.add_state_machine(listener)
listen_socket = socket.socket()
try:
listen_socket.connect(('localhost', port))
self.loop._run_once()
yield listen_socket
finally:
logging.info('Shutting down test connection handler')
listen_socket.shutdown(socket.SHUT_RDWR)
listener.close()
self.loop.run()
def test_invalid_input(self):
'''Distbuild processes must be robust against invalid input.
Anyone can write to the port that listens for connections from the
`morph distbuild` command. It should not be possible to break the
distbuild controller by sending it unexpected input.
'''
def send_and_process(string):
sock.sendall(string + '\n')
self.loop._run_once()
invalid_inputs = [
'fooin',
'{ "foo": "bar" }',
bytearray([1, 185, 5, 53]),
]
with self.listening_json_machine(7979) as sock:
for message in invalid_inputs:
send_and_process(message)
|