diff options
Diffstat (limited to 'Lib/bsddb/test')
-rw-r--r-- | Lib/bsddb/test/test_all.py | 14 | ||||
-rw-r--r-- | Lib/bsddb/test/test_basics.py | 10 | ||||
-rw-r--r-- | Lib/bsddb/test/test_cursor_pget_bug.py | 65 | ||||
-rw-r--r-- | Lib/bsddb/test/test_dbtables.py | 10 | ||||
-rw-r--r-- | Lib/bsddb/test/test_sequence.py | 112 |
5 files changed, 205 insertions, 6 deletions
diff --git a/Lib/bsddb/test/test_all.py b/Lib/bsddb/test/test_all.py index abfaf47e27..ad8b1e9e94 100644 --- a/Lib/bsddb/test/test_all.py +++ b/Lib/bsddb/test/test_all.py @@ -4,6 +4,12 @@ import sys import os import unittest +try: + # For Pythons w/distutils pybsddb + from bsddb3 import db +except ImportError: + # For Python 2.3 + from bsddb import db verbose = 0 if 'verbose' in sys.argv: @@ -16,12 +22,6 @@ if 'silent' in sys.argv: # take care of old flag, just in case def print_versions(): - try: - # For Pythons w/distutils pybsddb - from bsddb3 import db - except ImportError: - # For Python 2.3 - from bsddb import db print print '-=' * 38 print db.DB_VERSION_STRING @@ -69,6 +69,8 @@ def suite(): 'test_queue', 'test_recno', 'test_thread', + 'test_sequence', + 'test_cursor_pget_bug', ] alltests = unittest.TestSuite() diff --git a/Lib/bsddb/test/test_basics.py b/Lib/bsddb/test/test_basics.py index 7e8f835f4a..bec5da36c3 100644 --- a/Lib/bsddb/test/test_basics.py +++ b/Lib/bsddb/test/test_basics.py @@ -659,12 +659,22 @@ class BasicTransactionTestCase(BasicTestCase): except db.DBIncompleteError: pass + if db.version() >= (4,0): + statDict = self.env.log_stat(0); + assert statDict.has_key('magic') + assert statDict.has_key('version') + assert statDict.has_key('cur_file') + assert statDict.has_key('region_nowait') + # must have at least one log file present: logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG) assert logs != None for log in logs: if verbose: print 'log file: ' + log + if db.version >= (4,2): + logs = self.env.log_archive(db.DB_ARCH_REMOVE) + assert not logs self.txn = self.env.txn_begin() diff --git a/Lib/bsddb/test/test_cursor_pget_bug.py b/Lib/bsddb/test/test_cursor_pget_bug.py new file mode 100644 index 0000000000..de47e6d046 --- /dev/null +++ b/Lib/bsddb/test/test_cursor_pget_bug.py @@ -0,0 +1,65 @@ +import unittest +import sys, os, glob + +try: + # For Pythons w/distutils pybsddb + from bsddb3 import db +except ImportError: + # For Python 2.3 + from bsddb import db + + +#---------------------------------------------------------------------- + +class pget_bugTestCase(unittest.TestCase): + """Verify that cursor.pget works properly""" + db_name = 'test-cursor_pget.db' + + def setUp(self): + self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') + try: + os.mkdir(self.homeDir) + except os.error: + pass + self.env = db.DBEnv() + self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL) + self.primary_db = db.DB(self.env) + self.primary_db.open(self.db_name, 'primary', db.DB_BTREE, db.DB_CREATE) + self.secondary_db = db.DB(self.env) + self.secondary_db.set_flags(db.DB_DUP) + self.secondary_db.open(self.db_name, 'secondary', db.DB_BTREE, db.DB_CREATE) + self.primary_db.associate(self.secondary_db, lambda key, data: data) + self.primary_db.put('salad', 'eggs') + self.primary_db.put('spam', 'ham') + self.primary_db.put('omelet', 'eggs') + + + def tearDown(self): + self.secondary_db.close() + self.primary_db.close() + self.env.close() + del self.secondary_db + del self.primary_db + del self.env + for file in glob.glob(os.path.join(self.homeDir, '*')): + os.remove(file) + os.removedirs(self.homeDir) + + def test_pget(self): + cursor = self.secondary_db.cursor() + + self.assertEquals(('eggs', 'salad', 'eggs'), cursor.pget(key='eggs', flags=db.DB_SET)) + self.assertEquals(('eggs', 'omelet', 'eggs'), cursor.pget(db.DB_NEXT_DUP)) + self.assertEquals(None, cursor.pget(db.DB_NEXT_DUP)) + + self.assertEquals(('ham', 'spam', 'ham'), cursor.pget('ham', 'spam', flags=db.DB_SET)) + self.assertEquals(None, cursor.pget(db.DB_NEXT_DUP)) + + cursor.close() + + +def test_suite(): + return unittest.makeSuite(pget_bugTestCase) + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') diff --git a/Lib/bsddb/test/test_dbtables.py b/Lib/bsddb/test/test_dbtables.py index 1128a5a44e..26e3d3650b 100644 --- a/Lib/bsddb/test/test_dbtables.py +++ b/Lib/bsddb/test/test_dbtables.py @@ -339,6 +339,16 @@ class TableDBTestCase(unittest.TestCase): conditions={'Name': dbtables.LikeCond('%')}, mappings={'Access': increment_access}) + try: + self.tdb.Modify(tabname, + conditions={'Name': dbtables.LikeCond('%')}, + mappings={'Access': 'What is your quest?'}) + except TypeError: + # success, the string value in mappings isn't callable + pass + else: + raise RuntimeError, "why was TypeError not raised for bad callable?" + # Delete key in select conditions values = self.tdb.Select( tabname, None, diff --git a/Lib/bsddb/test/test_sequence.py b/Lib/bsddb/test/test_sequence.py new file mode 100644 index 0000000000..979f858c4a --- /dev/null +++ b/Lib/bsddb/test/test_sequence.py @@ -0,0 +1,112 @@ +import unittest +import os +import sys +import tempfile +import glob + +try: + # For Pythons w/distutils pybsddb + from bsddb3 import db +except ImportError: + from bsddb import db + +from test_all import verbose + + +class DBSequenceTest(unittest.TestCase): + def setUp(self): + self.int_32_max = 0x100000000 + self.homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home') + try: + os.mkdir(self.homeDir) + except os.error: + pass + tempfile.tempdir = self.homeDir + self.filename = os.path.split(tempfile.mktemp())[1] + tempfile.tempdir = None + + self.dbenv = db.DBEnv() + self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666) + self.d = db.DB(self.dbenv) + self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666) + + def tearDown(self): + if hasattr(self, 'seq'): + self.seq.close() + del self.seq + if hasattr(self, 'd'): + self.d.close() + del self.d + if hasattr(self, 'dbenv'): + self.dbenv.close() + del self.dbenv + + files = glob.glob(os.path.join(self.homeDir, '*')) + for file in files: + os.remove(file) + + def test_get(self): + self.seq = db.DBSequence(self.d, flags=0) + start_value = 10 * self.int_32_max + self.assertEqual(0xA00000000, start_value) + self.assertEquals(None, self.seq.init_value(start_value)) + self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE)) + self.assertEquals(start_value, self.seq.get(5)) + self.assertEquals(start_value + 5, self.seq.get()) + + def test_remove(self): + self.seq = db.DBSequence(self.d, flags=0) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + self.assertEquals(None, self.seq.remove(txn=None, flags=0)) + del self.seq + + def test_get_key(self): + self.seq = db.DBSequence(self.d, flags=0) + key = 'foo' + self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE)) + self.assertEquals(key, self.seq.get_key()) + + def test_get_dbp(self): + self.seq = db.DBSequence(self.d, flags=0) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + self.assertEquals(self.d, self.seq.get_dbp()) + + def test_cachesize(self): + self.seq = db.DBSequence(self.d, flags=0) + cashe_size = 10 + self.assertEquals(None, self.seq.set_cachesize(cashe_size)) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + self.assertEquals(cashe_size, self.seq.get_cachesize()) + + def test_flags(self): + self.seq = db.DBSequence(self.d, flags=0) + flag = db.DB_SEQ_WRAP; + self.assertEquals(None, self.seq.set_flags(flag)) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + self.assertEquals(flag, self.seq.get_flags() & flag) + + def test_range(self): + self.seq = db.DBSequence(self.d, flags=0) + seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1) + self.assertEquals(None, self.seq.set_range(seq_range)) + self.seq.init_value(seq_range[0]) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + self.assertEquals(seq_range, self.seq.get_range()) + + def test_stat(self): + self.seq = db.DBSequence(self.d, flags=0) + self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) + stat = self.seq.stat() + for param in ('nowait', 'min', 'max', 'value', 'current', + 'flags', 'cache_size', 'last_value', 'wait'): + self.assertTrue(param in stat, "parameter %s isn't in stat info" % param) + +def test_suite(): + suite = unittest.TestSuite() + if db.version() >= (4,3): + suite.addTest(unittest.makeSuite(DBSequenceTest)) + return suite + + +if __name__ == '__main__': + unittest.main(defaultTest='test_suite') |