summaryrefslogtreecommitdiff
path: root/Lib/bsddb/test
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/bsddb/test')
-rw-r--r--Lib/bsddb/test/test_all.py14
-rw-r--r--Lib/bsddb/test/test_basics.py10
-rw-r--r--Lib/bsddb/test/test_cursor_pget_bug.py65
-rw-r--r--Lib/bsddb/test/test_dbtables.py10
-rw-r--r--Lib/bsddb/test/test_sequence.py112
5 files changed, 205 insertions, 6 deletions
diff --git a/Lib/bsddb/test/test_all.py b/Lib/bsddb/test/test_all.py
index abfaf47e27..ad8b1e9e94 100644
--- a/Lib/bsddb/test/test_all.py
+++ b/Lib/bsddb/test/test_all.py
@@ -4,6 +4,12 @@
import sys
import os
import unittest
+try:
+ # For Pythons w/distutils pybsddb
+ from bsddb3 import db
+except ImportError:
+ # For Python 2.3
+ from bsddb import db
verbose = 0
if 'verbose' in sys.argv:
@@ -16,12 +22,6 @@ if 'silent' in sys.argv: # take care of old flag, just in case
def print_versions():
- try:
- # For Pythons w/distutils pybsddb
- from bsddb3 import db
- except ImportError:
- # For Python 2.3
- from bsddb import db
print
print '-=' * 38
print db.DB_VERSION_STRING
@@ -69,6 +69,8 @@ def suite():
'test_queue',
'test_recno',
'test_thread',
+ 'test_sequence',
+ 'test_cursor_pget_bug',
]
alltests = unittest.TestSuite()
diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py
index 7e8f835f4a..bec5da36c3 100644
--- a/Lib/bsddb/test/test_basics.py
+++ b/Lib/bsddb/test/test_basics.py
@@ -659,12 +659,22 @@ class BasicTransactionTestCase(BasicTestCase):
except db.DBIncompleteError:
pass
+ if db.version() >= (4,0):
+ statDict = self.env.log_stat(0);
+ assert statDict.has_key('magic')
+ assert statDict.has_key('version')
+ assert statDict.has_key('cur_file')
+ assert statDict.has_key('region_nowait')
+
# must have at least one log file present:
logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
assert logs != None
for log in logs:
if verbose:
print 'log file: ' + log
+ if db.version >= (4,2):
+ logs = self.env.log_archive(db.DB_ARCH_REMOVE)
+ assert not logs
self.txn = self.env.txn_begin()
diff --git a/Lib/bsddb/test/test_cursor_pget_bug.py b/Lib/bsddb/test/test_cursor_pget_bug.py
new file mode 100644
index 0000000000..de47e6d046
--- /dev/null
+++ b/Lib/bsddb/test/test_cursor_pget_bug.py
@@ -0,0 +1,65 @@
+import unittest
+import sys, os, glob
+
+try:
+ # For Pythons w/distutils pybsddb
+ from bsddb3 import db
+except ImportError:
+ # For Python 2.3
+ from bsddb import db
+
+
+#----------------------------------------------------------------------
+
+class pget_bugTestCase(unittest.TestCase):
+ """Verify that cursor.pget works properly"""
+ db_name = 'test-cursor_pget.db'
+
+ def setUp(self):
+ self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
+ try:
+ os.mkdir(self.homeDir)
+ except os.error:
+ pass
+ self.env = db.DBEnv()
+ self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
+ self.primary_db = db.DB(self.env)
+ self.primary_db.open(self.db_name, 'primary', db.DB_BTREE, db.DB_CREATE)
+ self.secondary_db = db.DB(self.env)
+ self.secondary_db.set_flags(db.DB_DUP)
+ self.secondary_db.open(self.db_name, 'secondary', db.DB_BTREE, db.DB_CREATE)
+ self.primary_db.associate(self.secondary_db, lambda key, data: data)
+ self.primary_db.put('salad', 'eggs')
+ self.primary_db.put('spam', 'ham')
+ self.primary_db.put('omelet', 'eggs')
+
+
+ def tearDown(self):
+ self.secondary_db.close()
+ self.primary_db.close()
+ self.env.close()
+ del self.secondary_db
+ del self.primary_db
+ del self.env
+ for file in glob.glob(os.path.join(self.homeDir, '*')):
+ os.remove(file)
+ os.removedirs(self.homeDir)
+
+ def test_pget(self):
+ cursor = self.secondary_db.cursor()
+
+ self.assertEquals(('eggs', 'salad', 'eggs'), cursor.pget(key='eggs', flags=db.DB_SET))
+ self.assertEquals(('eggs', 'omelet', 'eggs'), cursor.pget(db.DB_NEXT_DUP))
+ self.assertEquals(None, cursor.pget(db.DB_NEXT_DUP))
+
+ self.assertEquals(('ham', 'spam', 'ham'), cursor.pget('ham', 'spam', flags=db.DB_SET))
+ self.assertEquals(None, cursor.pget(db.DB_NEXT_DUP))
+
+ cursor.close()
+
+
+def test_suite():
+ return unittest.makeSuite(pget_bugTestCase)
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
diff --git a/Lib/bsddb/test/test_dbtables.py b/Lib/bsddb/test/test_dbtables.py
index 1128a5a44e..26e3d3650b 100644
--- a/Lib/bsddb/test/test_dbtables.py
+++ b/Lib/bsddb/test/test_dbtables.py
@@ -339,6 +339,16 @@ class TableDBTestCase(unittest.TestCase):
conditions={'Name': dbtables.LikeCond('%')},
mappings={'Access': increment_access})
+ try:
+ self.tdb.Modify(tabname,
+ conditions={'Name': dbtables.LikeCond('%')},
+ mappings={'Access': 'What is your quest?'})
+ except TypeError:
+ # success, the string value in mappings isn't callable
+ pass
+ else:
+ raise RuntimeError, "why was TypeError not raised for bad callable?"
+
# Delete key in select conditions
values = self.tdb.Select(
tabname, None,
diff --git a/Lib/bsddb/test/test_sequence.py b/Lib/bsddb/test/test_sequence.py
new file mode 100644
index 0000000000..979f858c4a
--- /dev/null
+++ b/Lib/bsddb/test/test_sequence.py
@@ -0,0 +1,112 @@
+import unittest
+import os
+import sys
+import tempfile
+import glob
+
+try:
+ # For Pythons w/distutils pybsddb
+ from bsddb3 import db
+except ImportError:
+ from bsddb import db
+
+from test_all import verbose
+
+
+class DBSequenceTest(unittest.TestCase):
+ def setUp(self):
+ self.int_32_max = 0x100000000
+ self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
+ try:
+ os.mkdir(self.homeDir)
+ except os.error:
+ pass
+ tempfile.tempdir = self.homeDir
+ self.filename = os.path.split(tempfile.mktemp())[1]
+ tempfile.tempdir = None
+
+ self.dbenv = db.DBEnv()
+ self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666)
+ self.d = db.DB(self.dbenv)
+ self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666)
+
+ def tearDown(self):
+ if hasattr(self, 'seq'):
+ self.seq.close()
+ del self.seq
+ if hasattr(self, 'd'):
+ self.d.close()
+ del self.d
+ if hasattr(self, 'dbenv'):
+ self.dbenv.close()
+ del self.dbenv
+
+ files = glob.glob(os.path.join(self.homeDir, '*'))
+ for file in files:
+ os.remove(file)
+
+ def test_get(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ start_value = 10 * self.int_32_max
+ self.assertEqual(0xA00000000, start_value)
+ self.assertEquals(None, self.seq.init_value(start_value))
+ self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(start_value, self.seq.get(5))
+ self.assertEquals(start_value + 5, self.seq.get())
+
+ def test_remove(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(None, self.seq.remove(txn=None, flags=0))
+ del self.seq
+
+ def test_get_key(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ key = 'foo'
+ self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE))
+ self.assertEquals(key, self.seq.get_key())
+
+ def test_get_dbp(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(self.d, self.seq.get_dbp())
+
+ def test_cachesize(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ cashe_size = 10
+ self.assertEquals(None, self.seq.set_cachesize(cashe_size))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(cashe_size, self.seq.get_cachesize())
+
+ def test_flags(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ flag = db.DB_SEQ_WRAP;
+ self.assertEquals(None, self.seq.set_flags(flag))
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(flag, self.seq.get_flags() & flag)
+
+ def test_range(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1)
+ self.assertEquals(None, self.seq.set_range(seq_range))
+ self.seq.init_value(seq_range[0])
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ self.assertEquals(seq_range, self.seq.get_range())
+
+ def test_stat(self):
+ self.seq = db.DBSequence(self.d, flags=0)
+ self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE))
+ stat = self.seq.stat()
+ for param in ('nowait', 'min', 'max', 'value', 'current',
+ 'flags', 'cache_size', 'last_value', 'wait'):
+ self.assertTrue(param in stat, "parameter %s isn't in stat info" % param)
+
+def test_suite():
+ suite = unittest.TestSuite()
+ if db.version() >= (4,3):
+ suite.addTest(unittest.makeSuite(DBSequenceTest))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')