Labrys of Knossos f05b09f349 Updates vendored subliminal to 2.1.0
Updates rarfile to 3.1
Updates stevedore to 3.5.0
Updates appdirs to 1.4.4
Updates click to 8.1.3
Updates decorator to 5.1.1
Updates dogpile.cache to 1.1.8
Updates pbr to 5.11.0
Updates pysrt to 1.1.2
Updates pytz to 2022.6
Adds importlib-metadata version 3.1.1
Adds typing-extensions version 4.1.1
Adds zipp version 3.11.0
2022-11-29 00:44:49 -05:00

136 lines
4.3 KiB
Python

# -*- coding: utf-8 -*-
import io
import json
import logging
from zipfile import ZipFile
from babelfish import Language
from guessit import guessit
from requests import Session
from six.moves import urllib
from . import Provider
from ..cache import EPISODE_EXPIRATION_TIME, region
from ..exceptions import ProviderError
from ..matches import guess_matches
from ..subtitle import Subtitle, fix_line_ending
from ..video import Episode
logger = logging.getLogger(__name__)
class ArgenteamSubtitle(Subtitle):
provider_name = 'argenteam'
def __init__(self, language, download_link, series, season, episode, release, version):
super(ArgenteamSubtitle, self).__init__(language, download_link)
self.download_link = download_link
self.series = series
self.season = season
self.episode = episode
self.release = release
self.version = version
@property
def id(self):
return self.download_link
@property
def info(self):
return urllib.parse.unquote(self.download_link.rsplit('/')[-1])
def get_matches(self, video):
matches = guess_matches(video, {
'title': self.series,
'season': self.season,
'episode': self.episode,
'release_group': self.version
})
# resolution
if video.resolution and self.version and video.resolution in self.version.lower():
matches.add('resolution')
matches |= guess_matches(video, guessit(self.version, {'type': 'episode'}), partial=True)
return matches
class ArgenteamProvider(Provider):
provider_name = 'argenteam'
language = Language.fromalpha2('es')
languages = {language}
video_types = (Episode,)
server_url = "http://argenteam.net/api/v1/"
subtitle_class = ArgenteamSubtitle
def __init__(self):
self.session = None
def initialize(self):
self.session = Session()
self.session.headers['User-Agent'] = self.user_agent
def terminate(self):
self.session.close()
@region.cache_on_arguments(expiration_time=EPISODE_EXPIRATION_TIME, should_cache_fn=lambda value: value)
def search_episode_id(self, series, season, episode):
"""Search the episode id from the `series`, `season` and `episode`.
:param str series: series of the episode.
:param int season: season of the episode.
:param int episode: episode number.
:return: the episode id, if any.
:rtype: int or None
"""
# make the search
query = '%s S%#02dE%#02d' % (series, season, episode)
logger.info('Searching episode id for %r', query)
r = self.session.get(self.server_url + 'search', params={'q': query}, timeout=10)
r.raise_for_status()
results = json.loads(r.text)
if results['total'] == 1:
return results['results'][0]['id']
logger.error('No episode id found for %r', series)
def query(self, series, season, episode):
episode_id = self.search_episode_id(series, season, episode)
if episode_id is None:
return []
response = self.session.get(self.server_url + 'episode', params={'id': episode_id}, timeout=10)
response.raise_for_status()
content = json.loads(response.text)
subtitles = []
for r in content['releases']:
for s in r['subtitles']:
subtitle = self.subtitle_class(self.language, s['uri'], series, season, episode, r['team'], r['tags'])
logger.debug('Found subtitle %r', subtitle)
subtitles.append(subtitle)
return subtitles
def list_subtitles(self, video, languages):
titles = [video.series] + video.alternative_series
for title in titles:
subs = self.query(title, video.season, video.episode)
if subs:
return subs
return []
def download_subtitle(self, subtitle):
# download as a zip
logger.info('Downloading subtitle %r', subtitle)
r = self.session.get(subtitle.download_link, timeout=10)
r.raise_for_status()
# open the zip
with ZipFile(io.BytesIO(r.content)) as zf:
if len(zf.namelist()) > 1:
raise ProviderError('More than one file to unzip')
subtitle.content = fix_line_ending(zf.read(zf.namelist()[0]))