0428fd52b3
We don't need to test the "navigation" of each and every comic on the same hoster, if those test give us no new information (this is true for most "modern" hosters which don't allow individual designs/HTML per comic)
156 lines
5.3 KiB
Python
156 lines
5.3 KiB
Python
# SPDX-License-Identifier: MIT
|
|
# Copyright (C) 2004-2008 Tristan Seligmann and Jonathan Jacobs
|
|
# Copyright (C) 2012-2014 Bastian Kleineidam
|
|
# Copyright (C) 2015-2021 Tobias Gruetzmacher
|
|
import json
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import warnings
|
|
from urllib.parse import urlsplit
|
|
|
|
|
|
# Dictionary with per-host locks.
|
|
_locks = {}
|
|
# Allowed number of connections per host
|
|
MaxConnections = 2
|
|
# Maximum number of strips to get to test a comic
|
|
MaxStrips = 5
|
|
# Match (already-escaped) archive.org URL
|
|
ARCHIVE_ORG_MATCH = re.compile(r'(?<=web\\.archive\\.org/web)/\d+/')
|
|
# Matches some (maybe-escaped - because Python 2) printf-style format specifiers
|
|
PRINTF_MATCH = re.compile(r'\\?%[0-9]*[sd]')
|
|
# Classes where the modules are very similar, so that testing the history of
|
|
# each modules doesn't make much sense
|
|
standarized_modules = {
|
|
'ComicSherpa',
|
|
'ComicsKingdom',
|
|
'GoComics',
|
|
'MangaDex',
|
|
'WebToons',
|
|
}
|
|
# Already seen classes
|
|
seen_modules = set()
|
|
|
|
|
|
def get_lock(host):
|
|
"""Get bounded semphore for given host."""
|
|
if host not in _locks:
|
|
_locks[host] = multiprocessing.BoundedSemaphore(MaxConnections)
|
|
return _locks[host]
|
|
|
|
|
|
def test_comicmodule(tmpdir, scraperobj, worker_id):
|
|
'''Test a scraper. It must be able to traverse backward for at least 5
|
|
strips from the start, and find strip images on at least 4 pages.'''
|
|
# Limit number of connections to one host.
|
|
host = urlsplit(scraperobj.url).hostname
|
|
with get_lock(host):
|
|
maxstrips = MaxStrips
|
|
parts = scraperobj.name.split('/', maxsplit=1)
|
|
if len(parts) > 1 and parts[0] in standarized_modules:
|
|
if parts[0] in seen_modules:
|
|
maxstrips = 1
|
|
else:
|
|
seen_modules.add(parts[0])
|
|
|
|
_test_comic(str(tmpdir), scraperobj, maxstrips)
|
|
|
|
|
|
def _test_comic(outdir, scraperobj, maxstrips):
|
|
num_strips = 0
|
|
strip = None
|
|
files = []
|
|
PROXYMAP.apply(scraperobj.name)
|
|
for strip in scraperobj.getStrips(maxstrips):
|
|
files.append(_check_strip(outdir, strip,
|
|
scraperobj.multipleImagesPerStrip))
|
|
|
|
if num_strips > 0:
|
|
_check_stripurl(strip, scraperobj)
|
|
num_strips += 1
|
|
|
|
if scraperobj.prevSearch and not scraperobj.hitFirstStripUrl:
|
|
# subtract the number of skipped URLs with no image from the expected
|
|
# image number
|
|
num_strips_expected = maxstrips - len(scraperobj.skippedUrls)
|
|
msg = 'Traversed %d strips instead of %d.' % (num_strips,
|
|
num_strips_expected)
|
|
if strip:
|
|
msg += " Check the prevSearch pattern at %s" % strip.strip_url
|
|
assert num_strips == num_strips_expected, msg
|
|
if strip:
|
|
_check_scraperesult(files, num_strips_expected, strip, scraperobj)
|
|
|
|
|
|
def _check_strip(outdir, strip, multipleImagesPerStrip):
|
|
'''Check that a specific page yields images and the comic module correctly
|
|
declares if there are multiple images per page.'''
|
|
images = []
|
|
files = []
|
|
for image in strip.getImages():
|
|
images.append(image.url)
|
|
|
|
# write a fake image (to download less)
|
|
fakeimg = image._fnbase(outdir) + '.fake'
|
|
with open(fakeimg, 'w') as f:
|
|
f.write("fake image for testing")
|
|
|
|
fn, _ = image.save(outdir)
|
|
files.append(fn)
|
|
assert images, 'failed to find images at %s' % strip.strip_url
|
|
if not multipleImagesPerStrip:
|
|
assert len(images) == 1, 'found more than 1 image at {}: {}'.format(
|
|
strip.strip_url, images)
|
|
return files
|
|
|
|
|
|
def _check_scraperesult(saved_images, num_images_expected, strip, scraperobj):
|
|
'''Check that exactly or for multiple pages at least num_strips images are
|
|
saved. This checks saved files, ie. it detects duplicate filenames.'''
|
|
num_images = len(saved_images)
|
|
|
|
attrs = (num_images, saved_images, num_images_expected)
|
|
if scraperobj.multipleImagesPerStrip:
|
|
err = 'saved %d %s instead of at least %d images' % attrs
|
|
assert num_images >= num_images_expected, err
|
|
else:
|
|
err = 'saved %d %s instead of %d images' % attrs
|
|
assert num_images == num_images_expected, err
|
|
|
|
|
|
def _check_stripurl(strip, scraperobj):
|
|
if not scraperobj.stripUrl:
|
|
# no indexing support
|
|
return
|
|
# test that the stripUrl regex matches the retrieved strip URL
|
|
urlmatch = re.escape(scraperobj.stripUrl)
|
|
urlmatch = PRINTF_MATCH.sub('.+', urlmatch)
|
|
urlmatch = ARCHIVE_ORG_MATCH.sub(r'/\\d+/', urlmatch)
|
|
ro = re.compile(urlmatch)
|
|
mo = ro.match(strip.strip_url)
|
|
if not mo:
|
|
warnings.warn('strip URL {!r} does not match stripUrl pattern {}'.format(
|
|
strip.strip_url, urlmatch))
|
|
|
|
|
|
class ProxyConfig:
|
|
"""Loads proxy config from an environment variable and applies it for each test."""
|
|
def __init__(self):
|
|
self.config = {}
|
|
if 'PROXYMAP' in os.environ:
|
|
for regex, server in json.loads(os.environ['PROXYMAP']).items():
|
|
self.config[re.compile(regex)] = server
|
|
|
|
def apply(self, name):
|
|
useserver = ''
|
|
for regex, server in self.config.items():
|
|
if regex.match(name):
|
|
useserver = server
|
|
break
|
|
os.environ['http_proxy'] = useserver
|
|
os.environ['https_proxy'] = useserver
|
|
|
|
|
|
# External proxy config to fetch some modules via proxies
|
|
PROXYMAP = ProxyConfig()
|