summaryrefslogtreecommitdiff
path: root/pip/_vendor/requests/packages/chardet/mbcharsetprober.py
diff options
context:
space:
mode:
Diffstat (limited to 'pip/_vendor/requests/packages/chardet/mbcharsetprober.py')
-rw-r--r--pip/_vendor/requests/packages/chardet/mbcharsetprober.py85
1 files changed, 45 insertions, 40 deletions
diff --git a/pip/_vendor/requests/packages/chardet/mbcharsetprober.py b/pip/_vendor/requests/packages/chardet/mbcharsetprober.py
index bb42f2fb5..6256ecfd1 100644
--- a/pip/_vendor/requests/packages/chardet/mbcharsetprober.py
+++ b/pip/_vendor/requests/packages/chardet/mbcharsetprober.py
@@ -27,60 +27,65 @@
# 02110-1301 USA
######################### END LICENSE BLOCK #########################
-import sys
-from . import constants
from .charsetprober import CharSetProber
+from .enums import ProbingState, MachineState
class MultiByteCharSetProber(CharSetProber):
- def __init__(self):
- CharSetProber.__init__(self)
- self._mDistributionAnalyzer = None
- self._mCodingSM = None
- self._mLastChar = [0, 0]
+ """
+ MultiByteCharSetProber
+ """
+
+ def __init__(self, lang_filter=None):
+ super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter)
+ self.distribution_analyzer = None
+ self.coding_sm = None
+ self._last_char = [0, 0]
def reset(self):
- CharSetProber.reset(self)
- if self._mCodingSM:
- self._mCodingSM.reset()
- if self._mDistributionAnalyzer:
- self._mDistributionAnalyzer.reset()
- self._mLastChar = [0, 0]
+ super(MultiByteCharSetProber, self).reset()
+ if self.coding_sm:
+ self.coding_sm.reset()
+ if self.distribution_analyzer:
+ self.distribution_analyzer.reset()
+ self._last_char = [0, 0]
+
+ @property
+ def charset_name(self):
+ raise NotImplementedError
- def get_charset_name(self):
- pass
+ @property
+ def language(self):
+ raise NotImplementedError
- def feed(self, aBuf):
- aLen = len(aBuf)
- for i in range(0, aLen):
- codingState = self._mCodingSM.next_state(aBuf[i])
- if codingState == constants.eError:
- if constants._debug:
- sys.stderr.write(self.get_charset_name()
- + ' prober hit error at byte ' + str(i)
- + '\n')
- self._mState = constants.eNotMe
+ def feed(self, byte_str):
+ for i in range(len(byte_str)):
+ coding_state = self.coding_sm.next_state(byte_str[i])
+ if coding_state == MachineState.ERROR:
+ self.logger.debug('%s %s prober hit error at byte %s',
+ self.charset_name, self.language, i)
+ self._state = ProbingState.NOT_ME
break
- elif codingState == constants.eItsMe:
- self._mState = constants.eFoundIt
+ elif coding_state == MachineState.ITS_ME:
+ self._state = ProbingState.FOUND_IT
break
- elif codingState == constants.eStart:
- charLen = self._mCodingSM.get_current_charlen()
+ elif coding_state == MachineState.START:
+ char_len = self.coding_sm.get_current_charlen()
if i == 0:
- self._mLastChar[1] = aBuf[0]
- self._mDistributionAnalyzer.feed(self._mLastChar, charLen)
+ self._last_char[1] = byte_str[0]
+ self.distribution_analyzer.feed(self._last_char, char_len)
else:
- self._mDistributionAnalyzer.feed(aBuf[i - 1:i + 1],
- charLen)
+ self.distribution_analyzer.feed(byte_str[i - 1:i + 1],
+ char_len)
- self._mLastChar[0] = aBuf[aLen - 1]
+ self._last_char[0] = byte_str[-1]
- if self.get_state() == constants.eDetecting:
- if (self._mDistributionAnalyzer.got_enough_data() and
- (self.get_confidence() > constants.SHORTCUT_THRESHOLD)):
- self._mState = constants.eFoundIt
+ if self.state == ProbingState.DETECTING:
+ if (self.distribution_analyzer.got_enough_data() and
+ (self.get_confidence() > self.SHORTCUT_THRESHOLD)):
+ self._state = ProbingState.FOUND_IT
- return self.get_state()
+ return self.state
def get_confidence(self):
- return self._mDistributionAnalyzer.get_confidence()
+ return self.distribution_analyzer.get_confidence()