1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# -*- coding: utf-8 -*-
"""
kombu.utils.encoding
~~~~~~~~~~~~~~~~~~~~~
Utilities to encode text, and to safely emit text from running
applications without crashing with the infamous :exc:`UnicodeDecodeError`
exception.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import sys
import traceback
is_py3k = sys.version_info >= (3, 0)
if sys.platform.startswith('java'): # pragma: no cover
def default_encoding():
return 'utf-8'
else:
def default_encoding(): # noqa
return sys.getfilesystemencoding()
if is_py3k: # pragma: no cover
def str_to_bytes(s):
if isinstance(s, str):
return s.encode()
return s
def bytes_to_str(s):
if isinstance(s, bytes):
return s.decode()
return s
def from_utf8(s, *args, **kwargs):
return s
def ensure_bytes(s):
if not isinstance(s, bytes):
return str_to_bytes(s)
return s
def default_encode(obj):
return obj
str_t = str
else:
def str_to_bytes(s): # noqa
if isinstance(s, unicode):
return s.encode()
return s
def bytes_to_str(s): # noqa
return s
def from_utf8(s, *args, **kwargs): # noqa
return s.encode('utf-8', *args, **kwargs)
def default_encode(obj): # noqa
return unicode(obj, default_encoding())
str_t = unicode
ensure_bytes = str_to_bytes
try:
bytes_t = bytes
except NameError:
bytes_t = str # noqa
def safe_str(s, errors='replace'):
s = bytes_to_str(s)
if not isinstance(s, basestring):
return safe_repr(s, errors)
return _safe_str(s, errors)
def _safe_str(s, errors='replace'):
if is_py3k: # pragma: no cover
if isinstance(s, str):
return s
try:
return str(s)
except Exception, exc:
return '<Unrepresentable %r: %r %r>' % (
type(s), exc, '\n'.join(traceback.format_stack()))
encoding = default_encoding()
try:
if isinstance(s, unicode):
return s.encode(encoding, errors)
return unicode(s, encoding, errors)
except Exception, exc:
return '<Unrepresentable %r: %r %r>' % (
type(s), exc, '\n'.join(traceback.format_stack()))
def safe_repr(o, errors='replace'):
try:
return repr(o)
except Exception:
return _safe_str(o, errors)
|