summaryrefslogtreecommitdiff
path: root/storage/bdb/qam/qam.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/bdb/qam/qam.c')
-rw-r--r--storage/bdb/qam/qam.c1615
1 files changed, 1615 insertions, 0 deletions
diff --git a/storage/bdb/qam/qam.c b/storage/bdb/qam/qam.c
new file mode 100644
index 00000000000..b10f8743439
--- /dev/null
+++ b/storage/bdb/qam/qam.c
@@ -0,0 +1,1615 @@
+/*-
+ * See the file LICENSE for redistribution information.
+ *
+ * Copyright (c) 1999-2002
+ * Sleepycat Software. All rights reserved.
+ */
+
+#include "db_config.h"
+
+#ifndef lint
+static const char revid[] = "$Id: qam.c,v 11.134 2002/08/13 20:46:08 ubell Exp $";
+#endif /* not lint */
+
+#ifndef NO_SYSTEM_INCLUDES
+#include <sys/types.h>
+
+#include <string.h>
+#endif
+
+#include "db_int.h"
+#include "dbinc/db_page.h"
+#include "dbinc/db_shash.h"
+#include "dbinc/btree.h"
+#include "dbinc/lock.h"
+#include "dbinc/log.h"
+#include "dbinc/qam.h"
+
+static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
+static int __qam_c_close __P((DBC *, db_pgno_t, int *));
+static int __qam_c_del __P((DBC *));
+static int __qam_c_destroy __P((DBC *));
+static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
+static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
+static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
+static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
+
+/*
+ * __qam_position --
+ * Position a queued access method cursor at a record. This returns
+ * the page locked. *exactp will be set if the record is valid.
+ * PUBLIC: int __qam_position
+ * PUBLIC: __P((DBC *, db_recno_t *, qam_position_mode, int *));
+ */
+int
+__qam_position(dbc, recnop, mode, exactp)
+ DBC *dbc; /* open cursor */
+ db_recno_t *recnop; /* pointer to recno to find */
+ qam_position_mode mode;/* locking: read or write */
+ int *exactp; /* indicate if it was found */
+{
+ QUEUE_CURSOR *cp;
+ DB *dbp;
+ QAMDATA *qp;
+ db_pgno_t pg;
+ int ret;
+
+ dbp = dbc->dbp;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ /* Fetch the page for this recno. */
+ pg = QAM_RECNO_PAGE(dbp, *recnop);
+
+ if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
+ DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
+ return (ret);
+ cp->page = NULL;
+ *exactp = 0;
+ if ((ret = __qam_fget(dbp, &pg,
+ mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
+ /* We did not fetch it, we can release the lock. */
+ (void)__LPUT(dbc, cp->lock);
+ if (mode != QAM_WRITE &&
+ (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
+ return (0);
+ return (ret);
+ }
+ cp->pgno = pg;
+ cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
+
+ if (PGNO(cp->page) == 0) {
+ if (F_ISSET(dbp, DB_AM_RDONLY)) {
+ *exactp = 0;
+ return (0);
+ }
+ PGNO(cp->page) = pg;
+ TYPE(cp->page) = P_QAMDATA;
+ }
+
+ qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
+ *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
+
+ return (ret);
+}
+
+/*
+ * __qam_pitem --
+ * Put an item on a queue page. Copy the data to the page and set the
+ * VALID and SET bits. If logging and the record was previously set,
+ * log that data, otherwise just log the new data.
+ *
+ * pagep must be write locked
+ *
+ * PUBLIC: int __qam_pitem
+ * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *));
+ */
+int
+__qam_pitem(dbc, pagep, indx, recno, data)
+ DBC *dbc;
+ QPAGE *pagep;
+ u_int32_t indx;
+ db_recno_t recno;
+ DBT *data;
+{
+ DB *dbp;
+ DBT olddata, pdata, *datap;
+ QAMDATA *qp;
+ QUEUE *t;
+ u_int32_t alloced;
+ u_int8_t *dest, *p;
+ int ret;
+
+ alloced = ret = 0;
+
+ dbp = dbc->dbp;
+ t = (QUEUE *)dbp->q_internal;
+
+ if (data->size > t->re_len)
+ goto len_err;
+
+ qp = QAM_GET_RECORD(dbp, pagep, indx);
+
+ p = qp->data;
+ datap = data;
+ if (F_ISSET(data, DB_DBT_PARTIAL)) {
+ if (data->doff + data->dlen > t->re_len) {
+ alloced = data->dlen;
+ goto len_err;
+ }
+ if (data->size != data->dlen) {
+len_err: __db_err(dbp->dbenv,
+ "Length improper for fixed length record %lu",
+ (u_long)(alloced ? alloced : data->size));
+ return (EINVAL);
+ }
+ if (data->size == t->re_len)
+ goto no_partial;
+
+ /*
+ * If we are logging, then we have to build the record
+ * first, otherwise, we can simply drop the change
+ * directly on the page. After this clause, make
+ * sure that datap and p are set up correctly so that
+ * copying datap into p does the right thing.
+ *
+ * Note, I am changing this so that if the existing
+ * record is not valid, we create a complete record
+ * to log so that both this and the recovery code is simpler.
+ */
+
+ if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
+ datap = &pdata;
+ memset(datap, 0, sizeof(*datap));
+
+ if ((ret = __os_malloc(dbp->dbenv,
+ t->re_len, &datap->data)) != 0)
+ return (ret);
+ alloced = 1;
+ datap->size = t->re_len;
+
+ /*
+ * Construct the record if it's valid, otherwise set it
+ * all to the pad character.
+ */
+ dest = datap->data;
+ if (F_ISSET(qp, QAM_VALID))
+ memcpy(dest, p, t->re_len);
+ else
+ memset(dest, t->re_pad, t->re_len);
+
+ dest += data->doff;
+ memcpy(dest, data->data, data->size);
+ } else {
+ datap = data;
+ p += data->doff;
+ }
+ }
+
+no_partial:
+ if (DBC_LOGGING(dbc)) {
+ olddata.size = 0;
+ if (F_ISSET(qp, QAM_SET)) {
+ olddata.data = qp->data;
+ olddata.size = t->re_len;
+ }
+ if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
+ 0, &LSN(pagep), pagep->pgno,
+ indx, recno, datap, qp->flags,
+ olddata.size == 0 ? NULL : &olddata)) != 0)
+ goto err;
+ }
+
+ F_SET(qp, QAM_VALID | QAM_SET);
+ memcpy(p, datap->data, datap->size);
+ if (!F_ISSET(data, DB_DBT_PARTIAL))
+ memset(p + datap->size, t->re_pad, t->re_len - datap->size);
+
+err: if (alloced)
+ __os_free(dbp->dbenv, datap->data);
+
+ return (ret);
+}
+/*
+ * __qam_c_put
+ * Cursor put for queued access method.
+ * BEFORE and AFTER cannot be specified.
+ */
+static int
+__qam_c_put(dbc, key, data, flags, pgnop)
+ DBC *dbc;
+ DBT *key, *data;
+ u_int32_t flags;
+ db_pgno_t *pgnop;
+{
+ DB *dbp;
+ DB_LOCK lock;
+ DB_MPOOLFILE *mpf;
+ QMETA *meta;
+ QUEUE_CURSOR *cp;
+ db_pgno_t pg;
+ db_recno_t new_cur, new_first;
+ u_int32_t opcode;
+ int exact, ret, t_ret;
+
+ dbp = dbc->dbp;
+ mpf = dbp->mpf;
+ if (pgnop != NULL)
+ *pgnop = PGNO_INVALID;
+
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ switch (flags) {
+ case DB_KEYFIRST:
+ case DB_KEYLAST:
+ if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
+ return (ret);
+ /* FALLTHROUGH */
+ case DB_CURRENT:
+ break;
+ default:
+ /* The interface shouldn't let anything else through. */
+ DB_ASSERT(0);
+ return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
+ }
+
+ /* Write lock the record. */
+ if ((ret = __db_lget(dbc,
+ 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
+ return (ret);
+
+ if ((ret = __qam_position(dbc,
+ &cp->recno, QAM_WRITE, &exact)) != 0) {
+ /* We could not get the page, we can release the record lock. */
+ __LPUT(dbc, lock);
+ return (ret);
+ }
+
+ /* Put the item on the page. */
+ ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
+
+ /* Doing record locking, release the page lock */
+ if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
+ ret = t_ret;
+ if ((t_ret = __qam_fput(
+ dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
+ ret = t_ret;
+ cp->page = NULL;
+ cp->lock = lock;
+ cp->lock_mode = DB_LOCK_WRITE;
+ if (ret != 0)
+ return (ret);
+
+ /* We may need to reset the head or tail of the queue. */
+ pg = ((QUEUE *)dbp->q_internal)->q_meta;
+
+ /*
+ * Get the meta page first, we don't want to write lock it while
+ * trying to pin it.
+ */
+ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
+ return (ret);
+ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
+ (void)mpf->put(mpf, meta, 0);
+ return (ret);
+ }
+
+ opcode = 0;
+ new_cur = new_first = 0;
+
+ /*
+ * If the put address is outside the queue, adjust the head and
+ * tail of the queue. If the order is inverted we move
+ * the one which is closer. The first case is when the
+ * queue is empty, move first and current to where the new
+ * insert is.
+ */
+
+ if (meta->first_recno == meta->cur_recno) {
+ new_first = cp->recno;
+ new_cur = cp->recno + 1;
+ if (new_cur == RECNO_OOB)
+ new_cur++;
+ opcode |= QAM_SETFIRST;
+ opcode |= QAM_SETCUR;
+ } else {
+ if (QAM_BEFORE_FIRST(meta, cp->recno) &&
+ (meta->first_recno <= meta->cur_recno ||
+ meta->first_recno - cp->recno <
+ cp->recno - meta->cur_recno)) {
+ new_first = cp->recno;
+ opcode |= QAM_SETFIRST;
+ }
+
+ if (meta->cur_recno == cp->recno ||
+ (QAM_AFTER_CURRENT(meta, cp->recno) &&
+ (meta->first_recno <= meta->cur_recno ||
+ cp->recno - meta->cur_recno <=
+ meta->first_recno - cp->recno))) {
+ new_cur = cp->recno + 1;
+ if (new_cur == RECNO_OOB)
+ new_cur++;
+ opcode |= QAM_SETCUR;
+ }
+ }
+
+ if (opcode != 0 && DBC_LOGGING(dbc)) {
+ ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn,
+ 0, opcode, meta->first_recno, new_first,
+ meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD);
+ if (ret != 0)
+ opcode = 0;
+ }
+
+ if (opcode & QAM_SETCUR)
+ meta->cur_recno = new_cur;
+ if (opcode & QAM_SETFIRST)
+ meta->first_recno = new_first;
+
+ if ((t_ret = mpf->put(
+ mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
+ ret = t_ret;
+
+ /* Don't hold the meta page long term. */
+ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
+ ret = t_ret;
+ return (ret);
+}
+
+/*
+ * __qam_append --
+ * Perform a put(DB_APPEND) in queue.
+ *
+ * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
+ */
+int
+__qam_append(dbc, key, data)
+ DBC *dbc;
+ DBT *key, *data;
+{
+ DB *dbp;
+ DB_LOCK lock;
+ DB_MPOOLFILE *mpf;
+ QMETA *meta;
+ QPAGE *page;
+ QUEUE *qp;
+ QUEUE_CURSOR *cp;
+ db_pgno_t pg;
+ db_recno_t recno;
+ int ret, t_ret;
+
+ dbp = dbc->dbp;
+ mpf = dbp->mpf;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ pg = ((QUEUE *)dbp->q_internal)->q_meta;
+ /*
+ * Get the meta page first, we don't want to write lock it while
+ * trying to pin it.
+ */
+ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
+ return (ret);
+ /* Write lock the meta page. */
+ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
+ (void)mpf->put(mpf, meta, 0);
+ return (ret);
+ }
+
+ /* Get the next record number. */
+ recno = meta->cur_recno;
+ meta->cur_recno++;
+ if (meta->cur_recno == RECNO_OOB)
+ meta->cur_recno++;
+ if (meta->cur_recno == meta->first_recno) {
+ meta->cur_recno--;
+ if (meta->cur_recno == RECNO_OOB)
+ meta->cur_recno--;
+ (void)__LPUT(dbc, lock);
+ ret = EFBIG;
+ goto err;
+ }
+
+ if (QAM_BEFORE_FIRST(meta, recno))
+ meta->first_recno = recno;
+
+ /* Lock the record and release meta page lock. */
+ if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
+ recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
+ (void)__LPUT(dbc, lock);
+ goto err;
+ }
+
+ /*
+ * The application may modify the data based on the selected record
+ * number.
+ */
+ if (dbc->dbp->db_append_recno != NULL &&
+ (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
+ (void)__LPUT(dbc, lock);
+ goto err;
+ }
+
+ cp->lock = lock;
+ cp->lock_mode = DB_LOCK_WRITE;
+
+ pg = QAM_RECNO_PAGE(dbp, recno);
+
+ /* Fetch and write lock the data page. */
+ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
+ goto err;
+ if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
+ /* We did not fetch it, we can release the lock. */
+ (void)__LPUT(dbc, lock);
+ goto err;
+ }
+
+ /* See if this is a new page. */
+ if (page->pgno == 0) {
+ page->pgno = pg;
+ page->type = P_QAMDATA;
+ }
+
+ /* Put the item on the page and log it. */
+ ret = __qam_pitem(dbc, page,
+ QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
+
+ /* Doing record locking, release the page lock */
+ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if ((t_ret
+ = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
+ ret = t_ret;
+
+ /* Return the record number to the user. */
+ if (ret == 0)
+ ret = __db_retcopy(dbp->dbenv, key,
+ &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
+
+ /* Position the cursor on this record. */
+ cp->recno = recno;
+
+ /* See if we are leaving the extent. */
+ qp = (QUEUE *) dbp->q_internal;
+ if (qp->page_ext != 0 &&
+ (recno % (qp->page_ext * qp->rec_page) == 0 ||
+ recno == UINT32_T_MAX)) {
+ if ((ret = __db_lget(dbc,
+ 0, ((QUEUE *)dbp->q_internal)->q_meta,
+ DB_LOCK_WRITE, 0, &lock)) != 0)
+ goto err;
+ if (!QAM_AFTER_CURRENT(meta, recno))
+ ret = __qam_fclose(dbp, pg);
+ (void)__LPUT(dbc, lock);
+ }
+
+err:
+ /* Release the meta page. */
+ if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
+ ret = t_ret;
+
+ return (ret);
+}
+
+/*
+ * __qam_c_del --
+ * Qam cursor->am_del function
+ */
+static int
+__qam_c_del(dbc)
+ DBC *dbc;
+{
+ DB *dbp;
+ DBT data;
+ DB_LOCK lock;
+ DB_MPOOLFILE *mpf;
+ PAGE *pagep;
+ QAMDATA *qp;
+ QMETA *meta;
+ QUEUE_CURSOR *cp;
+ db_pgno_t pg;
+ db_recno_t first;
+ int exact, ret, t_ret;
+
+ dbp = dbc->dbp;
+ mpf = dbp->mpf;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ pg = ((QUEUE *)dbp->q_internal)->q_meta;
+ /*
+ * Get the meta page first, we don't want to write lock it while
+ * trying to pin it.
+ */
+ if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
+ return (ret);
+ /* Write lock the meta page. */
+ if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) {
+ (void)mpf->put(mpf, meta, 0);
+ return (ret);
+ }
+
+ if (QAM_NOT_VALID(meta, cp->recno))
+ ret = DB_NOTFOUND;
+
+ first = meta->first_recno;
+
+ /* Don't hold the meta page long term. */
+ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if (ret != 0)
+ goto err1;
+
+ if ((ret = __db_lget(dbc,
+ 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
+ goto err1;
+
+ cp->lock_mode = DB_LOCK_WRITE;
+ /* Find the record ; delete only deletes exact matches. */
+ if ((ret = __qam_position(dbc,
+ &cp->recno, QAM_WRITE, &exact)) != 0) {
+ cp->lock = lock;
+ goto err1;
+ }
+ if (!exact) {
+ ret = DB_NOTFOUND;
+ goto err1;
+ }
+
+ pagep = cp->page;
+ qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
+
+ if (DBC_LOGGING(dbc)) {
+ if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
+ ((QUEUE *)dbp->q_internal)->re_len == 0) {
+ if ((ret = __qam_del_log(dbp,
+ dbc->txn, &LSN(pagep), 0, &LSN(pagep),
+ pagep->pgno, cp->indx, cp->recno)) != 0)
+ goto err1;
+ } else {
+ data.size = ((QUEUE *)dbp->q_internal)->re_len;
+ data.data = qp->data;
+ if ((ret = __qam_delext_log(dbp,
+ dbc->txn, &LSN(pagep), 0, &LSN(pagep),
+ pagep->pgno, cp->indx, cp->recno, &data)) != 0)
+ goto err1;
+ }
+ }
+
+ F_CLR(qp, QAM_VALID);
+
+ if (cp->recno == first) {
+ pg = ((QUEUE *)dbp->q_internal)->q_meta;
+ if ((ret =
+ __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
+ goto err1;
+ ret = __qam_consume(dbc, meta, first);
+ if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
+ ret = t_ret;
+ }
+
+err1:
+ if ((t_ret = mpf->put(mpf, meta, 0)) != 0 && ret == 0)
+ ret = t_ret;
+ if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno,
+ cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
+ ret = t_ret;
+ cp->page = NULL;
+
+ /* Doing record locking, release the page lock */
+ if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
+ ret = t_ret;
+ cp->lock = lock;
+
+ return (ret);
+}
+
+#ifdef DEBUG_WOP
+#define QDEBUG
+#endif
+
+/*
+ * __qam_c_get --
+ * Queue cursor->c_get function.
+ */
+static int
+__qam_c_get(dbc, key, data, flags, pgnop)
+ DBC *dbc;
+ DBT *key, *data;
+ u_int32_t flags;
+ db_pgno_t *pgnop;
+{
+ DB *dbp;
+ DBC *dbcdup;
+ DBT tmp;
+ DB_ENV *dbenv;
+ DB_LOCK lock, pglock, metalock;
+ DB_MPOOLFILE *mpf;
+ PAGE *pg;
+ QAMDATA *qp;
+ QMETA *meta;
+ QUEUE *t;
+ QUEUE_CURSOR *cp;
+ db_lockmode_t lock_mode;
+ db_pgno_t metapno;
+ db_recno_t first;
+ qam_position_mode mode;
+ int exact, is_first, locked, ret, t_ret, wait, with_delete;
+ int put_mode, meta_dirty, retrying;
+
+ dbp = dbc->dbp;
+ dbenv = dbp->dbenv;
+ mpf = dbp->mpf;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ PANIC_CHECK(dbenv);
+
+ wait = 0;
+ with_delete = 0;
+ retrying = 0;
+ lock_mode = DB_LOCK_READ;
+ put_mode = 0;
+ t_ret = 0;
+ *pgnop = 0;
+ pg = NULL;
+
+ mode = QAM_READ;
+ if (F_ISSET(dbc, DBC_RMW)) {
+ lock_mode = DB_LOCK_WRITE;
+ mode = QAM_WRITE;
+ }
+
+ if (flags == DB_CONSUME_WAIT) {
+ wait = 1;
+ flags = DB_CONSUME;
+ }
+ if (flags == DB_CONSUME) {
+ if ((ret = __db_check_txn(dbp, dbc->txn, dbc->locker, 0)) != 0)
+ return (ret);
+
+ with_delete = 1;
+ flags = DB_FIRST;
+ lock_mode = DB_LOCK_WRITE;
+ mode = QAM_CONSUME;
+ }
+
+ DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
+ flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
+
+ /* Make lint and friends happy. */
+ meta_dirty = 0;
+ locked = 0;
+
+ is_first = 0;
+
+ t = (QUEUE *)dbp->q_internal;
+ metapno = t->q_meta;
+
+ /*
+ * Get the meta page first, we don't want to write lock it while
+ * trying to pin it. This is because someone my have it pinned
+ * but not locked.
+ */
+ if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0)
+ return (ret);
+ if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
+ goto err;
+ locked = 1;
+
+ first = 0;
+
+ /* Release any previous lock if not in a transaction. */
+ (void)__TLPUT(dbc, cp->lock);
+
+retry: /* Update the record number. */
+ switch (flags) {
+ case DB_CURRENT:
+ break;
+ case DB_NEXT_DUP:
+ ret = DB_NOTFOUND;
+ goto err;
+ /* NOTREACHED */
+ case DB_NEXT:
+ case DB_NEXT_NODUP:
+ if (cp->recno != RECNO_OOB) {
+ ++cp->recno;
+ /* Wrap around, skipping zero. */
+ if (cp->recno == RECNO_OOB)
+ cp->recno++;
+ break;
+ }
+ /* FALLTHROUGH */
+ case DB_FIRST:
+ flags = DB_NEXT;
+ is_first = 1;
+
+ /* get the first record number */
+ cp->recno = first = meta->first_recno;
+
+ break;
+ case DB_PREV:
+ case DB_PREV_NODUP:
+ if (cp->recno != RECNO_OOB) {
+ if (QAM_BEFORE_FIRST(meta, cp->recno) ||
+ cp->recno == meta->first_recno) {
+ ret = DB_NOTFOUND;
+ goto err;
+ }
+ --cp->recno;
+ /* Wrap around, skipping zero. */
+ if (cp->recno == RECNO_OOB)
+ --cp->recno;
+ break;
+ }
+ /* FALLTHROUGH */
+ case DB_LAST:
+ if (meta->first_recno == meta->cur_recno) {
+ ret = DB_NOTFOUND;
+ goto err;
+ }
+ cp->recno = meta->cur_recno - 1;
+ if (cp->recno == RECNO_OOB)
+ cp->recno--;
+ break;
+ case DB_SET:
+ case DB_SET_RANGE:
+ case DB_GET_BOTH:
+ case DB_GET_BOTH_RANGE:
+ if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
+ goto err;
+ break;
+ default:
+ ret = __db_unknown_flag(dbenv, "__qam_c_get", flags);
+ goto err;
+ }
+
+ /*
+ * Check to see if we are out of data. Current points to
+ * the first free slot.
+ */
+ if (cp->recno == meta->cur_recno ||
+ QAM_AFTER_CURRENT(meta, cp->recno)) {
+ ret = DB_NOTFOUND;
+ pg = NULL;
+ if (wait) {
+ flags = DB_FIRST;
+ /*
+ * If first is not set, then we skipped a
+ * locked record, go back and find it.
+ * If we find a locked record again
+ * wait for it.
+ */
+ if (first == 0) {
+ retrying = 1;
+ goto retry;
+ }
+ if (CDB_LOCKING(dbenv)) {
+ if ((ret = dbenv->lock_get(
+ dbenv, dbc->locker,
+ DB_LOCK_SWITCH, &dbc->lock_dbt,
+ DB_LOCK_WAIT, &dbc->mylock)) != 0)
+ goto err;
+ if ((ret = dbenv->lock_get(
+ dbenv, dbc->locker,
+ DB_LOCK_UPGRADE, &dbc->lock_dbt,
+ DB_LOCK_WRITE, &dbc->mylock)) != 0)
+ goto err;
+ goto retry;
+ }
+ /*
+ * Wait for someone to update the meta page.
+ * This will probably mean there is something
+ * in the queue. We then go back up and
+ * try again.
+ */
+ if (locked == 0) {
+ if ((ret = __db_lget( dbc,
+ 0, metapno, lock_mode, 0, &metalock)) != 0)
+ goto err;
+ locked = 1;
+ if (cp->recno != RECNO_OOB &&
+ !QAM_AFTER_CURRENT(meta, cp->recno))
+ goto retry;
+ }
+ if ((ret = __db_lget(dbc, 0, metapno,
+ DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
+ goto err;
+ if ((ret = dbenv->lock_get(dbenv, dbc->locker,
+ DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
+ &metalock)) != 0)
+ goto err;
+ locked = 1;
+ goto retry;
+ }
+
+ goto err;
+ }
+
+ /* Don't hold the meta page long term. */
+ if (locked) {
+ if ((ret = __LPUT(dbc, metalock)) != 0)
+ goto err;
+ locked = 0;
+ }
+
+ /* Lock the record. */
+ if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
+ (with_delete && !retrying) ?
+ DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
+ &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
+#ifdef QDEBUG
+ __db_logmsg(dbenv,
+ dbc->txn, "Queue S", 0, "%x %d %d %d",
+ dbc->locker, cp->recno, first, meta->first_recno);
+#endif
+ first = 0;
+ if ((ret =
+ __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
+ goto err;
+ locked = 1;
+ goto retry;
+ }
+
+ if (ret != 0)
+ goto err;
+
+ /*
+ * In the DB_FIRST or DB_LAST cases we must wait and then start over
+ * since the first/last may have moved while we slept.
+ * We release our locks and try again.
+ */
+ if ((!with_delete && is_first) || flags == DB_LAST) {
+ if ((ret =
+ __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
+ goto err;
+ if (cp->recno !=
+ (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
+ __LPUT(dbc, lock);
+ if (is_first)
+ flags = DB_FIRST;
+ locked = 1;
+ goto retry;
+ }
+ /* Don't hold the meta page long term. */
+ if ((ret = __LPUT(dbc, metalock)) != 0)
+ goto err;
+ }
+
+ /* Position the cursor on the record. */
+ if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
+ /* We cannot get the page, release the record lock. */
+ (void)__LPUT(dbc, lock);
+ goto err;
+ }
+
+ pg = cp->page;
+ pglock = cp->lock;
+ cp->lock = lock;
+ cp->lock_mode = lock_mode;
+
+ if (!exact) {
+ if (flags == DB_NEXT || flags == DB_NEXT_NODUP ||
+ flags == DB_PREV || flags == DB_PREV_NODUP ||
+ flags == DB_LAST) {
+ /* Release locks and try again. */
+ if (pg != NULL)
+ (void)__qam_fput(dbp, cp->pgno, pg, 0);
+ cp->page = pg = NULL;
+ (void)__LPUT(dbc, pglock);
+ (void)__LPUT(dbc, cp->lock);
+ if (flags == DB_LAST)
+ flags = DB_PREV;
+ if (!with_delete)
+ is_first = 0;
+ retrying = 0;
+ goto retry;
+ }
+ /* this is for the SET and SET_RANGE cases */
+ ret = DB_KEYEMPTY;
+ goto err1;
+ }
+
+ /* Return the key if the user didn't give us one. */
+ if (key != NULL) {
+ if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE &&
+ flags != DB_SET && flags != DB_SET_RANGE &&
+ (ret = __db_retcopy(dbp->dbenv,
+ key, &cp->recno, sizeof(cp->recno),
+ &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
+ goto err1;
+ F_SET(key, DB_DBT_ISSET);
+ }
+
+ qp = QAM_GET_RECORD(dbp, pg, cp->indx);
+
+ /* Return the data item. */
+ if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
+ /*
+ * Need to compare
+ */
+ tmp.data = qp->data;
+ tmp.size = t->re_len;
+ if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
+ ret = DB_NOTFOUND;
+ goto err1;
+ }
+ }
+ if (data != NULL &&
+ !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
+ (ret = __db_retcopy(dbp->dbenv, data,
+ qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
+ goto err1;
+
+ if (data != NULL)
+ F_SET(data, DB_DBT_ISSET);
+
+ /* Finally, if we are doing DB_CONSUME mark the record. */
+ if (with_delete) {
+ /*
+ * Assert that we're not a secondary index. Doing a DB_CONSUME
+ * on a secondary makes very little sense, since one can't
+ * DB_APPEND there; attempting one should be forbidden by
+ * the interface.
+ */
+ DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
+
+ /*
+ * Check and see if we *have* any secondary indices.
+ * If we do, we're a primary, so call __db_c_del_primary
+ * to delete the references to the item we're about to
+ * delete.
+ *
+ * Note that we work on a duplicated cursor, since the
+ * __db_ret work has already been done, so it's not safe
+ * to perform any additional ops on this cursor.
+ */
+ if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
+ if ((ret = __db_c_idup(dbc,
+ &dbcdup, DB_POSITIONI)) != 0)
+ goto err1;
+
+ if ((ret = __db_c_del_primary(dbcdup)) != 0) {
+ /*
+ * The __db_c_del_primary return is more
+ * interesting.
+ */
+ (void)dbcdup->c_close(dbcdup);
+ goto err1;
+ }
+
+ if ((ret = dbcdup->c_close(dbcdup)) != 0)
+ goto err1;
+ }
+
+ if (DBC_LOGGING(dbc)) {
+ if (t->page_ext == 0 || t->re_len == 0) {
+ if ((ret = __qam_del_log(dbp, dbc->txn,
+ &LSN(pg), 0, &LSN(pg),
+ pg->pgno, cp->indx, cp->recno)) != 0)
+ goto err1;
+ } else {
+ tmp.data = qp->data;
+ tmp.size = t->re_len;
+ if ((ret = __qam_delext_log(dbp,
+ dbc->txn, &LSN(pg), 0, &LSN(pg),
+ pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
+ goto err1;
+ }
+ }
+
+ F_CLR(qp, QAM_VALID);
+ put_mode = DB_MPOOL_DIRTY;
+
+ if ((ret = __LPUT(dbc, pglock)) != 0)
+ goto err1;
+
+ /*
+ * Now we need to update the metapage
+ * first pointer. If we have deleted
+ * the record that is pointed to by
+ * first_recno then we move it as far
+ * forward as we can without blocking.
+ * The metapage lock must be held for
+ * the whole scan otherwise someone could
+ * do a random insert behind where we are
+ * looking.
+ */
+
+ if (locked == 0 && (ret = __db_lget(
+ dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
+ goto err1;
+ locked = 1;
+
+#ifdef QDEBUG
+ __db_logmsg(dbenv,
+ dbc->txn, "Queue D", 0, "%x %d %d %d",
+ dbc->locker, cp->recno, first, meta->first_recno);
+#endif
+ /*
+ * See if we deleted the "first" record. If
+ * first is zero then we skipped something,
+ * see if first_recno has been move passed
+ * that to the record that we deleted.
+ */
+ if (first == 0)
+ first = cp->recno;
+ if (first != meta->first_recno)
+ goto done;
+
+ if ((ret = __qam_consume(dbc, meta, first)) != 0)
+ goto err1;
+ }
+
+done:
+err1: if (cp->page != NULL) {
+ t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
+
+ if (!ret)
+ ret = t_ret;
+ /* Doing record locking, release the page lock */
+ t_ret = __LPUT(dbc, pglock);
+ cp->page = NULL;
+ }
+
+err: if (!ret)
+ ret = t_ret;
+ if (meta) {
+
+ /* release the meta page */
+ t_ret = mpf->put(mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
+
+ if (!ret)
+ ret = t_ret;
+
+ /* Don't hold the meta page long term. */
+ if (locked)
+ t_ret = __LPUT(dbc, metalock);
+ }
+ DB_ASSERT(!LOCK_ISSET(metalock));
+
+ /*
+ * There is no need to keep the record locked if we are
+ * not in a transaction.
+ */
+ if (t_ret == 0)
+ t_ret = __TLPUT(dbc, cp->lock);
+
+ return (ret ? ret : t_ret);
+}
+
+/*
+ * __qam_consume -- try to reset the head of the queue.
+ *
+ */
+
+static int
+__qam_consume(dbc, meta, first)
+ DBC *dbc;
+ QMETA *meta;
+ db_recno_t first;
+{
+ DB *dbp;
+ DB_LOCK lock, save_lock;
+ DB_MPOOLFILE *mpf;
+ QUEUE_CURSOR *cp;
+ db_indx_t save_indx;
+ db_pgno_t save_page;
+ db_recno_t current, save_recno;
+ u_int32_t rec_extent;
+ int exact, put_mode, ret, t_ret, wrapped;
+
+ dbp = dbc->dbp;
+ mpf = dbp->mpf;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+ put_mode = DB_MPOOL_DIRTY;
+ ret = t_ret = 0;
+
+ save_page = cp->pgno;
+ save_indx = cp->indx;
+ save_recno = cp->recno;
+ save_lock = cp->lock;
+
+ /*
+ * If we skipped some deleted records, we need to
+ * reposition on the first one. Get a lock
+ * in case someone is trying to put it back.
+ */
+ if (first != cp->recno) {
+ ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
+ DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
+ if (ret == DB_LOCK_NOTGRANTED) {
+ ret = 0;
+ goto done;
+ }
+ if (ret != 0)
+ goto done;
+ if ((ret =
+ __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
+ goto done;
+ cp->page = NULL;
+ put_mode = 0;
+ if ((ret = __qam_position(dbc,
+ &first, QAM_READ, &exact)) != 0 || exact != 0) {
+ (void)__LPUT(dbc, lock);
+ goto done;
+ }
+ if ((ret =__LPUT(dbc, lock)) != 0)
+ goto done;
+ if ((ret = __LPUT(dbc, cp->lock)) != 0)
+ goto done;
+ }
+
+ current = meta->cur_recno;
+ wrapped = 0;
+ if (first > current)
+ wrapped = 1;
+ rec_extent = meta->page_ext * meta->rec_page;
+
+ /* Loop until we find a record or hit current */
+ for (;;) {
+ /*
+ * Check to see if we are moving off the extent
+ * and remove the extent.
+ * If we are moving off a page we need to
+ * get rid of the buffer.
+ * Wait for the lagging readers to move off the
+ * page.
+ */
+ if (cp->page != NULL && rec_extent != 0 &&
+ ((exact = (first % rec_extent == 0)) ||
+ first % meta->rec_page == 0 ||
+ first == UINT32_T_MAX)) {
+ if (exact == 1 && (ret = __db_lget(dbc,
+ 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
+ break;
+
+#ifdef QDEBUG
+ __db_logmsg(dbp->dbenv,
+ dbc->txn, "Queue R", 0, "%x %d %d %d",
+ dbc->locker, cp->pgno, first, meta->first_recno);
+#endif
+ put_mode |= DB_MPOOL_DISCARD;
+ if ((ret = __qam_fput(dbp,
+ cp->pgno, cp->page, put_mode)) != 0)
+ break;
+ cp->page = NULL;
+
+ if (exact == 1) {
+ ret = __qam_fremove(dbp, cp->pgno);
+ t_ret = __LPUT(dbc, cp->lock);
+ }
+ if (ret != 0)
+ break;
+ if (t_ret != 0) {
+ ret = t_ret;
+ break;
+ }
+ } else if (cp->page != NULL && (ret =
+ __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
+ break;
+ cp->page = NULL;
+ first++;
+ if (first == RECNO_OOB) {
+ wrapped = 0;
+ first++;
+ }
+
+ /*
+ * LOOP EXIT when we come move to the current
+ * pointer.
+ */
+ if (!wrapped && first >= current)
+ break;
+
+ ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
+ DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
+ if (ret == DB_LOCK_NOTGRANTED) {
+ ret = 0;
+ break;
+ }
+ if (ret != 0)
+ break;
+
+ if ((ret = __qam_position(dbc,
+ &first, QAM_READ, &exact)) != 0) {
+ (void)__LPUT(dbc, lock);
+ break;
+ }
+ put_mode = 0;
+ if ((ret =__LPUT(dbc, lock)) != 0 ||
+ (ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
+ if ((t_ret = __qam_fput(dbp, cp->pgno,
+ cp->page, put_mode)) != 0 && ret == 0)
+ ret = t_ret;
+ cp->page = NULL;
+ break;
+ }
+ }
+
+ cp->pgno = save_page;
+ cp->indx = save_indx;
+ cp->recno = save_recno;
+ cp->lock = save_lock;
+
+ /*
+ * We have advanced as far as we can.
+ * Advance first_recno to this point.
+ */
+ if (ret == 0 && meta->first_recno != first) {
+#ifdef QDEBUG
+ __db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
+ 0, "%x %d %d %d", dbc->locker, cp->recno,
+ first, meta->first_recno);
+#endif
+ if (DBC_LOGGING(dbc))
+ if ((ret = __qam_incfirst_log(dbp,
+ dbc->txn, &meta->dbmeta.lsn, 0,
+ cp->recno, PGNO_BASE_MD)) != 0)
+ goto done;
+ meta->first_recno = first;
+ (void)mpf->set(mpf, meta, DB_MPOOL_DIRTY);
+ }
+
+done:
+ return (ret);
+}
+
+static int
+__qam_bulk(dbc, data, flags)
+ DBC *dbc;
+ DBT *data;
+ u_int32_t flags;
+{
+ DB *dbp;
+ DB_LOCK metalock;
+ DB_MPOOLFILE *mpf;
+ PAGE *pg;
+ QMETA *meta;
+ QAMDATA *qp;
+ QUEUE_CURSOR *cp;
+ db_indx_t indx;
+ db_pgno_t metapno;
+ qam_position_mode mode;
+ int32_t *endp, *offp;
+ u_int8_t *dbuf, *dp, *np;
+ int exact, recs, re_len, ret, t_ret, valid;
+ int is_key, need_pg, pagesize, size, space;
+
+ dbp = dbc->dbp;
+ mpf = dbp->mpf;
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ mode = QAM_READ;
+ if (F_ISSET(dbc, DBC_RMW))
+ mode = QAM_WRITE;
+
+ pagesize = dbp->pgsize;
+ re_len = ((QUEUE *)dbp->q_internal)->re_len;
+ recs = ((QUEUE *)dbp->q_internal)->rec_page;
+ metapno = ((QUEUE *)dbp->q_internal)->q_meta;
+
+ is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
+ size = 0;
+
+ if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
+ return (ret);
+ if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
+ /* We did not fetch it, we can release the lock. */
+ (void)__LPUT(dbc, metalock);
+ return (ret);
+ }
+
+ dbuf = data->data;
+ np = dp = dbuf;
+
+ /* Keep track of space that is left. There is an termination entry */
+ space = data->ulen;
+ space -= sizeof(*offp);
+
+ /* Build the offset/size table form the end up. */
+ endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
+ endp--;
+ offp = endp;
+
+next_pg:
+ if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
+ goto done;
+
+ pg = cp->page;
+ indx = cp->indx;
+ need_pg = 1;
+
+ do {
+ /*
+ * If this page is a nonexistent page at the end of an
+ * extent, pg may be NULL. A NULL page has no valid records,
+ * so just keep looping as though qp exists and isn't QAM_VALID;
+ * calling QAM_GET_RECORD is unsafe.
+ */
+ valid = 0;
+
+ /* Wrap around, skipping zero. */
+ if (cp->recno == RECNO_OOB)
+ cp->recno++;
+ if (pg != NULL) {
+ qp = QAM_GET_RECORD(dbp, pg, indx);
+ if (F_ISSET(qp, QAM_VALID)) {
+ valid = 1;
+ space -= (is_key ? 3 : 2) * sizeof(*offp);
+ if (space < 0)
+ goto get_space;
+ if (need_pg) {
+ dp = np;
+ size = pagesize - QPAGE_SZ(dbp);
+ if (space < size) {
+get_space:
+ if (offp == endp) {
+ data->size =
+ ALIGN(size +
+ pagesize,
+ sizeof(u_int32_t));
+ ret = ENOMEM;
+ break;
+ }
+ if (indx != 0)
+ indx--;
+ cp->recno--;
+ break;
+ }
+ memcpy(dp,
+ (char *)pg + QPAGE_SZ(dbp), size);
+ need_pg = 0;
+ space -= size;
+ np += size;
+ }
+ if (is_key)
+ *offp-- = cp->recno;
+ *offp-- = (int32_t)((u_int8_t*)qp -
+ (u_int8_t*)pg - QPAGE_SZ(dbp) +
+ dp - dbuf + SSZA(QAMDATA, data));
+ *offp-- = re_len;
+ }
+ }
+ if (!valid && is_key == 0) {
+ *offp-- = 0;
+ *offp-- = 0;
+ }
+ cp->recno++;
+ } while (++indx < recs && indx != RECNO_OOB
+ && cp->recno != meta->cur_recno
+ && !QAM_AFTER_CURRENT(meta, cp->recno));
+
+ if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if (cp->page != NULL) {
+ if ((t_ret =
+ __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
+ ret = t_ret;
+ cp->page = NULL;
+ }
+
+ if (ret == 0
+ && (indx >= recs || indx == RECNO_OOB)
+ && cp->recno != meta->cur_recno
+ && !QAM_AFTER_CURRENT(meta, cp->recno))
+ goto next_pg;
+
+ if (is_key == 1)
+ *offp = RECNO_OOB;
+ else
+ *offp = -1;
+
+done:
+ /* release the meta page */
+ t_ret = mpf->put(mpf, meta, 0);
+
+ if (!ret)
+ ret = t_ret;
+
+ t_ret = __LPUT(dbc, metalock);
+
+ return (ret);
+}
+
+/*
+ * __qam_c_close --
+ * Close down the cursor from a single use.
+ */
+static int
+__qam_c_close(dbc, root_pgno, rmroot)
+ DBC *dbc;
+ db_pgno_t root_pgno;
+ int *rmroot;
+{
+ QUEUE_CURSOR *cp;
+
+ COMPQUIET(root_pgno, 0);
+ COMPQUIET(rmroot, NULL);
+
+ cp = (QUEUE_CURSOR *)dbc->internal;
+
+ /* Discard any locks not acquired inside of a transaction. */
+ (void)__TLPUT(dbc, cp->lock);
+ LOCK_INIT(cp->lock);
+
+ cp->page = NULL;
+ cp->pgno = PGNO_INVALID;
+ cp->indx = 0;
+ cp->lock_mode = DB_LOCK_NG;
+ cp->recno = RECNO_OOB;
+ cp->flags = 0;
+
+ return (0);
+}
+
+/*
+ * __qam_c_dup --
+ * Duplicate a queue cursor, such that the new one holds appropriate
+ * locks for the position of the original.
+ *
+ * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
+ */
+int
+__qam_c_dup(orig_dbc, new_dbc)
+ DBC *orig_dbc, *new_dbc;
+{
+ QUEUE_CURSOR *orig, *new;
+
+ orig = (QUEUE_CURSOR *)orig_dbc->internal;
+ new = (QUEUE_CURSOR *)new_dbc->internal;
+
+ new->recno = orig->recno;
+
+ /* reget the long term lock if we are not in a xact */
+ if (orig_dbc->txn != NULL ||
+ !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
+ return (0);
+
+ return (__db_lget(new_dbc,
+ 0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
+}
+
+/*
+ * __qam_c_init
+ *
+ * PUBLIC: int __qam_c_init __P((DBC *));
+ */
+int
+__qam_c_init(dbc)
+ DBC *dbc;
+{
+ QUEUE_CURSOR *cp;
+ DB *dbp;
+ int ret;
+
+ dbp = dbc->dbp;
+
+ /* Allocate the internal structure. */
+ cp = (QUEUE_CURSOR *)dbc->internal;
+ if (cp == NULL) {
+ if ((ret =
+ __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
+ return (ret);
+ dbc->internal = (DBC_INTERNAL *)cp;
+ }
+
+ /* Initialize methods. */
+ dbc->c_close = __db_c_close;
+ dbc->c_count = __db_c_count;
+ dbc->c_del = __db_c_del;
+ dbc->c_dup = __db_c_dup;
+ dbc->c_get = dbc->c_real_get = __db_c_get;
+ dbc->c_pget = __db_c_pget;
+ dbc->c_put = __db_c_put;
+ dbc->c_am_bulk = __qam_bulk;
+ dbc->c_am_close = __qam_c_close;
+ dbc->c_am_del = __qam_c_del;
+ dbc->c_am_destroy = __qam_c_destroy;
+ dbc->c_am_get = __qam_c_get;
+ dbc->c_am_put = __qam_c_put;
+ dbc->c_am_writelock = NULL;
+
+ return (0);
+}
+
+/*
+ * __qam_c_destroy --
+ * Close a single cursor -- internal version.
+ */
+static int
+__qam_c_destroy(dbc)
+ DBC *dbc;
+{
+ /* Discard the structures. */
+ __os_free(dbc->dbp->dbenv, dbc->internal);
+
+ return (0);
+}
+
+/*
+ * __qam_getno --
+ * Check the user's record number.
+ */
+static int
+__qam_getno(dbp, key, rep)
+ DB *dbp;
+ const DBT *key;
+ db_recno_t *rep;
+{
+ if ((*rep = *(db_recno_t *)key->data) == 0) {
+ __db_err(dbp->dbenv, "illegal record number of 0");
+ return (EINVAL);
+ }
+ return (0);
+}
+
+/*
+ * __qam_truncate --
+ * Truncate a queue database
+ *
+ * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
+ */
+int
+__qam_truncate(dbp, txn, countp)
+ DB *dbp;
+ DB_TXN *txn;
+ u_int32_t *countp;
+{
+ DBC *dbc;
+ DB_LOCK metalock;
+ DB_MPOOLFILE *mpf;
+ QMETA *meta;
+ db_pgno_t metapno;
+ int count, ret, t_ret;
+
+ mpf = dbp->mpf;
+
+ /* Acquire a cursor. */
+ if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
+ return (ret);
+
+ /* Walk the queue, counting rows. */
+ count = 0;
+ while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
+ count++;
+
+ if (ret == DB_NOTFOUND)
+ ret = 0;
+
+ /* Discard the cursor. */
+ if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
+ ret = t_ret;
+
+ if (ret != 0)
+ return (ret);
+
+ /* update the meta page */
+ /* get the meta page */
+ metapno = ((QUEUE *)dbp->q_internal)->q_meta;
+ if ((ret =
+ __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
+ return (ret);
+
+ if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
+ /* We did not fetch it, we can release the lock. */
+ (void)__LPUT(dbc, metalock);
+ return (ret);
+ }
+ if (DBC_LOGGING(dbc)) {
+ ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
+ QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
+ 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
+ }
+ if (ret == 0)
+ meta->first_recno = meta->cur_recno = 1;
+
+ if ((t_ret =
+ mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
+ ret = t_ret;
+ if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
+ ret = t_ret;
+
+ *countp = count;
+
+ return (ret);
+}