summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWouter Bolsterlee <uws@xs4all.nl>2013-11-08 22:13:23 +0100
committerWouter Bolsterlee <uws@xs4all.nl>2013-11-08 22:13:23 +0100
commitaea6980ea181912c0c8cdd61763bc5110c28a445 (patch)
tree665fa7ab4d3e5f0a88577160f51da430cf29cba3
parentd972cabf1575dfbf83d518fb886c5f58c4c4a543 (diff)
downloadhappybase-aea6980ea181912c0c8cdd61763bc5110c28a445.tar.gz
Add support for retrieving sorted columns in HBase >= 0.96
This is possible with the HBase 0.96 Thrift API. This feature uses a new 'sorted_columns' argument to Table.scan(). Fixes issue #39.
-rw-r--r--happybase/table.py46
-rw-r--r--tests/test_api.py19
2 files changed, 59 insertions, 6 deletions
diff --git a/happybase/table.py b/happybase/table.py
index cb41b9e..6db20ca 100644
--- a/happybase/table.py
+++ b/happybase/table.py
@@ -8,7 +8,7 @@ from operator import attrgetter
from struct import Struct
from .hbase.ttypes import TScan
-from .util import thrift_type_to_dict, str_increment
+from .util import thrift_type_to_dict, str_increment, OrderedDict
from .batch import Batch
logger = logging.getLogger(__name__)
@@ -24,6 +24,14 @@ def make_row(cell_map, include_timestamp):
return dict((cn, cellfn(cell)) for cn, cell in cell_map.iteritems())
+def make_ordered_row(sorted_columns, include_timestamp):
+ """Make a row dict for sorted column results from scans."""
+ cellfn = include_timestamp and make_cell_timestamp or make_cell
+ return OrderedDict(
+ (column.columnName, cellfn(column.cell))
+ for column in sorted_columns)
+
+
class Table(object):
"""HBase table abstraction class.
@@ -206,7 +214,8 @@ class Table(object):
def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
- include_timestamp=False, batch_size=1000, limit=None):
+ include_timestamp=False, batch_size=1000, limit=None,
+ sorted_columns=False):
"""Create a scanner for data in the table.
This method returns an iterable that can be used for looping over the
@@ -236,14 +245,26 @@ class Table(object):
If `limit` is given, at most `limit` results will be returned.
+ If `sorted_columns` is `True`, the columns in the rows returned
+ by this scanner will be retrieved in sorted_columns order, and
+ the data will be stored in `OrderedDict` instances.
+
The `batch_size` argument specifies how many results should be
retrieved per batch when retrieving results from the scanner. Only set
this to a low value (or even 1) if your data is large, since a low
batch size results in added round-trips to the server.
- **Compatibility note:** The `filter` argument is only available when
- using HBase 0.92 (or up). In HBase 0.90 compatibility mode, specifying
- a `filter` raises an exception.
+ **Compatibility notes:**
+
+ * The `filter` argument is only available when using HBase 0.92
+ (or up). In HBase 0.90 compatibility mode, specifying
+ a `filter` raises an exception.
+
+ * The `sorted_columns` argument is only available when using
+ HBase 0.96 (or up).
+
+ .. versionadded:: 0.7
+ `sorted_columns` parameter
:param str row_start: the row key to start at (inclusive)
:param str row_stop: the row key to stop at (exclusive)
@@ -263,6 +284,10 @@ class Table(object):
if limit is not None and limit < 1:
raise ValueError("'limit' must be >= 1")
+ if sorted_columns and self.connection.compat < '0.96':
+ raise NotImplementedError(
+ "'sorted_columns' is not supported in HBase >= 0.96")
+
if row_prefix is not None:
if row_start is not None or row_stop is not None:
raise TypeError(
@@ -312,6 +337,7 @@ class Table(object):
caching=batch_size,
filterString=filter,
batchSize=batch_size,
+ sortColumns=sorted_columns,
)
scan_id = self.connection.client.scannerOpenWithScan(
self.name, scan, {})
@@ -335,7 +361,15 @@ class Table(object):
n_fetched += len(items)
for n_returned, item in enumerate(items, n_returned + 1):
- yield item.row, make_row(item.columns, include_timestamp)
+ row_key = item.row
+ if sorted_columns:
+ row = make_ordered_row(item.sortedColumns,
+ include_timestamp)
+ else:
+ row = make_row(item.columns, include_timestamp)
+
+ yield row_key, row
+
if limit is not None and n_returned == limit:
return
diff --git a/tests/test_api.py b/tests/test_api.py
index bf6fda0..2d22717 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -14,6 +14,7 @@ from nose.tools import (
assert_in,
assert_is_instance,
assert_is_not_none,
+ assert_list_equal,
assert_not_in,
assert_raises,
assert_true,
@@ -427,6 +428,24 @@ def test_scan():
next(scanner)
+def test_scan_sorting():
+ if connection.compat < '0.96':
+ return # not supported
+
+ input_row = {}
+ for i in xrange(100):
+ input_row['cf1:col-%03d' % i] = ''
+ input_key = 'row-scan-sorted'
+ table.put(input_key, input_row)
+
+ scan = table.scan(row_start=input_key, sorted_columns=True)
+ key, row = next(scan)
+ assert_equal(key, input_key)
+ assert_list_equal(
+ sorted(input_row.items()),
+ row.items())
+
+
def test_delete():
row_key = 'row-test-delete'
data = {'cf1:col1': 'v1',