[Python-checkins] r79292 - in python/trunk/Lib/bsddb/test: test_db.py test_dbenv.py test_fileid.py

jesus.cea python-checkins at python.org
Mon Mar 22 16:18:46 CET 2010


Author: jesus.cea
Date: Mon Mar 22 16:18:46 2010
New Revision: 79292

Log:
Missing testsuite files

Added:
   python/trunk/Lib/bsddb/test/test_db.py
   python/trunk/Lib/bsddb/test/test_dbenv.py
   python/trunk/Lib/bsddb/test/test_fileid.py

Added: python/trunk/Lib/bsddb/test/test_db.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/bsddb/test/test_db.py	Mon Mar 22 16:18:46 2010
@@ -0,0 +1,142 @@
+import unittest
+import os, glob
+
+from test_all import db, test_support, get_new_environment_path, \
+        get_new_database_path
+
+#----------------------------------------------------------------------
+
+class DB(unittest.TestCase):
+    import sys
+    if sys.version_info < (2, 4) :
+        def assertTrue(self, expr, msg=None):
+            self.failUnless(expr,msg=msg)
+
+    def setUp(self):
+        self.path = get_new_database_path()
+        self.db = db.DB()
+
+    def tearDown(self):
+        self.db.close()
+        del self.db
+        test_support.rmtree(self.path)
+
+class DB_general(DB) :
+    if db.version() >= (4, 2) :
+        def test_bt_minkey(self) :
+            for i in [17, 108, 1030] :
+                self.db.set_bt_minkey(i)
+                self.assertEqual(i, self.db.get_bt_minkey())
+
+        def test_lorder(self) :
+            self.db.set_lorder(1234)
+            self.assertEqual(1234, self.db.get_lorder())
+            self.db.set_lorder(4321)
+            self.assertEqual(4321, self.db.get_lorder())
+            self.assertRaises(db.DBInvalidArgError, self.db.set_lorder, 9182)
+
+    if db.version() >= (4, 6) :
+        def test_priority(self) :
+            flags = [db.DB_PRIORITY_VERY_LOW, db.DB_PRIORITY_LOW,
+                    db.DB_PRIORITY_DEFAULT, db.DB_PRIORITY_HIGH,
+                    db.DB_PRIORITY_VERY_HIGH]
+            for flag in flags :
+                self.db.set_priority(flag)
+                self.assertEqual(flag, self.db.get_priority())
+
+class DB_hash(DB) :
+    if db.version() >= (4, 2) :
+        def test_h_ffactor(self) :
+            for ffactor in [4, 16, 256] :
+                self.db.set_h_ffactor(ffactor)
+                self.assertEqual(ffactor, self.db.get_h_ffactor())
+
+        def test_h_nelem(self) :
+            for nelem in [1, 2, 4] :
+                nelem = nelem*1024*1024  # Millions
+                self.db.set_h_nelem(nelem)
+                self.assertEqual(nelem, self.db.get_h_nelem())
+
+        def test_pagesize(self) :
+            for i in xrange(9, 17) :  # From 512 to 65536
+                i = 1<<i
+                self.db.set_pagesize(i)
+                self.assertEqual(i, self.db.get_pagesize())
+
+            # The valid values goes from 512 to 65536
+            # Test 131072 bytes...
+            self.assertRaises(db.DBInvalidArgError, self.db.set_pagesize, 1<<17)
+            # Test 256 bytes...
+            self.assertRaises(db.DBInvalidArgError, self.db.set_pagesize, 1<<8)
+
+class DB_txn(DB) :
+    def setUp(self) :
+        self.homeDir = get_new_environment_path()
+        self.env = db.DBEnv()
+        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
+                db.DB_INIT_LOG | db.DB_INIT_TXN)
+        self.db = db.DB(self.env)
+
+    def tearDown(self) :
+        self.db.close()
+        del self.db
+        self.env.close()
+        del self.env
+        test_support.rmtree(self.homeDir)
+
+    if db.version() >= (4, 2) :
+        def test_flags(self) :
+            self.db.set_flags(db.DB_CHKSUM)
+            self.assertEqual(db.DB_CHKSUM, self.db.get_flags())
+            self.db.set_flags(db.DB_TXN_NOT_DURABLE)
+            self.assertEqual(db.DB_TXN_NOT_DURABLE | db.DB_CHKSUM,
+                    self.db.get_flags())
+
+class DB_recno(DB) :
+    if db.version() >= (4, 2) :
+        def test_re_pad(self) :
+            for i in [' ', '*'] :  # Check chars
+                self.db.set_re_pad(i)
+                self.assertEqual(ord(i), self.db.get_re_pad())
+            for i in [97, 65] :  # Check integers
+                self.db.set_re_pad(i)
+                self.assertEqual(i, self.db.get_re_pad())
+
+        def test_re_delim(self) :
+            for i in [' ', '*'] :  # Check chars
+                self.db.set_re_delim(i)
+                self.assertEqual(ord(i), self.db.get_re_delim())
+            for i in [97, 65] :  # Check integers
+                self.db.set_re_delim(i)
+                self.assertEqual(i, self.db.get_re_delim())
+
+        def test_re_source(self) :
+            for i in ["test", "test2", "test3"] :
+                self.db.set_re_source(i)
+                self.assertEqual(i, self.db.get_re_source())
+
+class DB_queue(DB) :
+    if db.version() >= (4, 2) :
+        def test_re_len(self) :
+            for i in [33, 65, 300, 2000] :
+                self.db.set_re_len(i)
+                self.assertEqual(i, self.db.get_re_len())
+
+        def test_q_extentsize(self) :
+            for i in [1, 60, 100] :
+                self.db.set_q_extentsize(i)
+                self.assertEqual(i, self.db.get_q_extentsize())
+
+def test_suite():
+    suite = unittest.TestSuite()
+
+    suite.addTest(unittest.makeSuite(DB_general))
+    suite.addTest(unittest.makeSuite(DB_txn))
+    suite.addTest(unittest.makeSuite(DB_hash))
+    suite.addTest(unittest.makeSuite(DB_recno))
+    suite.addTest(unittest.makeSuite(DB_queue))
+
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Added: python/trunk/Lib/bsddb/test/test_dbenv.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/bsddb/test/test_dbenv.py	Mon Mar 22 16:18:46 2010
@@ -0,0 +1,513 @@
+import unittest
+import os, glob
+
+from test_all import db, test_support, get_new_environment_path, \
+        get_new_database_path
+
+#----------------------------------------------------------------------
+
+class DBEnv(unittest.TestCase):
+    import sys
+    if sys.version_info < (2, 4) :
+        def assertTrue(self, expr, msg=None):
+            self.failUnless(expr,msg=msg)
+
+        def assertFalse(self, expr, msg=None):
+            self.failIf(expr,msg=msg)
+
+    def setUp(self):
+        self.homeDir = get_new_environment_path()
+        self.env = db.DBEnv()
+
+    def tearDown(self):
+        self.env.close()
+        del self.env
+        test_support.rmtree(self.homeDir)
+
+class DBEnv_general(DBEnv) :
+    if db.version() >= (4, 7) :
+        def test_lk_partitions(self) :
+            for i in [10, 20, 40] :
+                self.env.set_lk_partitions(i)
+                self.assertEqual(i, self.env.get_lk_partitions())
+
+    if db.version() >= (4, 6) :
+        def test_thread(self) :
+            for i in [16, 100, 1000] :
+                self.env.set_thread_count(i)
+                self.assertEqual(i, self.env.get_thread_count())
+
+        def test_cache_max(self) :
+            for size in [64, 128] :
+                size = size*1024*1024  # Megabytes
+                self.env.set_cache_max(0, size)
+                size2 = self.env.get_cache_max()
+                self.assertEqual(0, size2[0])
+                self.assertTrue(size <= size2[1])
+                self.assertTrue(2*size > size2[1])
+
+    if db.version() >= (4, 4) :
+        def test_mutex_stat(self) :
+            self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
+                    db.DB_INIT_LOCK)
+            stat = self.env.mutex_stat()
+            self.assertTrue("mutex_inuse_max" in stat)
+
+        def test_lg_filemode(self) :
+            for i in [0600, 0660, 0666] :
+                self.env.set_lg_filemode(i)
+                self.assertEqual(i, self.env.get_lg_filemode())
+
+    if db.version() >= (4, 3) :
+        def test_mp_max_openfd(self) :
+            for i in [17, 31, 42] :
+                self.env.set_mp_max_openfd(i)
+                self.assertEqual(i, self.env.get_mp_max_openfd())
+
+        def test_mp_max_write(self) :
+            for i in [100, 200, 300] :
+                for j in [1, 2, 3] :
+                    j *= 1000000
+                    self.env.set_mp_max_write(i, j)
+                    v=self.env.get_mp_max_write()
+                    self.assertEqual((i, j), v)
+
+    if db.version() >= (4, 2) :
+        def test_invalid_txn(self) :
+            # This environment doesn't support transactions
+            self.assertRaises(db.DBInvalidArgError, self.env.txn_begin)
+
+        def test_mp_mmapsize(self) :
+            for i in [16, 32, 64] :
+                i *= 1024*1024
+                self.env.set_mp_mmapsize(i)
+                self.assertEqual(i, self.env.get_mp_mmapsize())
+
+        def test_tmp_dir(self) :
+            for i in ["a", "bb", "ccc"] :
+                self.env.set_tmp_dir(i)
+                self.assertEqual(i, self.env.get_tmp_dir())
+
+        def test_flags(self) :
+            self.env.set_flags(db.DB_AUTO_COMMIT, 1)
+            self.assertEqual(db.DB_AUTO_COMMIT, self.env.get_flags())
+            self.env.set_flags(db.DB_TXN_NOSYNC, 1)
+            self.assertEqual(db.DB_AUTO_COMMIT | db.DB_TXN_NOSYNC,
+                    self.env.get_flags())
+            self.env.set_flags(db.DB_AUTO_COMMIT, 0)
+            self.assertEqual(db.DB_TXN_NOSYNC, self.env.get_flags())
+            self.env.set_flags(db.DB_TXN_NOSYNC, 0)
+            self.assertEqual(0, self.env.get_flags())
+
+        def test_lk_max_objects(self) :
+            for i in [1000, 2000, 3000] :
+                self.env.set_lk_max_objects(i)
+                self.assertEqual(i, self.env.get_lk_max_objects())
+
+        def test_lk_max_locks(self) :
+            for i in [1000, 2000, 3000] :
+                self.env.set_lk_max_locks(i)
+                self.assertEqual(i, self.env.get_lk_max_locks())
+
+        def test_lk_max_lockers(self) :
+            for i in [1000, 2000, 3000] :
+                self.env.set_lk_max_lockers(i)
+                self.assertEqual(i, self.env.get_lk_max_lockers())
+
+        def test_lg_regionmax(self) :
+            for i in [128, 256, 1024] :
+                i = i*1024*1024
+                self.env.set_lg_regionmax(i)
+                j = self.env.get_lg_regionmax()
+                self.assertTrue(i <= j)
+                self.assertTrue(2*i > j)
+
+        def test_lk_detect(self) :
+            flags= [db.DB_LOCK_DEFAULT, db.DB_LOCK_EXPIRE, db.DB_LOCK_MAXLOCKS,
+                    db.DB_LOCK_MINLOCKS, db.DB_LOCK_MINWRITE,
+                    db.DB_LOCK_OLDEST, db.DB_LOCK_RANDOM, db.DB_LOCK_YOUNGEST]
+
+            if db.version() >= (4, 3) :
+                flags.append(db.DB_LOCK_MAXWRITE)
+
+            for i in flags :
+                self.env.set_lk_detect(i)
+                self.assertEqual(i, self.env.get_lk_detect())
+
+        def test_lg_dir(self) :
+            for i in ["a", "bb", "ccc", "dddd"] :
+                self.env.set_lg_dir(i)
+                self.assertEqual(i, self.env.get_lg_dir())
+
+        def test_lg_bsize(self) :
+            log_size = 70*1024
+            self.env.set_lg_bsize(log_size)
+            self.assertTrue(self.env.get_lg_bsize() >= log_size)
+            self.assertTrue(self.env.get_lg_bsize() < 4*log_size)
+            self.env.set_lg_bsize(4*log_size)
+            self.assertTrue(self.env.get_lg_bsize() >= 4*log_size)
+
+        def test_setget_data_dirs(self) :
+            dirs = ("a", "b", "c", "d")
+            for i in dirs :
+                self.env.set_data_dir(i)
+            self.assertEqual(dirs, self.env.get_data_dirs())
+
+        def test_setget_cachesize(self) :
+            cachesize = (0, 512*1024*1024, 3)
+            self.env.set_cachesize(*cachesize)
+            self.assertEqual(cachesize, self.env.get_cachesize())
+
+            cachesize = (0, 1*1024*1024, 5)
+            self.env.set_cachesize(*cachesize)
+            cachesize2 = self.env.get_cachesize()
+            self.assertEqual(cachesize[0], cachesize2[0])
+            self.assertEqual(cachesize[2], cachesize2[2])
+            # Berkeley DB expands the cache 25% accounting overhead,
+            # if the cache is small.
+            self.assertEqual(125, int(100.0*cachesize2[1]/cachesize[1]))
+
+            # You can not change configuration after opening
+            # the environment.
+            self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
+            cachesize = (0, 2*1024*1024, 1)
+            self.assertRaises(db.DBInvalidArgError,
+                self.env.set_cachesize, *cachesize)
+            self.assertEqual(cachesize2, self.env.get_cachesize())
+
+        def test_set_cachesize_dbenv_db(self) :
+            # You can not configure the cachesize using
+            # the database handle, if you are using an environment.
+            d = db.DB(self.env)
+            self.assertRaises(db.DBInvalidArgError,
+                d.set_cachesize, 0, 1024*1024, 1)
+
+        def test_setget_shm_key(self) :
+            shm_key=137
+            self.env.set_shm_key(shm_key)
+            self.assertEqual(shm_key, self.env.get_shm_key())
+            self.env.set_shm_key(shm_key+1)
+            self.assertEqual(shm_key+1, self.env.get_shm_key())
+
+            # You can not change configuration after opening
+            # the environment.
+            self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
+            # If we try to reconfigure cache after opening the
+            # environment, core dump.
+            self.assertRaises(db.DBInvalidArgError,
+                self.env.set_shm_key, shm_key)
+            self.assertEqual(shm_key+1, self.env.get_shm_key())
+
+    if db.version() >= (4, 4) :
+        def test_mutex_setget_max(self) :
+            v = self.env.mutex_get_max()
+            v2 = v*2+1
+
+            self.env.mutex_set_max(v2)
+            self.assertEqual(v2, self.env.mutex_get_max())
+
+            self.env.mutex_set_max(v)
+            self.assertEqual(v, self.env.mutex_get_max())
+
+            # You can not change configuration after opening
+            # the environment.
+            self.env.open(self.homeDir, db.DB_CREATE)
+            self.assertRaises(db.DBInvalidArgError,
+                    self.env.mutex_set_max, v2)
+
+        def test_mutex_setget_increment(self) :
+            v = self.env.mutex_get_increment()
+            v2 = 127
+
+            self.env.mutex_set_increment(v2)
+            self.assertEqual(v2, self.env.mutex_get_increment())
+
+            self.env.mutex_set_increment(v)
+            self.assertEqual(v, self.env.mutex_get_increment())
+
+            # You can not change configuration after opening
+            # the environment.
+            self.env.open(self.homeDir, db.DB_CREATE)
+            self.assertRaises(db.DBInvalidArgError,
+                    self.env.mutex_set_increment, v2)
+
+        def test_mutex_setget_tas_spins(self) :
+            self.env.mutex_set_tas_spins(0)  # Default = BDB decides
+            v = self.env.mutex_get_tas_spins()
+            v2 = v*2+1
+
+            self.env.mutex_set_tas_spins(v2)
+            self.assertEqual(v2, self.env.mutex_get_tas_spins())
+
+            self.env.mutex_set_tas_spins(v)
+            self.assertEqual(v, self.env.mutex_get_tas_spins())
+
+            # In this case, you can change configuration
+            # after opening the environment.
+            self.env.open(self.homeDir, db.DB_CREATE)
+            self.env.mutex_set_tas_spins(v2)
+
+        def test_mutex_setget_align(self) :
+            v = self.env.mutex_get_align()
+            v2 = 64
+            if v == 64 :
+                v2 = 128
+
+            self.env.mutex_set_align(v2)
+            self.assertEqual(v2, self.env.mutex_get_align())
+
+            # Requires a nonzero power of two
+            self.assertRaises(db.DBInvalidArgError,
+                    self.env.mutex_set_align, 0)
+            self.assertRaises(db.DBInvalidArgError,
+                    self.env.mutex_set_align, 17)
+
+            self.env.mutex_set_align(2*v2)
+            self.assertEqual(2*v2, self.env.mutex_get_align())
+
+            # You can not change configuration after opening
+            # the environment.
+            self.env.open(self.homeDir, db.DB_CREATE)
+            self.assertRaises(db.DBInvalidArgError,
+                    self.env.mutex_set_align, v2)
+
+
+class DBEnv_log(DBEnv) :
+    def setUp(self):
+        DBEnv.setUp(self)
+        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOG)
+
+    def test_log_file(self) :
+        log_file = self.env.log_file((1, 1))
+        self.assertEqual("log.0000000001", log_file[-14:])
+
+    if db.version() >= (4, 4) :
+        # The version with transactions is checked in other test object
+        def test_log_printf(self) :
+            msg = "This is a test..."
+            self.env.log_printf(msg)
+            logc = self.env.log_cursor()
+            self.assertTrue(msg in (logc.last()[1]))
+
+    if db.version() >= (4, 7) :
+        def test_log_config(self) :
+            self.env.log_set_config(db.DB_LOG_DSYNC | db.DB_LOG_ZERO, 1)
+            self.assertTrue(self.env.log_get_config(db.DB_LOG_DSYNC))
+            self.assertTrue(self.env.log_get_config(db.DB_LOG_ZERO))
+            self.env.log_set_config(db.DB_LOG_ZERO, 0)
+            self.assertTrue(self.env.log_get_config(db.DB_LOG_DSYNC))
+            self.assertFalse(self.env.log_get_config(db.DB_LOG_ZERO))
+
+
+class DBEnv_log_txn(DBEnv) :
+    def setUp(self):
+        DBEnv.setUp(self)
+        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
+                db.DB_INIT_LOG | db.DB_INIT_TXN)
+
+    if db.version() >= (4, 5) :
+        def test_tx_max(self) :
+            txns=[]
+            def tx() :
+                for i in xrange(self.env.get_tx_max()) :
+                    txns.append(self.env.txn_begin())
+
+            tx()
+            self.assertRaises(MemoryError, tx)
+
+            # Abort the transactions before garbage collection,
+            # to avoid "warnings".
+            for i in txns :
+                i.abort()
+
+    if db.version() >= (4, 4) :
+        # The version without transactions is checked in other test object
+        def test_log_printf(self) :
+            msg = "This is a test..."
+            txn = self.env.txn_begin()
+            self.env.log_printf(msg, txn=txn)
+            txn.commit()
+            logc = self.env.log_cursor()
+            logc.last()  # Skip the commit
+            self.assertTrue(msg in (logc.prev()[1]))
+
+            msg = "This is another test..."
+            txn = self.env.txn_begin()
+            self.env.log_printf(msg, txn=txn)
+            txn.abort()  # Do not store the new message
+            logc.last()  # Skip the abort
+            self.assertTrue(msg not in (logc.prev()[1]))
+
+            msg = "This is a third test..."
+            txn = self.env.txn_begin()
+            self.env.log_printf(msg, txn=txn)
+            txn.commit()  # Do not store the new message
+            logc.last()  # Skip the commit
+            self.assertTrue(msg in (logc.prev()[1]))
+
+
+class DBEnv_memp(DBEnv):
+    def setUp(self):
+        DBEnv.setUp(self)
+        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOG)
+        self.db = db.DB(self.env)
+        self.db.open("test", db.DB_HASH, db.DB_CREATE, 0660)
+
+    def tearDown(self):
+        self.db.close()
+        del self.db
+        DBEnv.tearDown(self)
+
+    def test_memp_1_trickle(self) :
+        self.db.put("hi", "bye")
+        self.assertTrue(self.env.memp_trickle(100) > 0)
+
+# Preserve the order, do "memp_trickle" test first
+    def test_memp_2_sync(self) :
+        self.db.put("hi", "bye")
+        self.env.memp_sync()  # Full flush
+        # Nothing to do...
+        self.assertTrue(self.env.memp_trickle(100) == 0)
+
+        self.db.put("hi", "bye2")
+        self.env.memp_sync((1, 0))  # NOP, probably
+        # Something to do... or not
+        self.assertTrue(self.env.memp_trickle(100) >= 0)
+
+        self.db.put("hi", "bye3")
+        self.env.memp_sync((123, 99))  # Full flush
+        # Nothing to do...
+        self.assertTrue(self.env.memp_trickle(100) == 0)
+
+    def test_memp_stat_1(self) :
+        stats = self.env.memp_stat()  # No param
+        self.assertTrue(len(stats)==2)
+        self.assertTrue("cache_miss" in stats[0])
+        stats = self.env.memp_stat(db.DB_STAT_CLEAR)  # Positional param
+        self.assertTrue("cache_miss" in stats[0])
+        stats = self.env.memp_stat(flags=0)  # Keyword param
+        self.assertTrue("cache_miss" in stats[0])
+
+    def test_memp_stat_2(self) :
+        stats=self.env.memp_stat()[1]
+        self.assertTrue(len(stats))==1
+        self.assertTrue("test" in stats)
+        self.assertTrue("page_in" in stats["test"])
+
+class DBEnv_logcursor(DBEnv):
+    def setUp(self):
+        DBEnv.setUp(self)
+        self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL |
+                db.DB_INIT_LOG | db.DB_INIT_TXN)
+        txn = self.env.txn_begin()
+        self.db = db.DB(self.env)
+        self.db.open("test", db.DB_HASH, db.DB_CREATE, 0660, txn=txn)
+        txn.commit()
+        for i in ["2", "8", "20"] :
+            txn = self.env.txn_begin()
+            self.db.put(key = i, data = i*int(i), txn=txn)
+            txn.commit()
+
+    def tearDown(self):
+        self.db.close()
+        del self.db
+        DBEnv.tearDown(self)
+
+    def _check_return(self, value) :
+        self.assertTrue(isinstance(value, tuple))
+        self.assertEqual(len(value), 2)
+        self.assertTrue(isinstance(value[0], tuple))
+        self.assertEqual(len(value[0]), 2)
+        self.assertTrue(isinstance(value[0][0], int))
+        self.assertTrue(isinstance(value[0][1], int))
+        self.assertTrue(isinstance(value[1], str))
+
+    # Preserve test order
+    def test_1_first(self) :
+        logc = self.env.log_cursor()
+        v = logc.first()
+        self._check_return(v)
+        self.assertTrue((1, 1) < v[0])
+        self.assertTrue(len(v[1])>0)
+
+    def test_2_last(self) :
+        logc = self.env.log_cursor()
+        lsn_first = logc.first()[0]
+        v = logc.last()
+        self._check_return(v)
+        self.assertTrue(lsn_first < v[0])
+
+    def test_3_next(self) :
+        logc = self.env.log_cursor()
+        lsn_last = logc.last()[0]
+        self.assertEqual(logc.next(), None)
+        lsn_first = logc.first()[0]
+        v = logc.next()
+        self._check_return(v)
+        self.assertTrue(lsn_first < v[0])
+        self.assertTrue(lsn_last > v[0])
+
+        v2 = logc.next()
+        self.assertTrue(v2[0] > v[0])
+        self.assertTrue(lsn_last > v2[0])
+
+        v3 = logc.next()
+        self.assertTrue(v3[0] > v2[0])
+        self.assertTrue(lsn_last > v3[0])
+
+    def test_4_prev(self) :
+        logc = self.env.log_cursor()
+        lsn_first = logc.first()[0]
+        self.assertEqual(logc.prev(), None)
+        lsn_last = logc.last()[0]
+        v = logc.prev()
+        self._check_return(v)
+        self.assertTrue(lsn_first < v[0])
+        self.assertTrue(lsn_last > v[0])
+
+        v2 = logc.prev()
+        self.assertTrue(v2[0] < v[0])
+        self.assertTrue(lsn_first < v2[0])
+
+        v3 = logc.prev()
+        self.assertTrue(v3[0] < v2[0])
+        self.assertTrue(lsn_first < v3[0])
+
+    def test_5_current(self) :
+        logc = self.env.log_cursor()
+        logc.first()
+        v = logc.next()
+        self.assertEqual(v, logc.current())
+
+    def test_6_set(self) :
+        logc = self.env.log_cursor()
+        logc.first()
+        v = logc.next()
+        self.assertNotEqual(v, logc.next())
+        self.assertNotEqual(v, logc.next())
+        self.assertEqual(v, logc.set(v[0]))
+
+    def test_explicit_close(self) :
+        logc = self.env.log_cursor()
+        logc.close()
+        self.assertRaises(db.DBCursorClosedError, logc.next)
+
+    def test_implicit_close(self) :
+        logc =  [self.env.log_cursor() for i in xrange(10)]
+        self.env.close()  # This close should close too all its tree
+        for i in logc :
+            self.assertRaises(db.DBCursorClosedError, i.next)
+
+def test_suite():
+    suite = unittest.TestSuite()
+
+    suite.addTest(unittest.makeSuite(DBEnv_general))
+    suite.addTest(unittest.makeSuite(DBEnv_memp))
+    suite.addTest(unittest.makeSuite(DBEnv_logcursor))
+    suite.addTest(unittest.makeSuite(DBEnv_log))
+    suite.addTest(unittest.makeSuite(DBEnv_log_txn))
+
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')

