summaryrefslogtreecommitdiff
path: root/oauth_test.py
blob: 7e336c3054696256e24d13b1c27153680ae17deb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
'''Simplified interface to OAuth as a client.

DOES OAuth 1.0 and 1.0a (oauth_verifier is ignored)

'''
import urllib
import urlparse
import httplib
import oauth

class OAuthError(Exception):
    def __str__(self):
        if len(self.args) > 1:
            message, response = self.args
            return '%s (%s: %s)\n%s'%(message, response.status,
                response.reason, response.read())
        else:
            return self.args[0]

class SimpleOAuthClient(oauth.OAuthClient):
    '''Encapsulate a connection to an OAuth server.
    '''

    def __init__(self, key, secret, token=None):
        consumer = oauth.OAuthConsumer(key, secret)
        super(SimpleOAuthClient, self).__init__(consumer, token)

    def _attempt_access(self, url, oauth_request, data=None):
        if data:
            data = urllib.urlencode(data)

        # connect to the remote end
        scheme, host = urlparse.urlparse(url)[:2]
        if scheme == 'https':
            connection = httplib.HTTPSConnection(host)
        else:
            connection = httplib.HTTPConnection(host)

        if data:
            kw = dict(headers=oauth_request.to_header())
            if oauth_request.http_method == 'GET' and data:
                url += '?' + data
            elif oauth_request.http_method == 'POST':
                kw['headers']['Content-Type'] = 'application/x-www-form-urlencoded'
                kw['body'] = data
            connection.request(oauth_request.http_method, url, **kw)
        else:
            connection.request(oauth_request.http_method, url,
                headers=oauth_request.to_header())

        # parse the token from the response
        response = connection.getresponse()
        if response.status != 200:
            raise OAuthError('%s failed: %s %s'%(url, response.status,
                response.reason), response)

        return response

    def fetch_request_token(self, url):
        # construct the request for a request token
        signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
        oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer,
            http_url=url)
        oauth_request.sign_request(signature_method, self.consumer, None)
        response = self._attempt_access(url, oauth_request)
        return oauth.OAuthToken.from_string(response.read())

    def authorize_url(self, url, callback):
        # ask for the authorizaation URL
        oauth_request = oauth.OAuthRequest.from_token_and_callback(
            token=self.token, callback=callback, http_url=url)

        # invoke the remote end
        host = urlparse.urlparse(url)[1]
        return oauth_request.to_url()

    def fetch_access_token(self, url):
        oauth_request = oauth.OAuthRequest.from_consumer_and_token(
            self.consumer, token=self.token, http_url=url)
        signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
        oauth_request.sign_request(signature_method, self.consumer, self.token)
        response = self._attempt_access(url, oauth_request)
        return oauth.OAuthToken.from_string(response.read())

    def access_resource(self, url, parameters, http_method='GET'):
        oauth_request = oauth.OAuthRequest.from_consumer_and_token(
            self.consumer, token=self.token, http_method=http_method,
            http_url=url, parameters=parameters)
        signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
        oauth_request.sign_request(signature_method, self.consumer, self.token)
        response = self._attempt_access(url, oauth_request, data=parameters)
        return response.read()

def example_init():
    # test config
    CONSUMER_KEY = 'sekrit'
    CONSUMER_SECRET = '123'
    REQUEST_TOKEN_URL = 'https://testpypi.python.org/oauth/request_token'
    ACCESS_TOKEN_URL = 'https://testpypi.python.org/oauth/access_token'
    AUTHORIZATION_URL = 'https://testpypi.python.org/oauth/authorise'
    RESOURCE_URL = 'https://testpypi.python.org/oauth/test'
    CALLBACK_URL = 'http://spam.example/back'

    def pause():
        raw_input('[press enter when ready]')

    # setup
    print '** OAuth Python Library Example **'
    client = SimpleOAuthClient(CONSUMER_KEY, CONSUMER_SECRET)

    # get request token
    print '* Obtain a request token ...'
    token = client.fetch_request_token(REQUEST_TOKEN_URL)
    print 'key: %s' % str(token.key)
    print 'secret: %s' % str(token.secret)
    pause()

    print '* Authorize the request token ...'
    print '  You need to open the URL printed below in a browser'
    client = SimpleOAuthClient(CONSUMER_KEY, CONSUMER_SECRET, token)
    # this will actually occur only on some callback
    response = client.authorize_url(AUTHORIZATION_URL, CALLBACK_URL)
    print response
    print '... once you have opened the URL and authorised acces you may continue'
    pause()

    # get access token
    print '* Obtain an access token ...'
    pause()
    client.token = token
    token = client.fetch_access_token(ACCESS_TOKEN_URL)
    print 'key: %s' % str(token.key)
    print 'secret: %s' % str(token.secret)
    pause()

def example_test(access_token):
    # test config
    CONSUMER_KEY = 'sekrit'
    CONSUMER_SECRET = '123'
    RESOURCE_URL = 'https://testpypi.python.org/oauth/test'
    # access some protected resources
    print '* Access protected resources ...'
    client = SimpleOAuthClient(CONSUMER_KEY, CONSUMER_SECRET, access_token)
    params = client.access_resource(RESOURCE_URL, {'param_one': 'test'})
    print 'non-oauth parameters: %s' % params

def example_release(access_token, **params):
    # test config
    CONSUMER_KEY = 'sekrit'
    CONSUMER_SECRET = '123'
    RESOURCE_URL = 'https://testpypi.python.org/oauth/add_release'
    # access some protected resources
    print '* Access protected resources ...'
    client = SimpleOAuthClient(CONSUMER_KEY, CONSUMER_SECRET, access_token)
    params = client.access_resource(RESOURCE_URL, params)
    print 'non-oauth parameters: %s' % params

def example_upload(access_token, **params):
    # test config
    CONSUMER_KEY = 'sekrit'
    CONSUMER_SECRET = '123'
    RESOURCE_URL = 'https://testpypi.python.org/oauth/add_release'
    # access some protected resources
    print '* Access protected resources ...'
    client = SimpleOAuthClient(CONSUMER_KEY, CONSUMER_SECRET, access_token)
    params = client.access_resource(RESOURCE_URL, params, 'POST')
    print 'non-oauth parameters: %s' % params

if __name__ == '__main__':
    # For testing you will want to run example_init() and work through that
    # until you get an access token. Then replace ACCESS_KEY and ACCESS_SECRET
    # with the key and secret you were assigned.
    #example_init()
    ACCESS_KEY = 'AJLks8p4zezH2ud9R7OQy98eRLXf8zut'
    ACCESS_SECRET = 'vSXBEWk5kn6wvVGivILPyBmtRitDvJq0cVcmBDk57eX2XCENvp2da3ou7v09TxrL'
    access_token = oauth.OAuthToken(ACCESS_KEY, ACCESS_SECRET)
    example_test(access_token)
    example_release(access_token, name='spam', version='3.0', summary='Spam via OAuth',
        description='This package was registered via OAuth.')

    #upload = FileUpload(content, filename...)
    #m = hashlib.md5()
    #m.update(content)
    #digest = m.hexdigest()
    #example_upload(access_token, name='spam', version='3.0', summary='Spam via OAuth',
        #description='This package was registered via OAuth.', content=upload,
        #filetype='sdist', md5_digest=digest, comment='via oauth')