diff options
author | Federico Di Gregorio <fog@initd.org> | 2010-04-14 08:57:30 +0200 |
---|---|---|
committer | Federico Di Gregorio <fog@initd.org> | 2010-04-14 08:57:30 +0200 |
commit | a90935930bac87d0147d753366b2ff1bd265f9f0 (patch) | |
tree | 6509f32b1c74d9eba957e953ac0b1df7ada65dbe | |
parent | eaa97def731faa6fb9b17cfe9005d327f6a70341 (diff) | |
parent | 431920b3676ffecc80fd7f0bc25d774d6e5a2aa1 (diff) | |
download | psycopg2-a90935930bac87d0147d753366b2ff1bd265f9f0.tar.gz |
Merge remote branch 'piro/fix22' into python2
-rw-r--r-- | ChangeLog | 6 | ||||
-rw-r--r-- | README.async | 8 | ||||
-rw-r--r-- | doc/src/advanced.rst | 135 | ||||
-rw-r--r-- | doc/src/connection.rst | 34 | ||||
-rw-r--r-- | doc/src/cursor.rst | 66 | ||||
-rw-r--r-- | doc/src/extensions.rst | 93 | ||||
-rw-r--r-- | doc/src/faq.rst | 14 | ||||
-rw-r--r-- | doc/src/module.rst | 13 | ||||
-rw-r--r-- | lib/extensions.py | 5 | ||||
-rw-r--r-- | psycopg/connection_type.c | 3 | ||||
-rwxr-xr-x[-rw-r--r--] | tests/test_async.py | 45 |
11 files changed, 283 insertions, 139 deletions
@@ -1,3 +1,9 @@ +2010-04-13 Daniele Varrazzo <daniele.varrazzo@gmail.com> + + * lib/extensions.py: DECIMAL typecaster imported from _psycopg. + + * lib/extensions.py: PY* and MX* time typecaster imported from _psycopg. + 2010-04-05 Federico Di Gregorio <fog@initd.org> * Fixed problem with asynchronous NOTIFYs. diff --git a/README.async b/README.async deleted file mode 100644 index ce29424..0000000 --- a/README.async +++ /dev/null @@ -1,8 +0,0 @@ -Asynchronous support in psycopg -******************************* - -The support for asynchronous queries in psycopg is only half-backed and -won't work unless a good deal of time is put into fixing bugs. Currently -I (the author) have no need for asycnhronous queries but patches are -welcome. Also welcome are short tests that show how the async path fails -(miserably) so that I can at least fix obvious problems. diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index 2801a2d..7657014 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -246,88 +246,83 @@ Running the script and executing the command :sql:`NOTIFY test` in a separate .. index:: - double: Asynchronous; Query + double: Asynchronous; Connection -.. _asynchronous-queries: +.. _async-support: -Asynchronous queries +Asynchronous support -------------------- -.. warning:: - - Psycopg support for asynchronous queries is still experimental and the - informations reported here may be out of date. - - Discussion, testing and suggestions are welcome. - -Program code can initiate an asynchronous query by passing an ``async=1`` flag -to the `~cursor.execute()` or `~cursor.callproc()` cursor methods. A -very simple example, from the connection to the query:: - - conn = psycopg2.connect(database='test') +.. versionadded:: 2.2.0 + +Psycopg can issue asynchronous queries to a Postgresql database. An asynchronous +communication style is estabilished passing the parameter *async*\=1 to the +`~psycopg2.connect()` function: the returned connection will work in +asynchronous mode. + +In asynchronous mode, a Psycopg connection will rely on the caller to poll for +the socket file descriptor ready to accept data or a query result ready to be +read from the server. The caller can use the method `~cursor.fileno()` to get +the connection file descriptor and `~cursor.poll()` to make communication +proceed. An application can use a loop like the one below to transfer data +between the client and the server:: + + def wait(conn_or_cur): + while 1: + state = conn_or_cur.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn_or_cur.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn_or_cur.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) + +After `!poll()` has returned `~psycopg2.extensions.POLL_OK`, the results are +available in the cursor for regular reading:: + + curs.execute("SELECT * FROM foo;") + wait(curs) + for record in curs: + # use it... + +The same loop should also be used to accomplish a connection with the server: +the connection is usable only after `connection.poll()` has returned `!POLL_OK`. +The `!connection` has a `~connection.fileno()` method too, so it is possible to +use the same interface for the wait loop:: + + conn = psycopg2.connect(database='test', async=1) + wait(conn) + # Now you can have a cursor. curs = conn.cursor() - curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1) - -From then on any query on other cursors derived from the same connection is -doomed to fail (and raise an exception) until the original cursor (the one -executing the query) complete the asynchronous operation. This can happen in -a number of different ways: - -1) one of the `!fetch*()` methods is called, effectively blocking until - data has been sent from the backend to the client, terminating the query. - -2) `connection.cancel()` is called. This method tries to abort the - current query and will block until the query is aborted or fully executed. - The return value is ``True`` if the query was successfully aborted or - ``False`` if it was executed. Query result are discarded in both cases. - -3) `~cursor.execute()` is called again on the same cursor - (`!execute()` on a different cursor will simply raise an exception). - This waits for the complete execution of the current query, discard any - data and execute the new one. - -Note that calling `!execute()` two times in a row will not abort the -former query and will temporarily go to synchronous mode until the first of -the two queries is executed. -Cursors now have some extra methods that make them useful during -asynchronous queries: +Notice that there are a few other requirements to be met in order to have a +completely non-blocking connection attempt: see the libpq documentation for +|PQconnectStart|_. -`~cursor.fileno()` - Returns the file descriptor associated with the current connection and - make possible to use a cursor in a context where a file object would be - expected (like in a `select()` call). +.. |PQconnectStart| replace:: `!PQconnectStart()` +.. _PQconnectStart: http://www.postgresql.org/docs/8.4/static/libpq-connect.html#AEN33199 -`~cursor.isready()` - Returns ``False`` if the backend is still processing the query or ``True`` - if data is ready to be fetched (by one of the `!fetch*()` methods). - -.. index:: - single: Example; Asynchronous query - -A code snippet that shows how to use the cursor object in a `!select()` -call:: - - import psycopg2 - import select - - conn = psycopg2.connect(database='test') - curs = conn.cursor() - curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1) +When an asynchronous query is being executed, `connection.executing()` returns +`True`. Two cursors can't execute concurrent queries on the same asynchronous +connection. - # wait for input with a maximum timeout of 5 seconds - query_ended = False - while not query_ended: - rread, rwrite, rspec = select([curs, another_file], [], [], 5) +There are several limitations in using asynchronous connections: the connection +is always in :ref:`autocommit <autocommit>` mode and it is not possible to +change it using `~connection.set_isolation_level()`. So transaction are not +started at each query and is not possible to use methods `~connection.commit()` +and `~connection.rollback()`: you can manually control transactions using +`~cursor.execute()` to send commands :sql:`BEGIN`, :sql:`COMMIT` and +:sql:`ROLLBACK`. - if curs.isready(): - query_ended = True +With asynchronous connections it is also not possible to use +`~connection.set_client_encoding()`, `~cursor.executemany()`, :ref:`large +objects <large-objects>`, :ref:`named cursors <server-side-cursors>`. - # manage input from other sources like other_file, etc. +:ref:`COPY commands <copy>` are not supported either in asynchronous mode, but +this will be probably implemented in a future release. - print "Query Results:" - for row in curs: - print row .. testcode:: diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 1e65b29..bf091bf 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -314,3 +314,37 @@ The ``connection`` class .. _lo_import: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36307 .. versionadded:: 2.0.8 + + + + .. rubric:: Methods related to asynchronous support. + + .. seealso:: :ref:`Asynchronous support <async-support>`. + + + .. method:: issync() + + Return `True` if the connection is synchronous, `False` if asynchronous. + + + .. method:: poll() + + Used during an asynchronous connection attempt, make communication + proceed if it wouldn't block. + + Return one of the constants defined in :ref:`poll-constants`. If it + returns `~psycopg2.extensions.POLL_OK` the connection has been + estabilished. Otherwise wait until the file descriptor is ready as + explained in :ref:`async-support`. + + + .. method:: fileno() + + Return the file descriptor associated with the connection to read the + status during asynchronous communication. + + + .. method:: executing() + + Return `True` if the connection is executing an asynchronous operation. + diff --git a/doc/src/cursor.rst b/doc/src/cursor.rst index b31f2e7..333739d 100644 --- a/doc/src/cursor.rst +++ b/doc/src/cursor.rst @@ -109,7 +109,7 @@ The ``cursor`` class .. rubric:: Commands execution methods - .. method:: execute(operation [, parameters] [, async]) + .. method:: execute(operation [, parameters]) Prepare and execute a database operation (query or command). @@ -121,16 +121,6 @@ The ``cursor`` class The method returns `None`. If a query was executed, the returned values can be retrieved using |fetch*|_ methods. - If `async` is ``True``, query execution will be asynchronous: - the function returns immediately while the query is executed by the - backend. Use the `~cursor.isready()` method to see if the data is - ready for return via |fetch*|_ methods. See - :ref:`asynchronous-queries`. - - .. extension:: - - The `async` parameter is a Psycopg extension to the |DBAPI|. - .. method:: mogrify(operation [, parameters]) @@ -159,7 +149,7 @@ The ``cursor`` class the `~cursor.execute()` method. - .. method:: callproc(procname [, parameters] [, async]) + .. method:: callproc(procname [, parameters]) Call a stored database procedure with the given name. The sequence of parameters must contain one entry for each argument that the procedure @@ -170,16 +160,6 @@ The ``cursor`` class The procedure may also provide a result set as output. This must then be made available through the standard |fetch*|_ methods. - If `async` is ``True``, procedure execution will be asynchronous: - the function returns immediately while the procedure is executed by - the backend. Use the `~cursor.isready()` method to see if the - data is ready for return via |fetch*|_ methods. See - :ref:`asynchronous-queries`. - - .. extension:: - - The `async` parameter is a Psycopg extension to the |DBAPI|. - .. method:: setinputsizes(sizes) @@ -400,37 +380,41 @@ The ``cursor`` class |DBAPI|. - .. method:: isready() + .. attribute:: tzinfo_factory - Return ``False`` if the backend is still processing an asynchronous - query or ``True`` if data is ready to be fetched by one of the - |fetch*|_ methods. See :ref:`asynchronous-queries`. + The time zone factory used to handle data types such as + :sql:`TIMESTAMP WITH TIME ZONE`. It should be a |tzinfo|_ object. + See also the `psycopg2.tz` module. - .. extension:: + .. |tzinfo| replace:: `!tzinfo` + .. _tzinfo: http://docs.python.org/library/datetime.html#tzinfo-objects - The `isready()` method is a Psycopg extension to the |DBAPI|. - .. method:: fileno() + .. rubric:: Methods related to asynchronous support. - Return the file descriptor associated with the current connection and - make possible to use a cursor in a context where a file object would - be expected (like in a `select()` call). See - :ref:`asynchronous-queries`. + .. extension:: - .. extension:: + :ref:`Asynchronous support <async-support>` is a Psycopg extension to + the |DBAPI|. - The `fileno()` method is a Psycopg extension to the |DBAPI|. + .. method:: poll() - .. attribute:: tzinfo_factory + Used during asynchronous queries, make asynchronous communication + proceed if it wouldn't block. - The time zone factory used to handle data types such as - :sql:`TIMESTAMP WITH TIME ZONE`. It should be a |tzinfo|_ object. - See also the `psycopg2.tz` module. + Return `~psycopg2.extensions.POLL_OK` if the query has been fully + processed, `~psycopg2.extensions.POLL_READ` if the query has been sent + and the application should be waiting for the result to arrive or + `~psycopg2.extensions.POLL_WRITE` is the query is still being sent. - .. |tzinfo| replace:: `!tzinfo` - .. _tzinfo: http://docs.python.org/library/datetime.html#tzinfo-objects + + .. method:: fileno() + + Return the file descriptor associated with the current connection to + make possible to use a cursor in a context where a file object would + be expected (like in a `select()` call). diff --git a/doc/src/extensions.rst b/doc/src/extensions.rst index 436bb7a..4093eff 100644 --- a/doc/src/extensions.rst +++ b/doc/src/extensions.rst @@ -21,6 +21,8 @@ functionalities defined by the |DBAPI|_. `!connect()` function using the `connection_factory` parameter. See also :ref:`subclassing-connection`. + Subclasses should have constructor signature :samp:`({dsn}, {async}=0)`. + For a complete description of the class, see `connection`. .. class:: cursor @@ -80,13 +82,13 @@ functionalities defined by the |DBAPI|_. .. method:: truncate(len=0) - .. versionadded:: 2.0.15 + .. versionadded:: 2.2.0 Truncate the lobject to the given size. - The method will only be available if psycopg has been built against libpq - from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers - running these versions. It uses the |lo_truncate|_ libpq function. + The method will only be available if Psycopg has been built against libpq + from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers + running these versions. It uses the |lo_truncate|_ libpq function. .. |lo_truncate| replace:: `!lo_truncate()` .. _lo_truncate: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36420 @@ -197,7 +199,7 @@ deal with Python objects adaptation: .. versionchanged:: 2.0.14 previously the adapter was not exposed by the `extensions` - module. In older version it can be imported from the implementation + module. In older versions it can be imported from the implementation module `!psycopg2._psycopg`. @@ -443,6 +445,40 @@ can be read from the `~connection.status` attribute. +.. index:: + pair: Poll status; Constants + +.. _poll-constants: + +Poll constants +-------------- + +.. versionadded:: 2.2.0 + +These values can be returned by `connection.poll()` and `cursor.poll()` during +asynchronous communication. See :ref:`async-support`. + +.. data:: POLL_OK + + The data is available (or the file descriptor is ready for writing): there + is no need to block anymore. + +.. data:: POLL_READ + + Upon receiving this value, the callback should wait for the connection + file descriptor to be ready *for reading*. For example:: + + select.select([conn.fileno()], [], []) + +.. data:: POLL_WRITE + + Upon receiving this value, the callback should wait for the connection + file descriptor to be ready *for writing*. For example:: + + select.select([], [conn.fileno()], []) + + + Additional database types ------------------------- @@ -453,25 +489,56 @@ Python objects. All the typecasters are automatically registered, except `register_type()` in order to receive Unicode objects instead of strings from the database. See :ref:`unicode-handling` for details. +.. data:: BOOLEAN + DATE + DECIMAL + FLOAT + INTEGER + INTERVAL + LONGINTEGER + TIME + UNICODE + + Typecasters for basic types. Notice that a few other ones (`~psycopg2.BINARY`, + `~psycopg2.DATETIME`, `~psycopg2.NUMBER`, `~psycopg2.ROWID`, + `~psycopg2.STRING`) are exposed by the `psycopg2` module for |DBAPI|_ + compliance. + .. data:: BINARYARRAY - BOOLEAN BOOLEANARRAY - DATE DATEARRAY DATETIMEARRAY DECIMALARRAY - FLOAT FLOATARRAY - INTEGER INTEGERARRAY - INTERVAL INTERVALARRAY - LONGINTEGER LONGINTEGERARRAY ROWIDARRAY STRINGARRAY - TIME TIMEARRAY - UNICODE UNICODEARRAY + Typecasters to convert arrays of sql types into Python lists. + +.. data:: PYDATE + PYDATETIME + PYINTERVAL + PYTIME + + Typecasters to convert time-related data types to Python `!datetime` + objects. + +.. data:: MXDATE + MXDATETIME + MXINTERVAL + MXTIME + + Typecasters to convert time-related data types to `mx.DateTime`_ objects. + Only available if Psycopg was compiled with `!mx` support. + +.. versionchanged:: 2.2.0 + previously the `DECIMAL` typecaster and the specific time-related + typecasters (`!PY*` and `!MX*`) were not exposed by the `extensions` + module. In older versions they can be imported from the implementation + module `!psycopg2._psycopg`. + diff --git a/doc/src/faq.rst b/doc/src/faq.rst index b45d103..8015864 100644 --- a/doc/src/faq.rst +++ b/doc/src/faq.rst @@ -70,6 +70,20 @@ My database is Unicode, but I receive all the strings as UTF-8 `str`. Can I rece See :ref:`unicode-handling` for the gory details. +Psycopg converts :sql:`decimal`\/\ :sql:`numeric` database types into Python `!Decimal` objects. Can I have `!float` instead? + You can register the `~psycopg2.extensions.FLOAT` typecaster to be used in + place of `~psycopg2.extensions.DECIMAL`:: + + DEC2FLOAT = psycopg2.extensions.new_type( + psycopg2.extensions.DECIMAL.values, + 'DEC2FLOAT', + psycopg2.extensions.FLOAT) + psycopg2.extensions.register_type(DEC2FLOAT) + + See :ref:`type-casting-from-sql-to-python` to read the relevant + documentation. If you find `!psycopg2.extensions.DECIMAL` not avalable, use + `!psycopg2._psycopg.DECIMAL` instead. + I can't compile `!psycopg2`: the compiler says *error: Python.h: No such file or directory*. What am I missing? You need to install a Python development package: it is usually called ``python-dev``. diff --git a/doc/src/module.rst b/doc/src/module.rst index df06703..db7586e 100644 --- a/doc/src/module.rst +++ b/doc/src/module.rst @@ -16,7 +16,7 @@ The module interface respects the standard defined in the |DBAPI|_. single: Port; Connection single: DSN (Database Source Name) -.. function:: connect(dsn or params[, connection_factory]) +.. function:: connect(dsn or params [, connection_factory] [, async=0]) Create a new database session and return a new `connection` object. @@ -40,15 +40,18 @@ The module interface respects the standard defined in the |DBAPI|_. .. __: http://www.postgresql.org/docs/8.4/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS - Using the `connection_factory` parameter a different class or + Using the *connection_factory* parameter a different class or connections factory can be specified. It should be a callable object - taking a `dsn` argument. See :ref:`subclassing-connection` for + taking a *dsn* argument. See :ref:`subclassing-connection` for details. + Using *async*\=1 an asynchronous connection will be created: see + :ref:`async-support` to know about advantages and limitations. + .. extension:: - The `connection_factory` parameter is a Psycopg extension to the - |DBAPI|. + The parameters *connection_factory* and *async* are Psycopg extensions + to the |DBAPI|. .. data:: apilevel diff --git a/lib/extensions.py b/lib/extensions.py index edffee9..20f78e6 100644 --- a/lib/extensions.py +++ b/lib/extensions.py @@ -33,7 +33,7 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg. # License for more details. from _psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT -from _psycopg import TIME, DATE, INTERVAL +from _psycopg import TIME, DATE, INTERVAL, DECIMAL from _psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY from _psycopg import DECIMALARRAY, FLOATARRAY, INTEGERARRAY, INTERVALARRAY from _psycopg import LONGINTEGERARRAY, ROWIDARRAY, STRINGARRAY, TIMEARRAY @@ -41,11 +41,14 @@ from _psycopg import UNICODEARRAY from _psycopg import Binary, Boolean, Float, QuotedString, AsIs try: + from _psycopg import MXDATE, MXDATETIME, MXINTERVAL, MXTIME from _psycopg import DateFromMx, TimeFromMx, TimestampFromMx from _psycopg import IntervalFromMx except: pass + try: + from _psycopg import PYDATE, PYDATETIME, PYINTERVAL, PYTIME from _psycopg import DateFromPy, TimeFromPy, TimestampFromPy from _psycopg import IntervalFromPy except: diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 8e6758a..3fd08a4 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -757,8 +757,9 @@ connection_init(PyObject *obj, PyObject *args, PyObject *kwds) { const char *dsn; long int async = 0; + static char *kwlist[] = {"dsn", "async", NULL}; - if (!PyArg_ParseTuple(args, "s|l", &dsn, &async)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async)) return -1; return connection_setup((connectionObject *)obj, dsn, async); diff --git a/tests/test_async.py b/tests/test_async.py index 5aad8d3..da084bd 100644..100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -14,6 +14,21 @@ else: import py3tests as tests +class PollableStub(object): + """A 'pollable' wrapper allowing analysis of the `poll()` calls.""" + def __init__(self, pollable): + self.pollable = pollable + self.polls = [] + + def fileno(self): + return self.pollable.fileno() + + def poll(self): + rv = self.pollable.poll() + self.polls.append(rv) + return rv + + class AsyncTests(unittest.TestCase): def setUp(self): @@ -274,5 +289,35 @@ class AsyncTests(unittest.TestCase): # it should be the result of the second query self.assertEquals(cur.fetchone()[0], "b" * 10000) + def test_async_subclass(self): + class MyConn(psycopg2.extensions.connection): + def __init__(self, dsn, async=0): + psycopg2.extensions.connection.__init__(self, dsn, async=async) + + conn = psycopg2.connect(tests.dsn, connection_factory=MyConn, async=True) + self.assert_(isinstance(conn, MyConn)) + self.assert_(not conn.issync()) + conn.close() + + + def test_flush_on_write(self): + # a very large query requires a flush loop to be sent to the backend + curs = self.conn.cursor() + for mb in 1, 5, 10, 20, 50: + size = mb * 1024 * 1024 + print "\nplease wait: sending", mb, "MB query to the server", + stub = PollableStub(curs) + curs.execute("select %s;", ('x' * size,)) + self.wait(stub) + self.assertEqual(size, len(curs.fetchone()[0])) + if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1: + return + + self.fail("sending a large query didn't trigger block on write.") + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == "__main__": + unittest.main() + |