summaryrefslogtreecommitdiff
path: root/src/zope/interface/interface.py
diff options
context:
space:
mode:
authorChristian Theune <ct@gocept.com>2007-05-03 21:59:32 +0000
committerChristian Theune <ct@gocept.com>2007-05-03 21:59:32 +0000
commita025dca5f2dc2e74575ebd43ea616d6f67e6026e (patch)
tree811d75a6d333355932ea0bfe63e2765bc795aaca /src/zope/interface/interface.py
parent6e287d3cffe6087dad31fc2310b3de45d324d396 (diff)
downloadzope-interface-a025dca5f2dc2e74575ebd43ea616d6f67e6026e.tar.gz
Moving code to satellite.
Diffstat (limited to 'src/zope/interface/interface.py')
-rw-r--r--src/zope/interface/interface.py811
1 files changed, 811 insertions, 0 deletions
diff --git a/src/zope/interface/interface.py b/src/zope/interface/interface.py
new file mode 100644
index 0000000..4e18f7d
--- /dev/null
+++ b/src/zope/interface/interface.py
@@ -0,0 +1,811 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Interface object implementation
+
+$Id$
+"""
+
+from __future__ import generators
+
+import sys
+import warnings
+import weakref
+from types import FunctionType
+from ro import ro
+from zope.interface.exceptions import Invalid
+
+
+CO_VARARGS = 4
+CO_VARKEYWORDS = 8
+TAGGED_DATA = '__interface_tagged_values__'
+
+_decorator_non_return = object()
+
+def invariant(call):
+ f_locals = sys._getframe(1).f_locals
+ tags = f_locals.setdefault(TAGGED_DATA, {})
+ invariants = tags.setdefault('invariants', [])
+ invariants.append(call)
+ return _decorator_non_return
+
+
+def taggedValue(key, value):
+ """Attaches a tagged value to an interface at definition time."""
+ f_locals = sys._getframe(1).f_locals
+ tagged_values = f_locals.setdefault(TAGGED_DATA, {})
+ tagged_values[key] = value
+ return _decorator_non_return
+
+
+class Element(object):
+
+ # We can't say this yet because we don't have enough
+ # infrastructure in place.
+ #
+ #implements(IElement)
+
+ def __init__(self, __name__, __doc__=''):
+ """Create an 'attribute' description
+ """
+ if not __doc__ and __name__.find(' ') >= 0:
+ __doc__ = __name__
+ __name__ = None
+
+ self.__name__=__name__
+ self.__doc__=__doc__
+ self.__tagged_values = {}
+
+ def getName(self):
+ """ Returns the name of the object. """
+ return self.__name__
+
+ def getDoc(self):
+ """ Returns the documentation for the object. """
+ return self.__doc__
+
+ def getTaggedValue(self, tag):
+ """ Returns the value associated with 'tag'. """
+ return self.__tagged_values[tag]
+
+ def queryTaggedValue(self, tag, default=None):
+ """ Returns the value associated with 'tag'. """
+ return self.__tagged_values.get(tag, default)
+
+ def getTaggedValueTags(self):
+ """ Returns a list of all tags. """
+ return self.__tagged_values.keys()
+
+ def setTaggedValue(self, tag, value):
+ """ Associates 'value' with 'key'. """
+ self.__tagged_values[tag] = value
+
+class SpecificationBasePy(object):
+
+ def providedBy(self, ob):
+ """Is the interface implemented by an object
+
+ >>> from zope.interface import *
+ >>> class I1(Interface):
+ ... pass
+ >>> class C(object):
+ ... implements(I1)
+ >>> c = C()
+ >>> class X(object):
+ ... pass
+ >>> x = X()
+ >>> I1.providedBy(x)
+ False
+ >>> I1.providedBy(C)
+ False
+ >>> I1.providedBy(c)
+ True
+ >>> directlyProvides(x, I1)
+ >>> I1.providedBy(x)
+ True
+ >>> directlyProvides(C, I1)
+ >>> I1.providedBy(C)
+ True
+
+ """
+ spec = providedBy(ob)
+ return self in spec._implied
+
+ def implementedBy(self, cls):
+ """Test whether the specification is implemented by a class or factory.
+ Raise TypeError if argument is neither a class nor a callable."""
+ spec = implementedBy(cls)
+ return self in spec._implied
+
+ def isOrExtends(self, interface):
+ """Is the interface the same as or extend the given interface
+
+ Examples::
+
+ >>> from zope.interface import Interface
+ >>> from zope.interface.declarations import Declaration
+ >>> class I1(Interface): pass
+ ...
+ >>> class I2(I1): pass
+ ...
+ >>> class I3(Interface): pass
+ ...
+ >>> class I4(I3): pass
+ ...
+ >>> spec = Declaration()
+ >>> int(spec.extends(Interface))
+ 1
+ >>> spec = Declaration(I2)
+ >>> int(spec.extends(Interface))
+ 1
+ >>> int(spec.extends(I1))
+ 1
+ >>> int(spec.extends(I2))
+ 1
+ >>> int(spec.extends(I3))
+ 0
+ >>> int(spec.extends(I4))
+ 0
+
+ """
+ return interface in self._implied
+
+ __call__ = isOrExtends
+
+SpecificationBase = SpecificationBasePy
+
+_marker = object()
+class InterfaceBasePy(object):
+ """Base class that wants to be replaced with a C base :)
+ """
+
+ def __call__(self, obj, alternate=_marker):
+ """Adapt an object to the interface
+ """
+ conform = getattr(obj, '__conform__', None)
+ if conform is not None:
+ adapter = self._call_conform(conform)
+ if adapter is not None:
+ return adapter
+
+ adapter = self.__adapt__(obj)
+
+ if adapter is not None:
+ return adapter
+ elif alternate is not _marker:
+ return alternate
+ else:
+ raise TypeError("Could not adapt", obj, self)
+
+ def __adapt__(self, obj):
+ """Adapt an object to the reciever
+ """
+ if self.providedBy(obj):
+ return obj
+
+ for hook in adapter_hooks:
+ adapter = hook(self, obj)
+ if adapter is not None:
+ return adapter
+
+InterfaceBase = InterfaceBasePy
+
+adapter_hooks = []
+
+try:
+ import _zope_interface_coptimizations
+except ImportError:
+ pass
+else:
+ from _zope_interface_coptimizations import SpecificationBase
+ from _zope_interface_coptimizations import InterfaceBase, adapter_hooks
+
+class Specification(SpecificationBase):
+ """Specifications
+
+ An interface specification is used to track interface declarations
+ and component registrations.
+
+ This class is a base class for both interfaces themselves and for
+ interface specifications (declarations).
+
+ Specifications are mutable. If you reassign their cases, their
+ relations with other specifications are adjusted accordingly.
+
+ For example:
+
+ >>> from zope.interface import Interface
+ >>> class I1(Interface):
+ ... pass
+ >>> class I2(I1):
+ ... pass
+ >>> class I3(I2):
+ ... pass
+
+ >>> [i.__name__ for i in I1.__bases__]
+ ['Interface']
+
+ >>> [i.__name__ for i in I2.__bases__]
+ ['I1']
+
+ >>> I3.extends(I1)
+ 1
+
+ >>> I2.__bases__ = (Interface, )
+
+ >>> [i.__name__ for i in I2.__bases__]
+ ['Interface']
+
+ >>> I3.extends(I1)
+ 0
+
+ """
+
+ # Copy some base class methods for speed
+ isOrExtends = SpecificationBase.isOrExtends
+ providedBy = SpecificationBase.providedBy
+
+ def __init__(self, bases=()):
+ self._implied = {}
+ self.dependents = weakref.WeakKeyDictionary()
+ self.__bases__ = tuple(bases)
+
+ def subscribe(self, dependent):
+ self.dependents[dependent] = self.dependents.get(dependent, 0) + 1
+
+ def unsubscribe(self, dependent):
+ n = self.dependents.get(dependent, 0) - 1
+ if not n:
+ del self.dependents[dependent]
+ elif n > 0:
+ self.dependents[dependent] = n
+ else:
+ raise KeyError(dependent)
+
+ def __setBases(self, bases):
+ # Register ourselves as a dependent of our old bases
+ for b in self.__bases__:
+ b.unsubscribe(self)
+
+ # Register ourselves as a dependent of our bases
+ self.__dict__['__bases__'] = bases
+ for b in bases:
+ b.subscribe(self)
+
+ self.changed(self)
+
+ __bases__ = property(
+
+ lambda self: self.__dict__.get('__bases__', ()),
+ __setBases,
+ )
+
+ def changed(self, originally_changed):
+ """We, or something we depend on, have changed
+ """
+
+ implied = self._implied
+ implied.clear()
+
+ ancestors = ro(self)
+
+ try:
+ if Interface not in ancestors:
+ ancestors.append(Interface)
+ except NameError:
+ pass # defining Interface itself
+
+ self.__sro__ = tuple(ancestors)
+ self.__iro__ = tuple([ancestor for ancestor in ancestors
+ if isinstance(ancestor, InterfaceClass)
+ ])
+
+ for ancestor in ancestors:
+ # We directly imply our ancestors:
+ implied[ancestor] = ()
+
+ # Now, advise our dependents of change:
+ for dependent in self.dependents.keys():
+ dependent.changed(originally_changed)
+
+
+ def interfaces(self):
+ """Return an iterator for the interfaces in the specification
+
+ for example::
+
+ >>> from zope.interface import Interface
+ >>> class I1(Interface): pass
+ ...
+ >>> class I2(I1): pass
+ ...
+ >>> class I3(Interface): pass
+ ...
+ >>> class I4(I3): pass
+ ...
+ >>> spec = Specification((I2, I3))
+ >>> spec = Specification((I4, spec))
+ >>> i = spec.interfaces()
+ >>> i.next().getName()
+ 'I4'
+ >>> i.next().getName()
+ 'I2'
+ >>> i.next().getName()
+ 'I3'
+ >>> list(i)
+ []
+ """
+ seen = {}
+ for base in self.__bases__:
+ for interface in base.interfaces():
+ if interface not in seen:
+ seen[interface] = 1
+ yield interface
+
+
+ def extends(self, interface, strict=True):
+ """Does the specification extend the given interface?
+
+ Test whether an interface in the specification extends the
+ given interface
+
+ Examples::
+
+ >>> from zope.interface import Interface
+ >>> from zope.interface.declarations import Declaration
+ >>> class I1(Interface): pass
+ ...
+ >>> class I2(I1): pass
+ ...
+ >>> class I3(Interface): pass
+ ...
+ >>> class I4(I3): pass
+ ...
+ >>> spec = Declaration()
+ >>> int(spec.extends(Interface))
+ 1
+ >>> spec = Declaration(I2)
+ >>> int(spec.extends(Interface))
+ 1
+ >>> int(spec.extends(I1))
+ 1
+ >>> int(spec.extends(I2))
+ 1
+ >>> int(spec.extends(I3))
+ 0
+ >>> int(spec.extends(I4))
+ 0
+ >>> I2.extends(I2)
+ 0
+ >>> I2.extends(I2, False)
+ 1
+ >>> I2.extends(I2, strict=False)
+ 1
+
+ """
+ return ((interface in self._implied)
+ and
+ ((not strict) or (self != interface))
+ )
+
+ def weakref(self, callback=None):
+ return weakref.ref(self, callback)
+
+ def get(self, name, default=None):
+ """Query for an attribute description
+ """
+ try:
+ attrs = self._v_attrs
+ except AttributeError:
+ attrs = self._v_attrs = {}
+ attr = attrs.get(name)
+ if attr is None:
+ for iface in self.__iro__:
+ attr = iface.direct(name)
+ if attr is not None:
+ attrs[name] = attr
+ break
+
+ if attr is None:
+ return default
+ else:
+ return attr
+
+class InterfaceClass(Element, InterfaceBase, Specification):
+ """Prototype (scarecrow) Interfaces Implementation."""
+
+ # We can't say this yet because we don't have enough
+ # infrastructure in place.
+ #
+ #implements(IInterface)
+
+ def __init__(self, name, bases=(), attrs=None, __doc__=None,
+ __module__=None):
+
+ if attrs is None:
+ attrs = {}
+
+ if __module__ is None:
+ __module__ = attrs.get('__module__')
+ if isinstance(__module__, str):
+ del attrs['__module__']
+ else:
+ try:
+ # Figure out what module defined the interface.
+ # This is how cPython figures out the module of
+ # a class, but of course it does it in C. :-/
+ __module__ = sys._getframe(1).f_globals['__name__']
+ except (AttributeError, KeyError):
+ pass
+
+ self.__module__ = __module__
+
+ d = attrs.get('__doc__')
+ if d is not None:
+ if not isinstance(d, Attribute):
+ if __doc__ is None:
+ __doc__ = d
+ del attrs['__doc__']
+
+ if __doc__ is None:
+ __doc__ = ''
+
+ Element.__init__(self, name, __doc__)
+
+ tagged_data = attrs.pop(TAGGED_DATA, None)
+ if tagged_data is not None:
+ for key, val in tagged_data.items():
+ self.setTaggedValue(key, val)
+
+ for base in bases:
+ if not isinstance(base, InterfaceClass):
+ raise TypeError('Expected base interfaces')
+
+ Specification.__init__(self, bases)
+
+ # Make sure that all recorded attributes (and methods) are of type
+ # `Attribute` and `Method`
+ for name, attr in attrs.items():
+ if isinstance(attr, Attribute):
+ attr.interface = self
+ if not attr.__name__:
+ attr.__name__ = name
+ elif isinstance(attr, FunctionType):
+ attrs[name] = fromFunction(attr, self, name=name)
+ elif attr is _decorator_non_return:
+ del attrs[name]
+ else:
+ raise InvalidInterface("Concrete attribute, " + name)
+
+ self.__attrs = attrs
+
+ self.__identifier__ = "%s.%s" % (self.__module__, self.__name__)
+
+ def interfaces(self):
+ """Return an iterator for the interfaces in the specification
+
+ for example::
+
+ >>> from zope.interface import Interface
+ >>> class I1(Interface): pass
+ ...
+ >>>
+ >>> i = I1.interfaces()
+ >>> i.next().getName()
+ 'I1'
+ >>> list(i)
+ []
+ """
+ yield self
+
+ def getBases(self):
+ return self.__bases__
+
+ def isEqualOrExtendedBy(self, other):
+ """Same interface or extends?"""
+ return self == other or other.extends(self)
+
+ def names(self, all=False):
+ """Return the attribute names defined by the interface."""
+ if not all:
+ return self.__attrs.keys()
+
+ r = self.__attrs.copy()
+
+ for base in self.__bases__:
+ r.update(dict.fromkeys(base.names(all)))
+
+ return r.keys()
+
+ def __iter__(self):
+ return iter(self.names(all=True))
+
+ def namesAndDescriptions(self, all=False):
+ """Return attribute names and descriptions defined by interface."""
+ if not all:
+ return self.__attrs.items()
+
+ r = {}
+ for base in self.__bases__[::-1]:
+ r.update(dict(base.namesAndDescriptions(all)))
+
+ r.update(self.__attrs)
+
+ return r.items()
+
+ def getDescriptionFor(self, name):
+ """Return the attribute description for the given name."""
+ r = self.get(name)
+ if r is not None:
+ return r
+
+ raise KeyError(name)
+
+ __getitem__ = getDescriptionFor
+
+ def __contains__(self, name):
+ return self.get(name) is not None
+
+ def direct(self, name):
+ return self.__attrs.get(name)
+
+ def queryDescriptionFor(self, name, default=None):
+ return self.get(name, default)
+
+ def deferred(self):
+ """Return a defered class corresponding to the interface."""
+ if hasattr(self, "_deferred"): return self._deferred
+
+ klass={}
+ exec "class %s: pass" % self.__name__ in klass
+ klass=klass[self.__name__]
+
+ self.__d(klass.__dict__)
+
+ self._deferred=klass
+
+ return klass
+
+ def validateInvariants(self, obj, errors=None):
+ """validate object to defined invariants."""
+ for call in self.queryTaggedValue('invariants', []):
+ try:
+ call(obj)
+ except Invalid, e:
+ if errors is None:
+ raise
+ else:
+ errors.append(e)
+ for base in self.__bases__:
+ try:
+ base.validateInvariants(obj, errors)
+ except Invalid:
+ if errors is None:
+ raise
+ if errors:
+ raise Invalid(errors)
+
+ def _getInterface(self, ob, name):
+ """Retrieve a named interface."""
+ return None
+
+ def __d(self, dict):
+
+ for k, v in self.__attrs.items():
+ if isinstance(v, Method) and not (k in dict):
+ dict[k]=v
+
+ for b in self.__bases__:
+ b.__d(dict)
+
+ def __repr__(self):
+ try:
+ return self._v_repr
+ except AttributeError:
+ name = self.__name__
+ m = self.__module__
+ if m:
+ name = '%s.%s' % (m, name)
+ r = "<%s %s>" % (self.__class__.__name__, name)
+ self._v_repr = r
+ return r
+
+ def _call_conform(self, conform):
+ try:
+ return conform(self)
+ except TypeError:
+ # We got a TypeError. It might be an error raised by
+ # the __conform__ implementation, or *we* may have
+ # made the TypeError by calling an unbound method
+ # (object is a class). In the later case, we behave
+ # as though there is no __conform__ method. We can
+ # detect this case by checking whether there is more
+ # than one traceback object in the traceback chain:
+ if sys.exc_info()[2].tb_next is not None:
+ # There is more than one entry in the chain, so
+ # reraise the error:
+ raise
+ # This clever trick is from Phillip Eby
+
+ return None
+
+ def __reduce__(self):
+ return self.__name__
+
+ def __cmp(self, o1, o2):
+ # Yes, I did mean to name this __cmp, rather than __cmp__.
+ # It is a private method used by __lt__ and __gt__.
+ # I don't want to override __eq__ because I want the default
+ # __eq__, which is really fast.
+ """Make interfaces sortable
+
+ TODO: It would ne nice if:
+
+ More specific interfaces should sort before less specific ones.
+ Otherwise, sort on name and module.
+
+ But this is too complicated, and we're going to punt on it
+ for now.
+
+ For now, sort on interface and module name.
+
+ None is treated as a pseudo interface that implies the loosest
+ contact possible, no contract. For that reason, all interfaces
+ sort before None.
+
+ """
+ if o1 == o2:
+ return 0
+
+ if o1 is None:
+ return 1
+ if o2 is None:
+ return -1
+
+ n1 = (getattr(o1, '__name__', ''),
+ getattr(getattr(o1, '__module__', None), '__name__', ''))
+ n2 = (getattr(o2, '__name__', ''),
+ getattr(getattr(o2, '__module__', None), '__name__', ''))
+
+ return cmp(n1, n2)
+
+ def __lt__(self, other):
+ c = self.__cmp(self, other)
+ #print '<', self, other, c < 0, c
+ return c < 0
+
+ def __gt__(self, other):
+ c = self.__cmp(self, other)
+ #print '>', self, other, c > 0, c
+ return c > 0
+
+
+Interface = InterfaceClass("Interface", __module__ = 'zope.interface')
+
+class Attribute(Element):
+ """Attribute descriptions
+ """
+
+ # We can't say this yet because we don't have enough
+ # infrastructure in place.
+ #
+ # implements(IAttribute)
+
+ interface = None
+
+
+class Method(Attribute):
+ """Method interfaces
+
+ The idea here is that you have objects that describe methods.
+ This provides an opportunity for rich meta-data.
+ """
+
+ # We can't say this yet because we don't have enough
+ # infrastructure in place.
+ #
+ # implements(IMethod)
+
+ def __call__(self, *args, **kw):
+ raise BrokenImplementation(self.interface, self.__name__)
+
+ def getSignatureInfo(self):
+ return {'positional': self.positional,
+ 'required': self.required,
+ 'optional': self.optional,
+ 'varargs': self.varargs,
+ 'kwargs': self.kwargs,
+ }
+
+ def getSignatureString(self):
+ sig = []
+ for v in self.positional:
+ sig.append(v)
+ if v in self.optional.keys():
+ sig[-1] += "=" + `self.optional[v]`
+ if self.varargs:
+ sig.append("*" + self.varargs)
+ if self.kwargs:
+ sig.append("**" + self.kwargs)
+
+ return "(%s)" % ", ".join(sig)
+
+
+def fromFunction(func, interface=None, imlevel=0, name=None):
+ name = name or func.__name__
+ method = Method(name, func.__doc__)
+ defaults = func.func_defaults or ()
+ code = func.func_code
+ # Number of positional arguments
+ na = code.co_argcount-imlevel
+ names = code.co_varnames[imlevel:]
+ opt = {}
+ # Number of required arguments
+ nr = na-len(defaults)
+ if nr < 0:
+ defaults=defaults[-nr:]
+ nr = 0
+
+ # Determine the optional arguments.
+ opt.update(dict(zip(names[nr:], defaults)))
+
+ method.positional = names[:na]
+ method.required = names[:nr]
+ method.optional = opt
+
+ argno = na
+
+ # Determine the function's variable argument's name (i.e. *args)
+ if code.co_flags & CO_VARARGS:
+ method.varargs = names[argno]
+ argno = argno + 1
+ else:
+ method.varargs = None
+
+ # Determine the function's keyword argument's name (i.e. **kw)
+ if code.co_flags & CO_VARKEYWORDS:
+ method.kwargs = names[argno]
+ else:
+ method.kwargs = None
+
+ method.interface = interface
+
+ for key, value in func.__dict__.items():
+ method.setTaggedValue(key, value)
+
+ return method
+
+
+def fromMethod(meth, interface=None, name=None):
+ func = meth.im_func
+ return fromFunction(func, interface, imlevel=1, name=name)
+
+
+# Now we can create the interesting interfaces and wire them up:
+def _wire():
+ from zope.interface.declarations import classImplements
+
+ from zope.interface.interfaces import IAttribute
+ classImplements(Attribute, IAttribute)
+
+ from zope.interface.interfaces import IMethod
+ classImplements(Method, IMethod)
+
+ from zope.interface.interfaces import IInterface, ISpecification
+ classImplements(InterfaceClass, IInterface)
+ classImplements(Specification, ISpecification)
+
+# We import this here to deal with module dependencies.
+from zope.interface.declarations import providedBy, implementedBy
+from zope.interface.exceptions import InvalidInterface
+from zope.interface.exceptions import BrokenImplementation