1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
"""Test client that connects and sends infinite data."""
import sys
from tulip import *
def dprint(*args):
print('source:', *args, file=sys.stderr)
class Client(Protocol):
data = b'x'*16*1024
def connection_made(self, tr):
dprint('connecting to', tr.get_extra_info('getpeername'))
dprint('my socket is', tr.get_extra_info('getsockname'))
self.tr = tr
self.lost = False
self.loop = get_event_loop()
self.waiter = Future()
if '--stop' in sys.argv[1:]:
self.tr.write(b'stop')
self.tr.close()
else:
self.write_some_data()
def write_some_data(self):
if self.lost:
dprint('lost already')
return
dprint('writing', len(self.data), 'bytes')
self.tr.write(self.data)
self.loop.call_soon(self.write_some_data)
def connection_lost(self, exc):
dprint('lost connection', repr(exc))
self.lost = True
self.waiter.set_result(None)
@coroutine
def start(loop):
tr, pr = yield from loop.create_connection(Client, '127.0.0.1', 1111)
dprint('tr =', tr)
dprint('pr =', pr)
res = yield from pr.waiter
return res
def main():
if '--iocp' in sys.argv:
from tulip.windows_events import ProactorEventLoop
loop = ProactorEventLoop()
set_event_loop(loop)
loop = get_event_loop()
loop.run_until_complete(start(loop))
loop.close()
if __name__ == '__main__':
main()
|