summaryrefslogtreecommitdiff
path: root/barbicanclient/secrets.py
blob: a1e9b122f6c8bbf24da489d8e3c2590152b04d64 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from barbicanclient import base
from barbicanclient.openstack.common.timeutils import parse_isotime


LOG = logging.getLogger(__name__)


class Secret(object):
    """
    Secrets are used to keep track of the data stored in Barbican.
    """

    def __init__(self, secret_dict):
        """
        Builds a secret object from a dictionary.
        """
        self.secret_ref = secret_dict.get('secret_ref')
        self.name = secret_dict.get('name')
        self.status = secret_dict.get('status')
        self.content_types = secret_dict.get('content_types')

        self.created = parse_isotime(secret_dict.get('created'))
        if secret_dict.get('expiration') is not None:
            self.expiration = parse_isotime(secret_dict['expiration'])
        else:
            self.expiration = None
        if secret_dict.get('updated') is not None:
            self.updated = parse_isotime(secret_dict['updated'])
        else:
            self.updated = None

        self.algorithm = secret_dict.get('algorithm')
        self.bit_length = secret_dict.get('bit_length')
        self.mode = secret_dict.get('mode')

    def __str__(self):
        return ("Secret - href: {0}\n"
                "         name: {1}\n"
                "         created: {2}\n"
                "         status: {3}\n"
                "         content types: {4}\n"
                "         algorithm: {5}\n"
                "         bit length: {6}\n"
                "         mode: {7}\n"
                "         expiration: {8}\n"
                .format(self.secret_ref, self.name, self.created,
                        self.status, self.content_types, self.algorithm,
                        self.bit_length, self.mode, self.expiration)
                )

    def __repr__(self):
        return 'Secret(name="{0}")'.format(self.name)


class SecretManager(base.BaseEntityManager):

    def __init__(self, api):
        super(SecretManager, self).__init__(api, 'secrets')

    def store(self,
              name=None,
              payload=None,
              payload_content_type=None,
              payload_content_encoding=None,
              algorithm=None,
              bit_length=None,
              mode=None,
              expiration=None):
        """
        Stores a new Secret in Barbican

        :param name: A friendly name for the secret
        :param payload: The unencrypted secret data
        :param payload_content_type: The format/type of the secret data
        :param payload_content_encoding: The encoding of the secret data
        :param algorithm: The algorithm associated with this secret key
        :param bit_length: The bit length of this secret key
        :param mode: The algorithm mode used with this secret key
        :param expiration: The expiration time of the secret in ISO 8601
                           format
        :returns: Secret href for the stored secret
        """
        LOG.debug("Creating secret of payload content type {0}".format(
            payload_content_type))

        secret_dict = dict()
        secret_dict['name'] = name
        secret_dict['payload'] = payload
        secret_dict['payload_content_type'] = payload_content_type
        secret_dict['payload_content_encoding'] = payload_content_encoding
        secret_dict['algorithm'] = algorithm
        secret_dict['mode'] = mode
        secret_dict['bit_length'] = bit_length
        secret_dict['expiration'] = expiration
        self._remove_empty_keys(secret_dict)

        LOG.debug("Request body: {0}".format(secret_dict))

        resp = self.api.post(self.entity, secret_dict)
        return resp['secret_ref']

    def get(self, secret_ref):
        """
        Returns a Secret object with metadata about the secret.

        :param secret_ref: The href for the secret
        """
        if not secret_ref:
            raise ValueError('secret_ref is required.')
        resp = self.api.get(secret_ref)
        return Secret(resp)

    def decrypt(self, secret_ref, content_type=None):
        """
        Returns the actual secret data stored in Barbican.

        :param secret_ref: The href for the secret
        :param content_type: The content_type of the secret, if not
            provided, the client will fetch the secret meta and use the
            default content_type to decrypt the secret
        :returns: secret data
        """
        if not secret_ref:
            raise ValueError('secret_ref is required.')
        if not content_type:
            secret = self.get(secret_ref)
            if secret.content_types is None:
                raise ValueError('Secret has no encrypted data to decrypt.')
            if 'default' not in secret.content_types:
                raise ValueError("Must specify decrypt content-type as "
                                 "secret does not specify a 'default' "
                                 "content-type.")
            content_type = secret.content_types['default']
        headers = {'Accept': content_type}
        return self.api.get_raw(secret_ref, headers)

    def delete(self, secret_ref):
        """
        Deletes a secret

        :param secret_ref: The href for the secret
        """
        if not secret_ref:
            raise ValueError('secret_ref is required.')
        self.api.delete(secret_ref)

    def list(self, limit=10, offset=0):
        """
        List all secrets for the tenant

        :param limit: Max number of secrets returned
        :param offset: Offset secrets to begin list
        :returns: list of Secret metadata objects
        """
        LOG.debug('Listing secrets - offset {0} limit {1}'.format(offset,
                                                                  limit))
        href = '{0}/{1}'.format(self.api.base_url, self.entity)
        params = {'limit': limit, 'offset': offset}
        resp = self.api.get(href, params)

        return [Secret(s) for s in resp['secrets']]