diff options
author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-12-26 01:49:42 +0100 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-12-26 01:49:42 +0100 |
commit | faaef61c276f62f5b16f75caf7297400e7fc0523 (patch) | |
tree | e5af03497df75c7846b0c93a217f4adab0a19730 /lib/extras.py | |
parent | 0772d187e914929bf2666a1f6acac91639c18765 (diff) | |
parent | 17698c481566c0e8c1d5d65fe88e0cb7e4505957 (diff) | |
download | psycopg2-faaef61c276f62f5b16f75caf7297400e7fc0523.tar.gz |
Merge branch 'master' into named-callproc
Diffstat (limited to 'lib/extras.py')
-rw-r--r-- | lib/extras.py | 225 |
1 files changed, 190 insertions, 35 deletions
diff --git a/lib/extras.py b/lib/extras.py index 2713d6f..7fc853a 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -39,8 +39,28 @@ import psycopg2 from psycopg2 import extensions as _ext from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import connection as _connection -from psycopg2.extensions import adapt as _A -from psycopg2.extensions import b +from psycopg2.extensions import adapt as _A, quote_ident + +from psycopg2._psycopg import ( # noqa + REPLICATION_PHYSICAL, REPLICATION_LOGICAL, + ReplicationConnection as _replicationConnection, + ReplicationCursor as _replicationCursor, + ReplicationMessage) + + +# expose the json adaptation stuff into the module +from psycopg2._json import ( # noqa + json, Json, register_json, register_default_json, register_default_jsonb) + + +# Expose range-related objects +from psycopg2._range import ( # noqa + Range, NumericRange, DateRange, DateTimeRange, DateTimeTZRange, + register_range, RangeAdapter, RangeCaster) + + +# Expose ipaddress-related objects +from psycopg2._ipaddress import register_ipaddress # noqa class DictCursorBase(_cursor): @@ -106,6 +126,7 @@ class DictConnection(_connection): kwargs.setdefault('cursor_factory', DictCursor) return super(DictConnection, self).cursor(*args, **kwargs) + class DictCursor(DictCursorBase): """A cursor that keeps a list of column name -> index mappings.""" @@ -130,6 +151,7 @@ class DictCursor(DictCursorBase): self.index[self.description[i][0]] = i self._query_executed = 0 + class DictRow(list): """A row object that allow by-column-name access to data.""" @@ -192,10 +214,10 @@ class DictRow(list): # drop the crusty Py2 methods if _sys.version_info[0] > 2: - items = iteritems; del iteritems - keys = iterkeys; del iterkeys - values = itervalues; del itervalues - del has_key + items = iteritems # noqa + keys = iterkeys # noqa + values = itervalues # noqa + del iteritems, iterkeys, itervalues, has_key class RealDictConnection(_connection): @@ -204,6 +226,7 @@ class RealDictConnection(_connection): kwargs.setdefault('cursor_factory', RealDictCursor) return super(RealDictConnection, self).cursor(*args, **kwargs) + class RealDictCursor(DictCursorBase): """A cursor that uses a real dict as the base type for rows. @@ -233,6 +256,7 @@ class RealDictCursor(DictCursorBase): self.column_mapping.append(self.description[i][0]) self._query_executed = 0 + class RealDictRow(dict): """A `!dict` subclass representing a data record.""" @@ -265,6 +289,7 @@ class NamedTupleConnection(_connection): kwargs.setdefault('cursor_factory', NamedTupleCursor) return super(NamedTupleConnection, self).cursor(*args, **kwargs) + class NamedTupleCursor(_cursor): """A cursor that generates results as `~collections.namedtuple`. @@ -369,11 +394,13 @@ class LoggingConnection(_connection): def _logtofile(self, msg, curs): msg = self.filter(msg, curs) - if msg: self._logobj.write(msg + _os.linesep) + if msg: + self._logobj.write(msg + _os.linesep) def _logtologger(self, msg, curs): msg = self.filter(msg, curs) - if msg: self._logobj.debug(msg) + if msg: + self._logobj.debug(msg) def _check(self): if not hasattr(self, '_logobj'): @@ -385,6 +412,7 @@ class LoggingConnection(_connection): kwargs.setdefault('cursor_factory', LoggingCursor) return super(LoggingConnection, self).cursor(*args, **kwargs) + class LoggingCursor(_cursor): """A cursor that logs queries using its connection logging facilities.""" @@ -425,6 +453,7 @@ class MinTimeLoggingConnection(LoggingConnection): kwargs.setdefault('cursor_factory', MinTimeLoggingCursor) return LoggingConnection.cursor(self, *args, **kwargs) + class MinTimeLoggingCursor(LoggingCursor): """The cursor sub-class companion to `MinTimeLoggingConnection`.""" @@ -437,6 +466,133 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) +class LogicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_LOGICAL + super(LogicalReplicationConnection, self).__init__(*args, **kwargs) + + +class PhysicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_PHYSICAL + super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) + + +class StopReplication(Exception): + """ + Exception used to break out of the endless loop in + `~ReplicationCursor.consume_stream()`. + + Subclass of `~exceptions.Exception`. Intentionally *not* inherited from + `~psycopg2.Error` as occurrence of this exception does not indicate an + error. + """ + pass + + +class ReplicationCursor(_replicationCursor): + """A cursor used for communication on replication connections.""" + + def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): + """Create streaming replication slot.""" + + command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self) + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if output_plugin is None: + raise psycopg2.ProgrammingError( + "output plugin name is required to create " + "logical replication slot") + + command += "LOGICAL %s" % quote_ident(output_plugin, self) + + elif slot_type == REPLICATION_PHYSICAL: + if output_plugin is not None: + raise psycopg2.ProgrammingError( + "cannot specify output plugin name when creating " + "physical replication slot") + + command += "PHYSICAL" + + else: + raise psycopg2.ProgrammingError( + "unrecognized replication type: %s" % repr(slot_type)) + + self.execute(command) + + def drop_replication_slot(self, slot_name): + """Drop streaming replication slot.""" + + command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self) + self.execute(command) + + def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, + timeline=0, options=None, decode=False): + """Start replication stream.""" + + command = "START_REPLICATION " + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + else: + raise psycopg2.ProgrammingError( + "slot name is required for logical replication") + + command += "LOGICAL " + + elif slot_type == REPLICATION_PHYSICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX + + else: + raise psycopg2.ProgrammingError( + "unrecognized replication type: %s" % repr(slot_type)) + + if type(start_lsn) is str: + lsn = start_lsn.split('/') + lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16)) + else: + lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, + start_lsn & 0xFFFFFFFF) + + command += lsn + + if timeline != 0: + if slot_type == REPLICATION_LOGICAL: + raise psycopg2.ProgrammingError( + "cannot specify timeline for logical replication") + + command += " TIMELINE %d" % timeline + + if options: + if slot_type == REPLICATION_PHYSICAL: + raise psycopg2.ProgrammingError( + "cannot specify output plugin options for physical replication") + + command += " (" + for k, v in options.iteritems(): + if not command.endswith('('): + command += ", " + command += "%s %s" % (quote_ident(k, self), _A(str(v))) + command += ")" + + self.start_replication_expert(command, decode=decode) + + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() + + # a dbtype and adapter for Python UUID type class UUID_adapter(object): @@ -454,11 +610,12 @@ class UUID_adapter(object): return self def getquoted(self): - return b("'%s'::uuid" % self._uuid) + return ("'%s'::uuid" % self._uuid).encode('utf8') def __str__(self): return "'%s'::uuid" % self._uuid + def register_uuid(oids=None, conn_or_curs=None): """Create the UUID type and an uuid.UUID adapter. @@ -514,7 +671,7 @@ class Inet(object): obj = _A(self.addr) if hasattr(obj, 'prepare'): obj.prepare(self._conn) - return obj.getquoted() + b("::inet") + return obj.getquoted() + b"::inet" def __conform__(self, proto): if proto is _ext.ISQLQuote: @@ -523,6 +680,7 @@ class Inet(object): def __str__(self): return str(self.addr) + def register_inet(oid=None, conn_or_curs=None): """Create the INET type and an Inet adapter. @@ -532,6 +690,11 @@ def register_inet(oid=None, conn_or_curs=None): :param conn_or_curs: where to register the typecaster. If not specified, register it globally. """ + import warnings + warnings.warn( + "the inet adapter is deprecated, it's not very useful", + DeprecationWarning) + if not oid: oid1 = 869 oid2 = 1041 @@ -621,7 +784,7 @@ class HstoreAdapter(object): def _getquoted_8(self): """Use the operators available in PG pre-9.0.""" if not self.wrapped: - return b("''::hstore") + return b"''::hstore" adapt = _ext.adapt rv = [] @@ -635,23 +798,23 @@ class HstoreAdapter(object): v.prepare(self.conn) v = v.getquoted() else: - v = b('NULL') + v = b'NULL' # XXX this b'ing is painfully inefficient! - rv.append(b("(") + k + b(" => ") + v + b(")")) + rv.append(b"(" + k + b" => " + v + b")") - return b("(") + b('||').join(rv) + b(")") + return b"(" + b'||'.join(rv) + b")" def _getquoted_9(self): """Use the hstore(text[], text[]) function.""" if not self.wrapped: - return b("''::hstore") + return b"''::hstore" k = _ext.adapt(self.wrapped.keys()) k.prepare(self.conn) v = _ext.adapt(self.wrapped.values()) v.prepare(self.conn) - return b("hstore(") + k.getquoted() + b(", ") + v.getquoted() + b(")") + return b"hstore(" + k.getquoted() + b", " + v.getquoted() + b")" getquoted = _getquoted_9 @@ -742,9 +905,10 @@ WHERE typname = 'hstore'; return tuple(rv0), tuple(rv1) + def register_hstore(conn_or_curs, globally=False, unicode=False, - oid=None, array_oid=None): - """Register adapter and typecaster for `!dict`\-\ |hstore| conversions. + oid=None, array_oid=None): + r"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions. :param conn_or_curs: a connection or cursor: the typecaster will be registered only on this object unless *globally* is set to `!True` @@ -822,8 +986,8 @@ class CompositeCaster(object): self.oid = oid self.array_oid = array_oid - self.attnames = [ a[0] for a in attrs ] - self.atttypes = [ a[1] for a in attrs ] + self.attnames = [a[0] for a in attrs] + self.atttypes = [a[1] for a in attrs] self._create_type(name, self.attnames) self.typecaster = _ext.new_type((oid,), name, self.parse) if array_oid: @@ -842,8 +1006,8 @@ class CompositeCaster(object): "expecting %d components for the type %s, %d found instead" % (len(self.atttypes), self.name, len(tokens))) - values = [ curs.cast(oid, token) - for oid, token in zip(self.atttypes, tokens) ] + values = [curs.cast(oid, token) + for oid, token in zip(self.atttypes, tokens)] return self.make(values) @@ -937,11 +1101,12 @@ ORDER BY attnum; type_oid = recs[0][0] array_oid = recs[0][1] - type_attrs = [ (r[2], r[3]) for r in recs ] + type_attrs = [(r[2], r[3]) for r in recs] return self(tname, type_oid, type_attrs, array_oid=array_oid, schema=schema) + def register_composite(name, conn_or_curs, globally=False, factory=None): """Register a typecaster to convert a composite type into a tuple. @@ -964,17 +1129,7 @@ def register_composite(name, conn_or_curs, globally=False, factory=None): _ext.register_type(caster.typecaster, not globally and conn_or_curs or None) if caster.array_typecaster is not None: - _ext.register_type(caster.array_typecaster, not globally and conn_or_curs or None) + _ext.register_type( + caster.array_typecaster, not globally and conn_or_curs or None) return caster - - -# expose the json adaptation stuff into the module -from psycopg2._json import json, Json, register_json -from psycopg2._json import register_default_json, register_default_jsonb - - -# Expose range-related objects -from psycopg2._range import Range, NumericRange -from psycopg2._range import DateRange, DateTimeRange, DateTimeTZRange -from psycopg2._range import register_range, RangeAdapter, RangeCaster |