summaryrefslogtreecommitdiff
path: root/Lib/test/test_sqlite3/test_dbapi.py
diff options
context:
space:
mode:
authorErlend Egeberg Aasland <erlend.aasland@innova.no>2022-04-15 02:02:56 +0200
committerGitHub <noreply@github.com>2022-04-14 17:02:56 -0700
commitee475430d431814cbb6eb5e8a6c0ae51943349d4 (patch)
treee9464bb51a6304a77da9461d28564b00689edfc1 /Lib/test/test_sqlite3/test_dbapi.py
parentc9d41bcd68dbe339396523e931904930a87819b9 (diff)
downloadcpython-git-ee475430d431814cbb6eb5e8a6c0ae51943349d4.tar.gz
gh-69093: Support basic incremental I/O to blobs in `sqlite3` (GH-30680)
Authored-by: Aviv Palivoda <palaviv@gmail.com> Co-authored-by: Erlend E. Aasland <erlend.aasland@innova.no> Co-authored-by: palaviv <palaviv@gmail.com> Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
Diffstat (limited to 'Lib/test/test_sqlite3/test_dbapi.py')
-rw-r--r--Lib/test/test_sqlite3/test_dbapi.py157
1 files changed, 156 insertions, 1 deletions
diff --git a/Lib/test/test_sqlite3/test_dbapi.py b/Lib/test/test_sqlite3/test_dbapi.py
index 2d2e58a3d4..faaa3713cb 100644
--- a/Lib/test/test_sqlite3/test_dbapi.py
+++ b/Lib/test/test_sqlite3/test_dbapi.py
@@ -33,6 +33,8 @@ from test.support import (
check_disallow_instantiation,
threading_helper,
)
+from _testcapi import INT_MAX
+from os import SEEK_SET, SEEK_CUR, SEEK_END
from test.support.os_helper import TESTFN, unlink, temp_dir
@@ -1041,11 +1043,163 @@ class CursorTests(unittest.TestCase):
self.assertEqual(cu.fetchall(), [(1,)])
+class BlobTests(unittest.TestCase):
+ def setUp(self):
+ self.cx = sqlite.connect(":memory:")
+ self.cx.execute("create table test(b blob)")
+ self.data = b"this blob data string is exactly fifty bytes long!"
+ self.cx.execute("insert into test(b) values (?)", (self.data,))
+ self.blob = self.cx.blobopen("test", "b", 1)
+
+ def tearDown(self):
+ self.blob.close()
+ self.cx.close()
+
+ def test_blob_seek_and_tell(self):
+ self.blob.seek(10)
+ self.assertEqual(self.blob.tell(), 10)
+
+ self.blob.seek(10, SEEK_SET)
+ self.assertEqual(self.blob.tell(), 10)
+
+ self.blob.seek(10, SEEK_CUR)
+ self.assertEqual(self.blob.tell(), 20)
+
+ self.blob.seek(-10, SEEK_END)
+ self.assertEqual(self.blob.tell(), 40)
+
+ def test_blob_seek_error(self):
+ msg_oor = "offset out of blob range"
+ msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
+ msg_of = "seek offset results in overflow"
+
+ dataset = (
+ (ValueError, msg_oor, lambda: self.blob.seek(1000)),
+ (ValueError, msg_oor, lambda: self.blob.seek(-10)),
+ (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
+ (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
+ )
+ for exc, msg, fn in dataset:
+ with self.subTest(exc=exc, msg=msg, fn=fn):
+ self.assertRaisesRegex(exc, msg, fn)
+
+ # Force overflow errors
+ self.blob.seek(1, SEEK_SET)
+ with self.assertRaisesRegex(OverflowError, msg_of):
+ self.blob.seek(INT_MAX, SEEK_CUR)
+ with self.assertRaisesRegex(OverflowError, msg_of):
+ self.blob.seek(INT_MAX, SEEK_END)
+
+ def test_blob_read(self):
+ buf = self.blob.read()
+ self.assertEqual(buf, self.data)
+
+ def test_blob_read_oversized(self):
+ buf = self.blob.read(len(self.data) * 2)
+ self.assertEqual(buf, self.data)
+
+ def test_blob_read_advance_offset(self):
+ n = 10
+ buf = self.blob.read(n)
+ self.assertEqual(buf, self.data[:n])
+ self.assertEqual(self.blob.tell(), n)
+
+ def test_blob_read_at_offset(self):
+ self.blob.seek(10)
+ self.assertEqual(self.blob.read(10), self.data[10:20])
+
+ def test_blob_read_error_row_changed(self):
+ self.cx.execute("update test set b='aaaa' where rowid=1")
+ with self.assertRaises(sqlite.OperationalError):
+ self.blob.read()
+
+ def test_blob_write(self):
+ new_data = b"new data".ljust(50)
+ self.blob.write(new_data)
+ row = self.cx.execute("select b from test").fetchone()
+ self.assertEqual(row[0], new_data)
+
+ def test_blob_write_at_offset(self):
+ new_data = b"c" * 25
+ self.blob.seek(25)
+ self.blob.write(new_data)
+ row = self.cx.execute("select b from test").fetchone()
+ self.assertEqual(row[0], self.data[:25] + new_data)
+
+ def test_blob_write_advance_offset(self):
+ self.blob.write(b"d"*10)
+ self.assertEqual(self.blob.tell(), 10)
+
+ def test_blob_write_error_length(self):
+ with self.assertRaisesRegex(ValueError, "data longer than blob"):
+ self.blob.write(b"a" * 1000)
+
+ def test_blob_write_error_row_changed(self):
+ self.cx.execute("update test set b='aaaa' where rowid=1")
+ with self.assertRaises(sqlite.OperationalError):
+ self.blob.write(b"aaa")
+
+ def test_blob_write_error_readonly(self):
+ ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
+ with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
+ ro_blob.write(b"aaa")
+ ro_blob.close()
+
+ def test_blob_open_error(self):
+ dataset = (
+ (("test", "b", 1), {"name": "notexisting"}),
+ (("notexisting", "b", 1), {}),
+ (("test", "notexisting", 1), {}),
+ (("test", "b", 2), {}),
+ )
+ regex = "no such"
+ for args, kwds in dataset:
+ with self.subTest(args=args, kwds=kwds):
+ with self.assertRaisesRegex(sqlite.OperationalError, regex):
+ self.cx.blobopen(*args, **kwds)
+
+ def test_blob_sequence_not_supported(self):
+ with self.assertRaises(TypeError):
+ self.blob + self.blob
+ with self.assertRaises(TypeError):
+ self.blob * 5
+ with self.assertRaises(TypeError):
+ b"a" in self.blob
+
+ def test_blob_closed(self):
+ with memory_database() as cx:
+ cx.execute("create table test(b blob)")
+ cx.execute("insert into test values (zeroblob(100))")
+ blob = cx.blobopen("test", "b", 1)
+ blob.close()
+
+ msg = "Cannot operate on a closed blob"
+ with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
+ blob.read()
+ with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
+ blob.write(b"")
+ with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
+ blob.seek(0)
+ with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
+ blob.tell()
+
+ def test_blob_closed_db_read(self):
+ with memory_database() as cx:
+ cx.execute("create table test(b blob)")
+ cx.execute("insert into test(b) values (zeroblob(100))")
+ blob = cx.blobopen("test", "b", 1)
+ cx.close()
+ self.assertRaisesRegex(sqlite.ProgrammingError,
+ "Cannot operate on a closed database",
+ blob.read)
+
+
class ThreadTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.cur = self.con.cursor()
- self.cur.execute("create table test(name text)")
+ self.cur.execute("create table test(name text, b blob)")
+ self.cur.execute("insert into test values('blob', zeroblob(1))")
def tearDown(self):
self.cur.close()
@@ -1080,6 +1234,7 @@ class ThreadTests(unittest.TestCase):
lambda: self.con.create_collation("foo", None),
lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
+ lambda: self.con.blobopen("test", "b", 1),
]
if hasattr(sqlite.Connection, "serialize"):
fns.append(lambda: self.con.serialize())