summaryrefslogtreecommitdiff
path: root/ttystatus
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2010-07-12 19:13:58 +1200
committerLars Wirzenius <liw@liw.fi>2010-07-12 19:13:58 +1200
commite4f843d0ab595a97f30ed6d480814b86891e0c3d (patch)
tree3ead57697bb80e9bda24cef4b46bfcf885a4bb5a /ttystatus
parent1615ecf57b7b8b5a6a717e22cf23bdf3c5b9a5c5 (diff)
downloadpython-ttystatus-e4f843d0ab595a97f30ed6d480814b86891e0c3d.tar.gz
Add ByteSpeed.
Diffstat (limited to 'ttystatus')
-rw-r--r--ttystatus/__init__.py1
-rw-r--r--ttystatus/bytespeed.py61
-rw-r--r--ttystatus/bytespeed_tests.py54
3 files changed, 116 insertions, 0 deletions
diff --git a/ttystatus/__init__.py b/ttystatus/__init__.py
index d3b473b..64c1a6a 100644
--- a/ttystatus/__init__.py
+++ b/ttystatus/__init__.py
@@ -30,3 +30,4 @@ from percent import PercentDone
from progressbar import ProgressBar
from remtime import RemainingTime
from elapsed import ElapsedTime
+from bytespeed import ByteSpeed
diff --git a/ttystatus/bytespeed.py b/ttystatus/bytespeed.py
new file mode 100644
index 0000000..7f54b86
--- /dev/null
+++ b/ttystatus/bytespeed.py
@@ -0,0 +1,61 @@
+# Copyright 2010 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import time
+
+import ttystatus
+
+
+class ByteSpeed(ttystatus.Widget):
+
+ '''Display data size in bytes, KiB, etc.'''
+
+ def __init__(self, name):
+ self.name = name
+ self.interesting_keys = [name]
+ self._bytes = 0
+ self._started = None
+
+ def now(self):
+ '''Wrapper around time.time for unit tests to overrride.'''
+
+ return time.time()
+
+ def format(self):
+ units = (
+ (1024**4, 2, 'TiB/s'),
+ (1024**3, 2, 'GiB/s'),
+ (1024**2, 2, 'MiB/s'),
+ (1024**1, 1, 'KiB/s'),
+ )
+
+ if self._started is None:
+ return '0 B/s'
+
+ duration = self.now() - self._started
+ speed = self._bytes / duration
+
+ for factor, decimals, unit in units:
+ if speed >= factor:
+ return '%.*f %s' % (decimals,
+ float(speed) / float(factor),
+ unit)
+ return '%.0f B/s' % speed
+
+ def update(self, master, width):
+ if self._started is None:
+ self._started = self.now()
+ self._bytes = master[self.name]
diff --git a/ttystatus/bytespeed_tests.py b/ttystatus/bytespeed_tests.py
new file mode 100644
index 0000000..4c481fe
--- /dev/null
+++ b/ttystatus/bytespeed_tests.py
@@ -0,0 +1,54 @@
+# Copyright 2010 Lars Wirzenius
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import unittest
+
+import ttystatus
+
+
+class ByteSpeedTests(unittest.TestCase):
+
+ def setUp(self):
+ self.w = ttystatus.ByteSpeed('foo')
+
+ def test_formats_zero_speed_without_update(self):
+ self.assertEqual(str(self.w), '0 B/s')
+
+ def test_formats_zero_bytes_correctly(self):
+ self.w.update({ 'foo': 0 }, 999)
+ self.assertEqual(str(self.w), '0 B/s')
+
+ def test_formats_one_byte_per_second_correctly(self):
+ self.w.now = lambda: 1
+ self.w.update({ 'foo': 0 }, 999)
+ self.w.now = lambda: 2
+ self.w.update({ 'foo': 1 }, 999)
+ self.assertEqual(str(self.w), '1 B/s')
+
+ def test_formats_ten_bytes_per_second_correctly(self):
+ self.w.now = lambda: 1
+ self.w.update({ 'foo': 0 }, 999)
+ self.w.now = lambda: 11
+ self.w.update({ 'foo': 100 }, 999)
+ self.assertEqual(str(self.w), '10 B/s')
+
+ def test_formats_ten_tibs_per_second_correctly(self):
+ self.w.now = lambda: 1
+ self.w.update({ 'foo': 0 }, 999)
+ self.w.now = lambda: 2
+ self.w.update({ 'foo': 10 * 1024**4 }, 999)
+ self.assertEqual(str(self.w), '10.00 TiB/s')
+