summaryrefslogtreecommitdiff
path: root/psycopg2da/tests.py
diff options
context:
space:
mode:
authorFabio Tranchitella <kobold@debian.org>2006-09-19 03:50:35 +0000
committerFabio Tranchitella <kobold@debian.org>2006-09-19 03:50:35 +0000
commit146f64e71d323c1e4acb0d396db78775882e31b5 (patch)
tree762a1eaddc3c7b58f7f9d66f0cd517476d9a74b1 /psycopg2da/tests.py
parent31189ef0df68bc42c829300719f6685d7c209345 (diff)
downloadpsycopg2-146f64e71d323c1e4acb0d396db78775882e31b5.tar.gz
Added psycopg2da, the zope3 database adapter for psycopg2.
Diffstat (limited to 'psycopg2da/tests.py')
-rw-r--r--psycopg2da/tests.py389
1 files changed, 389 insertions, 0 deletions
diff --git a/psycopg2da/tests.py b/psycopg2da/tests.py
new file mode 100644
index 0000000..4a75f4e
--- /dev/null
+++ b/psycopg2da/tests.py
@@ -0,0 +1,389 @@
+# psycopg2da
+# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+"""Unit tests for Psycopg2DA."""
+
+from unittest import TestCase, TestSuite, main, makeSuite
+from datetime import tzinfo, timedelta
+
+import psycopg2
+import psycopg2.extensions
+
+
+class Stub(object):
+
+ def __init__(self, **kw):
+ self.__dict__.update(kw)
+
+class TZStub(tzinfo):
+
+ def __init__(self, h, m):
+ self.offset = h * 60 + m
+
+ def utcoffset(self, dt):
+ return timedelta(minutes=self.offset)
+
+ def dst(self, dt):
+ return 0
+
+ def tzname(self, dt):
+ return ''
+
+ def __repr__(self):
+ return 'tzinfo(%d)' % self.offset
+
+ def __reduce__(self):
+ return type(self), (), self.__dict__
+
+class ConnectionStub(object):
+
+ def set_isolation_level(self, level):
+ pass
+
+class Psycopg2Stub(object):
+
+ __shared_state = {} # 'Borg' design pattern
+
+ DATE = psycopg2.extensions.DATE
+ TIME = psycopg2.extensions.TIME
+ DATETIME = psycopg2.DATETIME
+ INTERVAL = psycopg2.extensions.INTERVAL
+ ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
+
+ def __init__(self):
+ self.__dict__ = self.__shared_state
+ self.types = {}
+
+ def connect(self, connection_string):
+ self.last_connection_string = connection_string
+ return ConnectionStub()
+
+ def new_type(self, values, name, converter):
+ return Stub(name=name, values=values)
+
+ def register_type(self, type):
+ for typeid in type.values:
+ self.types[typeid] = type
+
+ def getExtensions(self):
+ return self
+ extensions = property(getExtensions)
+
+
+class TestPsycopg2TypeConversion(TestCase):
+
+ def test_conv_date(self):
+ from psycopg2da.adapter import _conv_date
+ from datetime import date
+ def c(s):
+ return _conv_date(s, None)
+ self.assertEquals(c(''), None)
+ self.assertEquals(c('2001-03-02'), date(2001, 3, 2))
+
+ def test_conv_time(self):
+ from psycopg2da.adapter import _conv_time
+ from datetime import time
+ def c(s):
+ return _conv_time(s, None)
+ self.assertEquals(c(''), None)
+ self.assertEquals(c('23:17:57'), time(23, 17, 57))
+ self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000))
+
+ def test_conv_timetz(self):
+ from psycopg2da.adapter import _conv_timetz
+ from datetime import time
+ def c(s):
+ return _conv_timetz(s, None)
+ self.assertEquals(c(''), None)
+ self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0)))
+ self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30)))
+
+ def test_conv_timestamp(self):
+ from psycopg2da.adapter import _conv_timestamp
+ from datetime import datetime
+ def c(s):
+ return _conv_timestamp(s, None)
+ self.assertEquals(c(''), None)
+ self.assertEquals(c('2001-03-02 12:44:01'),
+ datetime(2001, 3, 2, 12, 44, 01))
+ self.assertEquals(c('2001-03-02 12:44:01.037'),
+ datetime(2001, 3, 2, 12, 44, 01, 37000))
+ self.assertEquals(c('2001-03-02 12:44:01.000001'),
+ datetime(2001, 3, 2, 12, 44, 01, 1))
+
+ def test_conv_timestamptz(self):
+ from psycopg2da.adapter import _conv_timestamptz
+ from datetime import datetime
+ def c(s):
+ return _conv_timestamptz(s, None)
+ self.assertEquals(c(''), None)
+
+ self.assertEquals(c('2001-03-02 12:44:01+01:00'),
+ datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0)))
+ self.assertEquals(c('2001-03-02 12:44:01.037-00:30'),
+ datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30)))
+ self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'),
+ datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0)))
+ self.assertEquals(c('2001-06-25 12:14:00-07'),
+ datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0)))
+
+ def test_conv_interval(self):
+ from psycopg2da.adapter import _conv_interval
+ from datetime import timedelta
+ def c(s):
+ return _conv_interval(s, None)
+
+ self.assertEquals(c(''), None)
+ self.assertEquals(c('01:00'), timedelta(hours=1))
+ self.assertEquals(c('00:15'), timedelta(minutes=15))
+ self.assertEquals(c('00:00:47'), timedelta(seconds=47))
+ self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000))
+ self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111))
+ self.assertEquals(c('1 day'), timedelta(days=1))
+ self.assertEquals(c('2 days'), timedelta(days=2))
+ self.assertEquals(c('374 days'), timedelta(days=374))
+ self.assertEquals(c('2 days 03:20:15.123456'),
+ timedelta(days=2, hours=3, minutes=20,
+ seconds=15, microseconds=123456))
+ # XXX There's a problem with years and months. Currently timedelta
+ # cannot represent them accurately
+ self.assertEquals(c('1 month'), '1 month')
+ self.assertEquals(c('2 months'), '2 months')
+ self.assertEquals(c('1 mon'), '1 mon')
+ self.assertEquals(c('2 mons'), '2 mons')
+ self.assertEquals(c('1 year'), '1 year')
+ self.assertEquals(c('3 years'), '3 years')
+ # Later we might be able to use
+ ## self.assertEquals(c('1 month'), timedelta(months=1))
+ ## self.assertEquals(c('2 months'), timedelta(months=2))
+ ## self.assertEquals(c('1 year'), timedelta(years=1))
+ ## self.assertEquals(c('3 years'), timedelta(years=3))
+
+ self.assertRaises(ValueError, c, '2 day')
+ self.assertRaises(ValueError, c, '2days')
+ self.assertRaises(ValueError, c, '123')
+
+ def test_conv_string(self):
+ from psycopg2da.adapter import _get_string_conv
+ _conv_string = _get_string_conv("utf-8")
+ def c(s):
+ return _conv_string(s, None)
+ self.assertEquals(c(None), None)
+ self.assertEquals(c(''), u'')
+ self.assertEquals(c('c'), u'c')
+ self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2')
+ self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2')
+
+class TestPsycopg2Adapter(TestCase):
+
+ def setUp(self):
+ import psycopg2da.adapter as adapter
+ self.real_psycopg2 = adapter.psycopg2
+ adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub()
+
+ def tearDown(self):
+ import psycopg2da.adapter as adapter
+ adapter.psycopg2 = self.real_psycopg2
+
+ def test_connection_factory(self):
+ from psycopg2da.adapter import Psycopg2Adapter
+ a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored')
+ c = a._connection_factory()
+ args = self.psycopg2_stub.last_connection_string.split()
+ args.sort()
+ self.assertEquals(args, ['dbname=dbname', 'host=hostname',
+ 'password=password', 'port=port',
+ 'user=username'])
+
+ def test_registerTypes(self):
+ import psycopg2da.adapter as adapter
+ from psycopg2da.adapter import Psycopg2Adapter
+ a = Psycopg2Adapter('dbi://')
+ a.registerTypes()
+ for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP',
+ 'TIMESTAMPTZ', 'INTERVAL'):
+ typeid = getattr(adapter, '%s_OID' % typename)
+ result = self.psycopg2_stub.types.get(typeid, None)
+ if not result:
+ # comparing None with psycopg2.type object segfaults
+ self.fail("did not register %s (%d): got None, not Z%s"
+ % (typename, typeid, typename))
+ else:
+ result_name = getattr(result, 'name', 'None')
+ self.assertEquals(result, getattr(adapter, typename),
+ "did not register %s (%d): got %s, not Z%s"
+ % (typename, typeid, result_name, typename))
+
+
+class TestISODateTime(TestCase):
+
+ # Test if date/time parsing functions accept a sensible subset of ISO-8601
+ # compliant date/time strings.
+ #
+ # Resources:
+ # http://www.cl.cam.ac.uk/~mgk25/iso-time.html
+ # http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html
+ # http://www.w3.org/TR/NOTE-datetime
+ # http://www.ietf.org/rfc/rfc3339.txt
+
+ basic_dates = (('20020304', (2002, 03, 04)),
+ ('20000229', (2000, 02, 29)))
+
+ extended_dates = (('2002-03-04', (2002, 03, 04)),
+ ('2000-02-29', (2000, 02, 29)))
+
+ basic_times = (('12', (12, 0, 0)),
+ ('1234', (12, 34, 0)),
+ ('123417', (12, 34, 17)),
+ ('123417.5', (12, 34, 17.5)),
+ ('123417,5', (12, 34, 17.5)))
+
+ extended_times = (('12', (12, 0, 0)),
+ ('12:34', (12, 34, 0)),
+ ('12:34:17', (12, 34, 17)),
+ ('12:34:17.5', (12, 34, 17.5)),
+ ('12:34:17,5', (12, 34, 17.5)))
+
+ basic_tzs = (('Z', 0),
+ ('+02', 2*60),
+ ('+1130', 11*60+30),
+ ('-05', -5*60),
+ ('-0030', -30))
+
+ extended_tzs = (('Z', 0),
+ ('+02', 2*60),
+ ('+11:30', 11*60+30),
+ ('-05', -5*60),
+ ('-00:30', -30))
+
+ time_separators = (' ', 'T')
+
+ bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29',
+ '1990/13/14')
+
+ bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,')
+
+ bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q')
+
+ bad_datetimes = ('', 'foo', '2002-03-0412:33')
+
+ bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200')
+
+ exception_type = ValueError
+
+ # We need the following funcions:
+ # parse_date -> (year, month, day)
+ # parse_time -> (hour, minute, second)
+ # parse_timetz -> (hour, minute, second, tzoffset)
+ # parse_datetime -> (year, month, day, hour, minute, second)
+ # parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset)
+ # second can be a float, all other values are ints
+ # tzoffset is offset in minutes east of UTC
+
+ def setUp(self):
+ from psycopg2da.adapter import parse_date, parse_time, \
+ parse_timetz, parse_datetime, parse_datetimetz
+ self.parse_date = parse_date
+ self.parse_time = parse_time
+ self.parse_timetz = parse_timetz
+ self.parse_datetime = parse_datetime
+ self.parse_datetimetz = parse_datetimetz
+
+ def test_basic_date(self):
+ for s, d in self.basic_dates:
+ self.assertEqual(self.parse_date(s), d)
+
+ def test_extended_date(self):
+ for s, d in self.extended_dates:
+ self.assertEqual(self.parse_date(s), d)
+
+ def test_bad_date(self):
+ for s in self.bad_dates:
+ self.assertRaises(self.exception_type, self.parse_date, s)
+
+ def test_basic_time(self):
+ for s, t in self.basic_times:
+ self.assertEqual(self.parse_time(s), t)
+
+ def test_extended_time(self):
+ for s, t in self.extended_times:
+ self.assertEqual(self.parse_time(s), t)
+
+ def test_bad_time(self):
+ for s in self.bad_times:
+ self.assertRaises(self.exception_type, self.parse_time, s)
+
+ def test_basic_timetz(self):
+ for s, t in self.basic_times:
+ for tz, off in self.basic_tzs:
+ self.assertEqual(self.parse_timetz(s+tz), t + (off,))
+
+ def test_extended_timetz(self):
+ for s, t in self.extended_times:
+ for tz, off in self.extended_tzs:
+ self.assertEqual(self.parse_timetz(s+tz), t + (off,))
+
+ def test_bad_timetzs(self):
+ for s in self.bad_timetzs:
+ self.assertRaises(self.exception_type, self.parse_timetz, s)
+
+ def test_basic_datetime(self):
+ for ds, d in self.basic_dates:
+ for ts, t in self.basic_times:
+ for sep in self.time_separators:
+ self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
+
+ def test_extended_datetime(self):
+ for ds, d in self.extended_dates:
+ for ts, t in self.extended_times:
+ for sep in self.time_separators:
+ self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
+
+ def test_bad_datetimes(self):
+ for s in self.bad_datetimes:
+ self.assertRaises(self.exception_type, self.parse_datetime, s)
+
+ def test_basic_datetimetz(self):
+ for ds, d in self.basic_dates:
+ for ts, t in self.basic_times:
+ for tz, off in self.basic_tzs:
+ for sep in self.time_separators:
+ self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
+ d + t + (off,))
+
+ def test_extended_datetimetz(self):
+ for ds, d in self.extended_dates:
+ for ts, t in self.extended_times:
+ for tz, off in self.extended_tzs:
+ for sep in self.time_separators:
+ self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
+ d + t + (off,))
+
+ def test_bad_datetimetzs(self):
+ for s in self.bad_datetimetzs:
+ self.assertRaises(self.exception_type, self.parse_datetimetz, s)
+
+
+def test_suite():
+ return TestSuite((
+ makeSuite(TestPsycopg2TypeConversion),
+ makeSuite(TestPsycopg2Adapter),
+ makeSuite(TestISODateTime),
+ ))
+
+if __name__=='__main__':
+ main(defaultTest='test_suite')