Source code for assembl.nlp.translation_service

"""Abstract and concrete classes for a machine translation service."""
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import object
from abc import abstractmethod
import urllib.request, urllib.error, urllib.parse
from traceback import print_exc
import re
from collections import defaultdict
from math import log, floor

import simplejson as json
from langdetect.detector_factory import init_factory
from langdetect.detector import LangDetectException
from sqlalchemy import inspect
from pyramid.i18n import TranslationStringFactory
import requests

from assembl.lib.abc import (abstractclassmethod, classproperty)
from assembl.lib import config
from assembl.lib.enum import OrderedEnum
from assembl.lib.clean_input import unescape
from assembl.models.langstrings import (
    LangString, LangStringEntry, LocaleLabel)
from assembl.lib.locale import (
    strip_country, create_mt_code, locale_compatible, any_locale_compatible)


_ = TranslationStringFactory('assembl')

# Minimum length (chars) before we trust the language identifications outside
# discussion languages
SECURE_IDENTIFICATION_LIMIT = 250


[docs]class LangStringStatus(OrderedEnum): SERVICE_DOWN = 1 # Transient, eg connection error TRANSLATION_FAILURE = 2 # possibly transient, like service down UNKNOWN_ERROR = 3 # unknown... assume transient QUOTA_ERROR = 4 # quota exceeded PERMANENT_TRANSLATION_FAILURE = 10 # eg wrong arguments CANNOT_IDENTIFY = 11 # the identify failed permanently. CANNOT_TRANSLATE = 12 # as given by canTranslate, eg wrong target lang IDENTICAL_TRANSLATION = 13 # the translation is identical to the original? IDENTIFIED_TO_UNKNOWN = 14 # The identified language is not a discussion language TOO_MANY_TRANSIENTS = 15
class LanguageIdentificationService(object): canTranslate = None _url_regexp = re.compile( r"\b(https?|ftp)://(-\.)?([^\s/?\.#-]+\.?)+(/[^\s]*)?\b", re.I) def __init__(self, discussion): self.discussion_id = discussion.id self._discussion = discussion @property def discussion(self): if inspect(self._discussion).detached: self._discussion = self._discussion.__class__.get( self.discussion_id) return self._discussion @property def known_locales(cls): return cls.detector_factory().langlist def asKnownLocale(self, locale_code): parts = locale_code.split("_") base = parts[0] if base == "zh": if len(parts) > 1 and parts[1] in ("Hant", "TW", "HK", "SG", "MO"): return "zh-tw" return "zh-cn" # mainland as default known_locales = self.detector_factory().langlist if base in self.known_locales: return base @classmethod def target_locale_labels_for_locales(cls, locales, target_locale): return LocaleLabel.names_of_locales_in_locale( [strip_country(cls.asPosixLocale(loc)) for loc in locales] + LocaleLabel.SPECIAL_LOCALES, target_locale) idiosyncrasies = {"zh-tw": "zh_Hant_TW", "zh-cn": "zh_Hans_CN"} @classmethod def asPosixLocale(cls, locale_code): return cls.idiosyncrasies.get(locale_code, locale_code) @classmethod def can_guess_locale(cls, text): # empirical return text and len(text) >= 15 @classmethod def strlen_nourl(cls, data): # a fancy strlen that removes urls. return len(cls._url_regexp.sub(' ', data)) def identify( self, text, expected_locales=None, constrain_locale_threshold=SECURE_IDENTIFICATION_LIMIT): "Try to identify locale of text. Boost if one of the expected locales." # Note that it is unreliable for very short text; especially it does not # give multiple probabilities when appropriate. if not text: return LocaleLabel.UNDEFINED, {LocaleLabel.UNDEFINED: 1} len_nourl = self.strlen_nourl(text) if len_nourl < 5: return LocaleLabel.NON_LINGUISTIC, {LocaleLabel.NON_LINGUISTIC: 1} detector = self.detector_factory().create() if constrain_locale_threshold and ( len_nourl < constrain_locale_threshold): excluded_probability = 0 else: # Give less probability to excluded languages for shorter texts excluded_probability = min(1, log(len_nourl) / 10) expected_locales = expected_locales or self.discussion.discussion_locales priors = self.convert_to_priors(expected_locales, excluded_probability) detector.set_prior_map(priors) detector.append(text) language_data = detector.get_probabilities() data = [(x.prob, x.lang) for x in language_data] data.sort(reverse=True) top = data[0][1] if (data and (data[0][0] > 0.5) ) else LocaleLabel.UNDEFINED return top, {lang: prob for (prob, lang) in data} @staticmethod def detector_factory(): init_factory() from langdetect.detector_factory import _factory as detector_factory return detector_factory def convert_to_priors(self, priors, base_rate=0.1): if isinstance(priors, list): priors = {self.asKnownLocale(l): 1 for l in priors} if base_rate > 0: factory = LanguageIdentificationService.detector_factory() if len(priors) < len(factory.langlist): priors0 = {l: base_rate for l in factory.langlist} priors0.update(priors) priors = priors0 return priors def confirm_locale( self, langstring_entry, priors=None, constrain_locale_threshold=SECURE_IDENTIFICATION_LIMIT): try: expected_locales = priors or self.discussion.discussion_locales lang, data = self.identify(langstring_entry.value, expected_locales) data["service"] = self.__class__.__name__ changed = langstring_entry.identify_locale(lang, data) if lang == LocaleLabel.UNDEFINED: pass # say you can't identify except Exception as e: print_exc() self.set_error(langstring_entry, *self.decode_exception(e, True)) @staticmethod def set_error(lse, error_code, error_description): lid = lse.locale_identification_data_json lse.error_code = error_code.value lse.error_count = 1 + (lse.error_count or 0) if (lse.error_count > 10 and lse.error_code < LangStringStatus.PERMANENT_TRANSLATION_FAILURE.value): lse.error_code = LangStringStatus.TOO_MANY_TRANSIENTS.value if error_description: lid = lse.locale_identification_data_json lid['error_desc'] = error_description lse.locale_identification_data_json = lid def has_fatal_error(self, lse): return lse.error_code >= LangStringStatus.PERMANENT_TRANSLATION_FAILURE.value @staticmethod def decode_exception(e, identify_phase=False): if isinstance(e, LangDetectException): return LangStringStatus.CANNOT_IDENTIFY, str(e) return LangStringStatus.UNKNOWN_ERROR, str(e) class AbstractTranslationService(LanguageIdentificationService): # Should we identify before translating? distinct_identify_step = True def serviceData(self): return {"translation_notice": "Machine-translated", "idiosyncrasies": {}} def canTranslate(self, source, target): return False def target_locales(self): return () def target_locale_labels(self, target_locale): return self.target_locale_labels_for_locales( list(self.target_locales()), target_locale) @abstractmethod def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC if not source or source == LocaleLabel.UNDEFINED: lang, data = self.identify(text) return text, lang def get_mt_name(self, source_name, target_name): return create_mt_code(source_name, target_name) def translate_lse( self, source_lse, target, retranslate=False, is_html=False, constrain_locale_threshold=SECURE_IDENTIFICATION_LIMIT): if not source_lse.value: # don't translate empty strings return source_lse source_locale = source_lse.locale if source_locale == LocaleLabel.NON_LINGUISTIC: return source_lse # TODO: Handle MULTILINGUAL if (source_locale == LocaleLabel.UNDEFINED and self.strlen_nourl(source_lse.value) < 5): source_lse.identify_locale(LocaleLabel.NON_LINGUISTIC, None, True) return source_lse if (source_locale == LocaleLabel.UNDEFINED and self.distinct_identify_step): self.confirm_locale( source_lse, constrain_locale_threshold=constrain_locale_threshold) # TODO: bail if identification failed source_locale = source_lse.locale # TODO: Handle script differences if (locale_compatible(source_locale, target)): return source_lse target_lse = None is_new_lse = False if (source_locale != LocaleLabel.UNDEFINED or not self.distinct_identify_step or self.has_fatal_error(source_lse)): # We try to avoid ???-mt-from-und locales in the DB. # This is only stored if both identification and translation # failed to identify a language. target_lse = source_lse.langstring.entries_as_dict.get(target, None) if target_lse and not retranslate: if self.has_fatal_error(target_lse): return target_lse if target_lse is None: target_lse = LangStringEntry( langstring_id=source_lse.langstring_id, locale = LocaleLabel.UNDEFINED, mt_trans_of_id=source_lse.id, value='') is_new_lse = True if self.canTranslate(source_locale, target): try: trans, lang = self.translate( source_lse.value, target, is_html, source=(source_locale if source_locale != LocaleLabel.UNDEFINED else None), db=source_lse.db) lang = self.asPosixLocale(lang) # What if detected language is not a discussion language? if source_locale == LocaleLabel.UNDEFINED: if constrain_locale_threshold and ( self.strlen_nourl(source_lse.value) < constrain_locale_threshold): if (not lang) or not any_locale_compatible( lang, self.discussion.discussion_locales): self.set_error( source_lse, LangStringStatus.IDENTIFIED_TO_UNKNOWN, "Identified to "+lang) return source_lse source_lse.identify_locale(lang, dict( service=self.__class__.__name__)) # This should never actually happen, because # it would mean that the language id. was forgotten. # Still, to be sure that all cases are covered. other_target_lse = source_lse.langstring.entries_as_dict.get(target, None) if other_target_lse: target_lse = other_target_lse is_new_lse = False source_locale = source_lse.locale if locale_compatible(source_locale, target): return source_lse target_lse.value = trans target_lse.error_count = 0 target_lse.error_code = None target_lse.locale_identification_data_json = dict( service=self.__class__.__name__) if trans.strip() == source_lse.value.strip(): # TODO: Check modulo spaces in the middle target_lse.error_count = 1 target_lse.error_code = \ LangStringStatus.IDENTICAL_TRANSLATION.value except Exception as e: print_exc() self.set_error(target_lse, *self.decode_exception(e)) target_lse.value = None else: # Note: when retranslating, we may lose a valid translation. if source_locale == LocaleLabel.UNDEFINED: if not self.distinct_identify_step: # At least do this much. self.confirm_locale(source_lse) source_locale = source_lse.locale self.set_error( target_lse, LangStringStatus.CANNOT_TRANSLATE, "cannot translate") target_lse.value = None if (not target_lse.locale or (source_locale != LocaleLabel.UNDEFINED and target_lse.locale == LocaleLabel.UNDEFINED)): target_lse.locale = target if is_new_lse: source_lse.db.add(target_lse) return target_lse class DummyTranslationServiceTwoSteps(AbstractTranslationService): def canTranslate(cls, source, target): return True def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC if not source: source, _ = self.identify(text) return u"Pseudo-translation from %s to %s of: %s" % ( source or LocaleLabel.UNDEFINED, target, text), source def target_locale_labels(self, target_locale): return LocaleLabel.names_in_locale(target_locale) class DummyTranslationServiceOneStep(DummyTranslationServiceTwoSteps): distinct_identify_step = False class DummyTranslationServiceTwoStepsWithErrors( DummyTranslationServiceTwoSteps): def identify( self, text, expected_locales=None, constrain_locale_threshold=SECURE_IDENTIFICATION_LIMIT): from random import random if random() > 0.9: raise RuntimeError() return super(DummyTranslationServiceTwoStepsWithErrors, self).identify( text, expected_locales, constrain_locale_threshold) def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC from random import random if random() > 0.8: raise RuntimeError() return super(DummyTranslationServiceTwoStepsWithErrors, self ).translate(text, target, is_html, source=source, db=db) class DummyTranslationServiceOneStepWithErrors(DummyTranslationServiceOneStep): def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC from random import random if random() > 0.8: raise RuntimeError() if source is None or source == LocaleLabel.UNDEFINED: source, _ = self.identify(text) return super(DummyTranslationServiceOneStepWithErrors, self).translate( text, target, is_html, source=source, db=db) class DummyGoogleTranslationService(AbstractTranslationService): # Uses public Google API. For testing purposes. Do NOT use in production. _known_locales = { 'af', 'am', 'ar', 'az', 'be', 'bg', 'bn', 'bs', 'ca', 'ceb', 'co', 'cs', 'cy', 'da', 'de', 'el', 'en', 'eo', 'es', 'et', 'eu', 'fa', 'fi', 'fr', 'fy', 'ga', 'gd', 'gl', 'gu', 'ha', 'haw', 'iw', 'hi', 'hmn', 'hr', 'ht', 'hu', 'hy', 'id', 'ig', 'is', 'it', 'ja', 'jw', 'ka', 'kk', 'km', 'kn', 'ko', 'ku', 'ky', 'la', 'lb', 'lo', 'lt', 'lv', 'mg', 'mi', 'mk', 'ml', 'mn', 'mr', 'ms', 'mt', 'my', 'ne', 'nl', 'no', 'ny', 'pa', 'pl', 'ps', 'pt', 'ro', 'ru', 'sd', 'si', 'sk', 'sl', 'sm', 'sn', 'so', 'sq', 'sr', 'st', 'su', 'sv', 'sw', 'ta', 'te', 'tg', 'th', 'tl', 'tr', 'uk', 'ur', 'uz', 'vi', 'xh', 'yi', 'yo', 'zh', 'zh-TW', 'zu'} known_locales_cls = _known_locales known_locales = known_locales_cls idiosyncrasies = { "zh-TW": "zh_Hant_TW", "zh": "zh_Hans_CN", "jw": "jv", "iw": "he" } idiosyncrasies_reverse = {v: k for (k, v) in idiosyncrasies.items()} agents = {'User-Agent':"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30)"} @classmethod def target_localesC(cls): return (cls.asPosixLocale(loc) for loc in cls.known_locales) def target_locales(self): return self.target_localesC() @classmethod def target_locale_labels_cls(cls, target_locale): return cls.target_locale_labels_for_locales( cls.target_localesC(), target_locale) def serviceData(self): return {"translation_notice": _("Translated by Google Translate"), "translation_notice_url": "http://translate.google.com", "idiosyncrasies": self.idiosyncrasies_reverse} def asKnownLocale(self, locale_code): parts = locale_code.split("_") base = parts[0] if base == "zh" and len(parts) > 1: p1 = parts[1] if p1 in ("Hans", "CN"): return "zh" # zh_Hans_CN elif p1 in ("Hant", "TW", "HK", "SG", "MO"): return "zh-TW" else: return base if base in self.known_locales: return base if base in self.idiosyncrasies_reverse: return self.idiosyncrasies_reverse[base] def get_mt_name(self, source_name, target_name): return super(DummyGoogleTranslationService, self).get_mt_name( self.asPosixLocale(source_name), self.asPosixLocale(target_name)) def canTranslate(self, source, target): return ((source == LocaleLabel.UNDEFINED or self.asKnownLocale(source)) and self.asKnownLocale(target)) def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC # Initial implementation from # https://github.com/mouuff/Google-Translate-API link = "http://translate.google.com/m?hl=%s&sl=%s&q=%s" % ( self.asKnownLocale(target), self.asKnownLocale(source) if source else "", text.replace(" ", "+")) request = urllib.request.Request(link, headers=self.agents) page = urllib.request.urlopen(request).read() before_trans = 'class="t0">' result = page[page.find(before_trans)+len(before_trans):] result = result.split("<")[0] return result, source class GoogleTranslationService(DummyGoogleTranslationService): distinct_identify_step = False def __init__(self, discussion, apikey=None): super(GoogleTranslationService, self).__init__(discussion) import apiclient.discovery apikey = (discussion.preferences['translation_service_api_key'] or config.get("google.server_api_key")) self._known_locales = None self.client = apiclient.discovery.build( 'translate', 'v2', developerKey=apikey) if apikey else None @staticmethod def unescape_text(text): return unescape(text) @staticmethod def unescape_html(text): # TODO: copy HTMLEntities.unescape but leaving # &, <, > alone. Just leave it unchanged for now. return text @staticmethod def unescape_string(text, is_html): if is_html: return GoogleTranslationService.unescape_html(text) else: return GoogleTranslationService.unescape_text(text) @property def known_locales(self): if not self.client: return self.known_locales_cls if self._known_locales is None: try: r = self.client.languages().list().execute() if r[u'languages']: self._known_locales = [ x[u'language'] for x in r[u'languages']] if set(self._known_locales) != set( DummyGoogleTranslationService._known_locales): from ..lib.raven_client import capture_message capture_message("google changed its language set again") except: return self.known_locales_cls return self._known_locales def identify( self, text, expected_locales=None, constrain_locale_threshold=SECURE_IDENTIFICATION_LIMIT): if not text: return LocaleLabel.UNDEFINED, {LocaleLabel.UNDEFINED: 1} if not self.client or self.strlen_nourl(text) >= SECURE_IDENTIFICATION_LIMIT: # Save money by avoiding the identification step when the text is long enough. return super(GoogleTranslationService, self).identify( text, expected_locales, constrain_locale_threshold) r = self.client.detections().list(q=text).execute() r = r[u"detections"][0] # small correction for expected languages, as this service is deemed reliable. priors = self.convert_to_priors(expected_locales, 0.8) r.sort(lambda x: x[u"confidence"] * priors.get(x[u'language'], 0.8), reverse=True) # Not sure about how to interpret isReliable, # it seems to always be false. return self.asPosixLocale(r[0][u"language"]), { self.asPosixLocale(x[u'language']): x[u'confidence'] for x in r} def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC if not self.client: from googleapiclient.http import HttpError raise HttpError(401, '{"error":"Please define server_api_key"}') r = self.client.translations().list( q=text, format="html" if is_html else "text", target=self.asKnownLocale(target), source=self.asKnownLocale(source) if source else None).execute() if source is None: source = self.asPosixLocale( r[u"translations"][0][u'detectedSourceLanguage']) translated = r[u"translations"][0][u'translatedText'] # Google uses unnecessary entities in translation translated = self.unescape_string(translated, is_html) return translated, source def decode_exception(self, exception, identify_phase=False): from googleapiclient.http import HttpError import socket if isinstance(exception, socket.error): return LangStringStatus.SERVICE_DOWN, str(exception) elif isinstance(exception, HttpError): status = exception.resp try: status = getattr(status, "status") except: pass content = json.loads(exception.content) if status == 403: return (LangStringStatus.QUOTA_ERROR, content) if 400 <= status < 500: return (LangStringStatus.CANNOT_IDENTIFY if identify_phase else LangStringStatus.PERMANENT_TRANSLATION_FAILURE, content) elif 500 <= status < 600: return LangStringStatus.TRANSLATION_FAILURE, content # make it permanent after awhile? return LangStringStatus.UNKNOWN_ERROR, content return LangStringStatus.UNKNOWN_ERROR, str(exception) class DeeplTranslationService(AbstractTranslationService): distinct_identify_step = False known_locales_cls = { "de", "en", "fr", "it", "ja", "es", "nl", "pl", "pt", "ru", "zh"} _known_locales = None server = 'https://api.deepl.com/v2/' translate_url = server + 'translate' language_url = server + 'languages' def serviceData(self): return {"translation_notice": _("Translated by Deepl"), "translation_notice_url": "http://deepl.com", "idiosyncrasies": {}} def __init__(self, discussion, apikey=None): super(DeeplTranslationService, self).__init__(discussion) self.apikey = ( discussion.preferences['translation_service_api_key'] or config.get("deepl.server_api_key"), None) self._known_locales = None def canTranslate(self, source, target): return ((source == LocaleLabel.UNDEFINED or source in self.known_locales) and target in self.known_locales) @property def known_locales(self): if self._known_locales is None and self.apikey: try: r = requests.get( self.language_url, params=dict(auth_key=self.apikey), timeout=(2, 5)) if r.ok: self._known_locales = { x['language'].lower() for x in r.json()} if self._known_locales != self.known_locales_cls: from ..lib.raven_client import capture_message capture_message("Deepl changed its language set again") except Exception: # assume we have correct info self._known_locales = self.known_locales_cls return self._known_locales or self.known_locales_cls @classmethod def target_locale_labels_cls(cls, target_locale): return cls.target_locale_labels_for_locales( cls.known_locales_cls, target_locale) def translate(self, text, target, is_html=False, source=None, db=None): if not text: return text, LocaleLabel.NON_LINGUISTIC if not self.apikey: raise RuntimeError("Please define server_api_key") args = dict( auth_key=self.apikey, text=text, target_lang=target.upper(), split_sentences="nonewlines" if is_html else "1", tag_handling="xml" if is_html else "" ) if is_html: args['tag_handling'] = "xml" if source: args['source_lang'] = source.upper() r = requests.get( self.translate_url, params=args, timeout=(2, 3+floor(len(text)/100))) if not r.ok: return RuntimeError("status", r.status) r = r.json()['translations'][0] return r['text'], r['detected_source_language'].lower() def decode_exception(self, exception, identify_phase=False): if isinstance(exception, requests.Timeout): return LangStringStatus.SERVICE_DOWN, str(exception) if isinstance(exception, RuntimeError): if exception.args[0] == "status": status = exception.args[1] if status in (456,): return LangStringStatus.QUOTA_ERROR, "" if status in (400, 403, 404): return LangStringStatus.PERMANENT_TRANSLATION_FAILURE, "" elif status in (503, 429): return LangStringStatus.SERVICE_DOWN, "" return LangStringStatus.UNKNOWN_ERROR, ""