summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLegrandin <helderijs@gmail.com>2013-01-23 22:37:53 +0100
committerDwayne Litzenberger <dlitz@dlitz.net>2013-10-20 13:30:21 -0700
commit57104488faa9fc386ea1aee249bafb6e2a529a57 (patch)
treec9f2245c097fbf4a0b00b29356c95dab2c3841aa
parentda79b781af41ff815b812c49d9be434f5de52aa4 (diff)
downloadpycrypto-57104488faa9fc386ea1aee249bafb6e2a529a57.tar.gz
Add support for CCM mode (AES only).
[dlitz@dlitz.net: Included changes from the following commits from the author's pull request:] - [5306cf3] Added support for CCM mode (AES cipher only) - [9abe301] Added CCM tests - [f0c1395] Add MacMismatchError and ApiUsageError - [fb62fae] ApiUsageError becomes TypeError - [9c13f9c] Rename 'IV' parameter to 'nonce' for AEAD modes. - [4ec64d8] Removed last references to ApiUsageError - [80bfd35] Corrected AES-CCM examples [dlitz@dlitz.net: Removed unrelated documentation change] [dlitz@dlitz.net: Renamed 'targs' back to 'args'] [dlitz@dlitz.net: Whitespace fixed with "git rebase --whitespace=fix"]
-rw-r--r--Doc/AEAD_API.txt267
-rw-r--r--lib/Crypto/Cipher/AES.py63
-rw-r--r--lib/Crypto/Cipher/blockalgo.py441
-rw-r--r--lib/Crypto/SelfTest/Cipher/common.py340
-rw-r--r--lib/Crypto/SelfTest/Cipher/test_AES.py234
-rw-r--r--pct-speedtest.py4
6 files changed, 1281 insertions, 68 deletions
diff --git a/Doc/AEAD_API.txt b/Doc/AEAD_API.txt
new file mode 100644
index 0000000..0dcc147
--- /dev/null
+++ b/Doc/AEAD_API.txt
@@ -0,0 +1,267 @@
+AEAD (Authenticated Encryption with Associated Data) modes of operations
+for symmetric block ciphers provide authentication and integrity in addition
+to confidentiality.
+
+Traditional modes like CBC or CTR only guarantee confidentiality; one
+has to combine them with cryptographic MACs in order to have assurance of
+authenticity. It is not trivial to implement such generic composition
+without introducing subtle security flaws.
+
+AEAD modes are designed to be more secure and often more efficient than
+ad-hoc constructions. Widespread AEAD modes are GCM, CCM, EAX, SIV, OCB.
+
+Most AEAD modes allow to optionally authenticate the plaintext with some
+piece of arbitrary data that will not be encrypted, for instance a
+packet header.
+
+In this file, that is called "associated data" (AD) even though terminology
+may vary from mode to mode.
+
+=== Goals of the AEAD API
+
+1. Any piece of data (AD, ciphertext, plaintext) is passed only once.
+2. It is possible to encrypt/decrypt huge files withe little memory cost.
+ To this end, authentication, encryption, and decryption are always
+ incremental. That is, the caller may split data in any way, and the end
+ result does not depend on how splitting is done.
+ Data is processed immediately without being internally buffered.
+3. API is similar to a Crypto.Hash MAC object, to the point that when only
+ AD is present, the object behaves exactly like a MAC.
+4. API is similar to a classic cipher object, to the point that when no AD is present,
+ the object behaves exactly like a classic cipher mode (e.g. CBC).
+5. MACs are produced and handled separately from ciphertext/plaintext.
+6. The API is intuitive and hard to misuse. Exceptions are raised immediately
+ in case of misuse.
+7. Caller does not have to add or remove padding.
+
+The following state diagram shows the proposed API for AEAD modes.
+In general, it is applicable to all block ciphers in the Crypto.Cipher
+module, even though certain modes may put restrictions on certain
+cipher properties (e.g. block size).
+
+ .--------.
+ | START |
+ '--------'
+ |
+ | .new(key, mode, iv, ...)
+ v
+ .-------------.
+.-------------.-------------| INITIALIZED |---------------+-------------.
+| | '-------------' | |
+| | | | |
+| |.encrypt() |.update() |.decrypt() |
+| | | | |
+| | v | |
+| | .--------------.___ .update() | |
+| | | CLEAR HEADER |<--' | |
+| | '--------------' | |
+| | | | | | | |
+| | .encrypt()| | | |.decrypt() | |
+| v | | | | v |
+| .--------------. | | | | .--------------. |
+| | ENCRYPTING |<----' | | '---->| DECRYPTING | |
+| '--------------' | | '--------------' |
+| | ^ | | | | ^ | |
+| hex/digest()| | |.encrypt()* | | | | |.decrypt()* |
+| | ''' | | | ''' |
+| | / \ | |
+| | hex/digest()/ \hex/verify()|hex/verify() |
+|hex/digest() | / \ | |
+| | / \ | hex/verify()|
+| v / \ v |
+| .--------------. / \ .--------------. |
+'------->| FINISHED/ENC |<-- -->| FINISHED/DEC |<---------'
+ '--------------' '--------------'
+ ^ | ^ |
+ | |hex/digest() | |hex/verify()
+ ''' '''
+
+ [*] this transition is not always allowed for non-online or 2-pass modes
+
+The following states are defined:
+
+ - INITIALIZED. The cipher has been instantiated with a key, possibly
+ a nonce (or IV), and other parameters.
+ The cipher object may be used for encryption or decryption.
+
+ - CLEAR HEADER. The cipher is authenticating the associated data.
+
+ - ENCRYPTING. It has been decided that the cipher has to be used for
+ encrypting data. At least one piece of ciphertext has been produced.
+
+ - DECRYPTING. It has been decided that the cipher has to be used for
+ decrypting data. At least one piece of candidate plaintext has
+ been produced.
+
+ - FINISHED/ENC. Authenticated encryption has completed.
+ The final MAC has been produced.
+
+ - FINISHED/DEC. Authenticated decryption has completed.
+ It is now established if the associated data together the plaintext
+ are authentic.
+
+In addition to the existing Cipher methods new(), encrypt() and decrypt(),
+5 new methods are added to a Cipher object:
+
+ - update() to consume all associated data. It takes as input
+ a byte string, and it can be invoked multiple times.
+ Ciphertext / plaintext must not be passed to update().
+ For simplicity, update() can only be called before encryption
+ or decryption starts. If no associated data is used, update()
+ is not called.
+
+ - digest() to output the binary MAC. It can only be called at the end
+ of the encryption.
+
+ - hexdigest() as digest(), but the output is ASCII hexadecimal.
+
+ - verify() to evaluate if the binary MAC is correct. It can only be
+ called at the end of decryption.
+
+ - hexverify() as verify(), but the input is ASCII hexadecimal.
+
+The syntax of update(), digest(), and hexdigest() are consistent with
+the API of a Hash object.
+
+IMPORTANT: method copy() is not present. Since most AEAD modes require
+IV/nonce to never repeat, this method may be misused and lead to security
+holes.
+
+Since MAC validation (decryption mode) is subject to timing attacks,
+the (hex)verify() method accepts as input the expected MAC and raises a
+ValueError exception if it does not match.
+Internally, the method can perform the validation using time-independent code.
+
+=== Padding
+
+The proposed API only supports AEAD modes than do not require padding
+of the plaintext. Adding support for that would make the API more complex
+(for instance, an external "finished" flag to pass encrypt()).
+
+=== Pre-processing of associated data
+
+The proposed API does not support pre-processing of AD.
+
+Sometimes, a lot of encryptions/decryptions are performed with
+the same key and the same AD, and different nonces and plaintext.
+
+Certain modes like GCM allow to cache processing of AD, and reuse
+it such results across several encryptions/decryptions.
+
+=== Singe/2-pass modes and online/non-online modes
+
+The proposed API supports single-pass, online AEAD modes.
+
+A "single-pass" mode processes each block of AD or data only once.
+Compare that to "2-pass" modes, where each block of data (not AD) twice:
+once for authentication and one for enciphering.
+
+An "online" mode does not need to know the size of the message before
+encryption starts. As a consequence:
+ - memory usage doesn't depend on the plaintext/ciphertext size.
+ - when encrypting, partial ciphertexts can be immediately sent to the receiver.
+ - when decrypting, partial (unauthenticated) plaintexts can be obtained from
+ the cipher.
+
+Most single-pass modes are patented, and only the less efficient 2-pass modes
+are in widespread use.
+
+That is still OK, provided that encryption can start *before* authentication is
+completed. If that's the case (e.g. GCM, EAX) encrypt()/decrypt() will also
+take care of completing the authentication over the plaintext.
+If that's NOT the case (e.g. SIV), encrypt()/decrypt() can only be called once.
+
+A similar problem arises with non-online modes (e.g. CCM): they have to wait to see
+the end of the plaintext before encryption can start. The only way to achieve
+that is to only allow encrypt()/decrypt() to be called once.
+
+=== Associated Data
+
+Depending on the mode, the associated data AD can be defined as:
+ 1. a single binary string or
+ 2. a vector of binary strings
+
+For modes of the 1st type (e.g. CCM), the API allows the AD to be split
+in any number of segments, and fed to update() in multiple calls.
+The resulting AD does not depend on how splitting is performed.
+It is responsability of the caller to ensure that concatenation of strings
+is secure. For instance, there is no difference between:
+
+ c.update(b"builtin")
+ c.update(b"securely")
+
+and:
+
+ c.update(b"built")
+ c.update(b"insecurely")
+
+For modes of the 2nd type (e.g. SIV), the API assumes that each call
+to update() ingests one full item of the vector. The two examples above
+are now different.
+
+=== CCM mode
+Number of keys required: 1
+Compatible with ciphers: AES (may be used with others if block length
+ is 128 bits)
+Counter/IV/nonce: 1 nonce required (length 8..13 bytes)
+Single-pass: no
+Online: no
+Term for AD: "associate data" (NIST) or "additional
+ authenticated data" (RFC)
+Term for MAC: part of ciphertext (NIST) or
+ "encrypted authentication value" (RFC).
+Padding required: no
+Pre-processing of AD: not possible
+
+In order to allow encrypt()/decrypt() to be called multiple times,
+and to reduce the memory usage, the new() method of the Cipher module
+optionally accepts the following parameters:
+
+ - 'assoc_len', the total length (in bytes) of the AD
+
+ - 'msg_len', the total length (in bytes) of the plaintext or ciphertext
+
+=== GCM mode
+Number of keys required: 1
+Compatible with ciphers: AES (may be used with others if block length
+ is 128 bits)
+Counter/IV/nonce: 1 IV required (any length)
+Single-pass: no
+Online: yes
+Term for AD: "additional authenticated data"
+Term for MAC: "authentication tag" or "tag"
+Padding required: no
+Pre-processing of AD: possible
+
+Encryption can start before authentication, so encrypt()/decrypt() can always
+be called multiple times.
+
+=== EAX mode
+Number of keys required: 1
+Compatible with ciphers: any
+Counter/IV/nonce: 1 IV required (any length)
+Single-pass: no
+Online: yes
+Term for AD: "header"
+Term for MAC: "tag"
+Padding required: no
+Pre-processing of AD: possible
+
+Encryption can start before authentication, so encrypt()/decrypt() can always
+be called multiple times.
+
+=== SIV
+Number of keys required: 2
+Compatible with ciphers: AES
+Counter/IV/nonce: 1 IV required (any length)
+Single-pass: no
+Online: no
+Term for AD: "associated data" (vector)
+Term for MAC: "tag"
+Padding required: no
+Pre-processing of AD: possible
+
+Encryption can only start before authentication is ended, so encrypt()/decrypt()
+an only be called once. SIV is not suitable for big files or streaming.
+
+AD is a vector of strings. One item in the vector is the (optional) IV.
diff --git a/lib/Crypto/Cipher/AES.py b/lib/Crypto/Cipher/AES.py
index 351b954..5316700 100644
--- a/lib/Crypto/Cipher/AES.py
+++ b/lib/Crypto/Cipher/AES.py
@@ -31,15 +31,46 @@ encryption.
As an example, encryption can be done as follows:
>>> from Crypto.Cipher import AES
- >>> from Crypto import Random
+ >>> from Crypto.Random import get_random_bytes
>>>
>>> key = b'Sixteen byte key'
- >>> iv = Random.new().read(AES.block_size)
+ >>> iv = get_random_bytes(16)
>>> cipher = AES.new(key, AES.MODE_CFB, iv)
>>> msg = iv + cipher.encrypt(b'Attack at dawn')
+A more complicated example is based on CCM, (see `MODE_CCM`) an `AEAD`_ mode
+that provides both confidentiality and authentication for a message.
+It also allows message for the header to remain in the clear, whilst still
+being authenticated. The encryption is done as follows:
+
+ >>> from Crypto.Cipher import AES
+ >>> from Crypto.Random import get_random_bytes
+ >>>
+ >>>
+ >>> hdr = b'To your eyes only'
+ >>> plaintext = b'Attack at dawn'
+ >>> key = b'Sixteen byte key'
+ >>> nonce = get_random_bytes(11)
+ >>> cipher = AES.new(key, AES.MODE_CCM, nonce)
+ >>> cipher.update(hdr)
+ >>> msg = nonce, hdr, cipher.encrypt(plaintext), cipher.digest()
+
+We assume that the tuple ``msg`` is transmitted to the receiver:
+
+ >>> nonce, hdr, ciphertext, mac = msg
+ >>> key = b'Sixteen byte key'
+ >>> cipher = AES.new(key, AES.MODE_CCM, nonce)
+ >>> cipher.update(hdr)
+ >>> plaintext = cipher.decrypt(ciphertext)
+ >>> try:
+ >>> cipher.verify(mac)
+ >>> print "The message is authentic: hdr=%s, pt=%s" % (hdr, plaintext)
+ >>> except ValueError:
+ >>> print "Key incorrect or message corrupted"
+
.. __: http://en.wikipedia.org/wiki/Advanced_Encryption_Standard
.. _NIST: http://csrc.nist.gov/publications/fips/fips197/fips-197.pdf
+.. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
:undocumented: __revision__, __package__
"""
@@ -67,7 +98,7 @@ class AESCipher (blockalgo.BlockAlgo):
def __init__(self, key, *args, **kwargs):
"""Initialize an AES cipher object
-
+
See also `new()` at the module level."""
# Check if the use_aesni was specified.
@@ -94,6 +125,8 @@ def new(key, *args, **kwargs):
The chaining mode to use for encryption or decryption.
Default is `MODE_ECB`.
IV : byte string
+ (*Only* `MODE_CBC`, `MODE_CFB`, `MODE_OFB`, `MODE_OPENPGP`).
+
The initialization vector to use for encryption or decryption.
It is ignored for `MODE_ECB` and `MODE_CTR`.
@@ -102,8 +135,17 @@ def new(key, *args, **kwargs):
and `block_size` +2 bytes for decryption (in the latter case, it is
actually the *encrypted* IV which was prefixed to the ciphertext).
It is mandatory.
-
- For all other modes, it must be `block_size` bytes longs.
+
+ For all other modes, it must be 16 bytes long.
+ nonce : byte string
+ (*Only* `MODE_CCM`).
+
+ A mandatory value that must never be reused for any other encryption.
+
+ For `MODE_CCM`, its length must be in the range ``[7..13]``.
+ 11 or 12 bytes are reasonable values in general. Bear in
+ mind that with CCM there is a trade-off between nonce length and
+ maximum message size.
counter : callable
(*Only* `MODE_CTR`). A stateful function that returns the next
*counter block*, which is a byte string of `block_size` bytes.
@@ -112,6 +154,15 @@ def new(key, *args, **kwargs):
(*Only* `MODE_CFB`).The number of bits the plaintext and ciphertext
are segmented in.
It must be a multiple of 8. If 0 or not specified, it will be assumed to be 8.
+ mac_len : integer
+ (*Only* `MODE_CCM`). Length of the MAC, in bytes. It must be even and in
+ the range ``[4..16]``. The default is 16.
+ msg_len : integer
+ (*Only* `MODE_CCM`). Length of the message to (de)cipher.
+ If not specified, ``encrypt`` or ``decrypt`` may only be called once.
+ assoc_len : integer
+ (*Only* `MODE_CCM`). Length of the associated data.
+ If not specified, all data is internally buffered.
use_aesni : boolean
Use AES-NI if available.
@@ -133,6 +184,8 @@ MODE_OFB = 5
MODE_CTR = 6
#: OpenPGP Mode. See `blockalgo.MODE_OPENPGP`.
MODE_OPENPGP = 7
+#: Counter with CBC-MAC (CCM) Mode. See `blockalgo.MODE_CCM`.
+MODE_CCM = 8
#: Size of a data block (in bytes)
block_size = 16
#: Size of a key (in bytes)
diff --git a/lib/Crypto/Cipher/blockalgo.py b/lib/Crypto/Cipher/blockalgo.py
index e20ec5a..57738c4 100644
--- a/lib/Crypto/Cipher/blockalgo.py
+++ b/lib/Crypto/Cipher/blockalgo.py
@@ -24,8 +24,15 @@
import sys
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
+
from Crypto.Util.py3compat import *
+from binascii import unhexlify
+
+from Crypto.Util import Counter
+from Crypto.Util.strxor import strxor
+from Crypto.Util.number import long_to_bytes
+
#: *Electronic Code Book (ECB)*.
#: This is the simplest encryption mode. Each of the plaintext blocks
#: is directly encrypted into a ciphertext block, independently of
@@ -123,6 +130,61 @@ MODE_CTR = 6
#: .. _OpenPGP: http://tools.ietf.org/html/rfc4880
MODE_OPENPGP = 7
+#: *Counter with CBC-MAC (CCM)*. This is an Authenticated Encryption with
+#: Associated Data (`AEAD`_) mode. It provides both confidentiality and
+#: authenticity.
+#: The header of the message may be left in the clear, if needed, and it will
+#: still be subject to authentication. The decryption step tells the receiver
+#: if the message comes from a source that really knowns the secret key.
+#: Additionally, decryption detects if any part of the message - including the
+#: header - has been modified or corrupted.
+#:
+#: This mode requires a nonce. The nonce shall never repeat for two
+#: different messages encrypted with the same key, but it does not need
+#: to be random.
+#: Note that there is a trade-off between the size of the nonce and the
+#: maximum size of a single message you can encrypt.
+#:
+#: It is important to use a large nonce if the key is reused across several
+#: messages and the nonce is chosen randomly.
+#:
+#: It is acceptable to us a short nonce if the key is only used a few times or
+#: if the nonce is taken from a counter.
+#:
+#: The following table shows the trade-off when the nonce is chosen at
+#: random. The column on the left shows how many messages it takes
+#: for the keystream to repeat **on average**. In practice, you will want to
+#: stop using the key way before that.
+#:
+#: +--------------------+---------------+-------------------+
+#: | Avg. # of messages | nonce | Max. message |
+#: | before keystream | size | size |
+#: | repeats | (bytes) | (bytes) |
+#: +====================+===============+===================+
+#: | 2**52 | 13 | 64K |
+#: +--------------------+---------------+-------------------+
+#: | 2**48 | 12 | 16M |
+#: +--------------------+---------------+-------------------+
+#: | 2**44 | 11 | 4G |
+#: +--------------------+---------------+-------------------+
+#: | 2**40 | 10 | 1T |
+#: +--------------------+---------------+-------------------+
+#: | 2**36 | 9 | 64P |
+#: +--------------------+---------------+-------------------+
+#: | 2**32 | 8 | 16E |
+#: +--------------------+---------------+-------------------+
+#:
+#: This mode is only available for ciphers that operate on 128 bits blocks
+#: (e.g. AES but not TDES).
+#:
+#: See `NIST SP800-38C`_ or RFC3610_ .
+#:
+#: .. _`NIST SP800-38C`: http://csrc.nist.gov/publications/nistpubs/800-38C/SP800-38C.pdf
+#: .. _RFC3610: https://tools.ietf.org/html/rfc3610
+#: .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html
+MODE_CCM = 8
+
+
def _getParameter(name, index, args, kwargs, default=None):
"""Find a parameter in tuple and dictionary arguments a function receives"""
@@ -132,68 +194,220 @@ def _getParameter(name, index, args, kwargs, default=None):
raise ValueError("Parameter '%s' is specified twice" % name)
param = args[index]
return param or default
-
+
+
class BlockAlgo:
"""Class modelling an abstract block cipher."""
def __init__(self, factory, key, *args, **kwargs):
self.mode = _getParameter('mode', 0, args, kwargs, default=MODE_ECB)
self.block_size = factory.block_size
-
- if self.mode != MODE_OPENPGP:
+ self._factory = factory
+
+ if self.mode == MODE_CCM:
+ if self.block_size != 16:
+ raise ValueError("CCM mode is only available for ciphers that operate on 128 bits blocks")
+
+ self._mac_len = kwargs.get('mac_len', 16) # t
+ if self._mac_len not in (4, 6, 8, 10, 12, 14, 16):
+ raise ValueError("Parameter 'mac_len' must be even and in the range 4..16")
+
+ self.nonce = _getParameter('nonce', 1, args, kwargs) # N
+ if not (self.nonce and 7 <= len(self.nonce) <= 13):
+ raise ValueError("Length of parameter 'nonce' must be"
+ " in the range 7..13 bytes")
+
+ self._key = key
+ self._msg_len = kwargs.get('msg_len', None) # p
+ self._assoc_len = kwargs.get('assoc_len', None) # a
+
+ self._assoc_buffer = []
+ self._assoc_buffer_len = 0
+ self._cipherCBC = None # To be used for MAC
+ self._done_assoc_data = False # True when all associated data
+ # has been processed
+
+ # Allowed transitions after initialization
+ self._next = [self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify]
+
+ # Try to start CCM
+ self._start_ccm()
+
+ elif self.mode == MODE_OPENPGP:
+ self._start_PGP(factory, key, *args, **kwargs)
+ else:
self._cipher = factory.new(key, *args, **kwargs)
self.IV = self._cipher.IV
+
+ def _start_PGP(self, factory, key, *args, **kwargs):
+ # OPENPGP mode. For details, see 13.9 in RCC4880.
+ #
+ # A few members are specifically created for this mode:
+ # - _encrypted_iv, set in this constructor
+ # - _done_first_block, set to True after the first encryption
+ # - _done_last_block, set to True after a partial block is processed
+
+ self._done_first_block = False
+ self._done_last_block = False
+ self.IV = _getParameter('iv', 1, args, kwargs)
+ if not self.IV:
+ raise ValueError("MODE_OPENPGP requires an IV")
+
+ # Instantiate a temporary cipher to process the IV
+ IV_cipher = factory.new(
+ key,
+ MODE_CFB,
+ b('\x00') * self.block_size, # IV for CFB
+ segment_size=self.block_size * 8)
+
+ # The cipher will be used for...
+ if len(self.IV) == self.block_size:
+ # ... encryption
+ self._encrypted_IV = IV_cipher.encrypt(
+ self.IV + self.IV[-2:] + # Plaintext
+ b('\x00') * (self.block_size - 2) # Padding
+ )[:self.block_size + 2]
+ elif len(self.IV) == self.block_size + 2:
+ # ... decryption
+ self._encrypted_IV = self.IV
+ self.IV = IV_cipher.decrypt(
+ self.IV + # Ciphertext
+ b('\x00') * (self.block_size - 2) # Padding
+ )[:self.block_size + 2]
+ if self.IV[-2:] != self.IV[-4:-2]:
+ raise ValueError("Failed integrity check for OPENPGP IV")
+ self.IV = self.IV[:-2]
else:
- # OPENPGP mode. For details, see 13.9 in RCC4880.
- #
- # A few members are specifically created for this mode:
- # - _encrypted_iv, set in this constructor
- # - _done_first_block, set to True after the first encryption
- # - _done_last_block, set to True after a partial block is processed
-
- self._done_first_block = False
- self._done_last_block = False
- self.IV = _getParameter('iv', 1, args, kwargs)
- if not self.IV:
- raise ValueError("MODE_OPENPGP requires an IV")
-
- # Instantiate a temporary cipher to process the IV
- IV_cipher = factory.new(key, MODE_CFB,
- b('\x00')*self.block_size, # IV for CFB
- segment_size=self.block_size*8)
-
- # The cipher will be used for...
- if len(self.IV) == self.block_size:
- # ... encryption
- self._encrypted_IV = IV_cipher.encrypt(
- self.IV + self.IV[-2:] + # Plaintext
- b('\x00')*(self.block_size-2) # Padding
- )[:self.block_size+2]
- elif len(self.IV) == self.block_size+2:
- # ... decryption
- self._encrypted_IV = self.IV
- self.IV = IV_cipher.decrypt(self.IV + # Ciphertext
- b('\x00')*(self.block_size-2) # Padding
- )[:self.block_size+2]
- if self.IV[-2:] != self.IV[-4:-2]:
- raise ValueError("Failed integrity check for OPENPGP IV")
- self.IV = self.IV[:-2]
+ raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP"
+ % (self.block_size, self.block_size+2))
+
+ # Instantiate the cipher for the real PGP data
+ self._cipher = factory.new(
+ key,
+ MODE_CFB,
+ self._encrypted_IV[-self.block_size:],
+ segment_size=self.block_size * 8
+ )
+
+ def _start_ccm(self, assoc_len=None, msg_len=None):
+ # CCM mode. This method creates the 2 ciphers used for the MAC
+ # (self._cipherCBC) and for the encryption/decryption (self._cipher).
+ #
+ # Member _assoc_buffer may already contain user data that needs to be
+ # authenticated.
+
+ if self._cipherCBC:
+ # Already started
+ return
+ if assoc_len is not None:
+ self._assoc_len = assoc_len
+ if msg_len is not None:
+ self._msg_len = msg_len
+ if None in (self._assoc_len, self._msg_len):
+ return
+
+ # q is the length of Q, the encoding of the message length
+ q = 15 - len(self.nonce)
+
+ ## Compute B_0
+ flags = (
+ 64 * (self._assoc_len > 0) +
+ 8 * divmod(self._mac_len - 2, 2)[0] +
+ (q - 1)
+ )
+ b_0 = bchr(flags) + self.nonce + long_to_bytes(self._msg_len, q)
+ self._assoc_buffer.insert(0, b_0)
+ self._assoc_buffer_len += 16
+
+ # Start CBC MAC with zero IV
+ # Mind that self._assoc_buffer may already contain some data
+ self._cipherCBC = self._factory.new(self._key, MODE_CBC, bchr(0)*16)
+ assoc_len_encoded = b('')
+ if self._assoc_len > 0:
+ if self._assoc_len < (2 ** 16 - 2 ** 8):
+ enc_size = 2
+ elif self._assoc_len < (2L ** 32):
+ assoc_len_encoded = b('\xFF\xFE')
+ enc_size = 4
else:
- raise ValueError("Length of IV must be %d or %d bytes for MODE_OPENPGP"
- % (self.block_size, self.block_size+2))
+ assoc_len_encoded = b('\xFF\xFF')
+ enc_size = 8
+ assoc_len_encoded += long_to_bytes(self._assoc_len, enc_size)
+ self._assoc_buffer.insert(1, assoc_len_encoded)
+ self._assoc_buffer_len += len(assoc_len_encoded)
+
+ # Start CTR cipher
+ flags = q - 1
+ prefix = bchr(flags) + self.nonce
+ ctr = Counter.new(128 - len(prefix) * 8, prefix, initial_value=0)
+ self._cipher = self._factory.new(self._key, MODE_CTR, counter=ctr)
+ # Will XOR against CBC MAC
+ self._s_0 = self._cipher.encrypt(bchr(0) * 16)
+
+ def update(self, assoc_data):
+ """Protect associated data
+
+ When using an AEAD mode like CCM, and if there is any associated data,
+ the caller has to invoke this function one or more times, before
+ using ``decrypt`` or ``encrypt``.
+
+ By *associated data* it is meant any data (e.g. packet headers) that
+ will not be encrypted and will be transmitted in the clear.
+ However, the receiver is still able to detect any modification to it.
+ In CCM, the *associated data* is also called *additional authenticated
+ data*.
+
+ If there is no associated data, this method must not be called.
+
+ The caller may split associated data in segments of any size, and
+ invoke this method multiple times, each time with the next segment.
- # Instantiate the cipher for the real PGP data
- self._cipher = factory.new(key, MODE_CFB,
- self._encrypted_IV[-self.block_size:],
- segment_size=self.block_size*8)
+ :Parameters:
+ assoc_data : byte string
+ A piece of associated data. There are no restrictions on its size.
+ """
+
+ if self.mode == MODE_CCM:
+ if self.update not in self._next:
+ raise TypeError("update() can only be called immediately after initialization")
+ self._next = [ self.update, self.encrypt, self.decrypt,
+ self.digest, self.verify ]
+ return self._update(assoc_data)
+
+ def _update(self, assoc_data, do_zero_padding=False):
+ """Equivalent to update(), but without FSM checks."""
+
+ if self.mode == MODE_CCM:
+ self._assoc_buffer.append(assoc_data)
+ self._assoc_buffer_len += len(assoc_data)
+
+ if not self._cipherCBC:
+ return
+
+ if do_zero_padding and (self._assoc_buffer_len & 15):
+ npad = 16 - self._assoc_buffer_len & 15
+ self._assoc_buffer.append(bchr(0) * npad)
+ self._assoc_buffer_len += npad
+
+ # Feed data into CBC MAC
+ aligned_data = 16 * divmod(self._assoc_buffer_len, 16)[0]
+ if aligned_data > 0:
+ buf = b("").join(self._assoc_buffer)
+ self._t = self._cipherCBC.encrypt(buf[:aligned_data])[-16:]
+ self._assoc_buffer = [buf[aligned_data:]]
+ self._assoc_buffer_len -= aligned_data
+ return
+ raise ValueError("update() not supported by this mode of operation")
def encrypt(self, plaintext):
"""Encrypt data with the key and the parameters set at initialization.
-
+
The cipher object is stateful; encryption of a long block
of data can be broken up in two or more calls to `encrypt()`.
+
That is, the statement:
-
+
>>> c.encrypt(a) + c.encrypt(b)
is always equivalent to:
@@ -211,7 +425,8 @@ class BlockAlgo:
- For `MODE_CFB`, *plaintext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB` and `MODE_CTR`, *plaintext* can be of any length.
+ - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM` *plaintext* can be
+ of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
unless it is the last chunk of the message.
@@ -245,6 +460,23 @@ class BlockAlgo:
self._done_first_block = True
return res
+ if self.mode == MODE_CCM:
+
+ if self.encrypt not in self._next:
+ raise TypeError("encrypt() can only be called after initialization or an update()")
+ self._next = [self.encrypt, self.digest]
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=len(plaintext))
+ self._next = [self.digest]
+ if not self._done_assoc_data:
+ self._update(b(""), do_zero_padding=True)
+ self._done_assoc_data = True
+
+ self._update(plaintext)
+
return self._cipher.encrypt(plaintext)
def decrypt(self, ciphertext):
@@ -272,7 +504,8 @@ class BlockAlgo:
- For `MODE_CFB`, *ciphertext* length (in bytes) must be a multiple
of *segment_size*/8.
- - For `MODE_OFB` and `MODE_CTR`, *ciphertext* can be of any length.
+ - For `MODE_OFB`, `MODE_CTR` and `MODE_CCM`, *ciphertext* can be
+ of any length.
- For `MODE_OPENPGP`, *plaintext* must be a multiple of *block_size*,
unless it is the last chunk of the message.
@@ -282,6 +515,7 @@ class BlockAlgo:
The piece of data to decrypt.
:Return: the decrypted data (byte string, as long as *ciphertext*).
"""
+
if self.mode == MODE_OPENPGP:
padding_length = (self.block_size - len(ciphertext) % self.block_size) % self.block_size
if padding_length > 0:
@@ -298,5 +532,116 @@ class BlockAlgo:
res = self._cipher.decrypt(ciphertext)
return res
- return self._cipher.decrypt(ciphertext)
+ if self.mode == MODE_CCM:
+
+ if self.decrypt not in self._next:
+ raise TypeError("decrypt() can only be called after initialization or an update()")
+ self._next = [self.decrypt, self.verify]
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=len(ciphertext))
+ self._next = [self.verify]
+ if not self._done_assoc_data:
+ self._update(b(""), do_zero_padding=True)
+ self._done_assoc_data = True
+
+ pt = self._cipher.decrypt(ciphertext)
+
+ if self.mode == MODE_CCM:
+ self._update(pt)
+
+ return pt
+
+ def digest(self):
+ """Compute the *binary* MAC tag in an AEAD mode.
+
+ When using an AEAD mode like CCM, the caller invokes this function
+ at the very end.
+
+ This method returns the MAC that shall be sent to the receiver,
+ together with the ciphertext.
+
+ :Return: the MAC, as a byte string.
+ """
+
+ if self.mode == MODE_CCM:
+
+ if self.digest not in self._next:
+ raise TypeError("digest() cannot be called when decrypting or validating a message")
+ self._next = [self.digest]
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=0)
+ self._update(b(""), do_zero_padding=True)
+
+ return strxor(self._t, self._s_0)[:self._mac_len]
+
+ raise TypeError("digest() not supported by this mode of operation")
+
+ def hexdigest(self):
+ """Compute the *printable* MAC tag in an AEAD mode.
+
+ This method is like `digest`.
+
+ :Return: the MAC, as a hexadecimal string.
+ """
+ return "".join(["%02x" % bord(x) for x in self.digest()])
+
+ def verify(self, mac_tag):
+ """Validate the *binary* MAC tag in an AEAD mode.
+
+ When using an AEAD mode like CCM, the caller invokes this function
+ at the very end.
+
+ This method checks if the decrypted message is indeed valid
+ (that is, if the key is correct) and it has not been
+ tampered with while in transit.
+
+ :Parameters:
+ mac_tag : byte string
+ This is the *binary* MAC, as received from the sender.
+ :Raises ValueError:
+ if the MAC does not match. The message has been tampered with
+ or the key is incorrect.
+ """
+
+ if self.mode == MODE_CCM:
+ if self.verify not in self._next:
+ raise TypeError("verify() cannot be called when encrypting a message")
+ self._next = [self.verify]
+
+ if self._assoc_len is None:
+ self._start_ccm(assoc_len=self._assoc_buffer_len)
+ if self._msg_len is None:
+ self._start_ccm(msg_len=0)
+ self._update(b(""), do_zero_padding=True)
+ u = strxor(self._t, self._s_0)[:self._mac_len]
+
+ res = 0
+ # Constant-time comparison
+ for x,y in zip(u, mac_tag):
+ res |= bord(x) ^ bord(y)
+ if res or len(mac_tag)!=self._mac_len:
+ raise ValueError("MAC check failed")
+ return
+
+ raise TypeError("verify() not supported by this mode of operation")
+
+ def hexverify(self, hex_mac_tag):
+ """Validate the *printable* MAC tag in an AEAD mode.
+
+ This method is like `verify`.
+
+ :Parameters:
+ hex_mac_tag : string
+ This is the *printable* MAC, as received from the sender.
+ :Raises ValueError:
+ if the MAC does not match. The message has been tampered with
+ or the key is incorrect.
+ """
+ self.verify(unhexlify(hex_mac_tag))
diff --git a/lib/Crypto/SelfTest/Cipher/common.py b/lib/Crypto/SelfTest/Cipher/common.py
index a20a3aa..e52a781 100644
--- a/lib/Crypto/SelfTest/Cipher/common.py
+++ b/lib/Crypto/SelfTest/Cipher/common.py
@@ -30,8 +30,9 @@ __revision__ = "$Id$"
import sys
import unittest
-from binascii import a2b_hex, b2a_hex
+from binascii import a2b_hex, b2a_hex, hexlify
from Crypto.Util.py3compat import *
+from Crypto.Util.strxor import strxor_c
# For compatibility with Python 2.1 and Python 2.2
if sys.hexversion < 0x02030000:
@@ -68,14 +69,24 @@ class CipherSelfTest(unittest.TestCase):
self.plaintext = b(_extract(params, 'plaintext'))
self.ciphertext = b(_extract(params, 'ciphertext'))
self.module_name = _extract(params, 'module_name', None)
+ self.assoc_data = _extract(params, 'assoc_data', None)
+ if self.assoc_data:
+ self.assoc_data = b(self.assoc_data)
+ self.mac = _extract(params, 'mac', None)
+ if self.assoc_data:
+ self.mac = b(self.mac)
mode = _extract(params, 'mode', None)
self.mode_name = str(mode)
if mode is not None:
# Block cipher
self.mode = getattr(self.module, "MODE_" + mode)
+
self.iv = _extract(params, 'iv', None)
- if self.iv is not None: self.iv = b(self.iv)
+ if self.iv is None:
+ self.iv = _extract(params, 'nonce', None)
+ if self.iv is not None:
+ self.iv = b(self.iv)
# Only relevant for OPENPGP mode
self.encrypted_iv = _extract(params, 'encrypted_iv', None)
@@ -122,26 +133,49 @@ class CipherSelfTest(unittest.TestCase):
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
-
- ct1 = b2a_hex(self._new().encrypt(plaintext))
- pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
- ct2 = b2a_hex(self._new().encrypt(plaintext))
- pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
+ assoc_data = None
+ if self.assoc_data:
+ assoc_data = a2b_hex(self.assoc_data)
+
+ ct = None
+ pt = None
+
+ #
+ # Repeat the same encryption or decryption twice and verify
+ # that the result is always the same
+ #
+ for i in xrange(2):
+ cipher = self._new()
+ decipher = self._new(1)
+
+ # Only AEAD modes
+ if self.assoc_data:
+ cipher.update(assoc_data)
+ decipher.update(assoc_data)
+
+ ctX = b2a_hex(cipher.encrypt(plaintext))
+ ptX = b2a_hex(decipher.decrypt(ciphertext))
+
+ if ct:
+ self.assertEqual(ct, ctX)
+ self.assertEqual(pt, ptX)
+ ct, pt = ctX, ptX
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
- self.assertEqual(self.encrypted_iv, ct1[:eilen])
- self.assertEqual(self.encrypted_iv, ct2[:eilen])
- ct1 = ct1[eilen:]
- ct2 = ct2[eilen:]
+ self.assertEqual(self.encrypted_iv, ct[:eilen])
+ ct = ct[eilen:]
- self.assertEqual(self.ciphertext, ct1) # encrypt
- self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
- self.assertEqual(self.plaintext, pt1) # decrypt
- self.assertEqual(self.plaintext, pt2) # decrypt (second time)
+ self.assertEqual(self.ciphertext, ct) # encrypt
+ self.assertEqual(self.plaintext, pt) # decrypt
+
+ if self.mac:
+ mac = b2a_hex(cipher.digest())
+ self.assertEqual(self.mac, mac)
+ decipher.verify(a2b_hex(self.mac))
class CipherStreamingSelfTest(CipherSelfTest):
@@ -252,6 +286,258 @@ class CFBSegmentSizeTest(unittest.TestCase):
self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), self.module.MODE_CFB, segment_size=i)
self.module.new(a2b_hex(self.key), self.module.MODE_CFB, "\0"*self.module.block_size, segment_size=8) # should succeed
+class CCMMACLengthTest(unittest.TestCase):
+ """CCM specific tests about MAC"""
+
+ def __init__(self, module):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.key = b('\xFF')*16
+ self.iv = b('\x00')*10
+
+ def shortDescription(self):
+ return self.description
+
+ def runTest(self):
+ """Verify that MAC can only be 4,6,8,..,16 bytes long."""
+ for i in range(3,16,2):
+ self.description = "CCM MAC length check (%d bytes)" % i
+ self.assertRaises(ValueError, self.module.new, self.key,
+ self.module.MODE_CCM, self.iv, msg_len=10, mac_len=i)
+
+ """Verify that default MAC length is 16."""
+ self.description = "CCM default MAC length check"
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv, msg_len=4)
+ cipher.encrypt(b('z')*4)
+ self.assertEqual(len(cipher.digest()), 16)
+
+class CCMSplitEncryptionTest(unittest.TestCase):
+ """CCM specific tests to validate how encrypt()
+ decrypt() can be called multiple times on the
+ same object."""
+
+ def __init__(self, module):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.key = b('\xFF')*16
+ self.iv = b('\x00')*10
+ self.description = "CCM Split Encryption Test"
+
+ def shortDescription(self):
+ return self.description
+
+ def runTest(self):
+ """Verify that CCM update()/encrypt() can be called multiple times,
+ provided that lengths are declared beforehand"""
+
+ data = b("AUTH DATA")
+ pt1 = b("PLAINTEXT1") # Short
+ pt2 = b("PLAINTEXT2") # Long
+ pt_ref = pt1+pt2
+
+ # REFERENCE: Run with 1 update() and 1 encrypt()
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv)
+ cipher.update(data)
+ ct_ref = cipher.encrypt(pt_ref)
+ mac_ref = cipher.digest()
+
+ # Verify that calling CCM encrypt()/decrypt() twice is not
+ # possible without the 'msg_len' parameter and regardless
+ # of the 'assoc_len' parameter
+ for ad_len in None, len(data):
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv, assoc_len=ad_len)
+ cipher.update(data)
+ cipher.encrypt(pt1)
+ self.assertRaises(TypeError, cipher.encrypt, pt2)
+
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv, assoc_len=ad_len)
+ cipher.update(data)
+ cipher.decrypt(ct_ref[:len(pt1)])
+ self.assertRaises(TypeError, cipher.decrypt, ct_ref[len(pt1):])
+
+ # Run with 2 encrypt()/decrypt(). Results must be the same
+ # regardless of the 'assoc_len' parameter
+ for ad_len in None, len(data):
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv, assoc_len=ad_len, msg_len=len(pt_ref))
+ cipher.update(data)
+ ct = cipher.encrypt(pt1)
+ ct += cipher.encrypt(pt2)
+ mac = cipher.digest()
+ self.assertEqual(ct_ref, ct)
+ self.assertEqual(mac_ref, mac)
+
+ cipher = self.module.new(self.key, self.module.MODE_CCM,
+ self.iv, msg_len=len(pt1+pt2))
+ cipher.update(data)
+ pt = cipher.decrypt(ct[:len(pt1)])
+ pt += cipher.decrypt(ct[len(pt1):])
+ mac = cipher.verify(mac_ref)
+ self.assertEqual(pt_ref, pt)
+
+class AEADTests(unittest.TestCase):
+ """Tests generic to all AEAD modes"""
+
+ def __init__(self, module, mode_name):
+ unittest.TestCase.__init__(self)
+ self.module = module
+ self.mode_name = mode_name
+ self.mode = getattr(module, mode_name)
+ self.key = b('\xFF')*16
+ self.iv = b('\x00')*10
+ self.description = "AEAD Test"
+
+ def right_mac_test(self):
+ """Positive tests for MAC"""
+
+ self.description = "Test for right MAC in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+
+ ad_ref = b("Reference AD")
+ pt_ref = b("Reference plaintext")
+
+ # Encrypt and create the reference MAC
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.update(ad_ref)
+ ct_ref = cipher.encrypt(pt_ref)
+ mac_ref = cipher.digest()
+
+ # Decrypt and verify that MAC is accepted
+ decipher = self.module.new(self.key, self.mode, self.iv)
+ decipher.update(ad_ref)
+ pt = decipher.decrypt(ct_ref)
+ decipher.verify(mac_ref)
+ self.assertEqual(pt, pt_ref)
+
+ # Verify that hexverify work
+ decipher.hexverify(hexlify(mac_ref))
+
+ def wrong_mac_test(self):
+ """Negative tests for MAC"""
+
+ self.description = "Test for wrong MAC in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+
+ ad_ref = b("Reference AD")
+ pt_ref = b("Reference plaintext")
+
+ # Encrypt and create the reference MAC
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.update(ad_ref)
+ ct_ref = cipher.encrypt(pt_ref)
+ mac_ref = cipher.digest()
+
+ # Modify the MAC and verify it is NOT ACCEPTED
+ wrong_mac = strxor_c(mac_ref, 255)
+ decipher = self.module.new(self.key, self.mode, self.iv)
+ decipher.update(ad_ref)
+ pt = decipher.decrypt(ct_ref)
+ self.assertRaises(ValueError, decipher.verify, wrong_mac)
+
+ def zero_data(self):
+ """Verify transition from INITIALIZED to FINISHED"""
+
+ self.description = "Test for zero data in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.digest()
+
+ def multiple_updates(self):
+ """Verify that update() can be called multiple times"""
+
+ self.description = "Test for multiple updates in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+
+ ad = b("").join([bchr(x) for x in xrange(0,128)])
+
+ mac1, mac2, mac3 = (None,)*3
+ for chunk_length in 1,10,40,80,128:
+ chunks = [ad[i:i+chunk_length] for i in range(0, len(ad), chunk_length)]
+
+ # No encryption/decryption
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ for c in chunks:
+ cipher.update(c)
+ if mac1:
+ cipher.verify(mac1)
+ else:
+ mac1 = cipher.digest()
+
+ # Encryption
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ for c in chunks:
+ cipher.update(c)
+ ct = cipher.encrypt(b("PT"))
+ mac2 = cipher.digest()
+
+ # Decryption
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ for c in chunks:
+ cipher.update(c)
+ cipher.decrypt(ct)
+ cipher.verify(mac2)
+
+ def no_mix_encrypt_decrypt(self):
+ """Verify that encrypt and decrypt cannot be mixed up"""
+
+ self.description = "Test for mix of encrypt and decrypt in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+
+ # Calling decrypt after encrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.encrypt(b("PT"))
+ self.assertRaises(TypeError, cipher.decrypt, b("XYZ"))
+
+ # Calling encrypt after decrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.decrypt(b("CT"))
+ self.assertRaises(TypeError, cipher.encrypt, b("XYZ"))
+
+ # Calling verify after encrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.encrypt(b("PT"))
+ self.assertRaises(TypeError, cipher.verify, b("XYZ"))
+ self.assertRaises(TypeError, cipher.hexverify, "12")
+
+ # Calling digest after decrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.decrypt(b("CT"))
+ self.assertRaises(TypeError, cipher.digest)
+ self.assertRaises(TypeError, cipher.hexdigest)
+
+ def no_late_update(self):
+ """Verify that update cannot be called after encrypt or decrypt"""
+
+ self.description = "Test for late update in %s of %s" % \
+ (self.mode_name, self.module.__name__)
+
+ # Calling update after encrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.update(b("XX"))
+ cipher.encrypt(b("PT"))
+ self.assertRaises(TypeError, cipher.update, b("XYZ"))
+
+ # Calling update after decrypt raises an exception
+ cipher = self.module.new(self.key, self.mode, self.iv)
+ cipher.update(b("XX"))
+ cipher.decrypt(b("CT"))
+ self.assertRaises(TypeError, cipher.update, b("XYZ"))
+
+ def runTest(self):
+ self.right_mac_test()
+ self.wrong_mac_test()
+ self.zero_data()
+ self.multiple_updates()
+ self.no_mix_encrypt_decrypt()
+ self.no_late_update()
+
+ def shortDescription(self):
+ return self.description
+
class RoundtripTest(unittest.TestCase):
def __init__(self, module, params):
from Crypto import Random
@@ -310,6 +596,10 @@ class IVLengthTest(unittest.TestCase):
self.module.MODE_OFB, "")
self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
self.module.MODE_OPENPGP, "")
+ if hasattr(self.module, "MODE_CCM"):
+ for ivlen in (0,6,14):
+ self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
+ self.module.MODE_CCM, bchr(0)*ivlen, msg_len=10)
self.module.new(a2b_hex(self.key), self.module.MODE_ECB, "")
self.module.new(a2b_hex(self.key), self.module.MODE_CTR, "", counter=self._dummy_counter)
@@ -367,6 +657,13 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
]
extra_tests_added = 1
+ # Extract associated data and MAC for AEAD modes
+ if p_mode == 'CCM':
+ assoc_data, params['plaintext'] = params['plaintext'].split('|')
+ assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|')
+ params['assoc_data'] = assoc_data
+ params['mac_len'] = len(params['mac'])>>1
+
# Add the current test to the test suite
tests.append(CipherSelfTest(module, params))
@@ -383,6 +680,19 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
if not params2['ctr_params'].has_key('disable_shortcut'):
params2['ctr_params']['disable_shortcut'] = 1
tests.append(CipherSelfTest(module, params2))
+
+ # Add tests that don't use test vectors
+ if hasattr(module, "MODE_CCM"):
+ tests += [
+ CCMMACLengthTest(module),
+ CCMSplitEncryptionTest(module),
+ ]
+ for aead_mode in ("MODE_CCM",):
+ if hasattr(module, aead_mode):
+ tests += [
+ AEADTests(module, aead_mode),
+ ]
+
return tests
def make_stream_tests(module, module_name, test_data):
diff --git a/lib/Crypto/SelfTest/Cipher/test_AES.py b/lib/Crypto/SelfTest/Cipher/test_AES.py
index 8fd4a6f..878f56b 100644
--- a/lib/Crypto/SelfTest/Cipher/test_AES.py
+++ b/lib/Crypto/SelfTest/Cipher/test_AES.py
@@ -1445,6 +1445,240 @@ test_data = [
'5baa61e4c9b93f3f0682250b6cf8331b', # Key (hash of 'password')
'GPG Test Vector #1',
dict(mode='OPENPGP', iv='3d7d3e62282add7eb203eeba5c800733', encrypted_iv='fd934601ef49cb58b6d9aebca6056bdb96ef' ) ),
+
+ # NIST SP 800-38C test vectors for CCM
+ # This is a list of tuples with 5 items:
+ #
+ # 1. Associated data + '|' + plaintext
+ # 2. Associated data + '|' + ciphertext + '|' + MAC
+ # 3. AES-128 key
+ # 4. Description
+ # 5. Dictionary of parameters to be passed to AES.new().
+ # It must include the nonce.
+ #
+ ( '0001020304050607|20212223',
+ '0001020304050607|7162015b|4dac255d',
+ '404142434445464748494a4b4c4d4e4f',
+ 'NIST SP 800-38C Appex C.1',
+ dict(mode='CCM', nonce='10111213141516')
+ ),
+ ( '000102030405060708090a0b0c0d0e0f|202122232425262728292a2b2c2d2e2f',
+ '000102030405060708090a0b0c0d0e0f|d2a1f0e051ea5f62081a7792073d593d|1fc64fbfaccd',
+ '404142434445464748494a4b4c4d4e4f',
+ 'NIST SP 800-38C Appex C.2',
+ dict(mode='CCM', nonce='1011121314151617')
+ ),
+ ( '000102030405060708090a0b0c0d0e0f10111213|'+
+ '202122232425262728292a2b2c2d2e2f3031323334353637',
+ '000102030405060708090a0b0c0d0e0f10111213|'+
+ 'e3b201a9f5b71a7a9b1ceaeccd97e70b6176aad9a4428aa5|484392fbc1b09951',
+ '404142434445464748494a4b4c4d4e4f',
+ 'NIST SP 800-38C Appex C.3',
+ dict(mode='CCM', nonce='101112131415161718191a1b')
+ ),
+ (
+ (''.join(["%02X" % (x*16+y) for x in xrange(0,16) for y in xrange(0,16)]))*256+'|'+
+ '202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f',
+ (''.join(["%02X" % (x*16+y) for x in xrange(0,16) for y in xrange(0,16)]))*256+'|'+
+ '69915dad1e84c6376a68c2967e4dab615ae0fd1faec44cc484828529463ccf72|'+
+ 'b4ac6bec93e8598e7f0dadbcea5b',
+ '404142434445464748494a4b4c4d4e4f',
+ 'NIST SP 800-38C Appex C.4',
+ dict(mode='CCM', nonce='101112131415161718191a1b1c')
+ ),
+ # RFC3610 test vectors
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e',
+ '0001020304050607|588c979a61c663d2f066d0c2c0f989806d5f6b61dac384|'+
+ '17e8d12cfdf926e0',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #1',
+ dict(mode='CCM', nonce='00000003020100a0a1a2a3a4a5')
+ ),
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f',
+ '0001020304050607|72c91a36e135f8cf291ca894085c87e3cc15c439c9e43a3b|'+
+ 'a091d56e10400916',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #2',
+ dict(mode='CCM', nonce='00000004030201a0a1a2a3a4a5')
+ ),
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20',
+ '0001020304050607|51b1e5f44a197d1da46b0f8e2d282ae871e838bb64da859657|'+
+ '4adaa76fbd9fb0c5',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #3',
+ dict(mode='CCM', nonce='00000005040302A0A1A2A3A4A5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e',
+ '000102030405060708090a0b|a28c6865939a9a79faaa5c4c2a9d4a91cdac8c|'+
+ '96c861b9c9e61ef1',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #4',
+ dict(mode='CCM', nonce='00000006050403a0a1a2a3a4a5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f',
+ '000102030405060708090a0b|dcf1fb7b5d9e23fb9d4e131253658ad86ebdca3e|'+
+ '51e83f077d9c2d93',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #5',
+ dict(mode='CCM', nonce='00000007060504a0a1a2a3a4a5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f20',
+ '000102030405060708090a0b|6fc1b011f006568b5171a42d953d469b2570a4bd87|'+
+ '405a0443ac91cb94',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #6',
+ dict(mode='CCM', nonce='00000008070605a0a1a2a3a4a5')
+ ),
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e',
+ '0001020304050607|0135d1b2c95f41d5d1d4fec185d166b8094e999dfed96c|'+
+ '048c56602c97acbb7490',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #7',
+ dict(mode='CCM', nonce='00000009080706a0a1a2a3a4a5')
+ ),
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f',
+ '0001020304050607|7b75399ac0831dd2f0bbd75879a2fd8f6cae6b6cd9b7db24|'+
+ 'c17b4433f434963f34b4',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #8',
+ dict(mode='CCM', nonce='0000000a090807a0a1a2a3a4a5')
+ ),
+ (
+ '0001020304050607|08090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20',
+ '0001020304050607|82531a60cc24945a4b8279181ab5c84df21ce7f9b73f42e197|'+
+ 'ea9c07e56b5eb17e5f4e',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #9',
+ dict(mode='CCM', nonce='0000000b0a0908a0a1a2a3a4a5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e',
+ '000102030405060708090a0b|07342594157785152b074098330abb141b947b|'+
+ '566aa9406b4d999988dd',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #10',
+ dict(mode='CCM', nonce='0000000c0b0a09a0a1a2a3a4a5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f',
+ '000102030405060708090a0b|676bb20380b0e301e8ab79590a396da78b834934|'+
+ 'f53aa2e9107a8b6c022c',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #11',
+ dict(mode='CCM', nonce='0000000d0c0b0aa0a1a2a3a4a5')
+ ),
+ (
+ '000102030405060708090a0b|0c0d0e0f101112131415161718191a1b1c1d1e1f20',
+ '000102030405060708090a0b|c0ffa0d6f05bdb67f24d43a4338d2aa4bed7b20e43|'+
+ 'cd1aa31662e7ad65d6db',
+ 'c0c1c2c3c4c5c6c7c8c9cacbcccdcecf',
+ 'RFC3610 Packet Vector #12',
+ dict(mode='CCM', nonce='0000000e0d0c0ba0a1a2a3a4a5')
+ ),
+ (
+ '0be1a88bace018b1|08e8cf97d820ea258460e96ad9cf5289054d895ceac47c',
+ '0be1a88bace018b1|4cb97f86a2a4689a877947ab8091ef5386a6ffbdd080f8|'+
+ 'e78cf7cb0cddd7b3',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #13',
+ dict(mode='CCM', nonce='00412b4ea9cdbe3c9696766cfa')
+ ),
+ (
+ '63018f76dc8a1bcb|9020ea6f91bdd85afa0039ba4baff9bfb79c7028949cd0ec',
+ '63018f76dc8a1bcb|4ccb1e7ca981befaa0726c55d378061298c85c92814abc33|'+
+ 'c52ee81d7d77c08a',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #14',
+ dict(mode='CCM', nonce='0033568ef7b2633c9696766cfa')
+ ),
+ (
+ 'aa6cfa36cae86b40|b916e0eacc1c00d7dcec68ec0b3bbb1a02de8a2d1aa346132e',
+ 'aa6cfa36cae86b40|b1d23a2220ddc0ac900d9aa03c61fcf4a559a4417767089708|'+
+ 'a776796edb723506',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #15',
+ dict(mode='CCM', nonce='00103fe41336713c9696766cfa')
+ ),
+ (
+ 'd0d0735c531e1becf049c244|12daac5630efa5396f770ce1a66b21f7b2101c',
+ 'd0d0735c531e1becf049c244|14d253c3967b70609b7cbb7c49916028324526|'+
+ '9a6f49975bcadeaf',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #16',
+ dict(mode='CCM', nonce='00764c63b8058e3c9696766cfa')
+ ),
+ (
+ '77b60f011c03e1525899bcae|e88b6a46c78d63e52eb8c546efb5de6f75e9cc0d',
+ '77b60f011c03e1525899bcae|5545ff1a085ee2efbf52b2e04bee1e2336c73e3f|'+
+ '762c0c7744fe7e3c',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #17',
+ dict(mode='CCM', nonce='00f8b678094e3b3c9696766cfa')
+ ),
+ (
+ 'cd9044d2b71fdb8120ea60c0|6435acbafb11a82e2f071d7ca4a5ebd93a803ba87f',
+ 'cd9044d2b71fdb8120ea60c0|009769ecabdf48625594c59251e6035722675e04c8|'+
+ '47099e5ae0704551',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #18',
+ dict(mode='CCM', nonce='00d560912d3f703c9696766cfa')
+ ),
+ (
+ 'd85bc7e69f944fb8|8a19b950bcf71a018e5e6701c91787659809d67dbedd18',
+ 'd85bc7e69f944fb8|bc218daa947427b6db386a99ac1aef23ade0b52939cb6a|'+
+ '637cf9bec2408897c6ba',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #19',
+ dict(mode='CCM', nonce='0042fff8f1951c3c9696766cfa')
+ ),
+ (
+ '74a0ebc9069f5b37|1761433c37c5a35fc1f39f406302eb907c6163be38c98437',
+ '74a0ebc9069f5b37|5810e6fd25874022e80361a478e3e9cf484ab04f447efff6|'+
+ 'f0a477cc2fc9bf548944',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #20',
+ dict(mode='CCM', nonce='00920f40e56cdc3c9696766cfa')
+ ),
+ (
+ '44a3aa3aae6475ca|a434a8e58500c6e41530538862d686ea9e81301b5ae4226bfa',
+ '44a3aa3aae6475ca|f2beed7bc5098e83feb5b31608f8e29c38819a89c8e776f154|'+
+ '4d4151a4ed3a8b87b9ce',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #21',
+ dict(mode='CCM', nonce='0027ca0c7120bc3c9696766cfa')
+ ),
+ (
+ 'ec46bb63b02520c33c49fd70|b96b49e21d621741632875db7f6c9243d2d7c2',
+ 'ec46bb63b02520c33c49fd70|31d750a09da3ed7fddd49a2032aabf17ec8ebf|'+
+ '7d22c8088c666be5c197',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #22',
+ dict(mode='CCM', nonce='005b8ccbcd9af83c9696766cfa')
+ ),
+ (
+ '47a65ac78b3d594227e85e71|e2fcfbb880442c731bf95167c8ffd7895e337076',
+ '47a65ac78b3d594227e85e71|e882f1dbd38ce3eda7c23f04dd65071eb41342ac|'+
+ 'df7e00dccec7ae52987d',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #23',
+ dict(mode='CCM', nonce='003ebe94044b9a3c9696766cfa')
+ ),
+ (
+ '6e37a6ef546d955d34ab6059|abf21c0b02feb88f856df4a37381bce3cc128517d4',
+ '6e37a6ef546d955d34ab6059|f32905b88a641b04b9c9ffb58cc390900f3da12ab1|'+
+ '6dce9e82efa16da62059',
+ 'd7828d13b2b0bdc325a76236df93cc6b',
+ 'RFC3610 Packet Vector #24',
+ dict(mode='CCM', nonce='008d493b30ae8b3c9696766cfa')
+ ),
]
def get_tests(config={}):
diff --git a/pct-speedtest.py b/pct-speedtest.py
index e84cf81..378604f 100644
--- a/pct-speedtest.py
+++ b/pct-speedtest.py
@@ -167,6 +167,8 @@ class Benchmark:
elif mode == "CTR-LE":
from Crypto.Util import Counter
cipher = module.new(key, module.MODE_CTR, counter=Counter.new(module.block_size*8, little_endian=True))
+ elif hasattr(module, 'MODE_CCM') and mode==module.MODE_CCM:
+ cipher = module.new(key, mode, iv[:8], msg_len=len(rand)*len(blocks))
elif mode==module.MODE_CTR:
ctr = Crypto.Util.Counter.new(module.block_size*8,
initial_value=bytes_to_long(iv),
@@ -359,6 +361,8 @@ class Benchmark:
self.test_encryption("%s-OPENPGP" % (cipher_name,), module, key_bytes, module.MODE_OPENPGP)
self.test_encryption("%s-CTR-BE" % (cipher_name,), module, key_bytes, "CTR-BE")
self.test_encryption("%s-CTR-LE" % (cipher_name,), module, key_bytes, "CTR-LE")
+ if hasattr(module, "MODE_CCM"):
+ self.test_encryption("%s-CCM" % (cipher_name,), module, key_bytes, module.MODE_CCM)
# Crypto.Cipher (stream ciphers)
for cipher_name, module, key_bytes in stream_specs: