diff options
author | Wouter Bolsterlee <uws@xs4all.nl> | 2013-11-08 22:13:23 +0100 |
---|---|---|
committer | Wouter Bolsterlee <uws@xs4all.nl> | 2013-11-08 22:13:23 +0100 |
commit | aea6980ea181912c0c8cdd61763bc5110c28a445 (patch) | |
tree | 665fa7ab4d3e5f0a88577160f51da430cf29cba3 | |
parent | d972cabf1575dfbf83d518fb886c5f58c4c4a543 (diff) | |
download | happybase-aea6980ea181912c0c8cdd61763bc5110c28a445.tar.gz |
Add support for retrieving sorted columns in HBase >= 0.96
This is possible with the HBase 0.96 Thrift API. This feature uses a new
'sorted_columns' argument to Table.scan(). Fixes issue #39.
-rw-r--r-- | happybase/table.py | 46 | ||||
-rw-r--r-- | tests/test_api.py | 19 |
2 files changed, 59 insertions, 6 deletions
diff --git a/happybase/table.py b/happybase/table.py index cb41b9e..6db20ca 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -8,7 +8,7 @@ from operator import attrgetter from struct import Struct from .hbase.ttypes import TScan -from .util import thrift_type_to_dict, str_increment +from .util import thrift_type_to_dict, str_increment, OrderedDict from .batch import Batch logger = logging.getLogger(__name__) @@ -24,6 +24,14 @@ def make_row(cell_map, include_timestamp): return dict((cn, cellfn(cell)) for cn, cell in cell_map.iteritems()) +def make_ordered_row(sorted_columns, include_timestamp): + """Make a row dict for sorted column results from scans.""" + cellfn = include_timestamp and make_cell_timestamp or make_cell + return OrderedDict( + (column.columnName, cellfn(column.cell)) + for column in sorted_columns) + + class Table(object): """HBase table abstraction class. @@ -206,7 +214,8 @@ class Table(object): def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, - include_timestamp=False, batch_size=1000, limit=None): + include_timestamp=False, batch_size=1000, limit=None, + sorted_columns=False): """Create a scanner for data in the table. This method returns an iterable that can be used for looping over the @@ -236,14 +245,26 @@ class Table(object): If `limit` is given, at most `limit` results will be returned. + If `sorted_columns` is `True`, the columns in the rows returned + by this scanner will be retrieved in sorted_columns order, and + the data will be stored in `OrderedDict` instances. + The `batch_size` argument specifies how many results should be retrieved per batch when retrieving results from the scanner. Only set this to a low value (or even 1) if your data is large, since a low batch size results in added round-trips to the server. - **Compatibility note:** The `filter` argument is only available when - using HBase 0.92 (or up). In HBase 0.90 compatibility mode, specifying - a `filter` raises an exception. + **Compatibility notes:** + + * The `filter` argument is only available when using HBase 0.92 + (or up). In HBase 0.90 compatibility mode, specifying + a `filter` raises an exception. + + * The `sorted_columns` argument is only available when using + HBase 0.96 (or up). + + .. versionadded:: 0.7 + `sorted_columns` parameter :param str row_start: the row key to start at (inclusive) :param str row_stop: the row key to stop at (exclusive) @@ -263,6 +284,10 @@ class Table(object): if limit is not None and limit < 1: raise ValueError("'limit' must be >= 1") + if sorted_columns and self.connection.compat < '0.96': + raise NotImplementedError( + "'sorted_columns' is not supported in HBase >= 0.96") + if row_prefix is not None: if row_start is not None or row_stop is not None: raise TypeError( @@ -312,6 +337,7 @@ class Table(object): caching=batch_size, filterString=filter, batchSize=batch_size, + sortColumns=sorted_columns, ) scan_id = self.connection.client.scannerOpenWithScan( self.name, scan, {}) @@ -335,7 +361,15 @@ class Table(object): n_fetched += len(items) for n_returned, item in enumerate(items, n_returned + 1): - yield item.row, make_row(item.columns, include_timestamp) + row_key = item.row + if sorted_columns: + row = make_ordered_row(item.sortedColumns, + include_timestamp) + else: + row = make_row(item.columns, include_timestamp) + + yield row_key, row + if limit is not None and n_returned == limit: return diff --git a/tests/test_api.py b/tests/test_api.py index bf6fda0..2d22717 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,6 +14,7 @@ from nose.tools import ( assert_in, assert_is_instance, assert_is_not_none, + assert_list_equal, assert_not_in, assert_raises, assert_true, @@ -427,6 +428,24 @@ def test_scan(): next(scanner) +def test_scan_sorting(): + if connection.compat < '0.96': + return # not supported + + input_row = {} + for i in xrange(100): + input_row['cf1:col-%03d' % i] = '' + input_key = 'row-scan-sorted' + table.put(input_key, input_row) + + scan = table.scan(row_start=input_key, sorted_columns=True) + key, row = next(scan) + assert_equal(key, input_key) + assert_list_equal( + sorted(input_row.items()), + row.items()) + + def test_delete(): row_key = 'row-test-delete' data = {'cf1:col1': 'v1', |