summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-12-26 01:49:42 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-12-26 01:49:42 +0100
commitfaaef61c276f62f5b16f75caf7297400e7fc0523 (patch)
treee5af03497df75c7846b0c93a217f4adab0a19730 /lib/extras.py
parent0772d187e914929bf2666a1f6acac91639c18765 (diff)
parent17698c481566c0e8c1d5d65fe88e0cb7e4505957 (diff)
downloadpsycopg2-faaef61c276f62f5b16f75caf7297400e7fc0523.tar.gz
Merge branch 'master' into named-callproc
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py225
1 files changed, 190 insertions, 35 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 2713d6f..7fc853a 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -39,8 +39,28 @@ import psycopg2
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
-from psycopg2.extensions import adapt as _A
-from psycopg2.extensions import b
+from psycopg2.extensions import adapt as _A, quote_ident
+
+from psycopg2._psycopg import ( # noqa
+ REPLICATION_PHYSICAL, REPLICATION_LOGICAL,
+ ReplicationConnection as _replicationConnection,
+ ReplicationCursor as _replicationCursor,
+ ReplicationMessage)
+
+
+# expose the json adaptation stuff into the module
+from psycopg2._json import ( # noqa
+ json, Json, register_json, register_default_json, register_default_jsonb)
+
+
+# Expose range-related objects
+from psycopg2._range import ( # noqa
+ Range, NumericRange, DateRange, DateTimeRange, DateTimeTZRange,
+ register_range, RangeAdapter, RangeCaster)
+
+
+# Expose ipaddress-related objects
+from psycopg2._ipaddress import register_ipaddress # noqa
class DictCursorBase(_cursor):
@@ -106,6 +126,7 @@ class DictConnection(_connection):
kwargs.setdefault('cursor_factory', DictCursor)
return super(DictConnection, self).cursor(*args, **kwargs)
+
class DictCursor(DictCursorBase):
"""A cursor that keeps a list of column name -> index mappings."""
@@ -130,6 +151,7 @@ class DictCursor(DictCursorBase):
self.index[self.description[i][0]] = i
self._query_executed = 0
+
class DictRow(list):
"""A row object that allow by-column-name access to data."""
@@ -192,10 +214,10 @@ class DictRow(list):
# drop the crusty Py2 methods
if _sys.version_info[0] > 2:
- items = iteritems; del iteritems
- keys = iterkeys; del iterkeys
- values = itervalues; del itervalues
- del has_key
+ items = iteritems # noqa
+ keys = iterkeys # noqa
+ values = itervalues # noqa
+ del iteritems, iterkeys, itervalues, has_key
class RealDictConnection(_connection):
@@ -204,6 +226,7 @@ class RealDictConnection(_connection):
kwargs.setdefault('cursor_factory', RealDictCursor)
return super(RealDictConnection, self).cursor(*args, **kwargs)
+
class RealDictCursor(DictCursorBase):
"""A cursor that uses a real dict as the base type for rows.
@@ -233,6 +256,7 @@ class RealDictCursor(DictCursorBase):
self.column_mapping.append(self.description[i][0])
self._query_executed = 0
+
class RealDictRow(dict):
"""A `!dict` subclass representing a data record."""
@@ -265,6 +289,7 @@ class NamedTupleConnection(_connection):
kwargs.setdefault('cursor_factory', NamedTupleCursor)
return super(NamedTupleConnection, self).cursor(*args, **kwargs)
+
class NamedTupleCursor(_cursor):
"""A cursor that generates results as `~collections.namedtuple`.
@@ -369,11 +394,13 @@ class LoggingConnection(_connection):
def _logtofile(self, msg, curs):
msg = self.filter(msg, curs)
- if msg: self._logobj.write(msg + _os.linesep)
+ if msg:
+ self._logobj.write(msg + _os.linesep)
def _logtologger(self, msg, curs):
msg = self.filter(msg, curs)
- if msg: self._logobj.debug(msg)
+ if msg:
+ self._logobj.debug(msg)
def _check(self):
if not hasattr(self, '_logobj'):
@@ -385,6 +412,7 @@ class LoggingConnection(_connection):
kwargs.setdefault('cursor_factory', LoggingCursor)
return super(LoggingConnection, self).cursor(*args, **kwargs)
+
class LoggingCursor(_cursor):
"""A cursor that logs queries using its connection logging facilities."""
@@ -425,6 +453,7 @@ class MinTimeLoggingConnection(LoggingConnection):
kwargs.setdefault('cursor_factory', MinTimeLoggingCursor)
return LoggingConnection.cursor(self, *args, **kwargs)
+
class MinTimeLoggingCursor(LoggingCursor):
"""The cursor sub-class companion to `MinTimeLoggingConnection`."""
@@ -437,6 +466,133 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
+class LogicalReplicationConnection(_replicationConnection):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['replication_type'] = REPLICATION_LOGICAL
+ super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class PhysicalReplicationConnection(_replicationConnection):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['replication_type'] = REPLICATION_PHYSICAL
+ super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class StopReplication(Exception):
+ """
+ Exception used to break out of the endless loop in
+ `~ReplicationCursor.consume_stream()`.
+
+ Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
+ `~psycopg2.Error` as occurrence of this exception does not indicate an
+ error.
+ """
+ pass
+
+
+class ReplicationCursor(_replicationCursor):
+ """A cursor used for communication on replication connections."""
+
+ def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
+ """Create streaming replication slot."""
+
+ command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if output_plugin is None:
+ raise psycopg2.ProgrammingError(
+ "output plugin name is required to create "
+ "logical replication slot")
+
+ command += "LOGICAL %s" % quote_ident(output_plugin, self)
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if output_plugin is not None:
+ raise psycopg2.ProgrammingError(
+ "cannot specify output plugin name when creating "
+ "physical replication slot")
+
+ command += "PHYSICAL"
+
+ else:
+ raise psycopg2.ProgrammingError(
+ "unrecognized replication type: %s" % repr(slot_type))
+
+ self.execute(command)
+
+ def drop_replication_slot(self, slot_name):
+ """Drop streaming replication slot."""
+
+ command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
+ self.execute(command)
+
+ def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
+ timeline=0, options=None, decode=False):
+ """Start replication stream."""
+
+ command = "START_REPLICATION "
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ else:
+ raise psycopg2.ProgrammingError(
+ "slot name is required for logical replication")
+
+ command += "LOGICAL "
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
+
+ else:
+ raise psycopg2.ProgrammingError(
+ "unrecognized replication type: %s" % repr(slot_type))
+
+ if type(start_lsn) is str:
+ lsn = start_lsn.split('/')
+ lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
+ else:
+ lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF,
+ start_lsn & 0xFFFFFFFF)
+
+ command += lsn
+
+ if timeline != 0:
+ if slot_type == REPLICATION_LOGICAL:
+ raise psycopg2.ProgrammingError(
+ "cannot specify timeline for logical replication")
+
+ command += " TIMELINE %d" % timeline
+
+ if options:
+ if slot_type == REPLICATION_PHYSICAL:
+ raise psycopg2.ProgrammingError(
+ "cannot specify output plugin options for physical replication")
+
+ command += " ("
+ for k, v in options.iteritems():
+ if not command.endswith('('):
+ command += ", "
+ command += "%s %s" % (quote_ident(k, self), _A(str(v)))
+ command += ")"
+
+ self.start_replication_expert(command, decode=decode)
+
+ # allows replication cursors to be used in select.select() directly
+ def fileno(self):
+ return self.connection.fileno()
+
+
# a dbtype and adapter for Python UUID type
class UUID_adapter(object):
@@ -454,11 +610,12 @@ class UUID_adapter(object):
return self
def getquoted(self):
- return b("'%s'::uuid" % self._uuid)
+ return ("'%s'::uuid" % self._uuid).encode('utf8')
def __str__(self):
return "'%s'::uuid" % self._uuid
+
def register_uuid(oids=None, conn_or_curs=None):
"""Create the UUID type and an uuid.UUID adapter.
@@ -514,7 +671,7 @@ class Inet(object):
obj = _A(self.addr)
if hasattr(obj, 'prepare'):
obj.prepare(self._conn)
- return obj.getquoted() + b("::inet")
+ return obj.getquoted() + b"::inet"
def __conform__(self, proto):
if proto is _ext.ISQLQuote:
@@ -523,6 +680,7 @@ class Inet(object):
def __str__(self):
return str(self.addr)
+
def register_inet(oid=None, conn_or_curs=None):
"""Create the INET type and an Inet adapter.
@@ -532,6 +690,11 @@ def register_inet(oid=None, conn_or_curs=None):
:param conn_or_curs: where to register the typecaster. If not specified,
register it globally.
"""
+ import warnings
+ warnings.warn(
+ "the inet adapter is deprecated, it's not very useful",
+ DeprecationWarning)
+
if not oid:
oid1 = 869
oid2 = 1041
@@ -621,7 +784,7 @@ class HstoreAdapter(object):
def _getquoted_8(self):
"""Use the operators available in PG pre-9.0."""
if not self.wrapped:
- return b("''::hstore")
+ return b"''::hstore"
adapt = _ext.adapt
rv = []
@@ -635,23 +798,23 @@ class HstoreAdapter(object):
v.prepare(self.conn)
v = v.getquoted()
else:
- v = b('NULL')
+ v = b'NULL'
# XXX this b'ing is painfully inefficient!
- rv.append(b("(") + k + b(" => ") + v + b(")"))
+ rv.append(b"(" + k + b" => " + v + b")")
- return b("(") + b('||').join(rv) + b(")")
+ return b"(" + b'||'.join(rv) + b")"
def _getquoted_9(self):
"""Use the hstore(text[], text[]) function."""
if not self.wrapped:
- return b("''::hstore")
+ return b"''::hstore"
k = _ext.adapt(self.wrapped.keys())
k.prepare(self.conn)
v = _ext.adapt(self.wrapped.values())
v.prepare(self.conn)
- return b("hstore(") + k.getquoted() + b(", ") + v.getquoted() + b(")")
+ return b"hstore(" + k.getquoted() + b", " + v.getquoted() + b")"
getquoted = _getquoted_9
@@ -742,9 +905,10 @@ WHERE typname = 'hstore';
return tuple(rv0), tuple(rv1)
+
def register_hstore(conn_or_curs, globally=False, unicode=False,
- oid=None, array_oid=None):
- """Register adapter and typecaster for `!dict`\-\ |hstore| conversions.
+ oid=None, array_oid=None):
+ r"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions.
:param conn_or_curs: a connection or cursor: the typecaster will be
registered only on this object unless *globally* is set to `!True`
@@ -822,8 +986,8 @@ class CompositeCaster(object):
self.oid = oid
self.array_oid = array_oid
- self.attnames = [ a[0] for a in attrs ]
- self.atttypes = [ a[1] for a in attrs ]
+ self.attnames = [a[0] for a in attrs]
+ self.atttypes = [a[1] for a in attrs]
self._create_type(name, self.attnames)
self.typecaster = _ext.new_type((oid,), name, self.parse)
if array_oid:
@@ -842,8 +1006,8 @@ class CompositeCaster(object):
"expecting %d components for the type %s, %d found instead" %
(len(self.atttypes), self.name, len(tokens)))
- values = [ curs.cast(oid, token)
- for oid, token in zip(self.atttypes, tokens) ]
+ values = [curs.cast(oid, token)
+ for oid, token in zip(self.atttypes, tokens)]
return self.make(values)
@@ -937,11 +1101,12 @@ ORDER BY attnum;
type_oid = recs[0][0]
array_oid = recs[0][1]
- type_attrs = [ (r[2], r[3]) for r in recs ]
+ type_attrs = [(r[2], r[3]) for r in recs]
return self(tname, type_oid, type_attrs,
array_oid=array_oid, schema=schema)
+
def register_composite(name, conn_or_curs, globally=False, factory=None):
"""Register a typecaster to convert a composite type into a tuple.
@@ -964,17 +1129,7 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
_ext.register_type(caster.typecaster, not globally and conn_or_curs or None)
if caster.array_typecaster is not None:
- _ext.register_type(caster.array_typecaster, not globally and conn_or_curs or None)
+ _ext.register_type(
+ caster.array_typecaster, not globally and conn_or_curs or None)
return caster
-
-
-# expose the json adaptation stuff into the module
-from psycopg2._json import json, Json, register_json
-from psycopg2._json import register_default_json, register_default_jsonb
-
-
-# Expose range-related objects
-from psycopg2._range import Range, NumericRange
-from psycopg2._range import DateRange, DateTimeRange, DateTimeTZRange
-from psycopg2._range import register_range, RangeAdapter, RangeCaster