From eaa56b7d721182541bab087a6d61304c778f7ea9 Mon Sep 17 00:00:00 2001 From: Chayim Date: Thu, 28 Oct 2021 09:57:03 +0300 Subject: redis timeseries support (#1652) --- redis/commands/helpers.py | 14 + redis/commands/json/__init__.py | 4 - redis/commands/redismodules.py | 16 +- redis/commands/timeseries/__init__.py | 61 +++ redis/commands/timeseries/commands.py | 775 ++++++++++++++++++++++++++++++++++ redis/commands/timeseries/info.py | 82 ++++ redis/commands/timeseries/utils.py | 49 +++ setup.py | 1 + tests/test_timeseries.py | 593 ++++++++++++++++++++++++++ 9 files changed, 1590 insertions(+), 5 deletions(-) create mode 100644 redis/commands/timeseries/__init__.py create mode 100644 redis/commands/timeseries/commands.py create mode 100644 redis/commands/timeseries/info.py create mode 100644 redis/commands/timeseries/utils.py create mode 100644 tests/test_timeseries.py diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py index b012621..a92c025 100644 --- a/redis/commands/helpers.py +++ b/redis/commands/helpers.py @@ -23,3 +23,17 @@ def nativestr(x): def delist(x): """Given a list of binaries, return the stringified version.""" return [nativestr(obj) for obj in x] + + +def parse_to_list(response): + """Optimistally parse the response to a list. + """ + res = [] + for item in response: + try: + res.append(int(item)) + except ValueError: + res.append(nativestr(item)) + except TypeError: + res.append(None) + return res diff --git a/redis/commands/json/__init__.py b/redis/commands/json/__init__.py index 2e26de3..9783705 100644 --- a/redis/commands/json/__init__.py +++ b/redis/commands/json/__init__.py @@ -1,12 +1,8 @@ -# from typing import Optional from json import JSONDecoder, JSONEncoder -# # from redis.client import Redis - from .helpers import bulk_of_jsons from ..helpers import nativestr, delist from .commands import JSONCommands -# from ..feature import AbstractFeature class JSON(JSONCommands): diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py index 3ecce29..457a69e 100644 --- a/redis/commands/redismodules.py +++ b/redis/commands/redismodules.py @@ -27,8 +27,22 @@ class RedisModuleCommands: try: modversion = self.loaded_modules['search'] except IndexError: - raise ModuleError("rejson is not a loaded in the redis instance.") + raise ModuleError("search is not a loaded in the redis instance.") from .search import Search s = Search(client=self, version=modversion, index_name=index_name) return s + + def ts(self, index_name="idx"): + """Access the timeseries namespace, providing support for + redis timeseries data. + """ + try: + modversion = self.loaded_modules['timeseries'] + except IndexError: + raise ModuleError("timeseries is not a loaded in " + "the redis instance.") + + from .timeseries import TimeSeries + s = TimeSeries(client=self, version=modversion, index_name=index_name) + return s diff --git a/redis/commands/timeseries/__init__.py b/redis/commands/timeseries/__init__.py new file mode 100644 index 0000000..db9c3a5 --- /dev/null +++ b/redis/commands/timeseries/__init__.py @@ -0,0 +1,61 @@ +from redis.client import bool_ok + +from .utils import ( + parse_range, + parse_get, + parse_m_range, + parse_m_get, +) +from .info import TSInfo +from ..helpers import parse_to_list +from .commands import ( + ALTER_CMD, + CREATE_CMD, + CREATERULE_CMD, + DELETERULE_CMD, + DEL_CMD, + GET_CMD, + INFO_CMD, + MGET_CMD, + MRANGE_CMD, + MREVRANGE_CMD, + QUERYINDEX_CMD, + RANGE_CMD, + REVRANGE_CMD, + TimeSeriesCommands, +) + + +class TimeSeries(TimeSeriesCommands): + """ + This class subclasses redis-py's `Redis` and implements RedisTimeSeries's + commands (prefixed with "ts"). + The client allows to interact with RedisTimeSeries and use all of it's + functionality. + """ + + def __init__(self, client=None, version=None, **kwargs): + """Create a new RedisTimeSeries client.""" + # Set the module commands' callbacks + MODULE_CALLBACKS = { + CREATE_CMD: bool_ok, + ALTER_CMD: bool_ok, + CREATERULE_CMD: bool_ok, + DEL_CMD: int, + DELETERULE_CMD: bool_ok, + RANGE_CMD: parse_range, + REVRANGE_CMD: parse_range, + MRANGE_CMD: parse_m_range, + MREVRANGE_CMD: parse_m_range, + GET_CMD: parse_get, + MGET_CMD: parse_m_get, + INFO_CMD: TSInfo, + QUERYINDEX_CMD: parse_to_list, + } + + self.client = client + self.execute_command = client.execute_command + self.MODULE_VERSION = version + + for k in MODULE_CALLBACKS: + self.client.set_response_callback(k, MODULE_CALLBACKS[k]) diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py new file mode 100644 index 0000000..3b9ee0f --- /dev/null +++ b/redis/commands/timeseries/commands.py @@ -0,0 +1,775 @@ +from redis.exceptions import DataError + + +ADD_CMD = "TS.ADD" +ALTER_CMD = "TS.ALTER" +CREATERULE_CMD = "TS.CREATERULE" +CREATE_CMD = "TS.CREATE" +DECRBY_CMD = "TS.DECRBY" +DELETERULE_CMD = "TS.DELETERULE" +DEL_CMD = "TS.DEL" +GET_CMD = "TS.GET" +INCRBY_CMD = "TS.INCRBY" +INFO_CMD = "TS.INFO" +MADD_CMD = "TS.MADD" +MGET_CMD = "TS.MGET" +MRANGE_CMD = "TS.MRANGE" +MREVRANGE_CMD = "TS.MREVRANGE" +QUERYINDEX_CMD = "TS.QUERYINDEX" +RANGE_CMD = "TS.RANGE" +REVRANGE_CMD = "TS.REVRANGE" + + +class TimeSeriesCommands: + """RedisTimeSeries Commands.""" + + def create(self, key, **kwargs): + """ + Create a new time-series. + For more information see + `TS.CREATE `_. # noqa + + Args: + + key: + time-series key + retention_msecs: + Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: + Since RedisTimeSeries v1.2, both timestamps and values are + compressed by default. + Adding this flag will keep data in an uncompressed form. + Compression not only saves + memory but usually improve performance due to lower number + of memory accesses. + labels: + Set of label-value pairs that represent metadata labels of the key. + chunk_size: + Each time-serie uses chunks of memory of fixed size for + time series samples. + You can alter the default TSDB chunk size by passing the + chunk_size argument (in Bytes). + duplicate_policy: + Since RedisTimeSeries v1.4 you can specify the duplicate sample policy + ( Configure what to do on duplicate sample. ) + Can be one of: + - 'block': an error will occur for any out of order sample. + - 'first': ignore the new value. + - 'last': override with latest value. + - 'min': only override if the value is lower than the existing value. + - 'max': only override if the value is higher than the existing value. + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get("retention_msecs", None) + uncompressed = kwargs.get("uncompressed", False) + labels = kwargs.get("labels", {}) + chunk_size = kwargs.get("chunk_size", None) + duplicate_policy = kwargs.get("duplicate_policy", None) + params = [key] + self._appendRetention(params, retention_msecs) + self._appendUncompressed(params, uncompressed) + self._appendChunkSize(params, chunk_size) + self._appendDuplicatePolicy(params, CREATE_CMD, duplicate_policy) + self._appendLabels(params, labels) + + return self.execute_command(CREATE_CMD, *params) + + def alter(self, key, **kwargs): + """ + Update the retention, labels of an existing key. + For more information see + `TS.ALTER `_. # noqa + + The parameters are the same as TS.CREATE. + """ + retention_msecs = kwargs.get("retention_msecs", None) + labels = kwargs.get("labels", {}) + duplicate_policy = kwargs.get("duplicate_policy", None) + params = [key] + self._appendRetention(params, retention_msecs) + self._appendDuplicatePolicy(params, ALTER_CMD, duplicate_policy) + self._appendLabels(params, labels) + + return self.execute_command(ALTER_CMD, *params) + + def add(self, key, timestamp, value, **kwargs): + """ + Append (or create and append) a new sample to the series. + For more information see + `TS.ADD `_. # noqa + + Args: + + key: + time-series key + timestamp: + Timestamp of the sample. * can be used for automatic timestamp (using the system clock). + value: + Numeric data value of the sample + retention_msecs: + Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: + Since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses. + labels: + Set of label-value pairs that represent metadata labels of the key. + chunk_size: + Each time-serie uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + duplicate_policy: + Since RedisTimeSeries v1.4 you can specify the duplicate sample policy + (Configure what to do on duplicate sample). + Can be one of: + - 'block': an error will occur for any out of order sample. + - 'first': ignore the new value. + - 'last': override with latest value. + - 'min': only override if the value is lower than the existing value. + - 'max': only override if the value is higher than the existing value. + When this is not set, the server-wide default will be used. + """ + retention_msecs = kwargs.get("retention_msecs", None) + uncompressed = kwargs.get("uncompressed", False) + labels = kwargs.get("labels", {}) + chunk_size = kwargs.get("chunk_size", None) + duplicate_policy = kwargs.get("duplicate_policy", None) + params = [key, timestamp, value] + self._appendRetention(params, retention_msecs) + self._appendUncompressed(params, uncompressed) + self._appendChunkSize(params, chunk_size) + self._appendDuplicatePolicy(params, ADD_CMD, duplicate_policy) + self._appendLabels(params, labels) + + return self.execute_command(ADD_CMD, *params) + + def madd(self, ktv_tuples): + """ + Append (or create and append) a new `value` to series + `key` with `timestamp`. + Expects a list of `tuples` as (`key`,`timestamp`, `value`). + Return value is an array with timestamps of insertions. + For more information see + `TS.MADD `_. # noqa + """ + params = [] + for ktv in ktv_tuples: + for item in ktv: + params.append(item) + + return self.execute_command(MADD_CMD, *params) + + def incrby(self, key, value, **kwargs): + """ + Increment (or create an time-series and increment) the latest + sample's of a series. + This command can be used as a counter or gauge that automatically gets + history as a time series. + For more information see + `TS.INCRBY `_. # noqa + + Args: + + key: + time-series key + value: + Numeric data value of the sample + timestamp: + Timestamp of the sample. None can be used for automatic timestamp (using the system clock). + retention_msecs: + Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: + Since RedisTimeSeries v1.2, both timestamps and values are compressed by default. + Adding this flag will keep data in an uncompressed form. Compression not only saves + memory but usually improve performance due to lower number of memory accesses. + labels: + Set of label-value pairs that represent metadata labels of the key. + chunk_size: + Each time-series uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get("timestamp", None) + retention_msecs = kwargs.get("retention_msecs", None) + uncompressed = kwargs.get("uncompressed", False) + labels = kwargs.get("labels", {}) + chunk_size = kwargs.get("chunk_size", None) + params = [key, value] + self._appendTimestamp(params, timestamp) + self._appendRetention(params, retention_msecs) + self._appendUncompressed(params, uncompressed) + self._appendChunkSize(params, chunk_size) + self._appendLabels(params, labels) + + return self.execute_command(INCRBY_CMD, *params) + + def decrby(self, key, value, **kwargs): + """ + Decrement (or create an time-series and decrement) the + latest sample's of a series. + This command can be used as a counter or gauge that + automatically gets history as a time series. + For more information see + `TS.DECRBY `_. # noqa + + Args: + + key: + time-series key + value: + Numeric data value of the sample + timestamp: + Timestamp of the sample. None can be used for automatic + timestamp (using the system clock). + retention_msecs: + Maximum age for samples compared to last event time (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + uncompressed: + Since RedisTimeSeries v1.2, both timestamps and values are + compressed by default. + Adding this flag will keep data in an uncompressed form. + Compression not only saves + memory but usually improve performance due to lower number + of memory accesses. + labels: + Set of label-value pairs that represent metadata labels of the key. + chunk_size: + Each time-series uses chunks of memory of fixed size for time series samples. + You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + """ + timestamp = kwargs.get("timestamp", None) + retention_msecs = kwargs.get("retention_msecs", None) + uncompressed = kwargs.get("uncompressed", False) + labels = kwargs.get("labels", {}) + chunk_size = kwargs.get("chunk_size", None) + params = [key, value] + self._appendTimestamp(params, timestamp) + self._appendRetention(params, retention_msecs) + self._appendUncompressed(params, uncompressed) + self._appendChunkSize(params, chunk_size) + self._appendLabels(params, labels) + + return self.execute_command(DECRBY_CMD, *params) + + def delete(self, key, from_time, to_time): + """ + Delete data points for a given timeseries and interval range + in the form of start and end delete timestamps. + The given timestamp interval is closed (inclusive), meaning start + and end data points will also be deleted. + Return the count for deleted items. + For more information see + `TS.DEL `_. # noqa + + Args: + + key: + time-series key. + from_time: + Start timestamp for the range deletion. + to_time: + End timestamp for the range deletion. + """ + return self.execute_command(DEL_CMD, key, from_time, to_time) + + def createrule( + self, + source_key, + dest_key, + aggregation_type, + bucket_size_msec + ): + """ + Create a compaction rule from values added to `source_key` into `dest_key`. + Aggregating for `bucket_size_msec` where an `aggregation_type` can be + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, + `std.p`, `std.s`, `var.p`, `var.s`] + For more information see + `TS.CREATERULE `_. # noqa + """ + params = [source_key, dest_key] + self._appendAggregation(params, aggregation_type, bucket_size_msec) + + return self.execute_command(CREATERULE_CMD, *params) + + def deleterule(self, source_key, dest_key): + """ + Delete a compaction rule. + For more information see + `TS.DELETERULE `_. # noqa + """ + return self.execute_command(DELETERULE_CMD, source_key, dest_key) + + def __range_params( + self, + key, + from_time, + to_time, + count, + aggregation_type, + bucket_size_msec, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + align, + ): + """Create TS.RANGE and TS.REVRANGE arguments.""" + params = [key, from_time, to_time] + self._appendFilerByTs(params, filter_by_ts) + self._appendFilerByValue( + params, + filter_by_min_value, + filter_by_max_value + ) + self._appendCount(params, count) + self._appendAlign(params, align) + self._appendAggregation(params, aggregation_type, bucket_size_msec) + + return params + + def range( + self, + key, + from_time, + to_time, + count=None, + aggregation_type=None, + bucket_size_msec=0, + filter_by_ts=None, + filter_by_min_value=None, + filter_by_max_value=None, + align=None, + ): + """ + Query a range in forward direction for a specific time-serie. + For more information see + `TS.RANGE `_. # noqa + + Args: + + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. - can be used to express + the minimum possible timestamp (0). + to_time: + End timestamp for range query, + can be used to express the + maximum possible timestamp. + count: + Optional maximum number of returned results. + aggregation_type: + Optional aggregation type. Can be one of + [`avg`, `sum`, `min`, `max`, `range`, `count`, + `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also filter + by_max_value). + filter_by_max_value: + Filter result by maximum value (must mention also filter + by_min_value). + align: + Timestamp for alignment control for aggregation. + """ + params = self.__range_params( + key, + from_time, + to_time, + count, + aggregation_type, + bucket_size_msec, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + align, + ) + return self.execute_command(RANGE_CMD, *params) + + def revrange( + self, + key, + from_time, + to_time, + count=None, + aggregation_type=None, + bucket_size_msec=0, + filter_by_ts=None, + filter_by_min_value=None, + filter_by_max_value=None, + align=None, + ): + """ + Query a range in reverse direction for a specific time-series. + For more information see + `TS.REVRANGE `_. # noqa + + **Note**: This command is only available since RedisTimeSeries >= v1.4 + + Args: + + key: + Key name for timeseries. + from_time: + Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). + to_time: + End timestamp for range query, + can be used to express the maximum possible timestamp. + count: + Optional maximum number of returned results. + aggregation_type: + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, + `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also filter_by_max_value). + filter_by_max_value: + Filter result by maximum value (must mention also filter_by_min_value). + align: + Timestamp for alignment control for aggregation. + """ + params = self.__range_params( + key, + from_time, + to_time, + count, + aggregation_type, + bucket_size_msec, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + align, + ) + return self.execute_command(REVRANGE_CMD, *params) + + def __mrange_params( + self, + aggregation_type, + bucket_size_msec, + count, + filters, + from_time, + to_time, + with_labels, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + groupby, + reduce, + select_labels, + align, + ): + """Create TS.MRANGE and TS.MREVRANGE arguments.""" + params = [from_time, to_time] + self._appendFilerByTs(params, filter_by_ts) + self._appendFilerByValue( + params, + filter_by_min_value, + filter_by_max_value + ) + self._appendCount(params, count) + self._appendAlign(params, align) + self._appendAggregation(params, aggregation_type, bucket_size_msec) + self._appendWithLabels(params, with_labels, select_labels) + params.extend(["FILTER"]) + params += filters + self._appendGroupbyReduce(params, groupby, reduce) + return params + + def mrange( + self, + from_time, + to_time, + filters, + count=None, + aggregation_type=None, + bucket_size_msec=0, + with_labels=False, + filter_by_ts=None, + filter_by_min_value=None, + filter_by_max_value=None, + groupby=None, + reduce=None, + select_labels=None, + align=None, + ): + """ + Query a range across multiple time-series by filters in forward direction. + For more information see + `TS.MRANGE `_. # noqa + + Args: + + from_time: + Start timestamp for the range query. `-` can be used to + express the minimum possible timestamp (0). + to_time: + End timestamp for range query, `+` can be used to express + the maximum possible timestamp. + filters: + filter to match the time-series labels. + count: + Optional maximum number of returned results. + aggregation_type: + Optional aggregation type. Can be one of + [`avg`, `sum`, `min`, `max`, `range`, `count`, + `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply the label-value pairs that represent metadata + labels of the time-series. + If this argument is not set, by default, an empty Array will be + replied on the labels array position. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also + filter_by_max_value). + filter_by_max_value: + Filter result by maximum value (must mention also + filter_by_min_value). + groupby: + Grouping by fields the results (must mention also reduce). + reduce: + Applying reducer functions on each group. Can be one + of [`sum`, `min`, `max`]. + select_labels: + Include in the reply only a subset of the key-value + pair labels of a series. + align: + Timestamp for alignment control for aggregation. + """ + params = self.__mrange_params( + aggregation_type, + bucket_size_msec, + count, + filters, + from_time, + to_time, + with_labels, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + groupby, + reduce, + select_labels, + align, + ) + + return self.execute_command(MRANGE_CMD, *params) + + def mrevrange( + self, + from_time, + to_time, + filters, + count=None, + aggregation_type=None, + bucket_size_msec=0, + with_labels=False, + filter_by_ts=None, + filter_by_min_value=None, + filter_by_max_value=None, + groupby=None, + reduce=None, + select_labels=None, + align=None, + ): + """ + Query a range across multiple time-series by filters in reverse direction. + For more information see + `TS.MREVRANGE `_. # noqa + + Args: + + from_time: + Start timestamp for the range query. - can be used to express + the minimum possible timestamp (0). + to_time: + End timestamp for range query, + can be used to express + the maximum possible timestamp. + filters: + Filter to match the time-series labels. + count: + Optional maximum number of returned results. + aggregation_type: + Optional aggregation type. Can be one of + [`avg`, `sum`, `min`, `max`, `range`, `count`, + `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + bucket_size_msec: + Time bucket for aggregation in milliseconds. + with_labels: + Include in the reply the label-value pairs that represent + metadata labels + of the time-series. + If this argument is not set, by default, an empty Array + will be replied + on the labels array position. + filter_by_ts: + List of timestamps to filter the result by specific timestamps. + filter_by_min_value: + Filter result by minimum value (must mention also filter + by_max_value). + filter_by_max_value: + Filter result by maximum value (must mention also filter + by_min_value). + groupby: + Grouping by fields the results (must mention also reduce). + reduce: + Applying reducer functions on each group. Can be one + of [`sum`, `min`, `max`]. + select_labels: + Include in the reply only a subset of the key-value pair + labels of a series. + align: + Timestamp for alignment control for aggregation. + """ + params = self.__mrange_params( + aggregation_type, + bucket_size_msec, + count, + filters, + from_time, + to_time, + with_labels, + filter_by_ts, + filter_by_min_value, + filter_by_max_value, + groupby, + reduce, + select_labels, + align, + ) + + return self.execute_command(MREVRANGE_CMD, *params) + + def get(self, key): + """ # noqa + Get the last sample of `key`. + For more information see `TS.GET `_. + """ + return self.execute_command(GET_CMD, key) + + def mget(self, filters, with_labels=False): + """ # noqa + Get the last samples matching the specific `filter`. + For more information see `TS.MGET `_. + """ + params = [] + self._appendWithLabels(params, with_labels) + params.extend(["FILTER"]) + params += filters + return self.execute_command(MGET_CMD, *params) + + def info(self, key): + """ # noqa + Get information of `key`. + For more information see `TS.INFO `_. + """ + return self.execute_command(INFO_CMD, key) + + def queryindex(self, filters): + """ # noqa + Get all the keys matching the `filter` list. + For more information see `TS.QUERYINDEX `_. + """ + return self.execute_command(QUERYINDEX_CMD, *filters) + + @staticmethod + def _appendUncompressed(params, uncompressed): + """Append UNCOMPRESSED tag to params.""" + if uncompressed: + params.extend(["UNCOMPRESSED"]) + + @staticmethod + def _appendWithLabels(params, with_labels, select_labels=None): + """Append labels behavior to params.""" + if with_labels and select_labels: + raise DataError( + "with_labels and select_labels cannot be provided together." + ) + + if with_labels: + params.extend(["WITHLABELS"]) + if select_labels: + params.extend(["SELECTED_LABELS", *select_labels]) + + @staticmethod + def _appendGroupbyReduce(params, groupby, reduce): + """Append GROUPBY REDUCE property to params.""" + if groupby is not None and reduce is not None: + params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()]) + + @staticmethod + def _appendRetention(params, retention): + """Append RETENTION property to params.""" + if retention is not None: + params.extend(["RETENTION", retention]) + + @staticmethod + def _appendLabels(params, labels): + """Append LABELS property to params.""" + if labels: + params.append("LABELS") + for k, v in labels.items(): + params.extend([k, v]) + + @staticmethod + def _appendCount(params, count): + """Append COUNT property to params.""" + if count is not None: + params.extend(["COUNT", count]) + + @staticmethod + def _appendTimestamp(params, timestamp): + """Append TIMESTAMP property to params.""" + if timestamp is not None: + params.extend(["TIMESTAMP", timestamp]) + + @staticmethod + def _appendAlign(params, align): + """Append ALIGN property to params.""" + if align is not None: + params.extend(["ALIGN", align]) + + @staticmethod + def _appendAggregation(params, aggregation_type, bucket_size_msec): + """Append AGGREGATION property to params.""" + if aggregation_type is not None: + params.append("AGGREGATION") + params.extend([aggregation_type, bucket_size_msec]) + + @staticmethod + def _appendChunkSize(params, chunk_size): + """Append CHUNK_SIZE property to params.""" + if chunk_size is not None: + params.extend(["CHUNK_SIZE", chunk_size]) + + @staticmethod + def _appendDuplicatePolicy(params, command, duplicate_policy): + """Append DUPLICATE_POLICY property to params on CREATE + and ON_DUPLICATE on ADD. + """ + if duplicate_policy is not None: + if command == "TS.ADD": + params.extend(["ON_DUPLICATE", duplicate_policy]) + else: + params.extend(["DUPLICATE_POLICY", duplicate_policy]) + + @staticmethod + def _appendFilerByTs(params, ts_list): + """Append FILTER_BY_TS property to params.""" + if ts_list is not None: + params.extend(["FILTER_BY_TS", *ts_list]) + + @staticmethod + def _appendFilerByValue(params, min_value, max_value): + """Append FILTER_BY_VALUE property to params.""" + if min_value is not None and max_value is not None: + params.extend(["FILTER_BY_VALUE", min_value, max_value]) diff --git a/redis/commands/timeseries/info.py b/redis/commands/timeseries/info.py new file mode 100644 index 0000000..3b89503 --- /dev/null +++ b/redis/commands/timeseries/info.py @@ -0,0 +1,82 @@ +from .utils import list_to_dict +from ..helpers import nativestr + + +class TSInfo(object): + """ + Hold information and statistics on the time-series. + Can be created using ``tsinfo`` command + https://oss.redis.com/redistimeseries/commands/#tsinfo. + """ + + rules = [] + labels = [] + sourceKey = None + chunk_count = None + memory_usage = None + total_samples = None + retention_msecs = None + last_time_stamp = None + first_time_stamp = None + + max_samples_per_chunk = None + chunk_size = None + duplicate_policy = None + + def __init__(self, args): + """ + Hold information and statistics on the time-series. + + The supported params that can be passed as args: + + rules: + A list of compaction rules of the time series. + sourceKey: + Key name for source time series in case the current series + is a target of a rule. + chunkCount: + Number of Memory Chunks used for the time series. + memoryUsage: + Total number of bytes allocated for the time series. + totalSamples: + Total number of samples in the time series. + labels: + A list of label-value pairs that represent the metadata + labels of the time series. + retentionTime: + Retention time, in milliseconds, for the time series. + lastTimestamp: + Last timestamp present in the time series. + firstTimestamp: + First timestamp present in the time series. + maxSamplesPerChunk: + Deprecated. + chunkSize: + Amount of memory, in bytes, allocated for data. + duplicatePolicy: + Policy that will define handling of duplicate samples. + + Can read more about on + https://oss.redis.com/redistimeseries/configuration/#duplicate_policy + """ + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.rules = response["rules"] + self.source_key = response["sourceKey"] + self.chunk_count = response["chunkCount"] + self.memory_usage = response["memoryUsage"] + self.total_samples = response["totalSamples"] + self.labels = list_to_dict(response["labels"]) + self.retention_msecs = response["retentionTime"] + self.lastTimeStamp = response["lastTimestamp"] + self.first_time_stamp = response["firstTimestamp"] + if "maxSamplesPerChunk" in response: + self.max_samples_per_chunk = response["maxSamplesPerChunk"] + self.chunk_size = ( + self.max_samples_per_chunk * 16 + ) # backward compatible changes + if "chunkSize" in response: + self.chunk_size = response["chunkSize"] + if "duplicatePolicy" in response: + self.duplicate_policy = response["duplicatePolicy"] + if type(self.duplicate_policy) == bytes: + self.duplicate_policy = self.duplicate_policy.decode() diff --git a/redis/commands/timeseries/utils.py b/redis/commands/timeseries/utils.py new file mode 100644 index 0000000..c33b7c5 --- /dev/null +++ b/redis/commands/timeseries/utils.py @@ -0,0 +1,49 @@ +from ..helpers import nativestr + + +def list_to_dict(aList): + return { + nativestr(aList[i][0]): nativestr(aList[i][1]) + for i in range(len(aList))} + + +def parse_range(response): + """Parse range response. Used by TS.RANGE and TS.REVRANGE.""" + return [tuple((r[0], float(r[1]))) for r in response] + + +def parse_m_range(response): + """Parse multi range response. Used by TS.MRANGE and TS.MREVRANGE.""" + res = [] + for item in response: + res.append( + {nativestr(item[0]): + [list_to_dict(item[1]), parse_range(item[2])]}) + return sorted(res, key=lambda d: list(d.keys())) + + +def parse_get(response): + """Parse get response. Used by TS.GET.""" + if not response: + return None + return int(response[0]), float(response[1]) + + +def parse_m_get(response): + """Parse multi get response. Used by TS.MGET.""" + res = [] + for item in response: + if not item[2]: + res.append( + {nativestr(item[0]): [list_to_dict(item[1]), None, None]}) + else: + res.append( + { + nativestr(item[0]): [ + list_to_dict(item[1]), + int(item[2][0]), + float(item[2][1]), + ] + } + ) + return sorted(res, key=lambda d: list(d.keys())) diff --git a/setup.py b/setup.py index 9788d2e..d0c81b4 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ setup( "redis.commands", "redis.commands.json", "redis.commands.search", + "redis.commands.timeseries", ] ), url="https://github.com/redis/redis-py", diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py new file mode 100644 index 0000000..b2df3fe --- /dev/null +++ b/tests/test_timeseries.py @@ -0,0 +1,593 @@ +import pytest +import time +from time import sleep +from .conftest import skip_ifmodversion_lt + + +@pytest.fixture +def client(modclient): + modclient.flushdb() + return modclient + + +@pytest.mark.redismod +def testCreate(client): + assert client.ts().create(1) + assert client.ts().create(2, retention_msecs=5) + assert client.ts().create(3, labels={"Redis": "Labs"}) + assert client.ts().create(4, retention_msecs=20, labels={"Time": "Series"}) + info = client.ts().info(4) + assert 20 == info.retention_msecs + assert "Series" == info.labels["Time"] + + # Test for a chunk size of 128 Bytes + assert client.ts().create("time-serie-1", chunk_size=128) + info = client.ts().info("time-serie-1") + assert 128, info.chunk_size + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.4.0", "timeseries") +def testCreateDuplicatePolicy(client): + # Test for duplicate policy + for duplicate_policy in ["block", "last", "first", "min", "max"]: + ts_name = "time-serie-ooo-{0}".format(duplicate_policy) + assert client.ts().create(ts_name, duplicate_policy=duplicate_policy) + info = client.ts().info(ts_name) + assert duplicate_policy == info.duplicate_policy + + +@pytest.mark.redismod +def testAlter(client): + assert client.ts().create(1) + assert 0 == client.ts().info(1).retention_msecs + assert client.ts().alter(1, retention_msecs=10) + assert {} == client.ts().info(1).labels + assert 10, client.ts().info(1).retention_msecs + assert client.ts().alter(1, labels={"Time": "Series"}) + assert "Series" == client.ts().info(1).labels["Time"] + assert 10 == client.ts().info(1).retention_msecs + + +# pipe = client.ts().pipeline() +# assert pipe.create(2) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.4.0", "timeseries") +def testAlterDiplicatePolicy(client): + assert client.ts().create(1) + info = client.ts().info(1) + assert info.duplicate_policy is None + assert client.ts().alter(1, duplicate_policy="min") + info = client.ts().info(1) + assert "min" == info.duplicate_policy + + +@pytest.mark.redismod +def testAdd(client): + assert 1 == client.ts().add(1, 1, 1) + assert 2 == client.ts().add(2, 2, 3, retention_msecs=10) + assert 3 == client.ts().add(3, 3, 2, labels={"Redis": "Labs"}) + assert 4 == client.ts().add( + 4, 4, 2, retention_msecs=10, labels={"Redis": "Labs", "Time": "Series"} + ) + assert round(time.time()) == \ + round(float(client.ts().add(5, "*", 1)) / 1000) + + info = client.ts().info(4) + assert 10 == info.retention_msecs + assert "Labs" == info.labels["Redis"] + + # Test for a chunk size of 128 Bytes on TS.ADD + assert client.ts().add("time-serie-1", 1, 10.0, chunk_size=128) + info = client.ts().info("time-serie-1") + assert 128 == info.chunk_size + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.4.0", "timeseries") +def testAddDuplicatePolicy(client): + + # Test for duplicate policy BLOCK + assert 1 == client.ts().add("time-serie-add-ooo-block", 1, 5.0) + with pytest.raises(Exception): + client.ts().add( + "time-serie-add-ooo-block", + 1, + 5.0, + duplicate_policy="block" + ) + + # Test for duplicate policy LAST + assert 1 == client.ts().add("time-serie-add-ooo-last", 1, 5.0) + assert 1 == client.ts().add( + "time-serie-add-ooo-last", 1, 10.0, duplicate_policy="last" + ) + assert 10.0 == client.ts().get("time-serie-add-ooo-last")[1] + + # Test for duplicate policy FIRST + assert 1 == client.ts().add("time-serie-add-ooo-first", 1, 5.0) + assert 1 == client.ts().add( + "time-serie-add-ooo-first", 1, 10.0, duplicate_policy="first" + ) + assert 5.0 == client.ts().get("time-serie-add-ooo-first")[1] + + # Test for duplicate policy MAX + assert 1 == client.ts().add("time-serie-add-ooo-max", 1, 5.0) + assert 1 == client.ts().add( + "time-serie-add-ooo-max", 1, 10.0, duplicate_policy="max" + ) + assert 10.0 == client.ts().get("time-serie-add-ooo-max")[1] + + # Test for duplicate policy MIN + assert 1 == client.ts().add("time-serie-add-ooo-min", 1, 5.0) + assert 1 == client.ts().add( + "time-serie-add-ooo-min", 1, 10.0, duplicate_policy="min" + ) + assert 5.0 == client.ts().get("time-serie-add-ooo-min")[1] + + +@pytest.mark.redismod +def testMAdd(client): + client.ts().create("a") + assert [1, 2, 3] == \ + client.ts().madd([("a", 1, 5), ("a", 2, 10), ("a", 3, 15)]) + + +@pytest.mark.redismod +def testIncrbyDecrby(client): + for _ in range(100): + assert client.ts().incrby(1, 1) + sleep(0.001) + assert 100 == client.ts().get(1)[1] + for _ in range(100): + assert client.ts().decrby(1, 1) + sleep(0.001) + assert 0 == client.ts().get(1)[1] + + assert client.ts().incrby(2, 1.5, timestamp=5) + assert (5, 1.5) == client.ts().get(2) + assert client.ts().incrby(2, 2.25, timestamp=7) + assert (7, 3.75) == client.ts().get(2) + assert client.ts().decrby(2, 1.5, timestamp=15) + assert (15, 2.25) == client.ts().get(2) + + # Test for a chunk size of 128 Bytes on TS.INCRBY + assert client.ts().incrby("time-serie-1", 10, chunk_size=128) + info = client.ts().info("time-serie-1") + assert 128 == info.chunk_size + + # Test for a chunk size of 128 Bytes on TS.DECRBY + assert client.ts().decrby("time-serie-2", 10, chunk_size=128) + info = client.ts().info("time-serie-2") + assert 128 == info.chunk_size + + +@pytest.mark.redismod +def testCreateAndDeleteRule(client): + # test rule creation + time = 100 + client.ts().create(1) + client.ts().create(2) + client.ts().createrule(1, 2, "avg", 100) + for i in range(50): + client.ts().add(1, time + i * 2, 1) + client.ts().add(1, time + i * 2 + 1, 2) + client.ts().add(1, time * 2, 1.5) + assert round(client.ts().get(2)[1], 5) == 1.5 + info = client.ts().info(1) + assert info.rules[0][1] == 100 + + # test rule deletion + client.ts().deleterule(1, 2) + info = client.ts().info(1) + assert not info.rules + + +@pytest.mark.redismod +@skip_ifmodversion_lt("99.99.99", "timeseries") +def testDelRange(client): + try: + client.ts().delete("test", 0, 100) + except Exception as e: + assert e.__str__() != "" + + for i in range(100): + client.ts().add(1, i, i % 7) + assert 22 == client.ts().delete(1, 0, 21) + assert [] == client.ts().range(1, 0, 21) + assert [(22, 1.0)] == client.ts().range(1, 22, 22) + + +@pytest.mark.redismod +def testRange(client): + for i in range(100): + client.ts().add(1, i, i % 7) + assert 100 == len(client.ts().range(1, 0, 200)) + for i in range(100): + client.ts().add(1, i + 200, i % 7) + assert 200 == len(client.ts().range(1, 0, 500)) + # last sample isn't returned + assert 20 == len( + client.ts().range( + 1, + 0, + 500, + aggregation_type="avg", + bucket_size_msec=10 + ) + ) + assert 10 == len(client.ts().range(1, 0, 500, count=10)) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("99.99.99", "timeseries") +def testRangeAdvanced(client): + for i in range(100): + client.ts().add(1, i, i % 7) + client.ts().add(1, i + 200, i % 7) + + assert 2 == len( + client.ts().range( + 1, + 0, + 500, + filter_by_ts=[i for i in range(10, 20)], + filter_by_min_value=1, + filter_by_max_value=2, + ) + ) + assert [(0, 10.0), (10, 1.0)] == client.ts().range( + 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align="+" + ) + assert [(-5, 5.0), (5, 6.0)] == client.ts().range( + 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=5 + ) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("99.99.99", "timeseries") +def testRevRange(client): + for i in range(100): + client.ts().add(1, i, i % 7) + assert 100 == len(client.ts().range(1, 0, 200)) + for i in range(100): + client.ts().add(1, i + 200, i % 7) + assert 200 == len(client.ts().range(1, 0, 500)) + # first sample isn't returned + assert 20 == len( + client.ts().revrange( + 1, + 0, + 500, + aggregation_type="avg", + bucket_size_msec=10 + ) + ) + assert 10 == len(client.ts().revrange(1, 0, 500, count=10)) + assert 2 == len( + client.ts().revrange( + 1, + 0, + 500, + filter_by_ts=[i for i in range(10, 20)], + filter_by_min_value=1, + filter_by_max_value=2, + ) + ) + assert [(10, 1.0), (0, 10.0)] == client.ts().revrange( + 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align="+" + ) + assert [(1, 10.0), (-9, 1.0)] == client.ts().revrange( + 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=1 + ) + + +@pytest.mark.redismod +def testMultiRange(client): + client.ts().create(1, labels={"Test": "This", "team": "ny"}) + client.ts().create( + 2, + labels={"Test": "This", "Taste": "That", "team": "sf"} + ) + for i in range(100): + client.ts().add(1, i, i % 7) + client.ts().add(2, i, i % 11) + + res = client.ts().mrange(0, 200, filters=["Test=This"]) + assert 2 == len(res) + assert 100 == len(res[0]["1"][1]) + + res = client.ts().mrange(0, 200, filters=["Test=This"], count=10) + assert 10 == len(res[0]["1"][1]) + + for i in range(100): + client.ts().add(1, i + 200, i % 7) + res = client.ts().mrange( + 0, + 500, + filters=["Test=This"], + aggregation_type="avg", + bucket_size_msec=10 + ) + assert 2 == len(res) + assert 20 == len(res[0]["1"][1]) + + # test withlabels + assert {} == res[0]["1"][0] + res = client.ts().mrange(0, 200, filters=["Test=This"], with_labels=True) + assert {"Test": "This", "team": "ny"} == res[0]["1"][0] + + +@pytest.mark.redismod +@skip_ifmodversion_lt("99.99.99", "timeseries") +def testMultiRangeAdvanced(client): + client.ts().create(1, labels={"Test": "This", "team": "ny"}) + client.ts().create( + 2, + labels={"Test": "This", "Taste": "That", "team": "sf"} + ) + for i in range(100): + client.ts().add(1, i, i % 7) + client.ts().add(2, i, i % 11) + + # test with selected labels + res = client.ts().mrange( + 0, + 200, + filters=["Test=This"], + select_labels=["team"] + ) + assert {"team": "ny"} == res[0]["1"][0] + assert {"team": "sf"} == res[1]["2"][0] + + # test with filterby + res = client.ts().mrange( + 0, + 200, + filters=["Test=This"], + filter_by_ts=[i for i in range(10, 20)], + filter_by_min_value=1, + filter_by_max_value=2, + ) + assert [(15, 1.0), (16, 2.0)] == res[0]["1"][1] + + # test groupby + res = client.ts().mrange( + 0, + 3, + filters=["Test=This"], + groupby="Test", + reduce="sum" + ) + assert [(0, 0.0), (1, 2.0), (2, 4.0), (3, 6.0)] == res[0]["Test=This"][1] + res = client.ts().mrange( + 0, + 3, + filters=["Test=This"], + groupby="Test", + reduce="max" + ) + assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[0]["Test=This"][1] + res = client.ts().mrange( + 0, + 3, + filters=["Test=This"], + groupby="team", + reduce="min") + assert 2 == len(res) + assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[0]["team=ny"][1] + assert [(0, 0.0), (1, 1.0), (2, 2.0), (3, 3.0)] == res[1]["team=sf"][1] + + # test align + res = client.ts().mrange( + 0, + 10, + filters=["team=ny"], + aggregation_type="count", + bucket_size_msec=10, + align="-", + ) + assert [(0, 10.0), (10, 1.0)] == res[0]["1"][1] + res = client.ts().mrange( + 0, + 10, + filters=["team=ny"], + aggregation_type="count", + bucket_size_msec=10, + align=5, + ) + assert [(-5, 5.0), (5, 6.0)] == res[0]["1"][1] + + +@pytest.mark.redismod +@skip_ifmodversion_lt("99.99.99", "timeseries") +def testMultiReverseRange(client): + client.ts().create(1, labels={"Test": "This", "team": "ny"}) + client.ts().create( + 2, + labels={"Test": "This", "Taste": "That", "team": "sf"} + ) + for i in range(100): + client.ts().add(1, i, i % 7) + client.ts().add(2, i, i % 11) + + res = client.ts().mrange(0, 200, filters=["Test=This"]) + assert 2 == len(res) + assert 100 == len(res[0]["1"][1]) + + res = client.ts().mrange(0, 200, filters=["Test=This"], count=10) + assert 10 == len(res[0]["1"][1]) + + for i in range(100): + client.ts().add(1, i + 200, i % 7) + res = client.ts().mrevrange( + 0, + 500, + filters=["Test=This"], + aggregation_type="avg", + bucket_size_msec=10 + ) + assert 2 == len(res) + assert 20 == len(res[0]["1"][1]) + assert {} == res[0]["1"][0] + + # test withlabels + res = client.ts().mrevrange( + 0, + 200, + filters=["Test=This"], + with_labels=True + ) + assert {"Test": "This", "team": "ny"} == res[0]["1"][0] + + # test with selected labels + res = client.ts().mrevrange( + 0, + 200, + filters=["Test=This"], select_labels=["team"] + ) + assert {"team": "ny"} == res[0]["1"][0] + assert {"team": "sf"} == res[1]["2"][0] + + # test filterby + res = client.ts().mrevrange( + 0, + 200, + filters=["Test=This"], + filter_by_ts=[i for i in range(10, 20)], + filter_by_min_value=1, + filter_by_max_value=2, + ) + assert [(16, 2.0), (15, 1.0)] == res[0]["1"][1] + + # test groupby + res = client.ts().mrevrange( + 0, 3, filters=["Test=This"], groupby="Test", reduce="sum" + ) + assert [(3, 6.0), (2, 4.0), (1, 2.0), (0, 0.0)] == res[0]["Test=This"][1] + res = client.ts().mrevrange( + 0, 3, filters=["Test=This"], groupby="Test", reduce="max" + ) + assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[0]["Test=This"][1] + res = client.ts().mrevrange( + 0, 3, filters=["Test=This"], groupby="team", reduce="min" + ) + assert 2 == len(res) + assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[0]["team=ny"][1] + assert [(3, 3.0), (2, 2.0), (1, 1.0), (0, 0.0)] == res[1]["team=sf"][1] + + # test align + res = client.ts().mrevrange( + 0, + 10, + filters=["team=ny"], + aggregation_type="count", + bucket_size_msec=10, + align="-", + ) + assert [(10, 1.0), (0, 10.0)] == res[0]["1"][1] + res = client.ts().mrevrange( + 0, + 10, + filters=["team=ny"], + aggregation_type="count", + bucket_size_msec=10, + align=1, + ) + assert [(1, 10.0), (-9, 1.0)] == res[0]["1"][1] + + +@pytest.mark.redismod +def testGet(client): + name = "test" + client.ts().create(name) + assert client.ts().get(name) is None + client.ts().add(name, 2, 3) + assert 2 == client.ts().get(name)[0] + client.ts().add(name, 3, 4) + assert 4 == client.ts().get(name)[1] + + +@pytest.mark.redismod +def testMGet(client): + client.ts().create(1, labels={"Test": "This"}) + client.ts().create(2, labels={"Test": "This", "Taste": "That"}) + act_res = client.ts().mget(["Test=This"]) + exp_res = [{"1": [{}, None, None]}, {"2": [{}, None, None]}] + assert act_res == exp_res + client.ts().add(1, "*", 15) + client.ts().add(2, "*", 25) + res = client.ts().mget(["Test=This"]) + assert 15 == res[0]["1"][2] + assert 25 == res[1]["2"][2] + res = client.ts().mget(["Taste=That"]) + assert 25 == res[0]["2"][2] + + # test with_labels + assert {} == res[0]["2"][0] + res = client.ts().mget(["Taste=That"], with_labels=True) + assert {"Taste": "That", "Test": "This"} == res[0]["2"][0] + + +@pytest.mark.redismod +def testInfo(client): + client.ts().create( + 1, + retention_msecs=5, + labels={"currentLabel": "currentData"} + ) + info = client.ts().info(1) + assert 5 == info.retention_msecs + assert info.labels["currentLabel"] == "currentData" + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.4.0", "timeseries") +def testInfoDuplicatePolicy(client): + client.ts().create( + 1, + retention_msecs=5, + labels={"currentLabel": "currentData"} + ) + info = client.ts().info(1) + assert info.duplicate_policy is None + + client.ts().create("time-serie-2", duplicate_policy="min") + info = client.ts().info("time-serie-2") + assert "min" == info.duplicate_policy + + +@pytest.mark.redismod +def testQueryIndex(client): + client.ts().create(1, labels={"Test": "This"}) + client.ts().create(2, labels={"Test": "This", "Taste": "That"}) + assert 2 == len(client.ts().queryindex(["Test=This"])) + assert 1 == len(client.ts().queryindex(["Taste=That"])) + assert [2] == client.ts().queryindex(["Taste=That"]) + + +# +# @pytest.mark.redismod +# @pytest.mark.pipeline +# def testPipeline(client): +# pipeline = client.ts().pipeline() +# pipeline.create("with_pipeline") +# for i in range(100): +# pipeline.add("with_pipeline", i, 1.1 * i) +# pipeline.execute() + +# info = client.ts().info("with_pipeline") +# assert info.lastTimeStamp == 99 +# assert info.total_samples == 100 +# assert client.ts().get("with_pipeline")[1] == 99 * 1.1 + + +@pytest.mark.redismod +def testUncompressed(client): + client.ts().create("compressed") + client.ts().create("uncompressed", uncompressed=True) + compressed_info = client.ts().info("compressed") + uncompressed_info = client.ts().info("uncompressed") + assert compressed_info.memory_usage != uncompressed_info.memory_usage -- cgit v1.2.1