summaryrefslogtreecommitdiff
path: root/redis
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2022-07-28 14:24:17 +0300
committerGitHub <noreply@github.com>2022-07-28 14:24:17 +0300
commitc94821c11e74d360ae3ccdcf9581cfe24e120a07 (patch)
treecb8dae16e14be25d443bd67480e3e4a9dae201ae /redis
parentf9f9d06c9951f8536bf9321dcebc96759eae03e0 (diff)
downloadredis-py-c94821c11e74d360ae3ccdcf9581cfe24e120a07.tar.gz
Add support for async GRAPH module (#2273)
* Add support for async graph * linters * fix docstring * Use retry mechanism in async version of Connection objects (#2271) * fix is_connected (#2278) * fix: workaround asyncio bug on connection reset by peer (#2259) Fixes #2237 * Fix crash: key expire while search (#2270) * fix expire while search * sleep * docs: Fix a few typos (#2274) * docs: Fix a few typos There are small typos in: - redis/cluster.py - redis/commands/core.py - redis/ocsp.py - tests/test_cluster.py Fixes: - Should read `validity` rather than `valididy`. - Should read `reinitialize` rather than `reinitilize`. - Should read `farthest` rather than `farest`. - Should read `commands` rather than `comamnds`. * Update core.py * async_cluster: fix concurrent pipeline (#2280) - each pipeline should create separate stacks for each node * Add support for TIMESERIES 1.8 (#2296) * Add support for timeseries 1.8 * fix info * linters * linters * fix info test * type hints * linters * Remove verbose logging from `redis-py/redis/cluster.py` (#2238) * removed the logging module and its corresponding methods * updated CHANGES * except block for RedisClusterException and BusyLoadingError removed * removed unused import (redis.exceptions.BusyLoadingError) * empty commit to re-trigger Actions workflow * replaced BaseException with Exception * empty commit to re-trigger Actions workflow * empty commit to re-trigger Actions workflow * redundant logic removed * re-trigger pipeline * reverted changes * re-trigger pipeline * except logic changed * redis stream example (#2269) * redis stream example * redis stream example on docs/examples.rst Co-authored-by: pedro.frazao <perl.pf@netcf.org> * Fix: `start_id` type for `XAUTOCLAIM` (#2257) * Changed start_id type for xautoclaim * Added to changes Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> * Doc add timeseries example (#2267) * DOC add timeseries example * DOC add timeseries examples * Apply suggestions * Fix typo Detention period => Retention period Co-authored-by: Gauthier Imbert <gauthier@PC17> * Fix warnings and resource usage problems in asyncio unittests (#2258) * Use pytest-asyncio in auto mode Remove overly genereric `pytestmark=pytest.mark.asyncio` causing lots of warning noise * Use "Factories as Fixtures" test pattern for the `create_redis` fixture this fixture is now async, avoiding teardown problems with missing event loops. * Fix sporadic error on fast event loops, such as `--uvloop` * Close connection, even if "username" was in kwargs This fixes a resource usage warning in the async unittests. * Do async cleanup of acl passwords via a fixture * Remove unused import, fix whitespace * Fix test with missing "await" * Close pubsub objects after use in unittest Use a simple fixture where possible, otherwise manually call pubsub.close() * re-introduce `pytestmark=pytest.mark.asyncio` for python 3.6 * Use context manager to clean up connections in connection pool for unit tests * Provide asynccontextmanager for python 3.6 * make `test_late_subscribe()` more robuste * Catch a couple of additional leaked resources * Graph - add counters for removed labels and properties (#2292) * grpah - add counters for removed labels and properties * added mock graph result set statistics * docstrings for graph result set statistics * format * isort * moved docstrings into functions * cleaning up the readme and moving docs into readthedocs (#2291) * cleaning up the readme and moving docs into readthedocs * examples at the end as per pr comments * async_cluster: fix max_connections/ssl & improve args (#2217) * async_cluster: fix max_connections/ssl & improve args - set proper connection_class if ssl = True - pass max_connections/connection_class to ClusterNode - recreate startup_nodes to properly initialize - pass parser_class to Connection instead of changing it in on_connect - only pass redis_connect_func if read_from_replicas = True - add connection_error_retry_attempts parameter - skip is_connected check in acquire_connection as it is already checked in send_packed_command BREAKING: - RedisCluster args except host & port are kw-only now - RedisCluster will no longer accept unknown arguments - RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url - RedisCluster.require_full_coverage defaults to True - ClusterNode args except host, port, & server_type are kw-only now * async_cluster: remove kw-only requirement from client Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> * fix review comments * fix * fix review comments * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: szumka <106675199+szumka@users.noreply.github.com> Co-authored-by: Mehdi ABAAKOUK <sileht@sileht.net> Co-authored-by: Tim Gates <tim.gates@iress.com> Co-authored-by: Utkarsh Gupta <utkarshgupta137@gmail.com> Co-authored-by: Nial Daly <34862917+nialdaly@users.noreply.github.com> Co-authored-by: pedrofrazao <603718+pedrofrazao@users.noreply.github.com> Co-authored-by: pedro.frazao <perl.pf@netcf.org> Co-authored-by: Антон Безденежных <gamer392@yandex.ru> Co-authored-by: Iglesys <g.imbert34@gmail.com> Co-authored-by: Gauthier Imbert <gauthier@PC17> Co-authored-by: Kristján Valur Jónsson <sweskman@gmail.com> Co-authored-by: DvirDukhan <dvir@redis.com>
Diffstat (limited to 'redis')
-rw-r--r--redis/commands/graph/__init__.py129
-rw-r--r--redis/commands/graph/commands.py146
-rw-r--r--redis/commands/graph/query_result.py371
-rw-r--r--redis/commands/redismodules.py14
4 files changed, 525 insertions, 135 deletions
diff --git a/redis/commands/graph/__init__.py b/redis/commands/graph/__init__.py
index 3736195..a882dd5 100644
--- a/redis/commands/graph/__init__.py
+++ b/redis/commands/graph/__init__.py
@@ -1,9 +1,13 @@
from ..helpers import quote_string, random_string, stringify_param_value
-from .commands import GraphCommands
+from .commands import AsyncGraphCommands, GraphCommands
from .edge import Edge # noqa
from .node import Node # noqa
from .path import Path # noqa
+DB_LABELS = "DB.LABELS"
+DB_RAELATIONSHIPTYPES = "DB.RELATIONSHIPTYPES"
+DB_PROPERTYKEYS = "DB.PROPERTYKEYS"
+
class Graph(GraphCommands):
"""
@@ -44,25 +48,19 @@ class Graph(GraphCommands):
lbls = self.labels()
# Unpack data.
- self._labels = [None] * len(lbls)
- for i, l in enumerate(lbls):
- self._labels[i] = l[0]
+ self._labels = [l[0] for _, l in enumerate(lbls)]
def _refresh_relations(self):
rels = self.relationship_types()
# Unpack data.
- self._relationship_types = [None] * len(rels)
- for i, r in enumerate(rels):
- self._relationship_types[i] = r[0]
+ self._relationship_types = [r[0] for _, r in enumerate(rels)]
def _refresh_attributes(self):
props = self.property_keys()
# Unpack data.
- self._properties = [None] * len(props)
- for i, p in enumerate(props):
- self._properties[i] = p[0]
+ self._properties = [p[0] for _, p in enumerate(props)]
def get_label(self, idx):
"""
@@ -108,12 +106,12 @@ class Graph(GraphCommands):
The index of the property
"""
try:
- propertie = self._properties[idx]
+ p = self._properties[idx]
except IndexError:
# Refresh properties.
self._refresh_attributes()
- propertie = self._properties[idx]
- return propertie
+ p = self._properties[idx]
+ return p
def add_node(self, node):
"""
@@ -133,6 +131,8 @@ class Graph(GraphCommands):
self.edges.append(edge)
def _build_params_header(self, params):
+ if params is None:
+ return ""
if not isinstance(params, dict):
raise TypeError("'params' must be a dict")
# Header starts with "CYPHER"
@@ -147,16 +147,109 @@ class Graph(GraphCommands):
q = f"CALL {procedure}({','.join(args)})"
y = kwagrs.get("y", None)
- if y:
- q += f" YIELD {','.join(y)}"
+ if y is not None:
+ q += f"YIELD {','.join(y)}"
return self.query(q, read_only=read_only)
def labels(self):
- return self.call_procedure("db.labels", read_only=True).result_set
+ return self.call_procedure(DB_LABELS, read_only=True).result_set
def relationship_types(self):
- return self.call_procedure("db.relationshipTypes", read_only=True).result_set
+ return self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True).result_set
def property_keys(self):
- return self.call_procedure("db.propertyKeys", read_only=True).result_set
+ return self.call_procedure(DB_PROPERTYKEYS, read_only=True).result_set
+
+
+class AsyncGraph(Graph, AsyncGraphCommands):
+ """Async version for Graph"""
+
+ async def _refresh_labels(self):
+ lbls = await self.labels()
+
+ # Unpack data.
+ self._labels = [l[0] for _, l in enumerate(lbls)]
+
+ async def _refresh_attributes(self):
+ props = await self.property_keys()
+
+ # Unpack data.
+ self._properties = [p[0] for _, p in enumerate(props)]
+
+ async def _refresh_relations(self):
+ rels = await self.relationship_types()
+
+ # Unpack data.
+ self._relationship_types = [r[0] for _, r in enumerate(rels)]
+
+ async def get_label(self, idx):
+ """
+ Returns a label by it's index
+
+ Args:
+
+ idx:
+ The index of the label
+ """
+ try:
+ label = self._labels[idx]
+ except IndexError:
+ # Refresh labels.
+ await self._refresh_labels()
+ label = self._labels[idx]
+ return label
+
+ async def get_property(self, idx):
+ """
+ Returns a property by it's index
+
+ Args:
+
+ idx:
+ The index of the property
+ """
+ try:
+ p = self._properties[idx]
+ except IndexError:
+ # Refresh properties.
+ await self._refresh_attributes()
+ p = self._properties[idx]
+ return p
+
+ async def get_relation(self, idx):
+ """
+ Returns a relationship type by it's index
+
+ Args:
+
+ idx:
+ The index of the relation
+ """
+ try:
+ relationship_type = self._relationship_types[idx]
+ except IndexError:
+ # Refresh relationship types.
+ await self._refresh_relations()
+ relationship_type = self._relationship_types[idx]
+ return relationship_type
+
+ async def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
+ args = [quote_string(arg) for arg in args]
+ q = f"CALL {procedure}({','.join(args)})"
+
+ y = kwagrs.get("y", None)
+ if y is not None:
+ f"YIELD {','.join(y)}"
+ return await self.query(q, read_only=read_only)
+
+ async def labels(self):
+ return ((await self.call_procedure(DB_LABELS, read_only=True))).result_set
+
+ async def property_keys(self):
+ return (await self.call_procedure(DB_PROPERTYKEYS, read_only=True)).result_set
+
+ async def relationship_types(self):
+ return (
+ await self.call_procedure(DB_RAELATIONSHIPTYPES, read_only=True)
+ ).result_set
diff --git a/redis/commands/graph/commands.py b/redis/commands/graph/commands.py
index fe4224b..762ab42 100644
--- a/redis/commands/graph/commands.py
+++ b/redis/commands/graph/commands.py
@@ -3,7 +3,16 @@ from redis.exceptions import ResponseError
from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
-from .query_result import QueryResult
+from .query_result import AsyncQueryResult, QueryResult
+
+PROFILE_CMD = "GRAPH.PROFILE"
+RO_QUERY_CMD = "GRAPH.RO_QUERY"
+QUERY_CMD = "GRAPH.QUERY"
+DELETE_CMD = "GRAPH.DELETE"
+SLOWLOG_CMD = "GRAPH.SLOWLOG"
+CONFIG_CMD = "GRAPH.CONFIG"
+LIST_CMD = "GRAPH.LIST"
+EXPLAIN_CMD = "GRAPH.EXPLAIN"
class GraphCommands:
@@ -52,33 +61,28 @@ class GraphCommands:
query = q
# handle query parameters
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
- cmd = "GRAPH.PROFILE"
+ cmd = PROFILE_CMD
else:
- cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
+ cmd = RO_QUERY_CMD if read_only else QUERY_CMD
command = [cmd, self.name, query, "--compact"]
# include timeout is specified
- if timeout:
- if not isinstance(timeout, int):
- raise Exception("Timeout argument must be a positive integer")
- command += ["timeout", timeout]
+ if isinstance(timeout, int):
+ command.extend(["timeout", timeout])
+ elif timeout is not None:
+ raise Exception("Timeout argument must be a positive integer")
# issue query
try:
response = self.execute_command(*command)
return QueryResult(self, response, profile)
except ResponseError as e:
- if "wrong number of arguments" in str(e):
- print(
- "Note: RedisGraph Python requires server version 2.2.8 or above"
- ) # noqa
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return self.query(q, params, timeout, read_only=False)
@@ -106,7 +110,7 @@ class GraphCommands:
For more information see `DELETE <https://redis.io/commands/graph.delete>`_. # noqa
"""
self._clear_schema()
- return self.execute_command("GRAPH.DELETE", self.name)
+ return self.execute_command(DELETE_CMD, self.name)
# declared here, to override the built in redis.db.flush()
def flush(self):
@@ -146,7 +150,7 @@ class GraphCommands:
3. The issued query.
4. The amount of time needed for its execution, in milliseconds.
"""
- return self.execute_command("GRAPH.SLOWLOG", self.name)
+ return self.execute_command(SLOWLOG_CMD, self.name)
def config(self, name, value=None, set=False):
"""
@@ -170,14 +174,14 @@ class GraphCommands:
raise DataError(
"``value`` can be provided only when ``set`` is True"
) # noqa
- return self.execute_command("GRAPH.CONFIG", *params)
+ return self.execute_command(CONFIG_CMD, *params)
def list_keys(self):
"""
Lists all graph keys in the keyspace.
For more information see `GRAPH.LIST <https://redis.io/commands/graph.list>`_. # noqa
"""
- return self.execute_command("GRAPH.LIST")
+ return self.execute_command(LIST_CMD)
def execution_plan(self, query, params=None):
"""
@@ -188,10 +192,9 @@ class GraphCommands:
query: the query that will be executed
params: query parameters
"""
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
- plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
+ plan = self.execute_command(EXPLAIN_CMD, self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)
@@ -206,8 +209,105 @@ class GraphCommands:
query: the query that will be executed
params: query parameters
"""
- if params is not None:
- query = self._build_params_header(params) + query
+ query = self._build_params_header(params) + query
+
+ plan = self.execute_command(EXPLAIN_CMD, self.name, query)
+ return ExecutionPlan(plan)
+
+
+class AsyncGraphCommands(GraphCommands):
+ async def query(self, q, params=None, timeout=None, read_only=False, profile=False):
+ """
+ Executes a query against the graph.
+ For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa
+
+ Args:
+
+ q : str
+ The query.
+ params : dict
+ Query parameters.
+ timeout : int
+ Maximum runtime for read queries in milliseconds.
+ read_only : bool
+ Executes a readonly query if set to True.
+ profile : bool
+ Return details on results produced by and time
+ spent in each operation.
+ """
+
+ # maintain original 'q'
+ query = q
+
+ # handle query parameters
+ query = self._build_params_header(params) + query
+
+ # construct query command
+ # ask for compact result-set format
+ # specify known graph version
+ if profile:
+ cmd = PROFILE_CMD
+ else:
+ cmd = RO_QUERY_CMD if read_only else QUERY_CMD
+ command = [cmd, self.name, query, "--compact"]
+
+ # include timeout is specified
+ if isinstance(timeout, int):
+ command.extend(["timeout", timeout])
+ elif timeout is not None:
+ raise Exception("Timeout argument must be a positive integer")
+
+ # issue query
+ try:
+ response = await self.execute_command(*command)
+ return await AsyncQueryResult().initialize(self, response, profile)
+ except ResponseError as e:
+ if "unknown command" in str(e) and read_only:
+ # `GRAPH.RO_QUERY` is unavailable in older versions.
+ return await self.query(q, params, timeout, read_only=False)
+ raise e
+ except VersionMismatchException as e:
+ # client view over the graph schema is out of sync
+ # set client version and refresh local schema
+ self.version = e.version
+ self._refresh_schema()
+ # re-issue query
+ return await self.query(q, params, timeout, read_only)
+
+ async def execution_plan(self, query, params=None):
+ """
+ Get the execution plan for given query,
+ GRAPH.EXPLAIN returns an array of operations.
+
+ Args:
+ query: the query that will be executed
+ params: query parameters
+ """
+ query = self._build_params_header(params) + query
- plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
+ plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
+ if isinstance(plan[0], bytes):
+ plan = [b.decode() for b in plan]
+ return "\n".join(plan)
+
+ async def explain(self, query, params=None):
+ """
+ Get the execution plan for given query,
+ GRAPH.EXPLAIN returns ExecutionPlan object.
+
+ Args:
+ query: the query that will be executed
+ params: query parameters
+ """
+ query = self._build_params_header(params) + query
+
+ plan = await self.execute_command(EXPLAIN_CMD, self.name, query)
return ExecutionPlan(plan)
+
+ async def flush(self):
+ """
+ Commit the graph and reset the edges and the nodes to zero length.
+ """
+ await self.commit()
+ self.nodes = {}
+ self.edges = []
diff --git a/redis/commands/graph/query_result.py b/redis/commands/graph/query_result.py
index 3ffa664..b88b4b6 100644
--- a/redis/commands/graph/query_result.py
+++ b/redis/commands/graph/query_result.py
@@ -1,4 +1,6 @@
+import sys
from collections import OrderedDict
+from distutils.util import strtobool
# from prettytable import PrettyTable
from redis import ResponseError
@@ -90,6 +92,9 @@ class QueryResult:
self.parse_results(response)
def _check_for_errors(self, response):
+ """
+ Check if the response contains an error.
+ """
if isinstance(response[0], ResponseError):
error = response[0]
if str(error) == "version mismatch":
@@ -103,6 +108,9 @@ class QueryResult:
raise response[-1]
def parse_results(self, raw_result_set):
+ """
+ Parse the query execution result returned from the server.
+ """
self.header = self.parse_header(raw_result_set)
# Empty header.
@@ -112,6 +120,9 @@ class QueryResult:
self.result_set = self.parse_records(raw_result_set)
def parse_statistics(self, raw_statistics):
+ """
+ Parse the statistics returned in the response.
+ """
self.statistics = {}
# decode statistics
@@ -125,31 +136,31 @@ class QueryResult:
self.statistics[s] = v
def parse_header(self, raw_result_set):
+ """
+ Parse the header of the result.
+ """
# An array of column name/column type pairs.
header = raw_result_set[0]
return header
def parse_records(self, raw_result_set):
- records = []
- result_set = raw_result_set[1]
- for row in result_set:
- record = []
- for idx, cell in enumerate(row):
- if self.header[idx][0] == ResultSetColumnTypes.COLUMN_SCALAR: # noqa
- record.append(self.parse_scalar(cell))
- elif self.header[idx][0] == ResultSetColumnTypes.COLUMN_NODE: # noqa
- record.append(self.parse_node(cell))
- elif (
- self.header[idx][0] == ResultSetColumnTypes.COLUMN_RELATION
- ): # noqa
- record.append(self.parse_edge(cell))
- else:
- print("Unknown column type.\n")
- records.append(record)
+ """
+ Parses the result set and returns a list of records.
+ """
+ records = [
+ [
+ self.parse_record_types[self.header[idx][0]](cell)
+ for idx, cell in enumerate(row)
+ ]
+ for row in raw_result_set[1]
+ ]
return records
def parse_entity_properties(self, props):
+ """
+ Parse node / edge properties.
+ """
# [[name, value type, value] X N]
properties = {}
for prop in props:
@@ -160,6 +171,9 @@ class QueryResult:
return properties
def parse_string(self, cell):
+ """
+ Parse the cell as a string.
+ """
if isinstance(cell, bytes):
return cell.decode()
elif not isinstance(cell, str):
@@ -168,6 +182,9 @@ class QueryResult:
return cell
def parse_node(self, cell):
+ """
+ Parse the cell to a node.
+ """
# Node ID (integer),
# [label string offset (integer)],
# [[name, value type, value] X N]
@@ -182,6 +199,9 @@ class QueryResult:
return Node(node_id=node_id, label=labels, properties=properties)
def parse_edge(self, cell):
+ """
+ Parse the cell to an edge.
+ """
# Edge ID (integer),
# reltype string offset (integer),
# src node ID offset (integer),
@@ -198,11 +218,17 @@ class QueryResult:
)
def parse_path(self, cell):
+ """
+ Parse the cell to a path.
+ """
nodes = self.parse_scalar(cell[0])
edges = self.parse_scalar(cell[1])
return Path(nodes, edges)
def parse_map(self, cell):
+ """
+ Parse the cell as a map.
+ """
m = OrderedDict()
n_entries = len(cell)
@@ -216,6 +242,9 @@ class QueryResult:
return m
def parse_point(self, cell):
+ """
+ Parse the cell to point.
+ """
p = {}
# A point is received an array of the form: [latitude, longitude]
# It is returned as a map of the form: {"latitude": latitude, "longitude": longitude} # noqa
@@ -223,94 +252,63 @@ class QueryResult:
p["longitude"] = float(cell[1])
return p
- def parse_scalar(self, cell):
- scalar_type = int(cell[0])
- value = cell[1]
- scalar = None
-
- if scalar_type == ResultSetScalarTypes.VALUE_NULL:
- scalar = None
-
- elif scalar_type == ResultSetScalarTypes.VALUE_STRING:
- scalar = self.parse_string(value)
-
- elif scalar_type == ResultSetScalarTypes.VALUE_INTEGER:
- scalar = int(value)
-
- elif scalar_type == ResultSetScalarTypes.VALUE_BOOLEAN:
- value = value.decode() if isinstance(value, bytes) else value
- if value == "true":
- scalar = True
- elif value == "false":
- scalar = False
- else:
- print("Unknown boolean type\n")
-
- elif scalar_type == ResultSetScalarTypes.VALUE_DOUBLE:
- scalar = float(value)
-
- elif scalar_type == ResultSetScalarTypes.VALUE_ARRAY:
- # array variable is introduced only for readability
- scalar = array = value
- for i in range(len(array)):
- scalar[i] = self.parse_scalar(array[i])
+ def parse_null(self, cell):
+ """
+ Parse a null value.
+ """
+ return None
- elif scalar_type == ResultSetScalarTypes.VALUE_NODE:
- scalar = self.parse_node(value)
+ def parse_integer(self, cell):
+ """
+ Parse the integer value from the cell.
+ """
+ return int(cell)
- elif scalar_type == ResultSetScalarTypes.VALUE_EDGE:
- scalar = self.parse_edge(value)
+ def parse_boolean(self, value):
+ """
+ Parse the cell value as a boolean.
+ """
+ value = value.decode() if isinstance(value, bytes) else value
+ try:
+ scalar = strtobool(value)
+ except ValueError:
+ sys.stderr.write("unknown boolean type\n")
+ scalar = None
+ return scalar
- elif scalar_type == ResultSetScalarTypes.VALUE_PATH:
- scalar = self.parse_path(value)
+ def parse_double(self, cell):
+ """
+ Parse the cell as a double.
+ """
+ return float(cell)
- elif scalar_type == ResultSetScalarTypes.VALUE_MAP:
- scalar = self.parse_map(value)
+ def parse_array(self, value):
+ """
+ Parse an array of values.
+ """
+ scalar = [self.parse_scalar(value[i]) for i in range(len(value))]
+ return scalar
- elif scalar_type == ResultSetScalarTypes.VALUE_POINT:
- scalar = self.parse_point(value)
+ def parse_unknown(self, cell):
+ """
+ Parse a cell of unknown type.
+ """
+ sys.stderr.write("Unknown type\n")
+ return None
- elif scalar_type == ResultSetScalarTypes.VALUE_UNKNOWN:
- print("Unknown scalar type\n")
+ def parse_scalar(self, cell):
+ """
+ Parse a scalar value from a cell in the result set.
+ """
+ scalar_type = int(cell[0])
+ value = cell[1]
+ scalar = self.parse_scalar_types[scalar_type](value)
return scalar
def parse_profile(self, response):
self.result_set = [x[0 : x.index(",")].strip() for x in response]
- # """Prints the data from the query response:
- # 1. First row result_set contains the columns names.
- # Thus the first row in PrettyTable will contain the
- # columns.
- # 2. The row after that will contain the data returned,
- # or 'No Data returned' if there is none.
- # 3. Prints the statistics of the query.
- # """
-
- # def pretty_print(self):
- # if not self.is_empty():
- # header = [col[1] for col in self.header]
- # tbl = PrettyTable(header)
-
- # for row in self.result_set:
- # record = []
- # for idx, cell in enumerate(row):
- # if type(cell) is Node:
- # record.append(cell.to_string())
- # elif type(cell) is Edge:
- # record.append(cell.to_string())
- # else:
- # record.append(cell)
- # tbl.add_row(record)
-
- # if len(self.result_set) == 0:
- # tbl.add_row(['No data returned.'])
-
- # print(str(tbl) + '\n')
-
- # for stat in self.statistics:
- # print("%s %s" % (stat, self.statistics[stat]))
-
def is_empty(self):
return len(self.result_set) == 0
@@ -384,3 +382,192 @@ class QueryResult:
def run_time_ms(self):
"""Returns the server execution time of the query"""
return self._get_stat(INTERNAL_EXECUTION_TIME)
+
+ @property
+ def parse_scalar_types(self):
+ return {
+ ResultSetScalarTypes.VALUE_NULL: self.parse_null,
+ ResultSetScalarTypes.VALUE_STRING: self.parse_string,
+ ResultSetScalarTypes.VALUE_INTEGER: self.parse_integer,
+ ResultSetScalarTypes.VALUE_BOOLEAN: self.parse_boolean,
+ ResultSetScalarTypes.VALUE_DOUBLE: self.parse_double,
+ ResultSetScalarTypes.VALUE_ARRAY: self.parse_array,
+ ResultSetScalarTypes.VALUE_NODE: self.parse_node,
+ ResultSetScalarTypes.VALUE_EDGE: self.parse_edge,
+ ResultSetScalarTypes.VALUE_PATH: self.parse_path,
+ ResultSetScalarTypes.VALUE_MAP: self.parse_map,
+ ResultSetScalarTypes.VALUE_POINT: self.parse_point,
+ ResultSetScalarTypes.VALUE_UNKNOWN: self.parse_unknown,
+ }
+
+ @property
+ def parse_record_types(self):
+ return {
+ ResultSetColumnTypes.COLUMN_SCALAR: self.parse_scalar,
+ ResultSetColumnTypes.COLUMN_NODE: self.parse_node,
+ ResultSetColumnTypes.COLUMN_RELATION: self.parse_edge,
+ ResultSetColumnTypes.COLUMN_UNKNOWN: self.parse_unknown,
+ }
+
+
+class AsyncQueryResult(QueryResult):
+ """
+ Async version for the QueryResult class - a class that
+ represents a result of the query operation.
+ """
+
+ def __init__(self):
+ """
+ To init the class you must call self.initialize()
+ """
+ pass
+
+ async def initialize(self, graph, response, profile=False):
+ """
+ Initializes the class.
+ Args:
+
+ graph:
+ The graph on which the query was executed.
+ response:
+ The response from the server.
+ profile:
+ A boolean indicating if the query command was "GRAPH.PROFILE"
+ """
+ self.graph = graph
+ self.header = []
+ self.result_set = []
+
+ # in case of an error an exception will be raised
+ self._check_for_errors(response)
+
+ if len(response) == 1:
+ self.parse_statistics(response[0])
+ elif profile:
+ self.parse_profile(response)
+ else:
+ # start by parsing statistics, matches the one we have
+ self.parse_statistics(response[-1]) # Last element.
+ await self.parse_results(response)
+
+ return self
+
+ async def parse_node(self, cell):
+ """
+ Parses a node from the cell.
+ """
+ # Node ID (integer),
+ # [label string offset (integer)],
+ # [[name, value type, value] X N]
+
+ labels = None
+ if len(cell[1]) > 0:
+ labels = []
+ for inner_label in cell[1]:
+ labels.append(await self.graph.get_label(inner_label))
+ properties = await self.parse_entity_properties(cell[2])
+ node_id = int(cell[0])
+ return Node(node_id=node_id, label=labels, properties=properties)
+
+ async def parse_scalar(self, cell):
+ """
+ Parses a scalar value from the server response.
+ """
+ scalar_type = int(cell[0])
+ value = cell[1]
+ try:
+ scalar = await self.parse_scalar_types[scalar_type](value)
+ except TypeError:
+ # Not all of the functions are async
+ scalar = self.parse_scalar_types[scalar_type](value)
+
+ return scalar
+
+ async def parse_records(self, raw_result_set):
+ """
+ Parses the result set and returns a list of records.
+ """
+ records = []
+ for row in raw_result_set[1]:
+ record = [
+ await self.parse_record_types[self.header[idx][0]](cell)
+ for idx, cell in enumerate(row)
+ ]
+ records.append(record)
+
+ return records
+
+ async def parse_results(self, raw_result_set):
+ """
+ Parse the query execution result returned from the server.
+ """
+ self.header = self.parse_header(raw_result_set)
+
+ # Empty header.
+ if len(self.header) == 0:
+ return
+
+ self.result_set = await self.parse_records(raw_result_set)
+
+ async def parse_entity_properties(self, props):
+ """
+ Parse node / edge properties.
+ """
+ # [[name, value type, value] X N]
+ properties = {}
+ for prop in props:
+ prop_name = await self.graph.get_property(prop[0])
+ prop_value = await self.parse_scalar(prop[1:])
+ properties[prop_name] = prop_value
+
+ return properties
+
+ async def parse_edge(self, cell):
+ """
+ Parse the cell to an edge.
+ """
+ # Edge ID (integer),
+ # reltype string offset (integer),
+ # src node ID offset (integer),
+ # dest node ID offset (integer),
+ # [[name, value, value type] X N]
+
+ edge_id = int(cell[0])
+ relation = await self.graph.get_relation(cell[1])
+ src_node_id = int(cell[2])
+ dest_node_id = int(cell[3])
+ properties = await self.parse_entity_properties(cell[4])
+ return Edge(
+ src_node_id, relation, dest_node_id, edge_id=edge_id, properties=properties
+ )
+
+ async def parse_path(self, cell):
+ """
+ Parse the cell to a path.
+ """
+ nodes = await self.parse_scalar(cell[0])
+ edges = await self.parse_scalar(cell[1])
+ return Path(nodes, edges)
+
+ async def parse_map(self, cell):
+ """
+ Parse the cell to a map.
+ """
+ m = OrderedDict()
+ n_entries = len(cell)
+
+ # A map is an array of key value pairs.
+ # 1. key (string)
+ # 2. array: (value type, value)
+ for i in range(0, n_entries, 2):
+ key = self.parse_string(cell[i])
+ m[key] = await self.parse_scalar(cell[i + 1])
+
+ return m
+
+ async def parse_array(self, value):
+ """
+ Parse array value.
+ """
+ scalar = [await self.parse_scalar(value[i]) for i in range(len(value))]
+ return scalar
diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py
index 875f3fc..7e2045a 100644
--- a/redis/commands/redismodules.py
+++ b/redis/commands/redismodules.py
@@ -73,8 +73,8 @@ class RedisModuleCommands:
return tdigest
def graph(self, index_name="idx"):
- """Access the timeseries namespace, providing support for
- redis timeseries data.
+ """Access the graph namespace, providing support for
+ redis graph data.
"""
from .graph import Graph
@@ -91,3 +91,13 @@ class AsyncRedisModuleCommands(RedisModuleCommands):
s = AsyncSearch(client=self, index_name=index_name)
return s
+
+ def graph(self, index_name="idx"):
+ """Access the graph namespace, providing support for
+ redis graph data.
+ """
+
+ from .graph import AsyncGraph
+
+ g = AsyncGraph(client=self, name=index_name)
+ return g