summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES3
-rw-r--r--lib/sqlalchemy/orm/mapper.py5
-rw-r--r--lib/sqlalchemy/schema.py6
-rw-r--r--lib/sqlalchemy/util.py5
-rw-r--r--test/engine/reflection.py81
5 files changed, 100 insertions, 0 deletions
diff --git a/CHANGES b/CHANGES
index 369f875ec..5e35e4eaa 100644
--- a/CHANGES
+++ b/CHANGES
@@ -28,6 +28,9 @@
explicitly set to False
- TypeEngine objects now have methods to deal with copying and comparing
values of their specific type. Currently used by the ORM, see below.
+ - fixed condition that occurred during reflection when a primary key
+ column was explciitly overridden, where the PrimaryKeyConstraint would
+ get both the reflected and the programmatic column doubled up
- Connections/Pooling/Execution:
- connection pool tracks open cursors and automatically closes them
if connection is returned to pool with cursors still opened. Can be
diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py
index fd912e617..af54f6df8 100644
--- a/lib/sqlalchemy/orm/mapper.py
+++ b/lib/sqlalchemy/orm/mapper.py
@@ -343,6 +343,10 @@ class Mapper(object):
except KeyError:
l = self.pks_by_table.setdefault(t, util.OrderedSet())
for k in t.primary_key:
+ #if k.key not in t.c and k._label not in t.c:
+ # this is a condition that was occurring when table reflection was doubling up primary keys
+ # that were overridden in the Table constructor
+ # raise exceptions.AssertionError("Column " + str(k) + " not located in the column set of table " + str(t))
l.add(k)
if len(self.pks_by_table[self.mapped_table]) == 0:
@@ -392,6 +396,7 @@ class Mapper(object):
self.__log("adding ColumnProperty %s" % (column.key))
elif isinstance(prop, ColumnProperty):
prop.columns.append(column)
+ self.__log("appending to existing ColumnProperty %s" % (column.key))
else:
if not self.allow_column_override:
raise exceptions.ArgumentError("WARNING: column '%s' not being added due to property '%s'. Specify 'allow_column_override=True' to mapper() to ignore this condition." % (column.key, repr(prop)))
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index 3dab1f867..72109379d 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -440,6 +440,8 @@ class Column(SchemaItem, sql.ColumnClause):
else:
return self.name
return self.name
+ else:
+ return self.name
def _derived_metadata(self):
return self.table.metadata
@@ -764,6 +766,10 @@ class PrimaryKeyConstraint(Constraint):
def accept_schema_visitor(self, visitor):
visitor.visit_primary_key_constraint(self)
def append(self, col):
+ # TODO: change "columns" to a key-sensitive set ?
+ for c in self.columns:
+ if c.key == col.key:
+ self.columns.remove(c)
self.columns.append(col)
col.primary_key=True
def copy(self):
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
index 3664b8e6d..91e295435 100644
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -102,6 +102,11 @@ class OrderedProperties(object):
raise AttributeError(key)
def __contains__(self, key):
return key in self.__data
+ def get(self, key, default=None):
+ if self.has_key(key):
+ return self[key]
+ else:
+ return default
def keys(self):
return self.__data.keys()
def has_key(self, key):
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index c31e5454f..2e2c50127 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -100,6 +100,86 @@ class ReflectionTest(PersistTest):
finally:
addresses.drop()
users.drop()
+
+ def testoverridecolumns(self):
+ """test that you can override columns which contain foreign keys to other reflected tables"""
+ meta = BoundMetaData(testbase.db)
+ users = Table('users', meta,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)))
+ addresses = Table('addresses', meta,
+ Column('id', Integer, primary_key=True),
+ Column('street', String(30)),
+ Column('user_id', Integer))
+
+ meta.create_all()
+ try:
+ meta2 = BoundMetaData(testbase.db)
+ a2 = Table('addresses', meta2,
+ Column('user_id', Integer, ForeignKey('users.id')),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+
+ assert a2.c.user_id.foreign_key is not None
+ assert a2.c.user_id.foreign_key.parent is a2.c.user_id
+ assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+
+ meta3 = BoundMetaData(testbase.db)
+ u3 = Table('users', meta3, autoload=True)
+ a3 = Table('addresses', meta3,
+ Column('user_id', Integer, ForeignKey('users.id')),
+ autoload=True)
+
+ assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
+
+ finally:
+ meta.drop_all()
+
+ def testoverridecolumns2(self):
+ """test that you can override columns which contain foreign keys to other reflected tables,
+ where the foreign key column is also a primary key column"""
+ meta = BoundMetaData(testbase.db)
+ users = Table('users', meta,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)))
+ addresses = Table('addresses', meta,
+ Column('id', Integer, primary_key=True),
+ Column('street', String(30)))
+
+
+ meta.create_all()
+ try:
+ meta2 = BoundMetaData(testbase.db)
+ a2 = Table('addresses', meta2,
+ Column('id', Integer, ForeignKey('users.id'), primary_key=True, ),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+
+ assert list(a2.primary_key) == [a2.c.id]
+ assert list(u2.primary_key) == [u2.c.id]
+ assert u2.join(a2).onclause == u2.c.id==a2.c.id
+
+ # heres what was originally failing, because a2's primary key
+ # had two "id" columns, one of which was not part of a2's "c" collection
+ #class Address(object):pass
+ #mapper(Address, a2)
+ #add1 = Address()
+ #sess = create_session()
+ #sess.save(add1)
+ #sess.flush()
+
+ meta3 = BoundMetaData(testbase.db)
+ u3 = Table('users', meta3, autoload=True)
+ a3 = Table('addresses', meta3,
+ Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+ autoload=True)
+
+ assert list(a3.primary_key) == [a3.c.id]
+ assert list(u3.primary_key) == [u3.c.id]
+ assert u3.join(a3).onclause == u3.c.id==a3.c.id
+
+ finally:
+ meta.drop_all()
@testbase.supported('mysql')
def testmysqltypes(self):
@@ -146,6 +226,7 @@ class ReflectionTest(PersistTest):
t.drop()
def testmultipk(self):
+ """test that creating a table checks for a sequence before creating it"""
table = Table(
'engine_multi', testbase.db,
Column('multi_id', Integer, Sequence('multi_id_seq'), primary_key=True),