summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorTomasz Bursztyka <tomasz.bursztyka@linux.intel.com>2014-05-12 16:10:14 +0300
committerPatrik Flykt <patrik.flykt@linux.intel.com>2014-05-13 10:19:07 +0300
commit4141c90712858c7fd6a1dd2ee0e80acbab2e19c1 (patch)
tree78cafc43c17cbb57ee05bf95afeb2e781a93ff96 /test
parente96a2cb241debd7a65971d30f28e903b9a1da49b (diff)
downloadconnman-4141c90712858c7fd6a1dd2ee0e80acbab2e19c1.tar.gz
test: Improve p2p-on-supplicant test script for P2P connection
- Provide a device name for the local p2p device - join or negociate for a group according to distant peer - GroupFinished signal handler added by Eduardo Abinader <eduardo.abinader@openbossa.org>
Diffstat (limited to 'test')
-rwxr-xr-xtest/p2p-on-supplicant175
1 files changed, 156 insertions, 19 deletions
diff --git a/test/p2p-on-supplicant b/test/p2p-on-supplicant
index b59c11cc..e845c885 100755
--- a/test/p2p-on-supplicant
+++ b/test/p2p-on-supplicant
@@ -13,9 +13,12 @@ WPA_INTF='fi.w1.wpa_supplicant1'
WPA_PATH='/fi/w1/wpa_supplicant1'
WPA_IF_INTF = WPA_INTF + '.Interface'
WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice'
+WPA_GROUP_INTF = WPA_INTF + '.Group'
WPA_PEER_INTF = WPA_INTF + '.Peer'
DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties'
+P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0
+
class InputLine:
def __init__(self, handler):
self.line = ''
@@ -69,12 +72,35 @@ def checkarg(nb_args = 0, min_args = False):
return wrapper
return under
+def print_dict(d):
+ for k in d:
+ try:
+ if type(d[k]) is dbus.Byte:
+ print 'Key %s --> 0x%x' % (k, d[k])
+ else:
+ print 'Key %s --> %s' % (k, d[k])
+ except:
+ print "Error: Key %s content cannot be printed" % k
+ pass
+
+def print_tuple(t):
+ for e in t:
+ if type(e) is dbus.Dictionary:
+ print_dict(e)
+ else:
+ print 'Element: %s' % e
+
class Wpa_s:
def __init__(self, iface_name = None):
self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF)
bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH,
member_keyword='signal')
+ bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH,
+ signal_name='InterfaceAdded')
+ bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH,
+ signal_name='InterfaceRemoved')
self.__reset()
+ self.debug = False
self.line_in = InputLine(self.__command)
@@ -84,6 +110,8 @@ class Wpa_s:
def help(self, args):
print 'Commands:'
print 'quit'
+ print 'enable_debug'
+ print 'disable_debug'
print 'create_if <iface_name>'
print 'get_if <iface_name>'
print 'del_if'
@@ -93,9 +121,11 @@ class Wpa_s:
print 'p2p_flush'
print 'p2p_group_add'
print 'p2p_group_remove'
+ print 'p2p_group'
print 'p2p_peers'
print 'p2p_peer <p2p device name>'
print 'p2p_connect <p2p device name>'
+ print 'p2p_disconnect <p2p device name>'
print 'p2p_serv_disc_req'
print 'p2p_serv_disc_cancel_req <identifier>'
print 'p2p_service_add <service type> <version/query> <service/response>'
@@ -120,11 +150,19 @@ class Wpa_s:
print 'WPA - Signal: %s' % kwargs.get('signal')
def __if_property_changed(*args, **kwargs):
- print 'IF - Signal: %s' % kwargs.get('signal')
+ signal = kwargs.get('signal')
+ print 'IF - Signal: %s' % signal
+
+ if signal == 'BSSAdded':
+ return
+
+ if args[0].debug:
+ print_tuple(args[1:])
def __p2p_property_changed(*args, **kwargs):
- signal = kwargs.get('signal')
- print 'IF P2P - Signal: %s' % signal
+ print 'IF P2P - Signal: %s' % kwargs.get('signal')
+ if args[0].debug:
+ print_tuple(args[1:])
"""
It should be: __DeviceFound(self, object_path, properties)
@@ -143,18 +181,56 @@ class Wpa_s:
if object_path in self.peers:
del self.peers[object_path]
- """
- "Of course"... properties are not the group's properties,
- but a bunch of informations related to the group:
- - object path of the interface object
- - it's role
- - object path of the group object
- """
+ def __PeerJoined(self, object_path):
+ print 'Peer %s joined' % object_path
+
+ def __PeerDisconnected(self, object_path):
+ print 'Peer %s dictonnected' % object_path
+
+ def __group_if_property_changed(*args, **kwargs):
+ print 'Group - ',
+ args[0].__if_property_changed(*args, **kwargs)
+
+ def __group_if_p2p_property_changed(*args, **kwargs):
+ print 'Group - ',
+ args[0].__p2p_property_changed(*args, **kwargs)
+
+ def __GroupFinished(self, ifname, role):
+ print 'Group running on %s is being removed' % ifname
+ self.group_obj = self.group_if = self.group_iface_path = None
+
def __GroupStarted(self, properties):
- self.group = properties
+ self.group_obj = properties['group_object']
+ bus.add_signal_receiver(self.__PeerJoined,
+ dbus_interface=WPA_GROUP_INTF,
+ path=self.group_obj,
+ signal_name='PeerJoined')
+ bus.add_signal_receiver(self.__PeerDisconnected,
+ dbus_interface=WPA_GROUP_INTF,
+ path=self.group_obj,
+ signal_name='PeerDisconnected')
+
+ self.group_iface_path = properties['interface_object']
self.group_if = dbus.Interface(bus.get_object(WPA_INTF,
- self.group['interface_object']),
+ self.group_iface_path),
WPA_P2P_INTF)
+ bus.add_signal_receiver(self.__group_if_property_changed,
+ dbus_interface=WPA_IF_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+ bus.add_signal_receiver(self.__group_if_p2p_property_changed,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+ bus.add_signal_receiver(self.__GroupFinished,
+ dbus_interface=WPA_P2P_INTF,
+ path=self.group_iface_path,
+ member_keyword='signal')
+
+ if self.debug:
+ group = dbus.Interface(bus.get_object(WPA_INTF, self.group_obj),
+ DBUS_PROPERTIES_INTF)
+ print_dict(group.GetAll(WPA_GROUP_INTF))
def __ServiceDiscoveryResponse(self, response):
peer = response['peer_object']
@@ -162,6 +238,17 @@ class Wpa_s:
print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName'])
print response['tlvs']
+ def __InterfaceAdded(self, path, properties):
+ print 'Interface %s Added (%s)' % (properties['Ifname'], path)
+ if self.debug:
+ print_dict(properties)
+ p2p = dbus.Interface(bus.get_object(WPA_INTF,
+ path), DBUS_PROPERTIES_INTF)
+ print_dict(p2p.GetAll(WPA_P2P_INTF))
+
+ def __InterfaceRemoved(self, path):
+ print 'Interface Removed (%s)' % (path)
+
def __listen_if_signals(self):
bus.add_signal_receiver(self.__if_property_changed,
dbus_interface=WPA_IF_INTF,
@@ -190,7 +277,7 @@ class Wpa_s:
def __reset(self):
self.iface_path = self.iface_name = self.iface = None
- self.p2p = self.group = self.group_if = None
+ self.p2p = self.group_if = self.group_obj = None
self.peers = {}
def __set_if(self, iface_name):
@@ -199,10 +286,22 @@ class Wpa_s:
self.p2p = dbus.Interface(bus.get_object(WPA_INTF,
self.iface_path), WPA_P2P_INTF)
+ p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF)
+ p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig',
+ dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' },
+ signature='sv'))
print 'Interface %s: %s' % (iface_name, self.iface_path)
self.iface_name = iface_name
self.__listen_if_signals()
+ @checkarg()
+ def enable_debug(self, args):
+ self.debug = True
+
+ @checkarg()
+ def disable_debug(serf, args):
+ self.debug = False
+
@checkarg(nb_args=1)
def create_if(self, args):
self.__reset()
@@ -282,21 +381,50 @@ class Wpa_s:
def p2p_peer(self, args):
peer = self.__find_peer(args[0])
if peer:
- for k in peer:
- print '%s = %s' % (k, peer[k])
+ print_dict(peer)
@checkarg(nb_args = 1)
def p2p_connect(self, args):
if not self.p2p:
return
- peer = self.__find_peer(args[0], True)
- if peer:
- pin = self.p2p.Connect(({ 'peer' : peer,
- 'wps_method' : 'pbc'}))
+ peer = self.__find_peer(args[0])
+ if not peer:
+ return
+
+ peer_path = self.__find_peer(args[0], True)
+
+ if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER ==
+ P2P_GROUP_CAPAB_GROUP_OWNER):
+ print 'Joining an existing P2P group'
+ pin = self.p2p.Connect(({ 'peer' : peer_path,
+ 'wps_method' : 'pbc',
+ 'join' : True,
+ 'go_intent' : 0 }))
+ else:
+ print 'Associating with another P2P device'
+ pin = self.p2p.Connect(({ 'peer' : peer_path,
+ 'wps_method' : 'pbc',
+ 'join' : False,
+ 'go_intent' : 7 }))
if not pin:
print 'WPS PIN in use: %s' % pin
+ @checkarg(nb_args = 1)
+ def p2p_disconnect(self, args):
+ if not self.p2p:
+ return
+
+ peer = self.__find_peer(args[0])
+ if not peer:
+ return
+
+ if not self.group_if:
+ print 'Peer %s is not connected' % (peer['DeviceName'])
+ return
+
+ self.group_if.Disconnect()
+
@checkarg()
def p2p_group_add(self, args):
if not self.p2p:
@@ -312,6 +440,15 @@ class Wpa_s:
self.group_if.Disconnect()
@checkarg()
+ def p2p_group(self, args):
+ if not self.group_obj:
+ return
+
+ group = dbus.Interface(bus.get_object(WPA_INTF,
+ self.group_obj), DBUS_PROPERTIES_INTF)
+ print_dict(group.GetAll(WPA_GROUP_INTF))
+
+ @checkarg()
def p2p_flush(self, args):
if not self.p2p:
return