Added: python/trunk/Lib/bsddb/test/test_fileid.py
==============================================================================
--- (empty file)
+++ python/trunk/Lib/bsddb/test/test_fileid.py	Mon Mar 22 16:18:46 2010
@@ -0,0 +1,61 @@
+"""TestCase for reseting File ID.
+"""
+
+import os
+import shutil
+import unittest
+
+from test_all import db, test_support, get_new_environment_path, get_new_database_path
+
+class FileidResetTestCase(unittest.TestCase):
+    def setUp(self):
+        self.db_path_1 = get_new_database_path()
+        self.db_path_2 = get_new_database_path()
+        self.db_env_path = get_new_environment_path()
+
+    def test_fileid_reset(self):
+        # create DB 1
+        self.db1 = db.DB()
+        self.db1.open(self.db_path_1, dbtype=db.DB_HASH, flags=(db.DB_CREATE|db.DB_EXCL))
+        self.db1.put('spam', 'eggs')
+        self.db1.close()
+
+        shutil.copy(self.db_path_1, self.db_path_2)
+
+        self.db2 = db.DB()
+        self.db2.open(self.db_path_2, dbtype=db.DB_HASH)
+        self.db2.put('spam', 'spam')
+        self.db2.close()
+
+        self.db_env = db.DBEnv()
+        self.db_env.open(self.db_env_path, db.DB_CREATE|db.DB_INIT_MPOOL)
+
+        # use fileid_reset() here
+        self.db_env.fileid_reset(self.db_path_2)
+
+        self.db1 = db.DB(self.db_env)
+        self.db1.open(self.db_path_1, dbtype=db.DB_HASH, flags=db.DB_RDONLY)
+        self.assertEquals(self.db1.get('spam'), 'eggs')
+
+        self.db2 = db.DB(self.db_env)
+        self.db2.open(self.db_path_2, dbtype=db.DB_HASH, flags=db.DB_RDONLY)
+        self.assertEquals(self.db2.get('spam'), 'spam')
+
+        self.db1.close()
+        self.db2.close()
+
+        self.db_env.close()
+
+    def tearDown(self):
+        test_support.unlink(self.db_path_1)
+        test_support.unlink(self.db_path_2)
+        test_support.rmtree(self.db_env_path)
+
+def test_suite():
+    suite = unittest.TestSuite()
+    if db.version() >= (4, 4):
+        suite.addTest(unittest.makeSuite(FileidResetTestCase))
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')


More information about the Python-checkins mailing list