summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim McCusker <jmccusker@5amsolutions.com>2015-05-25 23:25:00 -0400
committerJim McCusker <jmccusker@5amsolutions.com>2015-05-25 23:25:00 -0400
commit96551f9289f0cf15ecb88a83ef8de082649b1f46 (patch)
treef1256addfb10e108096ad72c38eb75fe74dd8db3
parentfe3fa522b48e787fa87dc1156e1a10bd6671b62c (diff)
downloadrdflib-96551f9289f0cf15ecb88a83ef8de082649b1f46.tar.gz
SPARQLUpdateStore writes an HTTP transaction for each and every triple addition or deletion. When adding significant amounts of data via this API, this results in a lot of network chatter and significant delays while the transactions are completed. This change batches edits up until the user calls commit() or until a read operation is performed. The result should always be the same behavior from any given client's perspective, since all edits are applied before a read occurs.
-rw-r--r--rdflib/plugins/stores/sparqlstore.py68
1 files changed, 48 insertions, 20 deletions
diff --git a/rdflib/plugins/stores/sparqlstore.py b/rdflib/plugins/stores/sparqlstore.py
index f528b1e8..77773810 100644
--- a/rdflib/plugins/stores/sparqlstore.py
+++ b/rdflib/plugins/stores/sparqlstore.py
@@ -12,6 +12,9 @@ OFFSET = 'OFFSET'
ORDERBY = 'ORDER BY'
import re
+import collections
+import urllib2
+
# import warnings
try:
from SPARQLWrapper import SPARQLWrapper, XML, POST, GET, URLENCODED, POSTDIRECTLY
@@ -525,6 +528,24 @@ class SPARQLUpdateStore(SPARQLStore):
queryEndpoint, bNodeAsURI, sparql11, context_aware, updateEndpoint=update_endpoint)
self.postAsEncoded = postAsEncoded
+ self.transaction_aware = True
+ self._edits = None
+
+ def query(self,*args, **kwargs):
+ self.commit()
+ return SPARQLStore.query(self,*args, **kwargs)
+
+ def triples(self,*args, **kwargs):
+ self.commit()
+ return SPARQLStore.triples(self,*args, **kwargs)
+
+ def contexts(self,*args, **kwargs):
+ self.commit()
+ return SPARQLStore.contexts(self,*args, **kwargs)
+
+ def __len__(self,*args, **kwargs):
+ self.commit()
+ return SPARQLStore.__len__(self,*args, **kwargs)
def open(self, configuration, create=False):
"""
@@ -547,6 +568,11 @@ class SPARQLUpdateStore(SPARQLStore):
if not self.updateEndpoint:
self.updateEndpoint = self.endpoint
+ def _transaction(self):
+ if self._edits == None:
+ self._edits = []
+ return self._edits
+
def __set_update_endpoint(self, update_endpoint):
self.updateEndpoint = update_endpoint
@@ -561,10 +587,18 @@ class SPARQLUpdateStore(SPARQLStore):
# Transactional interfaces
def commit(self):
- raise TypeError('The SPARQL Update store is not transaction aware!')
+ """ add(), addN(), and remove() are transactional to reduce overhead of many small edits.
+ Read and update() calls will automatically commit any outstanding edits.
+ This should behave as expected most of the time, except that alternating writes
+ and reads can degenerate to the original call-per-triple situation that originally existed.
+ """
+ if self._edits and len(self._edits) > 0:
+ r = self._do_update('\n;\n'.join(self._edits))
+ self._edits = None
+ return r
def rollback(self):
- raise TypeError('The SPARQL Update store is not transaction aware')
+ self._edits = None
def add(self, spo, context=None, quoted=False):
""" Add a triple to the store of triples. """
@@ -588,28 +622,21 @@ class SPARQLUpdateStore(SPARQLStore):
context.identifier.n3(), triple)
else:
q = "INSERT DATA { %s }" % triple
- self._do_update(q)
+ self._transaction().append(q)
def addN(self, quads):
""" Add a list of quads to the store. """
if not self.endpoint:
raise Exception("UpdateEndpoint is not set - call 'open'")
- data = ""
- for spoc in quads:
- (subject, predicate, obj, context) = spoc
-
- if ( isinstance(subject, BNode) or
- isinstance(predicate, BNode) or
- isinstance(obj, BNode) ):
- raise Exception("SPARQLStore does not support Bnodes! "
- "See http://www.w3.org/TR/sparql11-query/#BGPsparqlBNodes")
-
-
- triple = "%s %s %s ." % (subject.n3(), predicate.n3(), obj.n3())
- data += "INSERT DATA { GRAPH <%s> { %s } }\n" % (
- context.identifier, triple)
- self._do_update(data)
+ contexts = collections.defaultdict(list)
+ for subject, predicate, obj, context in quads:
+ contexts[context].append((subject,predicate,obj))
+ data = []
+ for context in contexts:
+ triples = ["%s %s %s ." % (x[0].n3(), x[1].n3(), x[2].n3()) for x in contexts[context]]
+ data.append("INSERT DATA { GRAPH <%s> { %s } }\n" % (context.identifier, '\n'.join(triples)))
+ self._transaction().extend(data)
def remove(self, spo, context):
""" Remove a triple from the store """
@@ -631,9 +658,10 @@ class SPARQLUpdateStore(SPARQLStore):
context.identifier.n3(), triple)
else:
q = "DELETE { %s } WHERE { %s } " % (triple, triple)
- self._do_update(q)
+ self._transaction().append(q)
def _do_update(self, update):
+ print update
self.resetQuery()
self.setQuery(update)
self.setMethod(POST)
@@ -703,7 +731,7 @@ class SPARQLUpdateStore(SPARQLStore):
query = self.where_pattern.sub("WHERE { " + values, query)
- self._do_update(query)
+ self._transaction().append(query)
def _insert_named_graph(self, query, query_graph):
"""