summaryrefslogtreecommitdiff
path: root/tests/oauth2/rfc6749/clients/test_service_application.py
blob: 6f48e232e3166dffa01d1054c55d84fd05a1dadb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# -*- coding: utf-8 -*-
import os
from time import time

import jwt
from unittest.mock import patch

from oauthlib.common import Request
from oauthlib.oauth2 import ServiceApplicationClient

from ....unittest import TestCase


class ServiceApplicationClientTest(TestCase):

    gt = ServiceApplicationClient.grant_type

    private_key = """
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG
AlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah
5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB
AoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR
p3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC
DY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i
rf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe
542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx
C60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT
Suy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA
kmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS
JzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT
Ga6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA==
-----END RSA PRIVATE KEY-----
"""

    public_key = """
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJ
opeQRRKKpZI4s5i+UPwVpupGAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98g
j0x7zamu0Ck1LtQ4c7pFMVah5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8
mfvGGg3xNjTMO7IdrwIDAQAB
-----END PUBLIC KEY-----
"""

    subject = 'resource-owner@provider.com'

    issuer = 'the-client@provider.com'

    audience = 'https://provider.com/token'

    client_id = "someclientid"
    scope = ["/profile"]
    kwargs = {
        "some": "providers",
        "require": "extra arguments"
    }

    body = "isnot=empty"

    body_up = "not=empty&grant_type=%s" % gt
    body_kwargs = body_up + "&some=providers&require=extra+arguments"

    token_json = ('{   "access_token":"2YotnFZFEjr1zCsicMWpAA",'
                  '    "token_type":"example",'
                  '    "expires_in":3600,'
                  '    "scope":"/profile",'
                  '    "example_parameter":"example_value"}')
    token = {
        "access_token": "2YotnFZFEjr1zCsicMWpAA",
        "token_type": "example",
        "expires_in": 3600,
        "scope": ["/profile"],
        "example_parameter": "example_value"
    }

    @patch('time.time')
    def test_request_body(self, t):
        t.return_value = time()
        self.token['expires_at'] = self.token['expires_in'] + t.return_value

        client = ServiceApplicationClient(
                self.client_id, private_key=self.private_key)

        # Basic with min required params
        body = client.prepare_request_body(issuer=self.issuer,
                                           subject=self.subject,
                                           audience=self.audience,
                                           body=self.body)
        r = Request('https://a.b', body=body)
        self.assertEqual(r.isnot, 'empty')
        self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

        claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

        self.assertEqual(claim['iss'], self.issuer)
        # audience verification is handled during decode now
        self.assertEqual(claim['sub'], self.subject)
        self.assertEqual(claim['iat'], int(t.return_value))
        self.assertNotIn('nbf', claim)
        self.assertNotIn('jti', claim)

        # Missing issuer parameter
        self.assertRaises(ValueError, client.prepare_request_body,
            issuer=None, subject=self.subject, audience=self.audience, body=self.body)

        # Missing subject parameter
        self.assertRaises(ValueError, client.prepare_request_body,
            issuer=self.issuer, subject=None, audience=self.audience, body=self.body)

        # Missing audience parameter
        self.assertRaises(ValueError, client.prepare_request_body,
            issuer=self.issuer, subject=self.subject, audience=None, body=self.body)

        # Optional kwargs
        not_before = time() - 3600
        jwt_id = '8zd15df4s35f43sd'
        body = client.prepare_request_body(issuer=self.issuer,
                                           subject=self.subject,
                                           audience=self.audience,
                                           body=self.body,
                                           not_before=not_before,
                                           jwt_id=jwt_id)

        r = Request('https://a.b', body=body)
        self.assertEqual(r.isnot, 'empty')
        self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

        claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

        self.assertEqual(claim['iss'], self.issuer)
        # audience verification is handled during decode now
        self.assertEqual(claim['sub'], self.subject)
        self.assertEqual(claim['iat'], int(t.return_value))
        self.assertEqual(claim['nbf'], not_before)
        self.assertEqual(claim['jti'], jwt_id)

    @patch('time.time')
    def test_request_body_no_initial_private_key(self, t):
        t.return_value = time()
        self.token['expires_at'] = self.token['expires_in'] + t.return_value

        client = ServiceApplicationClient(
                self.client_id, private_key=None)

        # Basic with private key provided
        body = client.prepare_request_body(issuer=self.issuer,
                                           subject=self.subject,
                                           audience=self.audience,
                                           body=self.body,
                                           private_key=self.private_key)
        r = Request('https://a.b', body=body)
        self.assertEqual(r.isnot, 'empty')
        self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)

        claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])

        self.assertEqual(claim['iss'], self.issuer)
        # audience verification is handled during decode now
        self.assertEqual(claim['sub'], self.subject)
        self.assertEqual(claim['iat'], int(t.return_value))

        # No private key provided
        self.assertRaises(ValueError, client.prepare_request_body,
            issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)

    @patch('time.time')
    def test_parse_token_response(self, t):
        t.return_value = time()
        self.token['expires_at'] = self.token['expires_in'] + t.return_value

        client = ServiceApplicationClient(self.client_id)

        # Parse code and state
        response = client.parse_request_body_response(self.token_json, scope=self.scope)
        self.assertEqual(response, self.token)
        self.assertEqual(client.access_token, response.get("access_token"))
        self.assertEqual(client.refresh_token, response.get("refresh_token"))
        self.assertEqual(client.token_type, response.get("token_type"))

        # Mismatching state
        self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
        os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '2'
        token = client.parse_request_body_response(self.token_json, scope="invalid")
        self.assertTrue(token.scope_changed)
        del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']