diff options
| -rw-r--r-- | psycopg/lobject.h | 21 | ||||
| -rw-r--r-- | psycopg/lobject_int.c | 182 | ||||
| -rw-r--r-- | psycopg/lobject_type.c | 6 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 2 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 2 | 
5 files changed, 145 insertions, 68 deletions
| diff --git a/psycopg/lobject.h b/psycopg/lobject.h index 4b84021..fabba2d 100644 --- a/psycopg/lobject.h +++ b/psycopg/lobject.h @@ -26,6 +26,7 @@  #include <libpq-fe.h>  #include <libpq/libpq-fs.h> +#include "psycopg/config.h"  #include "psycopg/connection.h"  #ifdef __cplusplus @@ -48,19 +49,19 @@ typedef struct {      Oid oid;                 /* the oid for this lobject */      int fd;                  /* the file descriptor for file-like ops */  } lobjectObject; -     +  /* functions exported from lobject_int.c */ -extern int lobject_open(lobjectObject *self, connectionObject *conn, +HIDDEN int lobject_open(lobjectObject *self, connectionObject *conn,                           Oid oid, int mode, Oid new_oid, char *new_file); -extern int lobject_unlink(lobjectObject *self); -extern int lobject_export(lobjectObject *self, char *filename); - -extern size_t lobject_read(lobjectObject *self, char *buf, size_t len); -extern size_t lobject_write(lobjectObject *self, char *buf, size_t len); -extern int lobject_seek(lobjectObject *self, int pos, int whence); -extern int lobject_tell(lobjectObject *self); -extern void lobject_close(lobjectObject *self); +HIDDEN int lobject_unlink(lobjectObject *self); +HIDDEN int lobject_export(lobjectObject *self, char *filename); + +HIDDEN Py_ssize_t lobject_read(lobjectObject *self, char *buf, size_t len); +HIDDEN Py_ssize_t lobject_write(lobjectObject *self, char *buf, size_t len); +HIDDEN int lobject_seek(lobjectObject *self, int pos, int whence); +HIDDEN int lobject_tell(lobjectObject *self); +HIDDEN int lobject_close(lobjectObject *self);  /* exception-raising macros */ diff --git a/psycopg/lobject_int.c b/psycopg/lobject_int.c index d4cc38f..386c945 100644 --- a/psycopg/lobject_int.c +++ b/psycopg/lobject_int.c @@ -31,16 +31,31 @@  #ifdef PSYCOPG_EXTENSIONS +static void +collect_error(connectionObject *conn, char **error) +{ +    const char *msg = PQerrorMessage(conn->pgconn); + +    if (msg) +        *error = strdup(msg); +} +  /* lobject_open - create a new/open an existing lo */  int  lobject_open(lobjectObject *self, connectionObject *conn,                Oid oid, int mode, Oid new_oid, char *new_file)  { +    int retvalue = -1; +    PGresult *pgres = NULL; +    char *error = NULL; +      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock)); -    pq_begin(self->conn); +    retvalue = pq_begin_locked(self->conn, &pgres, &error); +    if (retvalue < 0) +        goto end;      /* if the oid is InvalidOid we create a new lob before opening it         or we import a file from the FS, depending on the value of @@ -54,8 +69,12 @@ lobject_open(lobjectObject *self, connectionObject *conn,          Dprintf("lobject_open: large object created with oid = %d",                  self->oid); -        if (self->oid == InvalidOid) goto end; -         +        if (self->oid == InvalidOid) { +            collect_error(self->conn, &error); +            retvalue = -1; +            goto end; +        } +          mode = INV_WRITE;      }      else { @@ -69,114 +88,158 @@ lobject_open(lobjectObject *self, connectionObject *conn,          self->fd = lo_open(self->conn->pgconn, self->oid, mode);          Dprintf("lobject_open: large object opened with fd = %d",              self->fd); + +        if (self->fd == -1) { +            collect_error(self->conn, &error); +            retvalue = -1; +            goto end; +        }      }      else {          /* this is necessary to make sure no function that needs and  	   fd is called on unopened lobjects */          self->closed = 1;      } +    /* set the mode for future reference */ +    self->mode = mode; +    switch (mode) { +    case -1: +        self->smode = "n"; break; +    case INV_READ: +        self->smode = "r"; break; +    case INV_WRITE: +        self->smode = "w"; break; +    case INV_READ+INV_WRITE: +        self->smode = "rw"; break; +    } +    retvalue = 0;   end:      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS; -    /* here we check for errors before returning 0 */ -    if ((self->fd == -1 && mode != -1) || self->oid == InvalidOid) { -        pq_raise(conn, NULL, NULL, NULL); -        return -1; -    } -    else { -        /* set the mode for future reference and return */ -	self->mode = mode; -	switch (mode) { -            case -1: -                self->smode = "n"; break; -            case INV_READ: -                self->smode = "r"; break; -            case INV_WRITE: -                self->smode = "w"; break; -            case INV_READ+INV_WRITE: -                self->smode = "rw"; break; -	} -        return 0; -    } +    if (retvalue < 0) +        pq_complete_error(self->conn, &pgres, &error); +    return retvalue;  } -/* lobject_unlink - remove an lo from database */ +/* lobject_close - close an existing lo */ + +static int +lobject_close_locked(lobjectObject *self, char **error) +{ +    int retvalue; + +    if (self->conn->isolation_level == 0 || +        self->conn->mark != self->mark || +        self->fd == -1) +        return 0; + +    retvalue = lo_close(self->conn->pgconn, self->fd); +    self->fd = -1; +    if (retvalue < 0) +        collect_error(self->conn, error); + +    return retvalue; +}  int -lobject_unlink(lobjectObject *self) +lobject_close(lobjectObject *self)  { -    int res; +    char *error = NULL; +    int retvalue; -    /* first we make sure the lobject is closed and then we unlink */ -    lobject_close(self); -          Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock)); -    pq_begin(self->conn); - -    res = lo_unlink(self->conn->pgconn, self->oid); +    retvalue = lobject_close_locked(self, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS; -    if (res == -1) -        pq_raise(self->conn, NULL, NULL, NULL); -    return res; +    if (retvalue < 0) +        pq_complete_error(self->conn, NULL, &error); +    return retvalue;  } -/* lobject_close - close an existing lo */ +/* lobject_unlink - remove an lo from database */ -void -lobject_close(lobjectObject *self) +int +lobject_unlink(lobjectObject *self)  { -    if (self->conn->isolation_level > 0 -        && self->conn->mark == self->mark) { -        if (self->fd != -1) -            lo_close(self->conn->pgconn, self->fd); -    } +    PGresult *pgres = NULL; +    char *error = NULL; +    int retvalue = -1; + +    Py_BEGIN_ALLOW_THREADS; +    pthread_mutex_lock(&(self->conn->lock)); + +    retvalue = pq_begin_locked(self->conn, &pgres, &error); +    if (retvalue < 0) +        goto end; + +    /* first we make sure the lobject is closed and then we unlink */ +    retvalue = lobject_close_locked(self, &error); +    if (retvalue < 0) +        goto end; + +    retvalue = lo_unlink(self->conn->pgconn, self->oid); +    if (retvalue < 0) +        collect_error(self->conn, &error); + + end: +    pthread_mutex_unlock(&(self->conn->lock)); +    Py_END_ALLOW_THREADS; + +    if (retvalue < 0) +        pq_complete_error(self->conn, &pgres, &error); +    return retvalue;  }  /* lobject_write - write bytes to a lo */ -size_t +Py_ssize_t  lobject_write(lobjectObject *self, char *buf, size_t len)  { -    size_t written; +    Py_ssize_t written; +    char *error = NULL;      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock));      written = lo_write(self->conn->pgconn, self->fd, buf, len); +    if (written < 0) +        collect_error(self->conn, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS;      if (written < 0) -        pq_raise(self->conn, NULL, NULL, NULL); +        pq_complete_error(self->conn, NULL, &error);      return written;  }  /* lobject_read - read bytes from a lo */ -size_t +Py_ssize_t  lobject_read(lobjectObject *self, char *buf, size_t len)  { -    size_t readed; +    Py_ssize_t n_read; +    char *error = NULL;      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock)); -    readed = lo_read(self->conn->pgconn, self->fd, buf, len); +    n_read = lo_read(self->conn->pgconn, self->fd, buf, len); +    if (n_read < 0) +        collect_error(self->conn, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS; -    if (readed < 0) -        pq_raise(self->conn, NULL, NULL, NULL); -    return readed; +    if (n_read < 0) +        pq_complete_error(self->conn, NULL, &error); +    return n_read;  }  /* lobject_seek - move the current position in the lo */ @@ -185,17 +248,20 @@ int  lobject_seek(lobjectObject *self, int pos, int whence)  {      int where; +    char *error = NULL;      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock));      where = lo_lseek(self->conn->pgconn, self->fd, pos, whence); +    if (where < 0) +        collect_error(self->conn, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS;      if (where < 0) -        pq_raise(self->conn, NULL, NULL, NULL); +        pq_complete_error(self->conn, NULL, &error);      return where;  } @@ -205,17 +271,20 @@ int  lobject_tell(lobjectObject *self)  {      int where; +    char *error = NULL;      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock));      where = lo_tell(self->conn->pgconn, self->fd); +    if (where < 0) +        collect_error(self->conn, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS;      if (where < 0) -        pq_raise(self->conn, NULL, NULL, NULL); +        pq_complete_error(self->conn, NULL, &error);      return where;  } @@ -225,17 +294,20 @@ int  lobject_export(lobjectObject *self, char *filename)  {      int res; +    char *error = NULL;      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&(self->conn->lock));      res = lo_export(self->conn->pgconn, self->oid, filename); +    if (res < 0) +        collect_error(self->conn, &error);      pthread_mutex_unlock(&(self->conn->lock));      Py_END_ALLOW_THREADS;      if (res < 0) -        pq_raise(self->conn, NULL, NULL, NULL); +        pq_complete_error(self->conn, NULL, &error);      return res;  } diff --git a/psycopg/lobject_type.c b/psycopg/lobject_type.c index 8d299e9..b559a58 100644 --- a/psycopg/lobject_type.c +++ b/psycopg/lobject_type.c @@ -57,7 +57,8 @@ psyco_lobj_close(lobjectObject *self, PyObject *args)  	&& self->conn->mark == self->mark)      {          self->closed = 1; -        lobject_close(self); +        if (lobject_close(self) < 0) +            return NULL;          Dprintf("psyco_lobj_close: lobject at %p closed", self);      } @@ -276,7 +277,8 @@ lobject_dealloc(PyObject* obj)  {      lobjectObject *self = (lobjectObject *)obj; -    lobject_close(self); +    if (lobject_close(self) < 0) +        PyErr_Print();      Py_XDECREF((PyObject*)self->conn);      Dprintf("lobject_dealloc: deleted lobject object at %p, refcnt = %d", diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 16703b3..afab3b1 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -368,7 +368,7 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)     On error, -1 is returned, and the pgres argument will hold the     relevant result structure.   */ -static int +int  pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)  {      const char *query[] = { diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index ee14fab..8503c12 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -33,6 +33,8 @@  /* exported functions */  HIDDEN int pq_fetch(cursorObject *curs);  HIDDEN int pq_execute(cursorObject *curs, const char *query, int async); +HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres, +                           char **error);  HIDDEN int pq_commit(connectionObject *conn);  HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,                             char **error); | 
