summaryrefslogtreecommitdiff
path: root/extra/usb_power/powerlog.py
diff options
context:
space:
mode:
Diffstat (limited to 'extra/usb_power/powerlog.py')
-rwxr-xr-xextra/usb_power/powerlog.py130
1 files changed, 103 insertions, 27 deletions
diff --git a/extra/usb_power/powerlog.py b/extra/usb_power/powerlog.py
index 81d050bb91..6128ca2027 100755
--- a/extra/usb_power/powerlog.py
+++ b/extra/usb_power/powerlog.py
@@ -46,7 +46,19 @@ class Spower(object):
_write_ep: pyUSB write endpoint for this interface
"""
- INA231 = 1
+ # INA interface type.
+ INA_POWER = 1
+ INA_BUSV = 2
+ INA_CURRENT = 3
+ INA_SHUNTV = 4
+
+ # usb power commands
+ CMD_RESET = 0x0000
+ CMD_STOP = 0x0001
+ CMD_ADDINA = 0x0002
+ CMD_START = 0x0003
+ CMD_NEXT = 0x0004
+ CMD_SETTIME = 0x0005
# Map between header channel number (0-47)
# and INA I2C bus/addr on sweetberry.
@@ -180,7 +192,8 @@ class Spower(object):
""" Clear INA description struct."""
self._inas = []
- def append_ina_struct(self, name, rs, port, addr, data=None):
+ def append_ina_struct(self, name, rs, port, addr,
+ data=None, ina_type=INA_POWER):
"""Add an INA descriptor into the list of active INAs.
Args:
@@ -189,18 +202,22 @@ class Spower(object):
port: I2C channel this INA is connected to.
addr: I2C addr of this INA.
data: Misc data for special handling, board specific.
+ ina_type: INA function to use, power, voltage, etc.
"""
ina = {}
ina['name'] = name
ina['rs'] = rs
ina['port'] = port
ina['addr'] = addr
+ ina['type'] = ina_type
# Calculate INA231 Calibration register
# (see INA231 spec p.15)
# CurrentLSB = uA per div = 80mV / (Rsh * 2^15)
# CurrentLSB uA = 80000000nV / (Rsh mOhm * 0x8000)
ina['uAscale'] = 80000000. / (rs * 0x8000);
ina['uWscale'] = 25. * ina['uAscale'];
+ ina['mVscale'] = 1.25
+ ina['uVscale'] = 2.5
ina['data'] = data
self._inas.append(ina)
@@ -265,7 +282,7 @@ class Spower(object):
def send_reset(self):
"""Reset the power interface on the stm32"""
- cmd = struct.pack("<H", 0x0000)
+ cmd = struct.pack("<H", self.CMD_RESET)
ret = self.wr_command(cmd, rtimeout=50, wtimeout=50)
debuglog("Command RESET: %s" % "success" if ret == 0 else "failure")
@@ -292,7 +309,7 @@ class Spower(object):
def stop(self):
"""Stop any active data acquisition."""
- cmd = struct.pack("<H", 0x0001)
+ cmd = struct.pack("<H", self.CMD_STOP)
ret = self.wr_command(cmd)
debuglog("Command STOP: %s" % "success" if ret == 0 else "failure")
@@ -306,22 +323,23 @@ class Spower(object):
Returns:
actual sampling interval in ms.
"""
- cmd = struct.pack("<HI", 0x0003, integration_us)
+ cmd = struct.pack("<HI", self.CMD_START, integration_us)
read = self.wr_command(cmd, read_count=5)
actual_us = 0
if len(read) == 5:
ret, actual_us = struct.unpack("<BI", read)
- debuglog("Command START: %s %dus" % ("success" if ret == 0 else "failure", actual_us))
+ debuglog("Command START: %s %dus" % (
+ "success" if ret == 0 else "failure", actual_us))
else:
debuglog("Command START: FAIL")
return actual_us
- def add_ina_name(self, name):
+ def add_ina_name(self, name_tuple):
"""Add INA from board config.
Args:
- name: readable name of power rail in board config.
+ name_tuple: name and type of power rail in board config.
Returns:
True if INA added, False if the INA is not on this board.
@@ -329,6 +347,8 @@ class Spower(object):
Raises:
Exception on unexpected failure.
"""
+ name, ina_type = name_tuple
+
for datum in self._brdcfg:
if datum["name"] == name:
channel = int(datum["channel"])
@@ -337,7 +357,7 @@ class Spower(object):
if board == self._board:
port, addr = self.CHMAP[channel]
- self.add_ina(port, self.INA231, addr, 0, rs, data=datum)
+ self.add_ina(port, ina_type, addr, 0, rs, data=datum)
return True
else:
return False
@@ -350,7 +370,7 @@ class Spower(object):
timestamp_us: host timestmap in us.
"""
# 0x0005 , 8 byte timestamp
- cmd = struct.pack("<HQ", 0x0005, timestamp_us)
+ cmd = struct.pack("<HQ", self.CMD_SETTIME, timestamp_us)
ret = self.wr_command(cmd)
debuglog("Command SETTIME: %s" % "success" if ret == 0 else "failure")
@@ -360,20 +380,22 @@ class Spower(object):
Args:
bus: which i2c bus the INA is on. Same ordering as Si2c.
- ina_type: which model INA. 0x1 -> INA231
+ ina_type: Ina interface: INA_POWER/BUSV/etc.
addr: 7 bit i2c addr of this INA
extra: extra data for nonstandard configs.
resistance: int, shunt resistance in mOhm
"""
# 0x0002, 1B: bus, 1B:INA type, 1B: INA addr, 1B: extra, 4B: Rs
- cmd = struct.pack("<HBBBBI", 0x0002, bus, ina_type, addr, extra, resistance)
+ cmd = struct.pack("<HBBBBI", self.CMD_ADDINA,
+ bus, ina_type, addr, extra, resistance)
ret = self.wr_command(cmd)
if ret == 0:
if data:
name = data['name']
else:
name = "ina%d_%02x" % (bus, addr)
- self.append_ina_struct(name, resistance, bus, addr, data=data)
+ self.append_ina_struct(name, resistance, bus, addr,
+ data=data, ina_type=ina_type)
debuglog("Command ADD_INA: %s" % "success" if ret == 0 else "failure")
def report_header_size(self):
@@ -396,12 +418,12 @@ class Spower(object):
"""Read a line of data from the setup INAs
Returns:
- list of dicts of the values read, otherwise None.
- [{ts:100, vbat:450}, {ts:200, vbat:440}]
+ list of dicts of the values read by ina/type tuple, otherwise None.
+ [{ts:100, (vbat, power):450}, {ts:200, (vbat, power):440}]
"""
try:
expected_bytes = self.report_size(len(self._inas))
- cmd = struct.pack("<H", 0x0004)
+ cmd = struct.pack("<H", self.CMD_NEXT)
bytesread = self.wr_command(cmd, read_count=expected_bytes)
except usb.core.USBError as e:
print("READ LINE FAILED %s" % e)
@@ -455,11 +477,23 @@ class Spower(object):
for i in range(0, size):
idx = self.report_header_size() + 2*i
- raw_w = struct.unpack("<H", data[idx:idx+2])[0]
- uw = raw_w * self._inas[i]['uWscale']
name = self._inas[i]['name']
- debuglog("READ %d %s: %fs: %fuW" % (i, name, ftimestamp, uw))
- record[self._inas[i]['name']] = uw
+ name_tuple = (self._inas[i]['name'], self._inas[i]['type'])
+
+ raw_val = struct.unpack("<h", data[idx:idx+2])[0]
+
+ if self._inas[i]['type'] == Spower.INA_POWER:
+ val = raw_val * self._inas[i]['uWscale']
+ elif self._inas[i]['type'] == Spower.INA_BUSV:
+ val = raw_val * self._inas[i]['mVscale']
+ elif self._inas[i]['type'] == Spower.INA_CURRENT:
+ val = raw_val * self._inas[i]['uAscale']
+ elif self._inas[i]['type'] == Spower.INA_SHUNTV:
+ val = raw_val * self._inas[i]['uVscale']
+
+ debuglog("READ %d %s: %fs: 0x%04x %f" % (i,
+ name, ftimestamp, raw_val, val))
+ record[name_tuple] = val
return record
@@ -532,7 +566,7 @@ class powerlog(object):
with open(cfgfile) as data_file:
names = json.load(data_file)
- self._names = names
+ self._names = self.process_scenario(names)
for key in self._pwr:
self._pwr[key].load_board(brdfile)
@@ -562,6 +596,37 @@ class powerlog(object):
else:
self._pwr[key].set_time(0)
+ def process_scenario(self, name_list):
+ """Return list of tuples indicating name and type.
+
+ Args:
+ json originated list of names, or [name, type]
+ Returns:
+ list of tuples of (name, type) defaulting to type "POWER"
+ Raises: exception, invalid INA type.
+ """
+ names = []
+ for entry in name_list:
+ if isinstance(entry, list):
+ name = entry[0]
+ if entry[1] == "POWER":
+ type = Spower.INA_POWER
+ elif entry[1] == "BUSV":
+ type = Spower.INA_BUSV
+ elif entry[1] == "CURRENT":
+ type = Spower.INA_CURRENT
+ elif entry[1] == "SHUNTV":
+ type = Spower.INA_SHUNTV
+ else:
+ raise Exception("Invalid INA type", "Type of %s [%s] not recognized,"
+ " try one of POWER, BUSV, CURRENT" % (entry[0], entry[1]))
+ else:
+ name = entry
+ type = Spower.INA_POWER
+
+ names.append((name, type))
+ return names
+
def start(self, integration_us_request, seconds, sync_speed=.8):
"""Starts sampling.
@@ -585,8 +650,18 @@ class powerlog(object):
# CSV header
if self._print_raw_data:
title = "ts:%dus" % integration_us
- for name in self._names:
- unit = "mW" if self._use_mW else "uW"
+ for name_tuple in self._names:
+ name, ina_type = name_tuple
+
+ if ina_type == Spower.INA_POWER:
+ unit = "mW" if self._use_mW else "uW"
+ elif ina_type == Spower.INA_BUSV:
+ unit = "mV"
+ elif ina_type == Spower.INA_CURRENT:
+ unit = "uA"
+ elif ina_type == Spower.INA_SHUNTV:
+ unit = "uV"
+
title += ", %s %s" % (name, unit)
title += ", status"
logoutput(title)
@@ -625,10 +700,11 @@ class powerlog(object):
csv = "%f" % aggregate_record["ts"]
for name in self._names:
if name in aggregate_record:
- multiplier = 0.001 if self._use_mW else 1
- power = aggregate_record[name] * multiplier
- csv += ", %.2f" % power
- self._data.AddValue(name, power)
+ multiplier = 0.001 if (self._use_mW and
+ name[1]==Spower.INA_POWER) else 1
+ value = aggregate_record[name] * multiplier
+ csv += ", %.2f" % value
+ self._data.AddValue(name, value)
else:
csv += ", "
csv += ", %d" % aggregate_record["status"]