diff options
author | Tomasz Bursztyka <tomasz.bursztyka@linux.intel.com> | 2014-05-12 16:10:14 +0300 |
---|---|---|
committer | Patrik Flykt <patrik.flykt@linux.intel.com> | 2014-05-13 10:19:07 +0300 |
commit | 4141c90712858c7fd6a1dd2ee0e80acbab2e19c1 (patch) | |
tree | 78cafc43c17cbb57ee05bf95afeb2e781a93ff96 /test | |
parent | e96a2cb241debd7a65971d30f28e903b9a1da49b (diff) | |
download | connman-4141c90712858c7fd6a1dd2ee0e80acbab2e19c1.tar.gz |
test: Improve p2p-on-supplicant test script for P2P connection
- Provide a device name for the local p2p device
- join or negociate for a group according to distant peer
- GroupFinished signal handler added by
Eduardo Abinader <eduardo.abinader@openbossa.org>
Diffstat (limited to 'test')
-rwxr-xr-x | test/p2p-on-supplicant | 175 |
1 files changed, 156 insertions, 19 deletions
diff --git a/test/p2p-on-supplicant b/test/p2p-on-supplicant index b59c11cc..e845c885 100755 --- a/test/p2p-on-supplicant +++ b/test/p2p-on-supplicant @@ -13,9 +13,12 @@ WPA_INTF='fi.w1.wpa_supplicant1' WPA_PATH='/fi/w1/wpa_supplicant1' WPA_IF_INTF = WPA_INTF + '.Interface' WPA_P2P_INTF = WPA_IF_INTF + '.P2PDevice' +WPA_GROUP_INTF = WPA_INTF + '.Group' WPA_PEER_INTF = WPA_INTF + '.Peer' DBUS_PROPERTIES_INTF = 'org.freedesktop.DBus.Properties' +P2P_GROUP_CAPAB_GROUP_OWNER = 1 << 0 + class InputLine: def __init__(self, handler): self.line = '' @@ -69,12 +72,35 @@ def checkarg(nb_args = 0, min_args = False): return wrapper return under +def print_dict(d): + for k in d: + try: + if type(d[k]) is dbus.Byte: + print 'Key %s --> 0x%x' % (k, d[k]) + else: + print 'Key %s --> %s' % (k, d[k]) + except: + print "Error: Key %s content cannot be printed" % k + pass + +def print_tuple(t): + for e in t: + if type(e) is dbus.Dictionary: + print_dict(e) + else: + print 'Element: %s' % e + class Wpa_s: def __init__(self, iface_name = None): self.wpa = dbus.Interface(bus.get_object(WPA_NAME, WPA_PATH), WPA_INTF) bus.add_signal_receiver(self.__wpa_property_changed, path=WPA_PATH, member_keyword='signal') + bus.add_signal_receiver(self.__InterfaceAdded, path=WPA_PATH, + signal_name='InterfaceAdded') + bus.add_signal_receiver(self.__InterfaceRemoved, path=WPA_PATH, + signal_name='InterfaceRemoved') self.__reset() + self.debug = False self.line_in = InputLine(self.__command) @@ -84,6 +110,8 @@ class Wpa_s: def help(self, args): print 'Commands:' print 'quit' + print 'enable_debug' + print 'disable_debug' print 'create_if <iface_name>' print 'get_if <iface_name>' print 'del_if' @@ -93,9 +121,11 @@ class Wpa_s: print 'p2p_flush' print 'p2p_group_add' print 'p2p_group_remove' + print 'p2p_group' print 'p2p_peers' print 'p2p_peer <p2p device name>' print 'p2p_connect <p2p device name>' + print 'p2p_disconnect <p2p device name>' print 'p2p_serv_disc_req' print 'p2p_serv_disc_cancel_req <identifier>' print 'p2p_service_add <service type> <version/query> <service/response>' @@ -120,11 +150,19 @@ class Wpa_s: print 'WPA - Signal: %s' % kwargs.get('signal') def __if_property_changed(*args, **kwargs): - print 'IF - Signal: %s' % kwargs.get('signal') + signal = kwargs.get('signal') + print 'IF - Signal: %s' % signal + + if signal == 'BSSAdded': + return + + if args[0].debug: + print_tuple(args[1:]) def __p2p_property_changed(*args, **kwargs): - signal = kwargs.get('signal') - print 'IF P2P - Signal: %s' % signal + print 'IF P2P - Signal: %s' % kwargs.get('signal') + if args[0].debug: + print_tuple(args[1:]) """ It should be: __DeviceFound(self, object_path, properties) @@ -143,18 +181,56 @@ class Wpa_s: if object_path in self.peers: del self.peers[object_path] - """ - "Of course"... properties are not the group's properties, - but a bunch of informations related to the group: - - object path of the interface object - - it's role - - object path of the group object - """ + def __PeerJoined(self, object_path): + print 'Peer %s joined' % object_path + + def __PeerDisconnected(self, object_path): + print 'Peer %s dictonnected' % object_path + + def __group_if_property_changed(*args, **kwargs): + print 'Group - ', + args[0].__if_property_changed(*args, **kwargs) + + def __group_if_p2p_property_changed(*args, **kwargs): + print 'Group - ', + args[0].__p2p_property_changed(*args, **kwargs) + + def __GroupFinished(self, ifname, role): + print 'Group running on %s is being removed' % ifname + self.group_obj = self.group_if = self.group_iface_path = None + def __GroupStarted(self, properties): - self.group = properties + self.group_obj = properties['group_object'] + bus.add_signal_receiver(self.__PeerJoined, + dbus_interface=WPA_GROUP_INTF, + path=self.group_obj, + signal_name='PeerJoined') + bus.add_signal_receiver(self.__PeerDisconnected, + dbus_interface=WPA_GROUP_INTF, + path=self.group_obj, + signal_name='PeerDisconnected') + + self.group_iface_path = properties['interface_object'] self.group_if = dbus.Interface(bus.get_object(WPA_INTF, - self.group['interface_object']), + self.group_iface_path), WPA_P2P_INTF) + bus.add_signal_receiver(self.__group_if_property_changed, + dbus_interface=WPA_IF_INTF, + path=self.group_iface_path, + member_keyword='signal') + bus.add_signal_receiver(self.__group_if_p2p_property_changed, + dbus_interface=WPA_P2P_INTF, + path=self.group_iface_path, + member_keyword='signal') + bus.add_signal_receiver(self.__GroupFinished, + dbus_interface=WPA_P2P_INTF, + path=self.group_iface_path, + member_keyword='signal') + + if self.debug: + group = dbus.Interface(bus.get_object(WPA_INTF, self.group_obj), + DBUS_PROPERTIES_INTF) + print_dict(group.GetAll(WPA_GROUP_INTF)) def __ServiceDiscoveryResponse(self, response): peer = response['peer_object'] @@ -162,6 +238,17 @@ class Wpa_s: print 'Peer %s has this TLVs:' % (self.peers[peer]['DeviceName']) print response['tlvs'] + def __InterfaceAdded(self, path, properties): + print 'Interface %s Added (%s)' % (properties['Ifname'], path) + if self.debug: + print_dict(properties) + p2p = dbus.Interface(bus.get_object(WPA_INTF, + path), DBUS_PROPERTIES_INTF) + print_dict(p2p.GetAll(WPA_P2P_INTF)) + + def __InterfaceRemoved(self, path): + print 'Interface Removed (%s)' % (path) + def __listen_if_signals(self): bus.add_signal_receiver(self.__if_property_changed, dbus_interface=WPA_IF_INTF, @@ -190,7 +277,7 @@ class Wpa_s: def __reset(self): self.iface_path = self.iface_name = self.iface = None - self.p2p = self.group = self.group_if = None + self.p2p = self.group_if = self.group_obj = None self.peers = {} def __set_if(self, iface_name): @@ -199,10 +286,22 @@ class Wpa_s: self.p2p = dbus.Interface(bus.get_object(WPA_INTF, self.iface_path), WPA_P2P_INTF) + p2p_if = dbus.Interface(self.p2p, DBUS_PROPERTIES_INTF) + p2p_if.Set(WPA_P2P_INTF, 'P2PDeviceConfig', + dbus.Dictionary({ 'DeviceName' : 'ConnManP2P' }, + signature='sv')) print 'Interface %s: %s' % (iface_name, self.iface_path) self.iface_name = iface_name self.__listen_if_signals() + @checkarg() + def enable_debug(self, args): + self.debug = True + + @checkarg() + def disable_debug(serf, args): + self.debug = False + @checkarg(nb_args=1) def create_if(self, args): self.__reset() @@ -282,21 +381,50 @@ class Wpa_s: def p2p_peer(self, args): peer = self.__find_peer(args[0]) if peer: - for k in peer: - print '%s = %s' % (k, peer[k]) + print_dict(peer) @checkarg(nb_args = 1) def p2p_connect(self, args): if not self.p2p: return - peer = self.__find_peer(args[0], True) - if peer: - pin = self.p2p.Connect(({ 'peer' : peer, - 'wps_method' : 'pbc'})) + peer = self.__find_peer(args[0]) + if not peer: + return + + peer_path = self.__find_peer(args[0], True) + + if (peer['groupcapability'] & P2P_GROUP_CAPAB_GROUP_OWNER == + P2P_GROUP_CAPAB_GROUP_OWNER): + print 'Joining an existing P2P group' + pin = self.p2p.Connect(({ 'peer' : peer_path, + 'wps_method' : 'pbc', + 'join' : True, + 'go_intent' : 0 })) + else: + print 'Associating with another P2P device' + pin = self.p2p.Connect(({ 'peer' : peer_path, + 'wps_method' : 'pbc', + 'join' : False, + 'go_intent' : 7 })) if not pin: print 'WPS PIN in use: %s' % pin + @checkarg(nb_args = 1) + def p2p_disconnect(self, args): + if not self.p2p: + return + + peer = self.__find_peer(args[0]) + if not peer: + return + + if not self.group_if: + print 'Peer %s is not connected' % (peer['DeviceName']) + return + + self.group_if.Disconnect() + @checkarg() def p2p_group_add(self, args): if not self.p2p: @@ -312,6 +440,15 @@ class Wpa_s: self.group_if.Disconnect() @checkarg() + def p2p_group(self, args): + if not self.group_obj: + return + + group = dbus.Interface(bus.get_object(WPA_INTF, + self.group_obj), DBUS_PROPERTIES_INTF) + print_dict(group.GetAll(WPA_GROUP_INTF)) + + @checkarg() def p2p_flush(self, args): if not self.p2p: return |