dosage/dosagelib/util.py

341 lines
10 KiB
Python
Raw Normal View History

# -*- coding: iso-8859-1 -*-
# Copyright (C) 2004-2005 Tristan Seligmann and Jonathan Jacobs
# Copyright (C) 2012 Bastian Kleineidam
from __future__ import division, print_function
2012-06-20 19:58:13 +00:00
import urllib, urllib2, urlparse
import requests
2012-06-20 19:58:13 +00:00
import sys
import os
import cgi
import re
import traceback
import time
from htmlentitydefs import name2codepoint
from .output import out
from .configuration import UserAgent, AppName, App, SupportUrl
2012-06-20 20:33:26 +00:00
from .fileutil import has_module, is_tty
if os.name == 'nt':
from . import colorama
2012-06-20 20:33:26 +00:00
has_curses = has_module("curses")
2012-06-20 19:58:13 +00:00
2012-10-11 10:03:12 +00:00
MAX_FILESIZE = 1024*1024*1 # 1MB
2012-06-20 19:58:13 +00:00
2012-10-12 19:11:44 +00:00
def tagre(tag, attribute, value, quote='"', before="", after=""):
2012-10-11 10:03:12 +00:00
"""Return a regular expression matching the given HTML tag, attribute
and value. It matches the tag and attribute names case insensitive,
and skips arbitrary whitespace and leading HTML attributes. The "<>" at
the start and end of the HTML tag is also matched.
@param tag: the tag name
@ptype tag: string
@param attribute: the attribute name
@ptype attribute: string
@param value: the attribute value
@ptype value: string
2012-10-11 13:43:29 +00:00
@param quote: the attribute quote (default ")
@ptype quote: string
2012-10-12 19:11:44 +00:00
@param after: match after attribute value but before end
@ptype after: string
2012-10-11 10:03:12 +00:00
@return: the generated regular expression suitable for re.compile()
@rtype: string
"""
if before:
prefix = r"[^>]*%s[^>]*\s+" % before
else:
prefix = r"(?:[^>]*\s+)?"
2012-10-11 10:03:12 +00:00
attrs = dict(
tag=case_insensitive_re(tag),
attribute=case_insensitive_re(attribute),
value=value,
2012-10-11 13:43:29 +00:00
quote=quote,
prefix=prefix,
2012-10-12 19:11:44 +00:00
after=after,
2012-10-11 10:03:12 +00:00
)
return r'<\s*%(tag)s\s+%(prefix)s%(attribute)s\s*=\s*%(quote)s%(value)s%(quote)s[^>]*%(after)s[^>]*>' % attrs
2012-06-20 19:58:13 +00:00
2012-10-11 10:03:12 +00:00
def case_insensitive_re(name):
"""Reformat the given name to a case insensitive regular expression string
without using re.IGNORECASE. This way selective strings can be made case
insensitive.
@param name: the name to make case insensitive
@ptype name: string
@return: the case insenstive regex
@rtype: string
"""
return "".join("[%s%s]" % (c.lower(), c.upper()) for c in name)
2012-06-20 19:58:13 +00:00
2012-10-11 10:03:12 +00:00
baseSearch = re.compile(tagre("base", "href", '([^"]*)'))
2012-06-20 19:58:13 +00:00
2012-10-11 10:03:12 +00:00
def getPageContent(url):
# read page data
page = urlopen(url)
data = page.text
2012-10-11 10:03:12 +00:00
# determine base URL
baseUrl = None
match = baseSearch.search(data)
if match:
baseUrl = match.group(1)
2012-06-20 19:58:13 +00:00
else:
2012-10-11 10:03:12 +00:00
baseUrl = url
return data, baseUrl
2012-06-20 19:58:13 +00:00
2012-10-11 16:16:29 +00:00
def fetchUrl(url, urlSearch):
2012-10-11 10:03:12 +00:00
data, baseUrl = getPageContent(url)
2012-10-11 16:16:29 +00:00
match = urlSearch.search(data)
2012-06-20 19:58:13 +00:00
if match:
2012-10-11 10:03:12 +00:00
searchUrl = match.group(1)
2012-10-11 16:16:29 +00:00
if not searchUrl:
raise ValueError("Match empty URL at %s with pattern %s" % (url, urlSearch.pattern))
2012-10-11 10:03:12 +00:00
out.write('matched URL %r' % searchUrl, 2)
2012-11-21 20:57:26 +00:00
return normaliseURL(urlparse.urljoin(baseUrl, searchUrl))
2012-06-20 19:58:13 +00:00
return None
2012-10-11 10:03:12 +00:00
def fetchUrls(url, imageSearch, prevSearch=None):
data, baseUrl = getPageContent(url)
# match images
imageUrls = set()
for match in imageSearch.finditer(data):
imageUrl = match.group(1)
2012-10-11 16:16:29 +00:00
if not imageUrl:
raise ValueError("Match empty image URL at %s with pattern %s" % (url, imageSearch.pattern))
out.write('matched image URL %r with pattern %s' % (imageUrl, imageSearch.pattern), 2)
2012-11-21 20:57:26 +00:00
imageUrls.add(normaliseURL(urlparse.urljoin(baseUrl, imageUrl)))
2012-10-11 10:03:12 +00:00
if not imageUrls:
2012-10-11 13:17:08 +00:00
out.write("warning: no images found at %s with pattern %s" % (url, imageSearch.pattern))
2012-10-11 10:03:12 +00:00
if prevSearch is not None:
# match previous URL
match = prevSearch.search(data)
if match:
prevUrl = match.group(1)
2012-10-11 16:16:29 +00:00
if not prevUrl:
raise ValueError("Match empty previous URL at %s with pattern %s" % (url, prevSearch.pattern))
2012-10-11 10:03:12 +00:00
out.write('matched previous URL %r' % prevUrl, 2)
2012-11-21 20:57:26 +00:00
prevUrl = normaliseURL(urlparse.urljoin(baseUrl, prevUrl))
2012-10-11 10:03:12 +00:00
else:
2012-10-11 13:43:29 +00:00
out.write('no previous URL %s at %s' % (prevSearch.pattern, url), 2)
2012-10-11 10:03:12 +00:00
prevUrl = None
return imageUrls, prevUrl
2012-11-21 20:57:26 +00:00
return imageUrls, None
2012-06-20 19:58:13 +00:00
def unescape(text):
2012-06-20 19:58:13 +00:00
"""
Replace HTML entities and character references.
"""
def _fixup(m):
text = m.group(0)
if text[:2] == "&#":
# character reference
try:
if text[:3] == "&#x":
text = unichr(int(text[3:-1], 16))
else:
text = unichr(int(text[2:-1]))
except ValueError:
pass
else:
# named entity
try:
text = unichr(name2codepoint[text[1:-1]])
except KeyError:
pass
if isinstance(text, unicode):
text = text.encode('utf-8')
text = urllib2.quote(text, safe=';/?:@&=+$,')
return text
2012-11-21 20:57:26 +00:00
return re.sub(r"&#?\w+;", _fixup, text)
2012-06-20 19:58:13 +00:00
def normaliseURL(url):
"""
Removes any leading empty segments to avoid breaking urllib2; also replaces
HTML entities and character references.
"""
# XXX: brutal hack
url = unescape(url)
2012-06-20 19:58:13 +00:00
pu = list(urlparse.urlparse(url))
2012-11-21 20:57:26 +00:00
segments = pu[2].split('/')
2012-06-20 19:58:13 +00:00
while segments and segments[0] == '':
del segments[0]
2012-11-21 20:57:26 +00:00
pu[2] = '/' + '/'.join(segments).replace(' ', '%20')
2012-11-14 19:23:30 +00:00
# remove leading '&' from query
2012-11-21 20:57:26 +00:00
if pu[4].startswith('&'):
pu[4] = pu[4][1:]
# remove anchor
pu[5] = ""
2012-06-20 19:58:13 +00:00
return urlparse.urlunparse(pu)
2012-11-21 20:57:26 +00:00
2012-10-11 17:53:10 +00:00
def urlopen(url, referrer=None, retries=3, retry_wait_seconds=5):
out.write('Open URL %s' % url, 2)
2012-09-27 19:24:28 +00:00
assert retries >= 0, 'invalid retry value %r' % retries
assert retry_wait_seconds > 0, 'invalid retry seconds value %r' % retry_wait_seconds
headers = {'User-Agent': UserAgent}
config = {"max_retries": retries}
2012-06-20 19:58:13 +00:00
if referrer:
headers['Referer'] = referrer
try:
req = requests.get(url, headers=headers, config=config)
req.raise_for_status()
return req
except requests.exceptions.RequestException as err:
msg = 'URL retrieval of %s failed: %s' % (url, err)
out.write(msg)
raise IOError(msg)
2012-06-20 19:58:13 +00:00
2012-06-20 20:33:26 +00:00
def get_columns (fp):
"""Return number of columns for given file."""
if not is_tty(fp):
return 80
2012-09-27 19:59:11 +00:00
if os.name == 'nt':
return colorama.get_console_size().X
2012-06-20 20:33:26 +00:00
if has_curses:
import curses
try:
2012-09-27 19:59:11 +00:00
curses.setupterm(os.environ.get("TERM"), fp.fileno())
2012-06-20 20:33:26 +00:00
return curses.tigetnum("cols")
except curses.error:
pass
return 80
2012-06-20 19:58:13 +00:00
def splitpath(path):
c = []
head, tail = os.path.split(path)
while tail:
c.insert(0, tail)
head, tail = os.path.split(head)
return c
2012-11-21 20:57:26 +00:00
2012-06-20 19:58:13 +00:00
def getRelativePath(basepath, path):
basepath = splitpath(os.path.abspath(basepath))
path = splitpath(os.path.abspath(path))
afterCommon = False
for c in basepath:
if afterCommon or path[0] != c:
path.insert(0, os.path.pardir)
afterCommon = True
else:
del path[0]
return os.path.join(*path)
2012-11-21 20:57:26 +00:00
2012-06-20 19:58:13 +00:00
def getQueryParams(url):
query = urlparse.urlsplit(url)[3]
out.write('Extracting query parameters from %r (%r)...' % (url, query), 3)
return cgi.parse_qs(query)
def internal_error(out=sys.stderr, etype=None, evalue=None, tb=None):
"""Print internal error message (output defaults to stderr)."""
print(os.linesep, file=out)
print("""********** Oops, I did it again. *************
2012-06-20 19:58:13 +00:00
You have found an internal error in %(app)s. Please write a bug report
at %(url)s and include at least the information below:
2012-06-20 19:58:13 +00:00
Not disclosing some of the information below due to privacy reasons is ok.
2012-06-20 19:58:13 +00:00
I will try to help you nonetheless, but you have to give me something
I can work with ;) .
""" % dict(app=AppName, url=SupportUrl), file=out)
2012-06-20 19:58:13 +00:00
if etype is None:
etype = sys.exc_info()[0]
if evalue is None:
evalue = sys.exc_info()[1]
2012-11-21 20:57:26 +00:00
print(etype, evalue, file=out)
2012-06-20 19:58:13 +00:00
if tb is None:
tb = sys.exc_info()[2]
traceback.print_exception(etype, evalue, tb, None, out)
print_app_info(out=out)
print_proxy_info(out=out)
print_locale_info(out=out)
print(os.linesep,
"******** %s internal error, over and out ********" % AppName, file=out)
2012-06-20 19:58:13 +00:00
def print_env_info(key, out=sys.stderr):
"""If given environment key is defined, print it out."""
value = os.getenv(key)
if value is not None:
print(key, "=", repr(value), file=out)
2012-06-20 19:58:13 +00:00
def print_proxy_info(out=sys.stderr):
"""Print proxy info."""
print_env_info("http_proxy", out=out)
def print_locale_info(out=sys.stderr):
"""Print locale info."""
for key in ("LANGUAGE", "LC_ALL", "LC_CTYPE", "LANG"):
print_env_info(key, out=out)
def print_app_info(out=sys.stderr):
"""Print system and application info (output defaults to stderr)."""
print("System info:", file=out)
print(App, file=out)
print("Python %(version)s on %(platform)s" %
{"version": sys.version, "platform": sys.platform}, file=out)
2012-06-20 19:58:13 +00:00
stime = strtime(time.time())
print("Local time:", stime, file=out)
print("sys.argv", sys.argv, file=out)
2012-06-20 19:58:13 +00:00
def strtime(t):
"""Return ISO 8601 formatted time."""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)) + \
strtimezone()
def strtimezone():
"""Return timezone info, %z on some platforms, but not supported on all.
"""
if time.daylight:
zone = time.altzone
else:
zone = time.timezone
return "%+04d" % (-zone//3600)
2012-11-26 06:14:02 +00:00
def asciify(name):
"""Remove non-ascii characters from string."""
return re.sub("[^0-9a-zA-Z_]", "", name)
def unquote(text):
while '%' in text:
text = urllib.unquote(text)
return text
2012-12-02 17:35:06 +00:00
def strsize (b):
"""Return human representation of bytes b. A negative number of bytes
raises a value error."""
if b < 0:
raise ValueError("Invalid negative byte number")
if b < 1024:
return "%dB" % b
if b < 1024 * 10:
return "%dKB" % (b // 1024)
if b < 1024 * 1024:
return "%.2fKB" % (float(b) / 1024)
if b < 1024 * 1024 * 10:
return "%.2fMB" % (float(b) / (1024*1024))
if b < 1024 * 1024 * 1024:
return "%.1fMB" % (float(b) / (1024*1024))
if b < 1024 * 1024 * 1024 * 10:
return "%.2fGB" % (float(b) / (1024*1024*1024))
return "%.1fGB" % (float(b) / (1024*1024*1024))