summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFederico Di Gregorio <fog@initd.org>2010-04-14 08:57:30 +0200
committerFederico Di Gregorio <fog@initd.org>2010-04-14 08:57:30 +0200
commita90935930bac87d0147d753366b2ff1bd265f9f0 (patch)
tree6509f32b1c74d9eba957e953ac0b1df7ada65dbe
parenteaa97def731faa6fb9b17cfe9005d327f6a70341 (diff)
parent431920b3676ffecc80fd7f0bc25d774d6e5a2aa1 (diff)
downloadpsycopg2-a90935930bac87d0147d753366b2ff1bd265f9f0.tar.gz
Merge remote branch 'piro/fix22' into python2
-rw-r--r--ChangeLog6
-rw-r--r--README.async8
-rw-r--r--doc/src/advanced.rst135
-rw-r--r--doc/src/connection.rst34
-rw-r--r--doc/src/cursor.rst66
-rw-r--r--doc/src/extensions.rst93
-rw-r--r--doc/src/faq.rst14
-rw-r--r--doc/src/module.rst13
-rw-r--r--lib/extensions.py5
-rw-r--r--psycopg/connection_type.c3
-rwxr-xr-x[-rw-r--r--]tests/test_async.py45
11 files changed, 283 insertions, 139 deletions
diff --git a/ChangeLog b/ChangeLog
index 20af440..23ec09b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2010-04-13 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+
+ * lib/extensions.py: DECIMAL typecaster imported from _psycopg.
+
+ * lib/extensions.py: PY* and MX* time typecaster imported from _psycopg.
+
2010-04-05 Federico Di Gregorio <fog@initd.org>
* Fixed problem with asynchronous NOTIFYs.
diff --git a/README.async b/README.async
deleted file mode 100644
index ce29424..0000000
--- a/README.async
+++ /dev/null
@@ -1,8 +0,0 @@
-Asynchronous support in psycopg
-*******************************
-
-The support for asynchronous queries in psycopg is only half-backed and
-won't work unless a good deal of time is put into fixing bugs. Currently
-I (the author) have no need for asycnhronous queries but patches are
-welcome. Also welcome are short tests that show how the async path fails
-(miserably) so that I can at least fix obvious problems.
diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst
index 2801a2d..7657014 100644
--- a/doc/src/advanced.rst
+++ b/doc/src/advanced.rst
@@ -246,88 +246,83 @@ Running the script and executing the command :sql:`NOTIFY test` in a separate
.. index::
- double: Asynchronous; Query
+ double: Asynchronous; Connection
-.. _asynchronous-queries:
+.. _async-support:
-Asynchronous queries
+Asynchronous support
--------------------
-.. warning::
-
- Psycopg support for asynchronous queries is still experimental and the
- informations reported here may be out of date.
-
- Discussion, testing and suggestions are welcome.
-
-Program code can initiate an asynchronous query by passing an ``async=1`` flag
-to the `~cursor.execute()` or `~cursor.callproc()` cursor methods. A
-very simple example, from the connection to the query::
-
- conn = psycopg2.connect(database='test')
+.. versionadded:: 2.2.0
+
+Psycopg can issue asynchronous queries to a Postgresql database. An asynchronous
+communication style is estabilished passing the parameter *async*\=1 to the
+`~psycopg2.connect()` function: the returned connection will work in
+asynchronous mode.
+
+In asynchronous mode, a Psycopg connection will rely on the caller to poll for
+the socket file descriptor ready to accept data or a query result ready to be
+read from the server. The caller can use the method `~cursor.fileno()` to get
+the connection file descriptor and `~cursor.poll()` to make communication
+proceed. An application can use a loop like the one below to transfer data
+between the client and the server::
+
+ def wait(conn_or_cur):
+ while 1:
+ state = conn_or_cur.poll()
+ if state == psycopg2.extensions.POLL_OK:
+ break
+ elif state == psycopg2.extensions.POLL_WRITE:
+ select.select([], [conn_or_cur.fileno()], [])
+ elif state == psycopg2.extensions.POLL_READ:
+ select.select([conn_or_cur.fileno()], [], [])
+ else:
+ raise psycopg2.OperationalError("poll() returned %s" % state)
+
+After `!poll()` has returned `~psycopg2.extensions.POLL_OK`, the results are
+available in the cursor for regular reading::
+
+ curs.execute("SELECT * FROM foo;")
+ wait(curs)
+ for record in curs:
+ # use it...
+
+The same loop should also be used to accomplish a connection with the server:
+the connection is usable only after `connection.poll()` has returned `!POLL_OK`.
+The `!connection` has a `~connection.fileno()` method too, so it is possible to
+use the same interface for the wait loop::
+
+ conn = psycopg2.connect(database='test', async=1)
+ wait(conn)
+ # Now you can have a cursor.
curs = conn.cursor()
- curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1)
-
-From then on any query on other cursors derived from the same connection is
-doomed to fail (and raise an exception) until the original cursor (the one
-executing the query) complete the asynchronous operation. This can happen in
-a number of different ways:
-
-1) one of the `!fetch*()` methods is called, effectively blocking until
- data has been sent from the backend to the client, terminating the query.
-
-2) `connection.cancel()` is called. This method tries to abort the
- current query and will block until the query is aborted or fully executed.
- The return value is ``True`` if the query was successfully aborted or
- ``False`` if it was executed. Query result are discarded in both cases.
-
-3) `~cursor.execute()` is called again on the same cursor
- (`!execute()` on a different cursor will simply raise an exception).
- This waits for the complete execution of the current query, discard any
- data and execute the new one.
-
-Note that calling `!execute()` two times in a row will not abort the
-former query and will temporarily go to synchronous mode until the first of
-the two queries is executed.
-Cursors now have some extra methods that make them useful during
-asynchronous queries:
+Notice that there are a few other requirements to be met in order to have a
+completely non-blocking connection attempt: see the libpq documentation for
+|PQconnectStart|_.
-`~cursor.fileno()`
- Returns the file descriptor associated with the current connection and
- make possible to use a cursor in a context where a file object would be
- expected (like in a `select()` call).
+.. |PQconnectStart| replace:: `!PQconnectStart()`
+.. _PQconnectStart: http://www.postgresql.org/docs/8.4/static/libpq-connect.html#AEN33199
-`~cursor.isready()`
- Returns ``False`` if the backend is still processing the query or ``True``
- if data is ready to be fetched (by one of the `!fetch*()` methods).
-
-.. index::
- single: Example; Asynchronous query
-
-A code snippet that shows how to use the cursor object in a `!select()`
-call::
-
- import psycopg2
- import select
-
- conn = psycopg2.connect(database='test')
- curs = conn.cursor()
- curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1)
+When an asynchronous query is being executed, `connection.executing()` returns
+`True`. Two cursors can't execute concurrent queries on the same asynchronous
+connection.
- # wait for input with a maximum timeout of 5 seconds
- query_ended = False
- while not query_ended:
- rread, rwrite, rspec = select([curs, another_file], [], [], 5)
+There are several limitations in using asynchronous connections: the connection
+is always in :ref:`autocommit <autocommit>` mode and it is not possible to
+change it using `~connection.set_isolation_level()`. So transaction are not
+started at each query and is not possible to use methods `~connection.commit()`
+and `~connection.rollback()`: you can manually control transactions using
+`~cursor.execute()` to send commands :sql:`BEGIN`, :sql:`COMMIT` and
+:sql:`ROLLBACK`.
- if curs.isready():
- query_ended = True
+With asynchronous connections it is also not possible to use
+`~connection.set_client_encoding()`, `~cursor.executemany()`, :ref:`large
+objects <large-objects>`, :ref:`named cursors <server-side-cursors>`.
- # manage input from other sources like other_file, etc.
+:ref:`COPY commands <copy>` are not supported either in asynchronous mode, but
+this will be probably implemented in a future release.
- print "Query Results:"
- for row in curs:
- print row
.. testcode::
diff --git a/doc/src/connection.rst b/doc/src/connection.rst
index 1e65b29..bf091bf 100644
--- a/doc/src/connection.rst
+++ b/doc/src/connection.rst
@@ -314,3 +314,37 @@ The ``connection`` class
.. _lo_import: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36307
.. versionadded:: 2.0.8
+
+
+
+ .. rubric:: Methods related to asynchronous support.
+
+ .. seealso:: :ref:`Asynchronous support <async-support>`.
+
+
+ .. method:: issync()
+
+ Return `True` if the connection is synchronous, `False` if asynchronous.
+
+
+ .. method:: poll()
+
+ Used during an asynchronous connection attempt, make communication
+ proceed if it wouldn't block.
+
+ Return one of the constants defined in :ref:`poll-constants`. If it
+ returns `~psycopg2.extensions.POLL_OK` the connection has been
+ estabilished. Otherwise wait until the file descriptor is ready as
+ explained in :ref:`async-support`.
+
+
+ .. method:: fileno()
+
+ Return the file descriptor associated with the connection to read the
+ status during asynchronous communication.
+
+
+ .. method:: executing()
+
+ Return `True` if the connection is executing an asynchronous operation.
+
diff --git a/doc/src/cursor.rst b/doc/src/cursor.rst
index b31f2e7..333739d 100644
--- a/doc/src/cursor.rst
+++ b/doc/src/cursor.rst
@@ -109,7 +109,7 @@ The ``cursor`` class
.. rubric:: Commands execution methods
- .. method:: execute(operation [, parameters] [, async])
+ .. method:: execute(operation [, parameters])
Prepare and execute a database operation (query or command).
@@ -121,16 +121,6 @@ The ``cursor`` class
The method returns `None`. If a query was executed, the returned
values can be retrieved using |fetch*|_ methods.
- If `async` is ``True``, query execution will be asynchronous:
- the function returns immediately while the query is executed by the
- backend. Use the `~cursor.isready()` method to see if the data is
- ready for return via |fetch*|_ methods. See
- :ref:`asynchronous-queries`.
-
- .. extension::
-
- The `async` parameter is a Psycopg extension to the |DBAPI|.
-
.. method:: mogrify(operation [, parameters])
@@ -159,7 +149,7 @@ The ``cursor`` class
the `~cursor.execute()` method.
- .. method:: callproc(procname [, parameters] [, async])
+ .. method:: callproc(procname [, parameters])
Call a stored database procedure with the given name. The sequence of
parameters must contain one entry for each argument that the procedure
@@ -170,16 +160,6 @@ The ``cursor`` class
The procedure may also provide a result set as output. This must then
be made available through the standard |fetch*|_ methods.
- If `async` is ``True``, procedure execution will be asynchronous:
- the function returns immediately while the procedure is executed by
- the backend. Use the `~cursor.isready()` method to see if the
- data is ready for return via |fetch*|_ methods. See
- :ref:`asynchronous-queries`.
-
- .. extension::
-
- The `async` parameter is a Psycopg extension to the |DBAPI|.
-
.. method:: setinputsizes(sizes)
@@ -400,37 +380,41 @@ The ``cursor`` class
|DBAPI|.
- .. method:: isready()
+ .. attribute:: tzinfo_factory
- Return ``False`` if the backend is still processing an asynchronous
- query or ``True`` if data is ready to be fetched by one of the
- |fetch*|_ methods. See :ref:`asynchronous-queries`.
+ The time zone factory used to handle data types such as
+ :sql:`TIMESTAMP WITH TIME ZONE`. It should be a |tzinfo|_ object.
+ See also the `psycopg2.tz` module.
- .. extension::
+ .. |tzinfo| replace:: `!tzinfo`
+ .. _tzinfo: http://docs.python.org/library/datetime.html#tzinfo-objects
- The `isready()` method is a Psycopg extension to the |DBAPI|.
- .. method:: fileno()
+ .. rubric:: Methods related to asynchronous support.
- Return the file descriptor associated with the current connection and
- make possible to use a cursor in a context where a file object would
- be expected (like in a `select()` call). See
- :ref:`asynchronous-queries`.
+ .. extension::
- .. extension::
+ :ref:`Asynchronous support <async-support>` is a Psycopg extension to
+ the |DBAPI|.
- The `fileno()` method is a Psycopg extension to the |DBAPI|.
+ .. method:: poll()
- .. attribute:: tzinfo_factory
+ Used during asynchronous queries, make asynchronous communication
+ proceed if it wouldn't block.
- The time zone factory used to handle data types such as
- :sql:`TIMESTAMP WITH TIME ZONE`. It should be a |tzinfo|_ object.
- See also the `psycopg2.tz` module.
+ Return `~psycopg2.extensions.POLL_OK` if the query has been fully
+ processed, `~psycopg2.extensions.POLL_READ` if the query has been sent
+ and the application should be waiting for the result to arrive or
+ `~psycopg2.extensions.POLL_WRITE` is the query is still being sent.
- .. |tzinfo| replace:: `!tzinfo`
- .. _tzinfo: http://docs.python.org/library/datetime.html#tzinfo-objects
+
+ .. method:: fileno()
+
+ Return the file descriptor associated with the current connection to
+ make possible to use a cursor in a context where a file object would
+ be expected (like in a `select()` call).
diff --git a/doc/src/extensions.rst b/doc/src/extensions.rst
index 436bb7a..4093eff 100644
--- a/doc/src/extensions.rst
+++ b/doc/src/extensions.rst
@@ -21,6 +21,8 @@ functionalities defined by the |DBAPI|_.
`!connect()` function using the `connection_factory` parameter.
See also :ref:`subclassing-connection`.
+ Subclasses should have constructor signature :samp:`({dsn}, {async}=0)`.
+
For a complete description of the class, see `connection`.
.. class:: cursor
@@ -80,13 +82,13 @@ functionalities defined by the |DBAPI|_.
.. method:: truncate(len=0)
- .. versionadded:: 2.0.15
+ .. versionadded:: 2.2.0
Truncate the lobject to the given size.
- The method will only be available if psycopg has been built against libpq
- from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers
- running these versions. It uses the |lo_truncate|_ libpq function.
+ The method will only be available if Psycopg has been built against libpq
+ from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers
+ running these versions. It uses the |lo_truncate|_ libpq function.
.. |lo_truncate| replace:: `!lo_truncate()`
.. _lo_truncate: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36420
@@ -197,7 +199,7 @@ deal with Python objects adaptation:
.. versionchanged:: 2.0.14
previously the adapter was not exposed by the `extensions`
- module. In older version it can be imported from the implementation
+ module. In older versions it can be imported from the implementation
module `!psycopg2._psycopg`.
@@ -443,6 +445,40 @@ can be read from the `~connection.status` attribute.
+.. index::
+ pair: Poll status; Constants
+
+.. _poll-constants:
+
+Poll constants
+--------------
+
+.. versionadded:: 2.2.0
+
+These values can be returned by `connection.poll()` and `cursor.poll()` during
+asynchronous communication. See :ref:`async-support`.
+
+.. data:: POLL_OK
+
+ The data is available (or the file descriptor is ready for writing): there
+ is no need to block anymore.
+
+.. data:: POLL_READ
+
+ Upon receiving this value, the callback should wait for the connection
+ file descriptor to be ready *for reading*. For example::
+
+ select.select([conn.fileno()], [], [])
+
+.. data:: POLL_WRITE
+
+ Upon receiving this value, the callback should wait for the connection
+ file descriptor to be ready *for writing*. For example::
+
+ select.select([], [conn.fileno()], [])
+
+
+
Additional database types
-------------------------
@@ -453,25 +489,56 @@ Python objects. All the typecasters are automatically registered, except
`register_type()` in order to receive Unicode objects instead of strings
from the database. See :ref:`unicode-handling` for details.
+.. data:: BOOLEAN
+ DATE
+ DECIMAL
+ FLOAT
+ INTEGER
+ INTERVAL
+ LONGINTEGER
+ TIME
+ UNICODE
+
+ Typecasters for basic types. Notice that a few other ones (`~psycopg2.BINARY`,
+ `~psycopg2.DATETIME`, `~psycopg2.NUMBER`, `~psycopg2.ROWID`,
+ `~psycopg2.STRING`) are exposed by the `psycopg2` module for |DBAPI|_
+ compliance.
+
.. data:: BINARYARRAY
- BOOLEAN
BOOLEANARRAY
- DATE
DATEARRAY
DATETIMEARRAY
DECIMALARRAY
- FLOAT
FLOATARRAY
- INTEGER
INTEGERARRAY
- INTERVAL
INTERVALARRAY
- LONGINTEGER
LONGINTEGERARRAY
ROWIDARRAY
STRINGARRAY
- TIME
TIMEARRAY
- UNICODE
UNICODEARRAY
+ Typecasters to convert arrays of sql types into Python lists.
+
+.. data:: PYDATE
+ PYDATETIME
+ PYINTERVAL
+ PYTIME
+
+ Typecasters to convert time-related data types to Python `!datetime`
+ objects.
+
+.. data:: MXDATE
+ MXDATETIME
+ MXINTERVAL
+ MXTIME
+
+ Typecasters to convert time-related data types to `mx.DateTime`_ objects.
+ Only available if Psycopg was compiled with `!mx` support.
+
+.. versionchanged:: 2.2.0
+ previously the `DECIMAL` typecaster and the specific time-related
+ typecasters (`!PY*` and `!MX*`) were not exposed by the `extensions`
+ module. In older versions they can be imported from the implementation
+ module `!psycopg2._psycopg`.
+
diff --git a/doc/src/faq.rst b/doc/src/faq.rst
index b45d103..8015864 100644
--- a/doc/src/faq.rst
+++ b/doc/src/faq.rst
@@ -70,6 +70,20 @@ My database is Unicode, but I receive all the strings as UTF-8 `str`. Can I rece
See :ref:`unicode-handling` for the gory details.
+Psycopg converts :sql:`decimal`\/\ :sql:`numeric` database types into Python `!Decimal` objects. Can I have `!float` instead?
+ You can register the `~psycopg2.extensions.FLOAT` typecaster to be used in
+ place of `~psycopg2.extensions.DECIMAL`::
+
+ DEC2FLOAT = psycopg2.extensions.new_type(
+ psycopg2.extensions.DECIMAL.values,
+ 'DEC2FLOAT',
+ psycopg2.extensions.FLOAT)
+ psycopg2.extensions.register_type(DEC2FLOAT)
+
+ See :ref:`type-casting-from-sql-to-python` to read the relevant
+ documentation. If you find `!psycopg2.extensions.DECIMAL` not avalable, use
+ `!psycopg2._psycopg.DECIMAL` instead.
+
I can't compile `!psycopg2`: the compiler says *error: Python.h: No such file or directory*. What am I missing?
You need to install a Python development package: it is usually called
``python-dev``.
diff --git a/doc/src/module.rst b/doc/src/module.rst
index df06703..db7586e 100644
--- a/doc/src/module.rst
+++ b/doc/src/module.rst
@@ -16,7 +16,7 @@ The module interface respects the standard defined in the |DBAPI|_.
single: Port; Connection
single: DSN (Database Source Name)
-.. function:: connect(dsn or params[, connection_factory])
+.. function:: connect(dsn or params [, connection_factory] [, async=0])
Create a new database session and return a new `connection` object.
@@ -40,15 +40,18 @@ The module interface respects the standard defined in the |DBAPI|_.
.. __: http://www.postgresql.org/docs/8.4/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS
- Using the `connection_factory` parameter a different class or
+ Using the *connection_factory* parameter a different class or
connections factory can be specified. It should be a callable object
- taking a `dsn` argument. See :ref:`subclassing-connection` for
+ taking a *dsn* argument. See :ref:`subclassing-connection` for
details.
+ Using *async*\=1 an asynchronous connection will be created: see
+ :ref:`async-support` to know about advantages and limitations.
+
.. extension::
- The `connection_factory` parameter is a Psycopg extension to the
- |DBAPI|.
+ The parameters *connection_factory* and *async* are Psycopg extensions
+ to the |DBAPI|.
.. data:: apilevel
diff --git a/lib/extensions.py b/lib/extensions.py
index edffee9..20f78e6 100644
--- a/lib/extensions.py
+++ b/lib/extensions.py
@@ -33,7 +33,7 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
# License for more details.
from _psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT
-from _psycopg import TIME, DATE, INTERVAL
+from _psycopg import TIME, DATE, INTERVAL, DECIMAL
from _psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY
from _psycopg import DECIMALARRAY, FLOATARRAY, INTEGERARRAY, INTERVALARRAY
from _psycopg import LONGINTEGERARRAY, ROWIDARRAY, STRINGARRAY, TIMEARRAY
@@ -41,11 +41,14 @@ from _psycopg import UNICODEARRAY
from _psycopg import Binary, Boolean, Float, QuotedString, AsIs
try:
+ from _psycopg import MXDATE, MXDATETIME, MXINTERVAL, MXTIME
from _psycopg import DateFromMx, TimeFromMx, TimestampFromMx
from _psycopg import IntervalFromMx
except:
pass
+
try:
+ from _psycopg import PYDATE, PYDATETIME, PYINTERVAL, PYTIME
from _psycopg import DateFromPy, TimeFromPy, TimestampFromPy
from _psycopg import IntervalFromPy
except:
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 8e6758a..3fd08a4 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -757,8 +757,9 @@ connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
long int async = 0;
+ static char *kwlist[] = {"dsn", "async", NULL};
- if (!PyArg_ParseTuple(args, "s|l", &dsn, &async))
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async))
return -1;
return connection_setup((connectionObject *)obj, dsn, async);
diff --git a/tests/test_async.py b/tests/test_async.py
index 5aad8d3..da084bd 100644..100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -14,6 +14,21 @@ else:
import py3tests as tests
+class PollableStub(object):
+ """A 'pollable' wrapper allowing analysis of the `poll()` calls."""
+ def __init__(self, pollable):
+ self.pollable = pollable
+ self.polls = []
+
+ def fileno(self):
+ return self.pollable.fileno()
+
+ def poll(self):
+ rv = self.pollable.poll()
+ self.polls.append(rv)
+ return rv
+
+
class AsyncTests(unittest.TestCase):
def setUp(self):
@@ -274,5 +289,35 @@ class AsyncTests(unittest.TestCase):
# it should be the result of the second query
self.assertEquals(cur.fetchone()[0], "b" * 10000)
+ def test_async_subclass(self):
+ class MyConn(psycopg2.extensions.connection):
+ def __init__(self, dsn, async=0):
+ psycopg2.extensions.connection.__init__(self, dsn, async=async)
+
+ conn = psycopg2.connect(tests.dsn, connection_factory=MyConn, async=True)
+ self.assert_(isinstance(conn, MyConn))
+ self.assert_(not conn.issync())
+ conn.close()
+
+
+ def test_flush_on_write(self):
+ # a very large query requires a flush loop to be sent to the backend
+ curs = self.conn.cursor()
+ for mb in 1, 5, 10, 20, 50:
+ size = mb * 1024 * 1024
+ print "\nplease wait: sending", mb, "MB query to the server",
+ stub = PollableStub(curs)
+ curs.execute("select %s;", ('x' * size,))
+ self.wait(stub)
+ self.assertEqual(size, len(curs.fetchone()[0]))
+ if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
+ return
+
+ self.fail("sending a large query didn't trigger block on write.")
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
+