# -*- coding: utf-8 -*- import os import warnings from mock import patch from oauthlib import common, signals from oauthlib.oauth2 import (BackendApplicationClient, Client, LegacyApplicationClient, MobileApplicationClient, WebApplicationClient) from oauthlib.oauth2.rfc6749 import errors, utils from oauthlib.oauth2.rfc6749.clients import AUTH_HEADER, BODY, URI_QUERY import urllib.parse as urlparse from ....unittest import TestCase @patch('time.time', new=lambda: 1000) class WebApplicationClientTest(TestCase): client_id = "someclientid" client_secret = 'someclientsecret' uri = "https://example.com/path?query=world" uri_id = uri + "&response_type=code&client_id=" + client_id uri_redirect = uri_id + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" redirect_uri = "http://my.page.com/callback" scope = ["/profile"] state = "xyz" uri_scope = uri_id + "&scope=%2Fprofile" uri_state = uri_id + "&state=" + state kwargs = { "some": "providers", "require": "extra arguments" } uri_kwargs = uri_id + "&some=providers&require=extra+arguments" uri_authorize_code = uri_redirect + "&scope=%2Fprofile&state=" + state code = "zzzzaaaa" body = "not=empty" body_code = "not=empty&grant_type=authorization_code&code={}&client_id={}".format(code, client_id) body_redirect = body_code + "&redirect_uri=http%3A%2F%2Fmy.page.com%2Fcallback" body_kwargs = body_code + "&some=providers&require=extra+arguments" response_uri = "https://client.example.com/cb?code=zzzzaaaa&state=xyz" response = {"code": "zzzzaaaa", "state": "xyz"} token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",' ' "token_type":"example",' ' "expires_in":3600,' ' "scope":"/profile",' ' "refresh_token":"tGzv3JOkF0XG5Qx2TlKWIA",' ' "example_parameter":"example_value"}') token = { "access_token": "2YotnFZFEjr1zCsicMWpAA", "token_type": "example", "expires_in": 3600, "expires_at": 4600, "scope": scope, "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA", "example_parameter": "example_value" } def test_auth_grant_uri(self): client = WebApplicationClient(self.client_id) # Basic, no extra arguments uri = client.prepare_request_uri(self.uri) self.assertURLEqual(uri, self.uri_id) # With redirection uri uri = client.prepare_request_uri(self.uri, redirect_uri=self.redirect_uri) self.assertURLEqual(uri, self.uri_redirect) # With scope uri = client.prepare_request_uri(self.uri, scope=self.scope) self.assertURLEqual(uri, self.uri_scope) # With state uri = client.prepare_request_uri(self.uri, state=self.state) self.assertURLEqual(uri, self.uri_state) # With extra parameters through kwargs uri = client.prepare_request_uri(self.uri, **self.kwargs) self.assertURLEqual(uri, self.uri_kwargs) def test_request_body(self): client = WebApplicationClient(self.client_id, code=self.code) # Basic, no extra arguments body = client.prepare_request_body(body=self.body) self.assertFormBodyEqual(body, self.body_code) rclient = WebApplicationClient(self.client_id) body = rclient.prepare_request_body(code=self.code, body=self.body) self.assertFormBodyEqual(body, self.body_code) # With redirection uri body = client.prepare_request_body(body=self.body, redirect_uri=self.redirect_uri) self.assertFormBodyEqual(body, self.body_redirect) # With extra parameters body = client.prepare_request_body(body=self.body, **self.kwargs) self.assertFormBodyEqual(body, self.body_kwargs) def test_parse_grant_uri_response(self): client = WebApplicationClient(self.client_id) # Parse code and state response = client.parse_request_uri_response(self.response_uri, state=self.state) self.assertEqual(response, self.response) self.assertEqual(client.code, self.code) # Mismatching state self.assertRaises(errors.MismatchingStateError, client.parse_request_uri_response, self.response_uri, state="invalid") def test_populate_attributes(self): client = WebApplicationClient(self.client_id) response_uri = (self.response_uri + "&access_token=EVIL-TOKEN" "&refresh_token=EVIL-TOKEN" "&mac_key=EVIL-KEY") client.parse_request_uri_response(response_uri, self.state) self.assertEqual(client.code, self.code) # We must not accidentally pick up any further security # credentials at this point. self.assertIsNone(client.access_token) self.assertIsNone(client.refresh_token) self.assertIsNone(client.mac_key) def test_parse_token_response(self): client = WebApplicationClient(self.client_id) # Parse code and state response = client.parse_request_body_response(self.token_json, scope=self.scope) self.assertEqual(response, self.token) self.assertEqual(client.access_token, response.get("access_token")) self.assertEqual(client.refresh_token, response.get("refresh_token")) self.assertEqual(client.token_type, response.get("token_type")) # Mismatching state self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid") os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' token = client.parse_request_body_response(self.token_json, scope="invalid") self.assertTrue(token.scope_changed) scope_changes_recorded = [] def record_scope_change(sender, message, old, new): scope_changes_recorded.append((message, old, new)) signals.scope_changed.connect(record_scope_change) try: client.parse_request_body_response(self.token_json, scope="invalid") self.assertEqual(len(scope_changes_recorded), 1) message, old, new = scope_changes_recorded[0] self.assertEqual(message, 'Scope has changed from "invalid" to "/profile".') self.assertEqual(old, ['invalid']) self.assertEqual(new, ['/profile']) finally: signals.scope_changed.disconnect(record_scope_change) del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] def test_prepare_authorization_requeset(self): client = WebApplicationClient(self.client_id) url, header, body = client.prepare_authorization_request( self.uri, redirect_url=self.redirect_uri, state=self.state, scope=self.scope) self.assertURLEqual(url, self.uri_authorize_code) # verify default header and body only self.assertEqual(header, {'Content-Type': 'application/x-www-form-urlencoded'}) self.assertEqual(body, '') def test_prepare_request_body(self): """ see issue #585 https://github.com/oauthlib/oauthlib/issues/585 `prepare_request_body` should support the following scenarios: 1. Include client_id alone in the body (default) 2. Include client_id and client_secret in auth and not include them in the body (RFC preferred solution) 3. Include client_id and client_secret in the body (RFC alternative solution) 4. Include client_id in the body and an empty string for client_secret. """ client = WebApplicationClient(self.client_id) # scenario 1, default behavior to include `client_id` r1 = client.prepare_request_body() self.assertEqual(r1, 'grant_type=authorization_code&client_id=%s' % self.client_id) r1b = client.prepare_request_body(include_client_id=True) self.assertEqual(r1b, 'grant_type=authorization_code&client_id=%s' % self.client_id) # scenario 2, do not include `client_id` in the body, so it can be sent in auth. r2 = client.prepare_request_body(include_client_id=False) self.assertEqual(r2, 'grant_type=authorization_code') # scenario 3, Include client_id and client_secret in the body (RFC alternative solution) # the order of kwargs being appended is not guaranteed. for brevity, check the 2 permutations instead of sorting r3 = client.prepare_request_body(client_secret=self.client_secret) r3_params = dict(urlparse.parse_qsl(r3, keep_blank_values=True)) self.assertEqual(len(r3_params.keys()), 3) self.assertEqual(r3_params['grant_type'], 'authorization_code') self.assertEqual(r3_params['client_id'], self.client_id) self.assertEqual(r3_params['client_secret'], self.client_secret) r3b = client.prepare_request_body(include_client_id=True, client_secret=self.client_secret) r3b_params = dict(urlparse.parse_qsl(r3b, keep_blank_values=True)) self.assertEqual(len(r3b_params.keys()), 3) self.assertEqual(r3b_params['grant_type'], 'authorization_code') self.assertEqual(r3b_params['client_id'], self.client_id) self.assertEqual(r3b_params['client_secret'], self.client_secret) # scenario 4, `client_secret` is an empty string r4 = client.prepare_request_body(include_client_id=True, client_secret='') r4_params = dict(urlparse.parse_qsl(r4, keep_blank_values=True)) self.assertEqual(len(r4_params.keys()), 3) self.assertEqual(r4_params['grant_type'], 'authorization_code') self.assertEqual(r4_params['client_id'], self.client_id) self.assertEqual(r4_params['client_secret'], '') # scenario 4b, `client_secret` is `None` r4b = client.prepare_request_body(include_client_id=True, client_secret=None) r4b_params = dict(urlparse.parse_qsl(r4b, keep_blank_values=True)) self.assertEqual(len(r4b_params.keys()), 2) self.assertEqual(r4b_params['grant_type'], 'authorization_code') self.assertEqual(r4b_params['client_id'], self.client_id) # scenario Warnings with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") # catch all # warning1 - raise a DeprecationWarning if a `client_id` is submitted rWarnings1 = client.prepare_request_body(client_id=self.client_id) self.assertEqual(len(w), 1) self.assertIsInstance(w[0].message, DeprecationWarning) # testing the exact warning message in Python2&Python3 is a pain # scenario Exceptions # exception1 - raise a ValueError if the a different `client_id` is submitted with self.assertRaises(ValueError) as cm: client.prepare_request_body(client_id='different_client_id') # testing the exact exception message in Python2&Python3 is a pain