diff options
Diffstat (limited to 'extra/usb_power/powerlog.py')
-rwxr-xr-x | extra/usb_power/powerlog.py | 130 |
1 files changed, 103 insertions, 27 deletions
diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py index 81d050bb91..6128ca2027 100755 --- a/extra/usb_power/powerlog.py +++ b/extra/usb_power/powerlog.py @@ -46,7 +46,19 @@ class Spower(object): _write_ep: pyUSB write endpoint for this interface """ - INA231 = 1 + # INA interface type. + INA_POWER = 1 + INA_BUSV = 2 + INA_CURRENT = 3 + INA_SHUNTV = 4 + + # usb power commands + CMD_RESET = 0x0000 + CMD_STOP = 0x0001 + CMD_ADDINA = 0x0002 + CMD_START = 0x0003 + CMD_NEXT = 0x0004 + CMD_SETTIME = 0x0005 # Map between header channel number (0-47) # and INA I2C bus/addr on sweetberry. @@ -180,7 +192,8 @@ class Spower(object): """ Clear INA description struct.""" self._inas = [] - def append_ina_struct(self, name, rs, port, addr, data=None): + def append_ina_struct(self, name, rs, port, addr, + data=None, ina_type=INA_POWER): """Add an INA descriptor into the list of active INAs. Args: @@ -189,18 +202,22 @@ class Spower(object): port: I2C channel this INA is connected to. addr: I2C addr of this INA. data: Misc data for special handling, board specific. + ina_type: INA function to use, power, voltage, etc. """ ina = {} ina['name'] = name ina['rs'] = rs ina['port'] = port ina['addr'] = addr + ina['type'] = ina_type # Calculate INA231 Calibration register # (see INA231 spec p.15) # CurrentLSB = uA per div = 80mV / (Rsh * 2^15) # CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000) ina['uAscale'] = 80000000. / (rs * 0x8000); ina['uWscale'] = 25. * ina['uAscale']; + ina['mVscale'] = 1.25 + ina['uVscale'] = 2.5 ina['data'] = data self._inas.append(ina) @@ -265,7 +282,7 @@ class Spower(object): def send_reset(self): """Reset the power interface on the stm32""" - cmd = struct.pack("<H", 0x0000) + cmd = struct.pack("<H", self.CMD_RESET) ret = self.wr_command(cmd, rtimeout=50, wtimeout=50) debuglog("Command RESET: %s" % "success" if ret == 0 else "failure") @@ -292,7 +309,7 @@ class Spower(object): def stop(self): """Stop any active data acquisition.""" - cmd = struct.pack("<H", 0x0001) + cmd = struct.pack("<H", self.CMD_STOP) ret = self.wr_command(cmd) debuglog("Command STOP: %s" % "success" if ret == 0 else "failure") @@ -306,22 +323,23 @@ class Spower(object): Returns: actual sampling interval in ms. """ - cmd = struct.pack("<HI", 0x0003, integration_us) + cmd = struct.pack("<HI", self.CMD_START, integration_us) read = self.wr_command(cmd, read_count=5) actual_us = 0 if len(read) == 5: ret, actual_us = struct.unpack("<BI", read) - debuglog("Command START: %s %dus" % ("success" if ret == 0 else "failure", actual_us)) + debuglog("Command START: %s %dus" % ( + "success" if ret == 0 else "failure", actual_us)) else: debuglog("Command START: FAIL") return actual_us - def add_ina_name(self, name): + def add_ina_name(self, name_tuple): """Add INA from board config. Args: - name: readable name of power rail in board config. + name_tuple: name and type of power rail in board config. Returns: True if INA added, False if the INA is not on this board. @@ -329,6 +347,8 @@ class Spower(object): Raises: Exception on unexpected failure. """ + name, ina_type = name_tuple + for datum in self._brdcfg: if datum["name"] == name: channel = int(datum["channel"]) @@ -337,7 +357,7 @@ class Spower(object): if board == self._board: port, addr = self.CHMAP[channel] - self.add_ina(port, self.INA231, addr, 0, rs, data=datum) + self.add_ina(port, ina_type, addr, 0, rs, data=datum) return True else: return False @@ -350,7 +370,7 @@ class Spower(object): timestamp_us: host timestmap in us. """ # 0x0005 , 8 byte timestamp - cmd = struct.pack("<HQ", 0x0005, timestamp_us) + cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us) ret = self.wr_command(cmd) debuglog("Command SETTIME: %s" % "success" if ret == 0 else "failure") @@ -360,20 +380,22 @@ class Spower(object): Args: bus: which i2c bus the INA is on. Same ordering as Si2c. - ina_type: which model INA. 0x1 -> INA231 + ina_type: Ina interface: INA_POWER/BUSV/etc. addr: 7 bit i2c addr of this INA extra: extra data for nonstandard configs. resistance: int, shunt resistance in mOhm """ # 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs - cmd = struct.pack("<HBBBBI", 0x0002, bus, ina_type, addr, extra, resistance) + cmd = struct.pack("<HBBBBI", self.CMD_ADDINA, + bus, ina_type, addr, extra, resistance) ret = self.wr_command(cmd) if ret == 0: if data: name = data['name'] else: name = "ina%d_%02x" % (bus, addr) - self.append_ina_struct(name, resistance, bus, addr, data=data) + self.append_ina_struct(name, resistance, bus, addr, + data=data, ina_type=ina_type) debuglog("Command ADD_INA: %s" % "success" if ret == 0 else "failure") def report_header_size(self): @@ -396,12 +418,12 @@ class Spower(object): """Read a line of data from the setup INAs Returns: - list of dicts of the values read, otherwise None. - [{ts:100, vbat:450}, {ts:200, vbat:440}] + list of dicts of the values read by ina/type tuple, otherwise None. + [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}] """ try: expected_bytes = self.report_size(len(self._inas)) - cmd = struct.pack("<H", 0x0004) + cmd = struct.pack("<H", self.CMD_NEXT) bytesread = self.wr_command(cmd, read_count=expected_bytes) except usb.core.USBError as e: print("READ LINE FAILED %s" % e) @@ -455,11 +477,23 @@ class Spower(object): for i in range(0, size): idx = self.report_header_size() + 2*i - raw_w = struct.unpack("<H", data[idx:idx+2])[0] - uw = raw_w * self._inas[i]['uWscale'] name = self._inas[i]['name'] - debuglog("READ %d %s: %fs: %fuW" % (i, name, ftimestamp, uw)) - record[self._inas[i]['name']] = uw + name_tuple = (self._inas[i]['name'], self._inas[i]['type']) + + raw_val = struct.unpack("<h", data[idx:idx+2])[0] + + if self._inas[i]['type'] == Spower.INA_POWER: + val = raw_val * self._inas[i]['uWscale'] + elif self._inas[i]['type'] == Spower.INA_BUSV: + val = raw_val * self._inas[i]['mVscale'] + elif self._inas[i]['type'] == Spower.INA_CURRENT: + val = raw_val * self._inas[i]['uAscale'] + elif self._inas[i]['type'] == Spower.INA_SHUNTV: + val = raw_val * self._inas[i]['uVscale'] + + debuglog("READ %d %s: %fs: 0x%04x %f" % (i, + name, ftimestamp, raw_val, val)) + record[name_tuple] = val return record @@ -532,7 +566,7 @@ class powerlog(object): with open(cfgfile) as data_file: names = json.load(data_file) - self._names = names + self._names = self.process_scenario(names) for key in self._pwr: self._pwr[key].load_board(brdfile) @@ -562,6 +596,37 @@ class powerlog(object): else: self._pwr[key].set_time(0) + def process_scenario(self, name_list): + """Return list of tuples indicating name and type. + + Args: + json originated list of names, or [name, type] + Returns: + list of tuples of (name, type) defaulting to type "POWER" + Raises: exception, invalid INA type. + """ + names = [] + for entry in name_list: + if isinstance(entry, list): + name = entry[0] + if entry[1] == "POWER": + type = Spower.INA_POWER + elif entry[1] == "BUSV": + type = Spower.INA_BUSV + elif entry[1] == "CURRENT": + type = Spower.INA_CURRENT + elif entry[1] == "SHUNTV": + type = Spower.INA_SHUNTV + else: + raise Exception("Invalid INA type", "Type of %s [%s] not recognized," + " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1])) + else: + name = entry + type = Spower.INA_POWER + + names.append((name, type)) + return names + def start(self, integration_us_request, seconds, sync_speed=.8): """Starts sampling. @@ -585,8 +650,18 @@ class powerlog(object): # CSV header if self._print_raw_data: title = "ts:%dus" % integration_us - for name in self._names: - unit = "mW" if self._use_mW else "uW" + for name_tuple in self._names: + name, ina_type = name_tuple + + if ina_type == Spower.INA_POWER: + unit = "mW" if self._use_mW else "uW" + elif ina_type == Spower.INA_BUSV: + unit = "mV" + elif ina_type == Spower.INA_CURRENT: + unit = "uA" + elif ina_type == Spower.INA_SHUNTV: + unit = "uV" + title += ", %s %s" % (name, unit) title += ", status" logoutput(title) @@ -625,10 +700,11 @@ class powerlog(object): csv = "%f" % aggregate_record["ts"] for name in self._names: if name in aggregate_record: - multiplier = 0.001 if self._use_mW else 1 - power = aggregate_record[name] * multiplier - csv += ", %.2f" % power - self._data.AddValue(name, power) + multiplier = 0.001 if (self._use_mW and + name[1]==Spower.INA_POWER) else 1 + value = aggregate_record[name] * multiplier + csv += ", %.2f" % value + self._data.AddValue(name, value) else: csv += ", " csv += ", %d" % aggregate_record["status"] |