summaryrefslogtreecommitdiff
path: root/wheel/util.py
blob: 5268813725e84395ca267563ef3019c785c12740 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Utility functions."""

import sys
import os
import base64
import json
import hashlib
try:
    from collections import OrderedDict
except ImportError:
    OrderedDict = dict

__all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
           'to_json', 'from_json', 'matches_requirement']

def urlsafe_b64encode(data):
    """urlsafe_b64encode without padding"""
    return base64.urlsafe_b64encode(data).rstrip(binary('='))


def urlsafe_b64decode(data):
    """urlsafe_b64decode without padding"""
    pad = b'=' * (4 - (len(data) & 3))
    return base64.urlsafe_b64decode(data + pad)


def to_json(o):
    '''Convert given data to JSON.'''
    return json.dumps(o, sort_keys=True)


def from_json(j):
    '''Decode a JSON payload.'''
    return json.loads(j)

def open_for_csv(name, mode):
    if sys.version_info[0] < 3:
        nl = {}
        bin = 'b'
    else:
        nl = { 'newline': '' }
        bin = ''
    return open(name, mode + bin, **nl)

try:
    unicode

    def utf8(data):
        '''Utf-8 encode data.'''
        if isinstance(data, unicode):
            return data.encode('utf-8')
        return data
except NameError:
    def utf8(data):
        '''Utf-8 encode data.'''
        if isinstance(data, str):
            return data.encode('utf-8')
        return data


try:
    # For encoding ascii back and forth between bytestrings, as is repeatedly
    # necessary in JSON-based crypto under Python 3
    unicode
    def native(s):
        return s
    def binary(s):
        if isinstance(s, unicode):
            return s.encode('ascii')
        return s
except NameError:
    def native(s):
        if isinstance(s, bytes):
            return s.decode('ascii')
        return s
    def binary(s):
        if isinstance(s, str):
            return s.encode('ascii')

class HashingFile(object):
    def __init__(self, fd, hashtype='sha256'):
        self.fd = fd
        self.hashtype = hashtype
        self.hash = hashlib.new(hashtype)
        self.length = 0
    def write(self, data):
        self.hash.update(data)
        self.length += len(data)
        self.fd.write(data)
    def close(self):
        self.fd.close()
    def digest(self):
        if self.hashtype == 'md5':
            return self.hash.hexdigest()
        digest = self.hash.digest()
        return self.hashtype + '=' + native(urlsafe_b64encode(digest))

class OrderedDefaultDict(OrderedDict):
    def __init__(self, *args, **kwargs):
        if not args:
            self.default_factory = None
        else:
            if not (args[0] is None or callable(args[0])):
                raise TypeError('first argument must be callable or None')
            self.default_factory = args[0]
            args = args[1:]
        super(OrderedDefaultDict, self).__init__(*args, **kwargs)

    def __missing__ (self, key):
        if self.default_factory is None:
            raise KeyError(key)
        self[key] = default = self.default_factory()
        return default

if sys.platform == 'win32':
    import ctypes.wintypes
    # CSIDL_APPDATA for reference - not used here for compatibility with
    # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
    csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28,
            CSIDL_COMMON_APPDATA=35)
    def get_path(name):
        SHGFP_TYPE_CURRENT = 0
        buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
        ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
        return buf.value

    def save_config_path(*resource):
        appdata = get_path("CSIDL_LOCAL_APPDATA")
        path = os.path.join(appdata, *resource)
        if not os.path.isdir(path):
            os.makedirs(path)
        return path
    def load_config_paths(*resource):
        ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
        for id in ids:
            base = get_path(id)
            path = os.path.join(base, *resource)
            if os.path.exists(path):
                yield path
else:
    def save_config_path(*resource):
        import xdg.BaseDirectory
        return xdg.BaseDirectory.save_config_path(*resource)
    def load_config_paths(*resource):
        import xdg.BaseDirectory
        return xdg.BaseDirectory.load_config_paths(*resource)

def matches_requirement(req, wheels):
    """List of wheels matching a requirement.

    :param req: The requirement to satisfy
    :param wheels: List of wheels to search.
    """
    try:
        from pkg_resources import Distribution, Requirement
    except ImportError:
        raise RuntimeError("Cannot use requirements without pkg_resources")

    req = Requirement.parse(req)

    selected = []
    for wf in wheels:
        f = wf.parsed_filename
        dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
        if dist in req:
            selected.append(wf)
    return selected