diff options
Diffstat (limited to 'pip/_vendor/requests/packages/chardet/mbcharsetprober.py')
-rw-r--r-- | pip/_vendor/requests/packages/chardet/mbcharsetprober.py | 85 |
1 files changed, 45 insertions, 40 deletions
diff --git a/pip/_vendor/requests/packages/chardet/mbcharsetprober.py b/pip/_vendor/requests/packages/chardet/mbcharsetprober.py index bb42f2fb5..6256ecfd1 100644 --- a/pip/_vendor/requests/packages/chardet/mbcharsetprober.py +++ b/pip/_vendor/requests/packages/chardet/mbcharsetprober.py @@ -27,60 +27,65 @@ # 02110-1301 USA ######################### END LICENSE BLOCK ######################### -import sys -from . import constants from .charsetprober import CharSetProber +from .enums import ProbingState, MachineState class MultiByteCharSetProber(CharSetProber): - def __init__(self): - CharSetProber.__init__(self) - self._mDistributionAnalyzer = None - self._mCodingSM = None - self._mLastChar = [0, 0] + """ + MultiByteCharSetProber + """ + + def __init__(self, lang_filter=None): + super(MultiByteCharSetProber, self).__init__(lang_filter=lang_filter) + self.distribution_analyzer = None + self.coding_sm = None + self._last_char = [0, 0] def reset(self): - CharSetProber.reset(self) - if self._mCodingSM: - self._mCodingSM.reset() - if self._mDistributionAnalyzer: - self._mDistributionAnalyzer.reset() - self._mLastChar = [0, 0] + super(MultiByteCharSetProber, self).reset() + if self.coding_sm: + self.coding_sm.reset() + if self.distribution_analyzer: + self.distribution_analyzer.reset() + self._last_char = [0, 0] + + @property + def charset_name(self): + raise NotImplementedError - def get_charset_name(self): - pass + @property + def language(self): + raise NotImplementedError - def feed(self, aBuf): - aLen = len(aBuf) - for i in range(0, aLen): - codingState = self._mCodingSM.next_state(aBuf[i]) - if codingState == constants.eError: - if constants._debug: - sys.stderr.write(self.get_charset_name() - + ' prober hit error at byte ' + str(i) - + '\n') - self._mState = constants.eNotMe + def feed(self, byte_str): + for i in range(len(byte_str)): + coding_state = self.coding_sm.next_state(byte_str[i]) + if coding_state == MachineState.ERROR: + self.logger.debug('%s %s prober hit error at byte %s', + self.charset_name, self.language, i) + self._state = ProbingState.NOT_ME break - elif codingState == constants.eItsMe: - self._mState = constants.eFoundIt + elif coding_state == MachineState.ITS_ME: + self._state = ProbingState.FOUND_IT break - elif codingState == constants.eStart: - charLen = self._mCodingSM.get_current_charlen() + elif coding_state == MachineState.START: + char_len = self.coding_sm.get_current_charlen() if i == 0: - self._mLastChar[1] = aBuf[0] - self._mDistributionAnalyzer.feed(self._mLastChar, charLen) + self._last_char[1] = byte_str[0] + self.distribution_analyzer.feed(self._last_char, char_len) else: - self._mDistributionAnalyzer.feed(aBuf[i - 1:i + 1], - charLen) + self.distribution_analyzer.feed(byte_str[i - 1:i + 1], + char_len) - self._mLastChar[0] = aBuf[aLen - 1] + self._last_char[0] = byte_str[-1] - if self.get_state() == constants.eDetecting: - if (self._mDistributionAnalyzer.got_enough_data() and - (self.get_confidence() > constants.SHORTCUT_THRESHOLD)): - self._mState = constants.eFoundIt + if self.state == ProbingState.DETECTING: + if (self.distribution_analyzer.got_enough_data() and + (self.get_confidence() > self.SHORTCUT_THRESHOLD)): + self._state = ProbingState.FOUND_IT - return self.get_state() + return self.state def get_confidence(self): - return self._mDistributionAnalyzer.get_confidence() + return self.distribution_analyzer.get_confidence() |