diff options
| author | Fabio Tranchitella <kobold@debian.org> | 2006-09-19 03:50:35 +0000 |
|---|---|---|
| committer | Fabio Tranchitella <kobold@debian.org> | 2006-09-19 03:50:35 +0000 |
| commit | 146f64e71d323c1e4acb0d396db78775882e31b5 (patch) | |
| tree | 762a1eaddc3c7b58f7f9d66f0cd517476d9a74b1 /psycopg2da/tests.py | |
| parent | 31189ef0df68bc42c829300719f6685d7c209345 (diff) | |
| download | psycopg2-146f64e71d323c1e4acb0d396db78775882e31b5.tar.gz | |
Added psycopg2da, the zope3 database adapter for psycopg2.
Diffstat (limited to 'psycopg2da/tests.py')
| -rw-r--r-- | psycopg2da/tests.py | 389 |
1 files changed, 389 insertions, 0 deletions
diff --git a/psycopg2da/tests.py b/psycopg2da/tests.py new file mode 100644 index 0000000..4a75f4e --- /dev/null +++ b/psycopg2da/tests.py @@ -0,0 +1,389 @@ +# psycopg2da +# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +"""Unit tests for Psycopg2DA.""" + +from unittest import TestCase, TestSuite, main, makeSuite +from datetime import tzinfo, timedelta + +import psycopg2 +import psycopg2.extensions + + +class Stub(object): + + def __init__(self, **kw): + self.__dict__.update(kw) + +class TZStub(tzinfo): + + def __init__(self, h, m): + self.offset = h * 60 + m + + def utcoffset(self, dt): + return timedelta(minutes=self.offset) + + def dst(self, dt): + return 0 + + def tzname(self, dt): + return '' + + def __repr__(self): + return 'tzinfo(%d)' % self.offset + + def __reduce__(self): + return type(self), (), self.__dict__ + +class ConnectionStub(object): + + def set_isolation_level(self, level): + pass + +class Psycopg2Stub(object): + + __shared_state = {} # 'Borg' design pattern + + DATE = psycopg2.extensions.DATE + TIME = psycopg2.extensions.TIME + DATETIME = psycopg2.DATETIME + INTERVAL = psycopg2.extensions.INTERVAL + ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE + + def __init__(self): + self.__dict__ = self.__shared_state + self.types = {} + + def connect(self, connection_string): + self.last_connection_string = connection_string + return ConnectionStub() + + def new_type(self, values, name, converter): + return Stub(name=name, values=values) + + def register_type(self, type): + for typeid in type.values: + self.types[typeid] = type + + def getExtensions(self): + return self + extensions = property(getExtensions) + + +class TestPsycopg2TypeConversion(TestCase): + + def test_conv_date(self): + from psycopg2da.adapter import _conv_date + from datetime import date + def c(s): + return _conv_date(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('2001-03-02'), date(2001, 3, 2)) + + def test_conv_time(self): + from psycopg2da.adapter import _conv_time + from datetime import time + def c(s): + return _conv_time(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('23:17:57'), time(23, 17, 57)) + self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000)) + + def test_conv_timetz(self): + from psycopg2da.adapter import _conv_timetz + from datetime import time + def c(s): + return _conv_timetz(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0))) + self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30))) + + def test_conv_timestamp(self): + from psycopg2da.adapter import _conv_timestamp + from datetime import datetime + def c(s): + return _conv_timestamp(s, None) + self.assertEquals(c(''), None) + self.assertEquals(c('2001-03-02 12:44:01'), + datetime(2001, 3, 2, 12, 44, 01)) + self.assertEquals(c('2001-03-02 12:44:01.037'), + datetime(2001, 3, 2, 12, 44, 01, 37000)) + self.assertEquals(c('2001-03-02 12:44:01.000001'), + datetime(2001, 3, 2, 12, 44, 01, 1)) + + def test_conv_timestamptz(self): + from psycopg2da.adapter import _conv_timestamptz + from datetime import datetime + def c(s): + return _conv_timestamptz(s, None) + self.assertEquals(c(''), None) + + self.assertEquals(c('2001-03-02 12:44:01+01:00'), + datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0))) + self.assertEquals(c('2001-03-02 12:44:01.037-00:30'), + datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30))) + self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'), + datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0))) + self.assertEquals(c('2001-06-25 12:14:00-07'), + datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0))) + + def test_conv_interval(self): + from psycopg2da.adapter import _conv_interval + from datetime import timedelta + def c(s): + return _conv_interval(s, None) + + self.assertEquals(c(''), None) + self.assertEquals(c('01:00'), timedelta(hours=1)) + self.assertEquals(c('00:15'), timedelta(minutes=15)) + self.assertEquals(c('00:00:47'), timedelta(seconds=47)) + self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000)) + self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111)) + self.assertEquals(c('1 day'), timedelta(days=1)) + self.assertEquals(c('2 days'), timedelta(days=2)) + self.assertEquals(c('374 days'), timedelta(days=374)) + self.assertEquals(c('2 days 03:20:15.123456'), + timedelta(days=2, hours=3, minutes=20, + seconds=15, microseconds=123456)) + # XXX There's a problem with years and months. Currently timedelta + # cannot represent them accurately + self.assertEquals(c('1 month'), '1 month') + self.assertEquals(c('2 months'), '2 months') + self.assertEquals(c('1 mon'), '1 mon') + self.assertEquals(c('2 mons'), '2 mons') + self.assertEquals(c('1 year'), '1 year') + self.assertEquals(c('3 years'), '3 years') + # Later we might be able to use + ## self.assertEquals(c('1 month'), timedelta(months=1)) + ## self.assertEquals(c('2 months'), timedelta(months=2)) + ## self.assertEquals(c('1 year'), timedelta(years=1)) + ## self.assertEquals(c('3 years'), timedelta(years=3)) + + self.assertRaises(ValueError, c, '2 day') + self.assertRaises(ValueError, c, '2days') + self.assertRaises(ValueError, c, '123') + + def test_conv_string(self): + from psycopg2da.adapter import _get_string_conv + _conv_string = _get_string_conv("utf-8") + def c(s): + return _conv_string(s, None) + self.assertEquals(c(None), None) + self.assertEquals(c(''), u'') + self.assertEquals(c('c'), u'c') + self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2') + self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2') + +class TestPsycopg2Adapter(TestCase): + + def setUp(self): + import psycopg2da.adapter as adapter + self.real_psycopg2 = adapter.psycopg2 + adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub() + + def tearDown(self): + import psycopg2da.adapter as adapter + adapter.psycopg2 = self.real_psycopg2 + + def test_connection_factory(self): + from psycopg2da.adapter import Psycopg2Adapter + a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored') + c = a._connection_factory() + args = self.psycopg2_stub.last_connection_string.split() + args.sort() + self.assertEquals(args, ['dbname=dbname', 'host=hostname', + 'password=password', 'port=port', + 'user=username']) + + def test_registerTypes(self): + import psycopg2da.adapter as adapter + from psycopg2da.adapter import Psycopg2Adapter + a = Psycopg2Adapter('dbi://') + a.registerTypes() + for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP', + 'TIMESTAMPTZ', 'INTERVAL'): + typeid = getattr(adapter, '%s_OID' % typename) + result = self.psycopg2_stub.types.get(typeid, None) + if not result: + # comparing None with psycopg2.type object segfaults + self.fail("did not register %s (%d): got None, not Z%s" + % (typename, typeid, typename)) + else: + result_name = getattr(result, 'name', 'None') + self.assertEquals(result, getattr(adapter, typename), + "did not register %s (%d): got %s, not Z%s" + % (typename, typeid, result_name, typename)) + + +class TestISODateTime(TestCase): + + # Test if date/time parsing functions accept a sensible subset of ISO-8601 + # compliant date/time strings. + # + # Resources: + # http://www.cl.cam.ac.uk/~mgk25/iso-time.html + # http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html + # http://www.w3.org/TR/NOTE-datetime + # http://www.ietf.org/rfc/rfc3339.txt + + basic_dates = (('20020304', (2002, 03, 04)), + ('20000229', (2000, 02, 29))) + + extended_dates = (('2002-03-04', (2002, 03, 04)), + ('2000-02-29', (2000, 02, 29))) + + basic_times = (('12', (12, 0, 0)), + ('1234', (12, 34, 0)), + ('123417', (12, 34, 17)), + ('123417.5', (12, 34, 17.5)), + ('123417,5', (12, 34, 17.5))) + + extended_times = (('12', (12, 0, 0)), + ('12:34', (12, 34, 0)), + ('12:34:17', (12, 34, 17)), + ('12:34:17.5', (12, 34, 17.5)), + ('12:34:17,5', (12, 34, 17.5))) + + basic_tzs = (('Z', 0), + ('+02', 2*60), + ('+1130', 11*60+30), + ('-05', -5*60), + ('-0030', -30)) + + extended_tzs = (('Z', 0), + ('+02', 2*60), + ('+11:30', 11*60+30), + ('-05', -5*60), + ('-00:30', -30)) + + time_separators = (' ', 'T') + + bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29', + '1990/13/14') + + bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,') + + bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q') + + bad_datetimes = ('', 'foo', '2002-03-0412:33') + + bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200') + + exception_type = ValueError + + # We need the following funcions: + # parse_date -> (year, month, day) + # parse_time -> (hour, minute, second) + # parse_timetz -> (hour, minute, second, tzoffset) + # parse_datetime -> (year, month, day, hour, minute, second) + # parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset) + # second can be a float, all other values are ints + # tzoffset is offset in minutes east of UTC + + def setUp(self): + from psycopg2da.adapter import parse_date, parse_time, \ + parse_timetz, parse_datetime, parse_datetimetz + self.parse_date = parse_date + self.parse_time = parse_time + self.parse_timetz = parse_timetz + self.parse_datetime = parse_datetime + self.parse_datetimetz = parse_datetimetz + + def test_basic_date(self): + for s, d in self.basic_dates: + self.assertEqual(self.parse_date(s), d) + + def test_extended_date(self): + for s, d in self.extended_dates: + self.assertEqual(self.parse_date(s), d) + + def test_bad_date(self): + for s in self.bad_dates: + self.assertRaises(self.exception_type, self.parse_date, s) + + def test_basic_time(self): + for s, t in self.basic_times: + self.assertEqual(self.parse_time(s), t) + + def test_extended_time(self): + for s, t in self.extended_times: + self.assertEqual(self.parse_time(s), t) + + def test_bad_time(self): + for s in self.bad_times: + self.assertRaises(self.exception_type, self.parse_time, s) + + def test_basic_timetz(self): + for s, t in self.basic_times: + for tz, off in self.basic_tzs: + self.assertEqual(self.parse_timetz(s+tz), t + (off,)) + + def test_extended_timetz(self): + for s, t in self.extended_times: + for tz, off in self.extended_tzs: + self.assertEqual(self.parse_timetz(s+tz), t + (off,)) + + def test_bad_timetzs(self): + for s in self.bad_timetzs: + self.assertRaises(self.exception_type, self.parse_timetz, s) + + def test_basic_datetime(self): + for ds, d in self.basic_dates: + for ts, t in self.basic_times: + for sep in self.time_separators: + self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) + + def test_extended_datetime(self): + for ds, d in self.extended_dates: + for ts, t in self.extended_times: + for sep in self.time_separators: + self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) + + def test_bad_datetimes(self): + for s in self.bad_datetimes: + self.assertRaises(self.exception_type, self.parse_datetime, s) + + def test_basic_datetimetz(self): + for ds, d in self.basic_dates: + for ts, t in self.basic_times: + for tz, off in self.basic_tzs: + for sep in self.time_separators: + self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), + d + t + (off,)) + + def test_extended_datetimetz(self): + for ds, d in self.extended_dates: + for ts, t in self.extended_times: + for tz, off in self.extended_tzs: + for sep in self.time_separators: + self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), + d + t + (off,)) + + def test_bad_datetimetzs(self): + for s in self.bad_datetimetzs: + self.assertRaises(self.exception_type, self.parse_datetimetz, s) + + +def test_suite(): + return TestSuite(( + makeSuite(TestPsycopg2TypeConversion), + makeSuite(TestPsycopg2Adapter), + makeSuite(TestISODateTime), + )) + +if __name__=='__main__': + main(defaultTest='test_suite') |
