diff options
author | dvora-h <67596500+dvora-h@users.noreply.github.com> | 2022-07-28 14:24:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-28 14:24:17 +0300 |
commit | c94821c11e74d360ae3ccdcf9581cfe24e120a07 (patch) | |
tree | cb8dae16e14be25d443bd67480e3e4a9dae201ae /redis | |
parent | f9f9d06c9951f8536bf9321dcebc96759eae03e0 (diff) | |
download | redis-py-c94821c11e74d360ae3ccdcf9581cfe24e120a07.tar.gz |
Add support for async GRAPH module (#2273)
* Add support for async graph
* linters
* fix docstring
* Use retry mechanism in async version of Connection objects (#2271)
* fix is_connected (#2278)
* fix: workaround asyncio bug on connection reset by peer (#2259)
Fixes #2237
* Fix crash: key expire while search (#2270)
* fix expire while search
* sleep
* docs: Fix a few typos (#2274)
* docs: Fix a few typos
There are small typos in:
- redis/cluster.py
- redis/commands/core.py
- redis/ocsp.py
- tests/test_cluster.py
Fixes:
- Should read `validity` rather than `valididy`.
- Should read `reinitialize` rather than `reinitilize`.
- Should read `farthest` rather than `farest`.
- Should read `commands` rather than `comamnds`.
* Update core.py
* async_cluster: fix concurrent pipeline (#2280)
- each pipeline should create separate stacks for each node
* Add support for TIMESERIES 1.8 (#2296)
* Add support for timeseries 1.8
* fix info
* linters
* linters
* fix info test
* type hints
* linters
* Remove verbose logging from `redis-py/redis/cluster.py` (#2238)
* removed the logging module and its corresponding methods
* updated CHANGES
* except block for RedisClusterException and BusyLoadingError removed
* removed unused import (redis.exceptions.BusyLoadingError)
* empty commit to re-trigger Actions workflow
* replaced BaseException with Exception
* empty commit to re-trigger Actions workflow
* empty commit to re-trigger Actions workflow
* redundant logic removed
* re-trigger pipeline
* reverted changes
* re-trigger pipeline
* except logic changed
* redis stream example (#2269)
* redis stream example
* redis stream example on docs/examples.rst
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
* Fix: `start_id` type for `XAUTOCLAIM` (#2257)
* Changed start_id type for xautoclaim
* Added to changes
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
* Doc add timeseries example (#2267)
* DOC add timeseries example
* DOC add timeseries examples
* Apply suggestions
* Fix typo
Detention period => Retention period
Co-authored-by: Gauthier Imbert <gauthier@PC17>
* Fix warnings and resource usage problems in asyncio unittests (#2258)
* Use pytest-asyncio in auto mode
Remove overly genereric `pytestmark=pytest.mark.asyncio` causing lots of warning noise
* Use "Factories as Fixtures" test pattern for the `create_redis` fixture
this fixture is now async, avoiding teardown problems with missing event loops.
* Fix sporadic error on fast event loops, such as `--uvloop`
* Close connection, even if "username" was in kwargs
This fixes a resource usage warning in the async unittests.
* Do async cleanup of acl passwords via a fixture
* Remove unused import, fix whitespace
* Fix test with missing "await"
* Close pubsub objects after use in unittest
Use a simple fixture where possible, otherwise manually call pubsub.close()
* re-introduce `pytestmark=pytest.mark.asyncio` for python 3.6
* Use context manager to clean up connections in connection pool for unit tests
* Provide asynccontextmanager for python 3.6
* make `test_late_subscribe()` more robuste
* Catch a couple of additional leaked resources
* Graph - add counters for removed labels and properties (#2292)
* grpah - add counters for removed labels and properties
* added mock graph result set statistics
* docstrings for graph result set statistics
* format
* isort
* moved docstrings into functions
* cleaning up the readme and moving docs into readthedocs (#2291)
* cleaning up the readme and moving docs into readthedocs
* examples at the end as per pr comments
* async_cluster: fix max_connections/ssl & improve args (#2217)
* async_cluster: fix max_connections/ssl & improve args
- set proper connection_class if ssl = True
- pass max_connections/connection_class to ClusterNode
- recreate startup_nodes to properly initialize
- pass parser_class to Connection instead of changing it in on_connect
- only pass redis_connect_func if read_from_replicas = True
- add connection_error_retry_attempts parameter
- skip is_connected check in acquire_connection as it is already checked in send_packed_command
BREAKING:
- RedisCluster args except host & port are kw-only now
- RedisCluster will no longer accept unknown arguments
- RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url
- RedisCluster.require_full_coverage defaults to True
- ClusterNode args except host, port, & server_type are kw-only now
* async_cluster: remove kw-only requirement from client
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
* fix review comments
* fix
* fix review comments
* fix review comments
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: szumka <106675199+szumka@users.noreply.github.com>
Co-authored-by: Mehdi ABAAKOUK <sileht@sileht.net>
Co-authored-by: Tim Gates <tim.gates@iress.com>
Co-authored-by: Utkarsh Gupta <utkarshgupta137@gmail.com>
Co-authored-by: Nial Daly <34862917+nialdaly@users.noreply.github.com>
Co-authored-by: pedrofrazao <603718+pedrofrazao@users.noreply.github.com>
Co-authored-by: pedro.frazao <perl.pf@netcf.org>
Co-authored-by: Антон Безденежных <gamer392@yandex.ru>
Co-authored-by: Iglesys <g.imbert34@gmail.com>
Co-authored-by: Gauthier Imbert <gauthier@PC17>
Co-authored-by: Kristján Valur Jónsson <sweskman@gmail.com>
Co-authored-by: DvirDukhan <dvir@redis.com>
Diffstat (limited to 'redis')
-rw-r--r-- | redis/commands/graph/__init__.py | 129 | ||||
-rw-r--r-- | redis/commands/graph/commands.py | 146 | ||||
-rw-r--r-- | redis/commands/graph/query_result.py | 371 | ||||
-rw-r--r-- | redis/commands/redismodules.py | 14 |
4 files changed, 525 insertions, 135 deletions
diff --git a/redis/commands/graph/__init__.py b/redis/commands/graph/__init__.py index 3736195..a882dd5 100644 --- a/redis/commands/graph/__init__.py +++ b/redis/commands/graph/__init__.py @@ -1,9 +1,13 @@ from ..helpers import quote_string, random_string, stringify_param_value -from .commands import GraphCommands +from .commands import AsyncGraphCommands, GraphCommands from .edge import Edge # noqa from .node import Node # noqa from .path import Path # noqa +DB_LABELS = "DB.LABELS" +DB_RAELATIONSHIPTYPES = "DB.RELATIONSHIPTYPES" +DB_PROPERTYKEYS = "DB.PROPERTYKEYS" + class Graph(GraphCommands): """ @@ -44,25 +48,19 @@ class Graph(GraphCommands): lbls = self.labels() # Unpack data. - self._labels = [None] * len(lbls) - for i, l in enumerate(lbls): - self._labels[i] = l[0] + self._labels = [l[0] for _, l in enumerate(lbls)] def _refresh_relations(self): rels = self.relationship_types() # Unpack data. - self._relationship_types = [None] * len(rels) - for i, r in enumerate(rels): - self._relationship_types[i] = r[0] + self._relationship_types = [r[0] for _, r in enumerate(rels)] def _refresh_attributes(self): props = self.property_keys() # Unpack data. - self._properties = [None] * len(props) - for i, p in enumerate(props): - self._properties[i] = p[0] + self._properties = [p[0] for _, p in enumerate(props)] def get_label(self, idx): """ @@ -108,12 +106,12 @@ class Graph(GraphCommands): The index of the property """ try: - propertie = self._properties[idx] + p = self._properties[idx] except IndexError: # Refresh properties. self._refresh_attributes() - propertie = self._properties[idx] - return propertie + p = self._properties[idx] + return p def add_node(self, node): """ @@ -133,6 +131,8 @@ class Graph(GraphCommands): self.edges.append(edge) def _build_params_header(self, params): + if params is None: + return "" if not isinstance(params, dict): raise TypeError("'params' must be a dict") # Header starts with "CYPHER" @@ -147,16 +147,109 @@ class Graph(GraphCommands): q = f"CALL {procedure}({','.join(args)})" y = kwagrs.get("y", None) - if y: - q += f" YIELD {','.join(y)}" + if y is not None: + q += f"YIELD {','.join(y)}" return self.query(q, read_only=read_only) def labels(self): - return self.call_procedure("db.labels", read_only=True).result_set + return self.call_procedure(DB_LABELS, read_only=True).result_set def relationship_types(self): - return self.call_procedure("db.relationshipTypes", read_only=True).result_set + return self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True).result_set def property_keys(self): - return self.call_procedure("db.propertyKeys", read_only=True).result_set + return self.call_procedure(DB_PROPERTYKEYS, read_only=True).result_set + + +class AsyncGraph(Graph, AsyncGraphCommands): + """Async version for Graph""" + + async def _refresh_labels(self): + lbls = await self.labels() + + # Unpack data. + self._labels = [l[0] for _, l in enumerate(lbls)] + + async def _refresh_attributes(self): + props = await self.property_keys() + + # Unpack data. + self._properties = [p[0] for _, p in enumerate(props)] + + async def _refresh_relations(self): + rels = await self.relationship_types() + + # Unpack data. + self._relationship_types = [r[0] for _, r in enumerate(rels)] + + async def get_label(self, idx): + """ + Returns a label by it's index + + Args: + + idx: + The index of the label + """ + try: + label = self._labels[idx] + except IndexError: + # Refresh labels. + await self._refresh_labels() + label = self._labels[idx] + return label + + async def get_property(self, idx): + """ + Returns a property by it's index + + Args: + + idx: + The index of the property + """ + try: + p = self._properties[idx] + except IndexError: + # Refresh properties. + await self._refresh_attributes() + p = self._properties[idx] + return p + + async def get_relation(self, idx): + """ + Returns a relationship type by it's index + + Args: + + idx: + The index of the relation + """ + try: + relationship_type = self._relationship_types[idx] + except IndexError: + # Refresh relationship types. + await self._refresh_relations() + relationship_type = self._relationship_types[idx] + return relationship_type + + async def call_procedure(self, procedure, *args, read_only=False, **kwagrs): + args = [quote_string(arg) for arg in args] + q = f"CALL {procedure}({','.join(args)})" + + y = kwagrs.get("y", None) + if y is not None: + f"YIELD {','.join(y)}" + return await self.query(q, read_only=read_only) + + async def labels(self): + return ((await self.call_procedure(DB_LABELS, read_only=True))).result_set + + async def property_keys(self): + return (await self.call_procedure(DB_PROPERTYKEYS, read_only=True)).result_set + + async def relationship_types(self): + return ( + await self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True) + ).result_set diff --git a/redis/commands/graph/commands.py b/redis/commands/graph/commands.py index fe4224b..762ab42 100644 --- a/redis/commands/graph/commands.py +++ b/redis/commands/graph/commands.py @@ -3,7 +3,16 @@ from redis.exceptions import ResponseError from .exceptions import VersionMismatchException from .execution_plan import ExecutionPlan -from .query_result import QueryResult +from .query_result import AsyncQueryResult, QueryResult + +PROFILE_CMD = "GRAPH.PROFILE" +RO_QUERY_CMD = "GRAPH.RO_QUERY" +QUERY_CMD = "GRAPH.QUERY" +DELETE_CMD = "GRAPH.DELETE" +SLOWLOG_CMD = "GRAPH.SLOWLOG" +CONFIG_CMD = "GRAPH.CONFIG" +LIST_CMD = "GRAPH.LIST" +EXPLAIN_CMD = "GRAPH.EXPLAIN" class GraphCommands: @@ -52,33 +61,28 @@ class GraphCommands: query = q # handle query parameters - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query # construct query command # ask for compact result-set format # specify known graph version if profile: - cmd = "GRAPH.PROFILE" + cmd = PROFILE_CMD else: - cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY" + cmd = RO_QUERY_CMD if read_only else QUERY_CMD command = [cmd, self.name, query, "--compact"] # include timeout is specified - if timeout: - if not isinstance(timeout, int): - raise Exception("Timeout argument must be a positive integer") - command += ["timeout", timeout] + if isinstance(timeout, int): + command.extend(["timeout", timeout]) + elif timeout is not None: + raise Exception("Timeout argument must be a positive integer") # issue query try: response = self.execute_command(*command) return QueryResult(self, response, profile) except ResponseError as e: - if "wrong number of arguments" in str(e): - print( - "Note: RedisGraph Python requires server version 2.2.8 or above" - ) # noqa if "unknown command" in str(e) and read_only: # `GRAPH.RO_QUERY` is unavailable in older versions. return self.query(q, params, timeout, read_only=False) @@ -106,7 +110,7 @@ class GraphCommands: For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa """ self._clear_schema() - return self.execute_command("GRAPH.DELETE", self.name) + return self.execute_command(DELETE_CMD, self.name) # declared here, to override the built in redis.db.flush() def flush(self): @@ -146,7 +150,7 @@ class GraphCommands: 3. The issued query. 4. The amount of time needed for its execution, in milliseconds. """ - return self.execute_command("GRAPH.SLOWLOG", self.name) + return self.execute_command(SLOWLOG_CMD, self.name) def config(self, name, value=None, set=False): """ @@ -170,14 +174,14 @@ class GraphCommands: raise DataError( "``value`` can be provided only when ``set`` is True" ) # noqa - return self.execute_command("GRAPH.CONFIG", *params) + return self.execute_command(CONFIG_CMD, *params) def list_keys(self): """ Lists all graph keys in the keyspace. For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa """ - return self.execute_command("GRAPH.LIST") + return self.execute_command(LIST_CMD) def execution_plan(self, query, params=None): """ @@ -188,10 +192,9 @@ class GraphCommands: query: the query that will be executed params: query parameters """ - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query - plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) + plan = self.execute_command(EXPLAIN_CMD, self.name, query) if isinstance(plan[0], bytes): plan = [b.decode() for b in plan] return "\n".join(plan) @@ -206,8 +209,105 @@ class GraphCommands: query: the query that will be executed params: query parameters """ - if params is not None: - query = self._build_params_header(params) + query + query = self._build_params_header(params) + query + + plan = self.execute_command(EXPLAIN_CMD, self.name, query) + return ExecutionPlan(plan) + + +class AsyncGraphCommands(GraphCommands): + async def query(self, q, params=None, timeout=None, read_only=False, profile=False): + """ + Executes a query against the graph. + For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa + + Args: + + q : str + The query. + params : dict + Query parameters. + timeout : int + Maximum runtime for read queries in milliseconds. + read_only : bool + Executes a readonly query if set to True. + profile : bool + Return details on results produced by and time + spent in each operation. + """ + + # maintain original 'q' + query = q + + # handle query parameters + query = self._build_params_header(params) + query + + # construct query command + # ask for compact result-set format + # specify known graph version + if profile: + cmd = PROFILE_CMD + else: + cmd = RO_QUERY_CMD if read_only else QUERY_CMD + command = [cmd, self.name, query, "--compact"] + + # include timeout is specified + if isinstance(timeout, int): + command.extend(["timeout", timeout]) + elif timeout is not None: + raise Exception("Timeout argument must be a positive integer") + + # issue query + try: + response = await self.execute_command(*command) + return await AsyncQueryResult().initialize(self, response, profile) + except ResponseError as e: + if "unknown command" in str(e) and read_only: + # `GRAPH.RO_QUERY` is unavailable in older versions. + return await self.query(q, params, timeout, read_only=False) + raise e + except VersionMismatchException as e: + # client view over the graph schema is out of sync + # set client version and refresh local schema + self.version = e.version + self._refresh_schema() + # re-issue query + return await self.query(q, params, timeout, read_only) + + async def execution_plan(self, query, params=None): + """ + Get the execution plan for given query, + GRAPH.EXPLAIN returns an array of operations. + + Args: + query: the query that will be executed + params: query parameters + """ + query = self._build_params_header(params) + query - plan = self.execute_command("GRAPH.EXPLAIN", self.name, query) + plan = await self.execute_command(EXPLAIN_CMD, self.name, query) + if isinstance(plan[0], bytes): + plan = [b.decode() for b in plan] + return "\n".join(plan) + + async def explain(self, query, params=None): + """ + Get the execution plan for given query, + GRAPH.EXPLAIN returns ExecutionPlan object. + + Args: + query: the query that will be executed + params: query parameters + """ + query = self._build_params_header(params) + query + + plan = await self.execute_command(EXPLAIN_CMD, self.name, query) return ExecutionPlan(plan) + + async def flush(self): + """ + Commit the graph and reset the edges and the nodes to zero length. + """ + await self.commit() + self.nodes = {} + self.edges = [] diff --git a/redis/commands/graph/query_result.py b/redis/commands/graph/query_result.py index 3ffa664..b88b4b6 100644 --- a/redis/commands/graph/query_result.py +++ b/redis/commands/graph/query_result.py @@ -1,4 +1,6 @@ +import sys from collections import OrderedDict +from distutils.util import strtobool # from prettytable import PrettyTable from redis import ResponseError @@ -90,6 +92,9 @@ class QueryResult: self.parse_results(response) def _check_for_errors(self, response): + """ + Check if the response contains an error. + """ if isinstance(response[0], ResponseError): error = response[0] if str(error) == "version mismatch": @@ -103,6 +108,9 @@ class QueryResult: raise response[-1] def parse_results(self, raw_result_set): + """ + Parse the query execution result returned from the server. + """ self.header = self.parse_header(raw_result_set) # Empty header. @@ -112,6 +120,9 @@ class QueryResult: self.result_set = self.parse_records(raw_result_set) def parse_statistics(self, raw_statistics): + """ + Parse the statistics returned in the response. + """ self.statistics = {} # decode statistics @@ -125,31 +136,31 @@ class QueryResult: self.statistics[s] = v def parse_header(self, raw_result_set): + """ + Parse the header of the result. + """ # An array of column name/column type pairs. header = raw_result_set[0] return header def parse_records(self, raw_result_set): - records = [] - result_set = raw_result_set[1] - for row in result_set: - record = [] - for idx, cell in enumerate(row): - if self.header[idx][0] == ResultSetColumnTypes.COLUMN_SCALAR: # noqa - record.append(self.parse_scalar(cell)) - elif self.header[idx][0] == ResultSetColumnTypes.COLUMN_NODE: # noqa - record.append(self.parse_node(cell)) - elif ( - self.header[idx][0] == ResultSetColumnTypes.COLUMN_RELATION - ): # noqa - record.append(self.parse_edge(cell)) - else: - print("Unknown column type.\n") - records.append(record) + """ + Parses the result set and returns a list of records. + """ + records = [ + [ + self.parse_record_types[self.header[idx][0]](cell) + for idx, cell in enumerate(row) + ] + for row in raw_result_set[1] + ] return records def parse_entity_properties(self, props): + """ + Parse node / edge properties. + """ # [[name, value type, value] X N] properties = {} for prop in props: @@ -160,6 +171,9 @@ class QueryResult: return properties def parse_string(self, cell): + """ + Parse the cell as a string. + """ if isinstance(cell, bytes): return cell.decode() elif not isinstance(cell, str): @@ -168,6 +182,9 @@ class QueryResult: return cell def parse_node(self, cell): + """ + Parse the cell to a node. + """ # Node ID (integer), # [label string offset (integer)], # [[name, value type, value] X N] @@ -182,6 +199,9 @@ class QueryResult: return Node(node_id=node_id, label=labels, properties=properties) def parse_edge(self, cell): + """ + Parse the cell to an edge. + """ # Edge ID (integer), # reltype string offset (integer), # src node ID offset (integer), @@ -198,11 +218,17 @@ class QueryResult: ) def parse_path(self, cell): + """ + Parse the cell to a path. + """ nodes = self.parse_scalar(cell[0]) edges = self.parse_scalar(cell[1]) return Path(nodes, edges) def parse_map(self, cell): + """ + Parse the cell as a map. + """ m = OrderedDict() n_entries = len(cell) @@ -216,6 +242,9 @@ class QueryResult: return m def parse_point(self, cell): + """ + Parse the cell to point. + """ p = {} # A point is received an array of the form: [latitude, longitude] # It is returned as a map of the form: {"latitude": latitude, "longitude": longitude} # noqa @@ -223,94 +252,63 @@ class QueryResult: p["longitude"] = float(cell[1]) return p - def parse_scalar(self, cell): - scalar_type = int(cell[0]) - value = cell[1] - scalar = None - - if scalar_type == ResultSetScalarTypes.VALUE_NULL: - scalar = None - - elif scalar_type == ResultSetScalarTypes.VALUE_STRING: - scalar = self.parse_string(value) - - elif scalar_type == ResultSetScalarTypes.VALUE_INTEGER: - scalar = int(value) - - elif scalar_type == ResultSetScalarTypes.VALUE_BOOLEAN: - value = value.decode() if isinstance(value, bytes) else value - if value == "true": - scalar = True - elif value == "false": - scalar = False - else: - print("Unknown boolean type\n") - - elif scalar_type == ResultSetScalarTypes.VALUE_DOUBLE: - scalar = float(value) - - elif scalar_type == ResultSetScalarTypes.VALUE_ARRAY: - # array variable is introduced only for readability - scalar = array = value - for i in range(len(array)): - scalar[i] = self.parse_scalar(array[i]) + def parse_null(self, cell): + """ + Parse a null value. + """ + return None - elif scalar_type == ResultSetScalarTypes.VALUE_NODE: - scalar = self.parse_node(value) + def parse_integer(self, cell): + """ + Parse the integer value from the cell. + """ + return int(cell) - elif scalar_type == ResultSetScalarTypes.VALUE_EDGE: - scalar = self.parse_edge(value) + def parse_boolean(self, value): + """ + Parse the cell value as a boolean. + """ + value = value.decode() if isinstance(value, bytes) else value + try: + scalar = strtobool(value) + except ValueError: + sys.stderr.write("unknown boolean type\n") + scalar = None + return scalar - elif scalar_type == ResultSetScalarTypes.VALUE_PATH: - scalar = self.parse_path(value) + def parse_double(self, cell): + """ + Parse the cell as a double. + """ + return float(cell) - elif scalar_type == ResultSetScalarTypes.VALUE_MAP: - scalar = self.parse_map(value) + def parse_array(self, value): + """ + Parse an array of values. + """ + scalar = [self.parse_scalar(value[i]) for i in range(len(value))] + return scalar - elif scalar_type == ResultSetScalarTypes.VALUE_POINT: - scalar = self.parse_point(value) + def parse_unknown(self, cell): + """ + Parse a cell of unknown type. + """ + sys.stderr.write("Unknown type\n") + return None - elif scalar_type == ResultSetScalarTypes.VALUE_UNKNOWN: - print("Unknown scalar type\n") + def parse_scalar(self, cell): + """ + Parse a scalar value from a cell in the result set. + """ + scalar_type = int(cell[0]) + value = cell[1] + scalar = self.parse_scalar_types[scalar_type](value) return scalar def parse_profile(self, response): self.result_set = [x[0 : x.index(",")].strip() for x in response] - # """Prints the data from the query response: - # 1. First row result_set contains the columns names. - # Thus the first row in PrettyTable will contain the - # columns. - # 2. The row after that will contain the data returned, - # or 'No Data returned' if there is none. - # 3. Prints the statistics of the query. - # """ - - # def pretty_print(self): - # if not self.is_empty(): - # header = [col[1] for col in self.header] - # tbl = PrettyTable(header) - - # for row in self.result_set: - # record = [] - # for idx, cell in enumerate(row): - # if type(cell) is Node: - # record.append(cell.to_string()) - # elif type(cell) is Edge: - # record.append(cell.to_string()) - # else: - # record.append(cell) - # tbl.add_row(record) - - # if len(self.result_set) == 0: - # tbl.add_row(['No data returned.']) - - # print(str(tbl) + '\n') - - # for stat in self.statistics: - # print("%s %s" % (stat, self.statistics[stat])) - def is_empty(self): return len(self.result_set) == 0 @@ -384,3 +382,192 @@ class QueryResult: def run_time_ms(self): """Returns the server execution time of the query""" return self._get_stat(INTERNAL_EXECUTION_TIME) + + @property + def parse_scalar_types(self): + return { + ResultSetScalarTypes.VALUE_NULL: self.parse_null, + ResultSetScalarTypes.VALUE_STRING: self.parse_string, + ResultSetScalarTypes.VALUE_INTEGER: self.parse_integer, + ResultSetScalarTypes.VALUE_BOOLEAN: self.parse_boolean, + ResultSetScalarTypes.VALUE_DOUBLE: self.parse_double, + ResultSetScalarTypes.VALUE_ARRAY: self.parse_array, + ResultSetScalarTypes.VALUE_NODE: self.parse_node, + ResultSetScalarTypes.VALUE_EDGE: self.parse_edge, + ResultSetScalarTypes.VALUE_PATH: self.parse_path, + ResultSetScalarTypes.VALUE_MAP: self.parse_map, + ResultSetScalarTypes.VALUE_POINT: self.parse_point, + ResultSetScalarTypes.VALUE_UNKNOWN: self.parse_unknown, + } + + @property + def parse_record_types(self): + return { + ResultSetColumnTypes.COLUMN_SCALAR: self.parse_scalar, + ResultSetColumnTypes.COLUMN_NODE: self.parse_node, + ResultSetColumnTypes.COLUMN_RELATION: self.parse_edge, + ResultSetColumnTypes.COLUMN_UNKNOWN: self.parse_unknown, + } + + +class AsyncQueryResult(QueryResult): + """ + Async version for the QueryResult class - a class that + represents a result of the query operation. + """ + + def __init__(self): + """ + To init the class you must call self.initialize() + """ + pass + + async def initialize(self, graph, response, profile=False): + """ + Initializes the class. + Args: + + graph: + The graph on which the query was executed. + response: + The response from the server. + profile: + A boolean indicating if the query command was "GRAPH.PROFILE" + """ + self.graph = graph + self.header = [] + self.result_set = [] + + # in case of an error an exception will be raised + self._check_for_errors(response) + + if len(response) == 1: + self.parse_statistics(response[0]) + elif profile: + self.parse_profile(response) + else: + # start by parsing statistics, matches the one we have + self.parse_statistics(response[-1]) # Last element. + await self.parse_results(response) + + return self + + async def parse_node(self, cell): + """ + Parses a node from the cell. + """ + # Node ID (integer), + # [label string offset (integer)], + # [[name, value type, value] X N] + + labels = None + if len(cell[1]) > 0: + labels = [] + for inner_label in cell[1]: + labels.append(await self.graph.get_label(inner_label)) + properties = await self.parse_entity_properties(cell[2]) + node_id = int(cell[0]) + return Node(node_id=node_id, label=labels, properties=properties) + + async def parse_scalar(self, cell): + """ + Parses a scalar value from the server response. + """ + scalar_type = int(cell[0]) + value = cell[1] + try: + scalar = await self.parse_scalar_types[scalar_type](value) + except TypeError: + # Not all of the functions are async + scalar = self.parse_scalar_types[scalar_type](value) + + return scalar + + async def parse_records(self, raw_result_set): + """ + Parses the result set and returns a list of records. + """ + records = [] + for row in raw_result_set[1]: + record = [ + await self.parse_record_types[self.header[idx][0]](cell) + for idx, cell in enumerate(row) + ] + records.append(record) + + return records + + async def parse_results(self, raw_result_set): + """ + Parse the query execution result returned from the server. + """ + self.header = self.parse_header(raw_result_set) + + # Empty header. + if len(self.header) == 0: + return + + self.result_set = await self.parse_records(raw_result_set) + + async def parse_entity_properties(self, props): + """ + Parse node / edge properties. + """ + # [[name, value type, value] X N] + properties = {} + for prop in props: + prop_name = await self.graph.get_property(prop[0]) + prop_value = await self.parse_scalar(prop[1:]) + properties[prop_name] = prop_value + + return properties + + async def parse_edge(self, cell): + """ + Parse the cell to an edge. + """ + # Edge ID (integer), + # reltype string offset (integer), + # src node ID offset (integer), + # dest node ID offset (integer), + # [[name, value, value type] X N] + + edge_id = int(cell[0]) + relation = await self.graph.get_relation(cell[1]) + src_node_id = int(cell[2]) + dest_node_id = int(cell[3]) + properties = await self.parse_entity_properties(cell[4]) + return Edge( + src_node_id, relation, dest_node_id, edge_id=edge_id, properties=properties + ) + + async def parse_path(self, cell): + """ + Parse the cell to a path. + """ + nodes = await self.parse_scalar(cell[0]) + edges = await self.parse_scalar(cell[1]) + return Path(nodes, edges) + + async def parse_map(self, cell): + """ + Parse the cell to a map. + """ + m = OrderedDict() + n_entries = len(cell) + + # A map is an array of key value pairs. + # 1. key (string) + # 2. array: (value type, value) + for i in range(0, n_entries, 2): + key = self.parse_string(cell[i]) + m[key] = await self.parse_scalar(cell[i + 1]) + + return m + + async def parse_array(self, value): + """ + Parse array value. + """ + scalar = [await self.parse_scalar(value[i]) for i in range(len(value))] + return scalar diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py index 875f3fc..7e2045a 100644 --- a/redis/commands/redismodules.py +++ b/redis/commands/redismodules.py @@ -73,8 +73,8 @@ class RedisModuleCommands: return tdigest def graph(self, index_name="idx"): - """Access the timeseries namespace, providing support for - redis timeseries data. + """Access the graph namespace, providing support for + redis graph data. """ from .graph import Graph @@ -91,3 +91,13 @@ class AsyncRedisModuleCommands(RedisModuleCommands): s = AsyncSearch(client=self, index_name=index_name) return s + + def graph(self, index_name="idx"): + """Access the graph namespace, providing support for + redis graph data. + """ + + from .graph import AsyncGraph + + g = AsyncGraph(client=self, name=index_name) + return g |