1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#!/usr/bin/env python
from dbus.mainloop.glib import DBusGMainLoop
import dbus
import dbus.bus
import dbus.service
import dbus.server
import gobject
import os, sys
class TestService(dbus.service.Object):
NAME = 'org.freedesktop.dbus.TestService'
PATH = '/org/freedesktop/dbus/TestService'
def __init__(self, conn, path=PATH):
super(TestService, self).__init__(conn, path)
@dbus.service.method(dbus_interface=NAME,
out_signature='s',
in_signature='s')
def reverse(self, text):
text = list(text)
text.reverse()
return ''.join(text)
class TestServer(dbus.server.Server):
def __init__(self, *args, **kwargs):
super(TestServer, self).__init__(*args, **kwargs)
self.__connections = list()
def _on_new_connection(self, conn):
print 'new connection: %r' % conn
self.__connections.append(conn)
TestService(conn)
pin, pout = os.pipe()
child = os.fork()
if 0 == child:
DBusGMainLoop(set_as_default=True)
server = TestServer('unix:tmpdir=/tmp')
os.write(pout, server.address)
os.close(pout)
os.close(pin)
print 'server running: %s' % server.address
gobject.MainLoop().run()
print 'server quit'
print 'done?'
else:
os.waitpid(child, os.WNOHANG)
os.close(pout)
address = os.read(pin, 128)
os.close(pin)
client = dbus.bus.Connection(address)
proxy = client.get_object(TestService.NAME, TestService.PATH)
object = dbus.Interface(proxy, TestService.NAME)
while True:
line = sys.stdin.readline()
if not line: break
text = line.strip()
print 'reverse(%s): %s' % (text, object.reverse(text))
|