2020-03-23 18:45:35 -07:00

540 lines
20 KiB
Python

from __future__ import print_function, absolute_import
import os
import tempfile
import unittest
import sys
import re
import warnings
import io
from textwrap import dedent
from future.utils import bind_method, PY26, PY3, PY2, PY27
from future.moves.subprocess import check_output, STDOUT, CalledProcessError
if PY26:
import unittest2 as unittest
def reformat_code(code):
"""
Removes any leading \n and dedents.
"""
if code.startswith('\n'):
code = code[1:]
return dedent(code)
def order_future_lines(code):
"""
Returns the code block with any ``__future__`` import lines sorted, and
then any ``future`` import lines sorted, then any ``builtins`` import lines
sorted.
This only sorts the lines within the expected blocks.
See test_order_future_lines() for an example.
"""
# We need .splitlines(keepends=True), which doesn't exist on Py2,
# so we use this instead:
lines = code.split('\n')
uufuture_line_numbers = [i for i, line in enumerate(lines)
if line.startswith('from __future__ import ')]
future_line_numbers = [i for i, line in enumerate(lines)
if line.startswith('from future')
or line.startswith('from past')]
builtins_line_numbers = [i for i, line in enumerate(lines)
if line.startswith('from builtins')]
assert code.lstrip() == code, ('internal usage error: '
'dedent the code before calling order_future_lines()')
def mymax(numbers):
return max(numbers) if len(numbers) > 0 else 0
def mymin(numbers):
return min(numbers) if len(numbers) > 0 else float('inf')
assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \
'the __future__ and future imports are out of order'
# assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \
# 'the future and builtins imports are out of order'
uul = sorted([lines[i] for i in uufuture_line_numbers])
sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul))
fl = sorted([lines[i] for i in future_line_numbers])
sorted_future_lines = dict(zip(future_line_numbers, fl))
bl = sorted([lines[i] for i in builtins_line_numbers])
sorted_builtins_lines = dict(zip(builtins_line_numbers, bl))
# Replace the old unsorted "from __future__ import ..." lines with the
# new sorted ones:
new_lines = []
for i in range(len(lines)):
if i in uufuture_line_numbers:
new_lines.append(sorted_uufuture_lines[i])
elif i in future_line_numbers:
new_lines.append(sorted_future_lines[i])
elif i in builtins_line_numbers:
new_lines.append(sorted_builtins_lines[i])
else:
new_lines.append(lines[i])
return '\n'.join(new_lines)
class VerboseCalledProcessError(CalledProcessError):
"""
Like CalledProcessError, but it displays more information (message and
script output) for diagnosing test failures etc.
"""
def __init__(self, msg, returncode, cmd, output=None):
self.msg = msg
self.returncode = returncode
self.cmd = cmd
self.output = output
def __str__(self):
return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %s"
% (self.cmd, self.returncode, self.msg, self.output))
class FuturizeError(VerboseCalledProcessError):
pass
class PasteurizeError(VerboseCalledProcessError):
pass
class CodeHandler(unittest.TestCase):
"""
Handy mixin for test classes for writing / reading / futurizing /
running .py files in the test suite.
"""
def setUp(self):
"""
The outputs from the various futurize stages should have the
following headers:
"""
# After stage1:
# TODO: use this form after implementing a fixer to consolidate
# __future__ imports into a single line:
# self.headers1 = """
# from __future__ import absolute_import, division, print_function
# """
self.headers1 = reformat_code("""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
""")
# After stage2 --all-imports:
# TODO: use this form after implementing a fixer to consolidate
# __future__ imports into a single line:
# self.headers2 = """
# from __future__ import (absolute_import, division,
# print_function, unicode_literals)
# from future import standard_library
# from future.builtins import *
# """
self.headers2 = reformat_code("""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future import standard_library
standard_library.install_aliases()
from builtins import *
""")
self.interpreters = [sys.executable]
self.tempdir = tempfile.mkdtemp() + os.path.sep
pypath = os.getenv('PYTHONPATH')
if pypath:
self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath}
else:
self.env = {'PYTHONPATH': os.getcwd()}
def convert(self, code, stages=(1, 2), all_imports=False, from3=False,
reformat=True, run=True, conservative=False):
"""
Converts the code block using ``futurize`` and returns the
resulting code.
Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or
``stage2`` to ``futurize``. Passing both stages runs ``futurize``
with both stages by default.
If from3 is False, runs ``futurize``, converting from Python 2 to
both 2 and 3. If from3 is True, runs ``pasteurize`` to convert
from Python 3 to both 2 and 3.
Optionally reformats the code block first using the reformat() function.
If run is True, runs the resulting code under all Python
interpreters in self.interpreters.
"""
if reformat:
code = reformat_code(code)
self._write_test_script(code)
self._futurize_test_script(stages=stages, all_imports=all_imports,
from3=from3, conservative=conservative)
output = self._read_test_script()
if run:
for interpreter in self.interpreters:
_ = self._run_test_script(interpreter=interpreter)
return output
def compare(self, output, expected, ignore_imports=True):
"""
Compares whether the code blocks are equal. If not, raises an
exception so the test fails. Ignores any trailing whitespace like
blank lines.
If ignore_imports is True, passes the code blocks into the
strip_future_imports method.
If one code block is a unicode string and the other a
byte-string, it assumes the byte-string is encoded as utf-8.
"""
if ignore_imports:
output = self.strip_future_imports(output)
expected = self.strip_future_imports(expected)
if isinstance(output, bytes) and not isinstance(expected, bytes):
output = output.decode('utf-8')
if isinstance(expected, bytes) and not isinstance(output, bytes):
expected = expected.decode('utf-8')
self.assertEqual(order_future_lines(output.rstrip()),
expected.rstrip())
def strip_future_imports(self, code):
"""
Strips any of these import lines:
from __future__ import <anything>
from future <anything>
from future.<anything>
from builtins <anything>
or any line containing:
install_hooks()
or:
install_aliases()
Limitation: doesn't handle imports split across multiple lines like
this:
from __future__ import (absolute_import, division, print_function,
unicode_literals)
"""
output = []
# We need .splitlines(keepends=True), which doesn't exist on Py2,
# so we use this instead:
for line in code.split('\n'):
if not (line.startswith('from __future__ import ')
or line.startswith('from future ')
or line.startswith('from builtins ')
or 'install_hooks()' in line
or 'install_aliases()' in line
# but don't match "from future_builtins" :)
or line.startswith('from future.')):
output.append(line)
return '\n'.join(output)
def convert_check(self, before, expected, stages=(1, 2), all_imports=False,
ignore_imports=True, from3=False, run=True,
conservative=False):
"""
Convenience method that calls convert() and compare().
Reformats the code blocks automatically using the reformat_code()
function.
If all_imports is passed, we add the appropriate import headers
for the stage(s) selected to the ``expected`` code-block, so they
needn't appear repeatedly in the test code.
If ignore_imports is True, ignores the presence of any lines
beginning:
from __future__ import ...
from future import ...
for the purpose of the comparison.
"""
output = self.convert(before, stages=stages, all_imports=all_imports,
from3=from3, run=run, conservative=conservative)
if all_imports:
headers = self.headers2 if 2 in stages else self.headers1
else:
headers = ''
reformatted = reformat_code(expected)
if headers in reformatted:
headers = ''
self.compare(output, headers + reformatted,
ignore_imports=ignore_imports)
def unchanged(self, code, **kwargs):
"""
Convenience method to ensure the code is unchanged by the
futurize process.
"""
self.convert_check(code, code, **kwargs)
def _write_test_script(self, code, filename='mytestscript.py'):
"""
Dedents the given code (a multiline string) and writes it out to
a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py.
"""
if isinstance(code, bytes):
code = code.decode('utf-8')
# Be explicit about encoding the temp file as UTF-8 (issue #63):
with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f:
f.write(dedent(code))
def _read_test_script(self, filename='mytestscript.py'):
with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f:
newsource = f.read()
return newsource
def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2),
all_imports=False, from3=False,
conservative=False):
params = []
stages = list(stages)
if all_imports:
params.append('--all-imports')
if from3:
script = 'pasteurize.py'
else:
script = 'futurize.py'
if stages == [1]:
params.append('--stage1')
elif stages == [2]:
params.append('--stage2')
else:
assert stages == [1, 2]
if conservative:
params.append('--conservative')
# No extra params needed
# Absolute file path:
fn = self.tempdir + filename
call_args = [sys.executable, script] + params + ['-w', fn]
try:
output = check_output(call_args, stderr=STDOUT, env=self.env)
except CalledProcessError as e:
with open(fn) as f:
msg = (
'Error running the command %s\n'
'%s\n'
'Contents of file %s:\n'
'\n'
'%s') % (
' '.join(call_args),
'env=%s' % self.env,
fn,
'----\n%s\n----' % f.read(),
)
ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeError)
if not hasattr(e, 'output'):
# The attribute CalledProcessError.output doesn't exist on Py2.6
e.output = None
raise ErrorClass(msg, e.returncode, e.cmd, output=e.output)
return output
def _run_test_script(self, filename='mytestscript.py',
interpreter=sys.executable):
# Absolute file path:
fn = self.tempdir + filename
try:
output = check_output([interpreter, fn],
env=self.env, stderr=STDOUT)
except CalledProcessError as e:
with open(fn) as f:
msg = (
'Error running the command %s\n'
'%s\n'
'Contents of file %s:\n'
'\n'
'%s') % (
' '.join([interpreter, fn]),
'env=%s' % self.env,
fn,
'----\n%s\n----' % f.read(),
)
if not hasattr(e, 'output'):
# The attribute CalledProcessError.output doesn't exist on Py2.6
e.output = None
raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.output)
return output
# Decorator to skip some tests on Python 2.6 ...
skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6")
def expectedFailurePY3(func):
if not PY3:
return func
return unittest.expectedFailure(func)
def expectedFailurePY26(func):
if not PY26:
return func
return unittest.expectedFailure(func)
def expectedFailurePY27(func):
if not PY27:
return func
return unittest.expectedFailure(func)
def expectedFailurePY2(func):
if not PY2:
return func
return unittest.expectedFailure(func)
# Renamed in Py3.3:
if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
# From Py3.3:
def assertRegex(self, text, expected_regex, msg=None):
"""Fail the test unless the text matches the regular expression."""
if isinstance(expected_regex, (str, unicode)):
assert expected_regex, "expected_regex must not be empty."
expected_regex = re.compile(expected_regex)
if not expected_regex.search(text):
msg = msg or "Regex didn't match"
msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text)
raise self.failureException(msg)
if not hasattr(unittest.TestCase, 'assertRegex'):
bind_method(unittest.TestCase, 'assertRegex', assertRegex)
class _AssertRaisesBaseContext(object):
def __init__(self, expected, test_case, callable_obj=None,
expected_regex=None):
self.expected = expected
self.test_case = test_case
if callable_obj is not None:
try:
self.obj_name = callable_obj.__name__
except AttributeError:
self.obj_name = str(callable_obj)
else:
self.obj_name = None
if isinstance(expected_regex, (bytes, str)):
expected_regex = re.compile(expected_regex)
self.expected_regex = expected_regex
self.msg = None
def _raiseFailure(self, standardMsg):
msg = self.test_case._formatMessage(self.msg, standardMsg)
raise self.test_case.failureException(msg)
def handle(self, name, callable_obj, args, kwargs):
"""
If callable_obj is None, assertRaises/Warns is being used as a
context manager, so check for a 'msg' kwarg and return self.
If callable_obj is not None, call it passing args and kwargs.
"""
if callable_obj is None:
self.msg = kwargs.pop('msg', None)
return self
with self:
callable_obj(*args, **kwargs)
class _AssertWarnsContext(_AssertRaisesBaseContext):
"""A context manager used to implement TestCase.assertWarns* methods."""
def __enter__(self):
# The __warningregistry__'s need to be in a pristine state for tests
# to work properly.
for v in sys.modules.values():
if getattr(v, '__warningregistry__', None):
v.__warningregistry__ = {}
self.warnings_manager = warnings.catch_warnings(record=True)
self.warnings = self.warnings_manager.__enter__()
warnings.simplefilter("always", self.expected)
return self
def __exit__(self, exc_type, exc_value, tb):
self.warnings_manager.__exit__(exc_type, exc_value, tb)
if exc_type is not None:
# let unexpected exceptions pass through
return
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
first_matching = None
for m in self.warnings:
w = m.message
if not isinstance(w, self.expected):
continue
if first_matching is None:
first_matching = w
if (self.expected_regex is not None and
not self.expected_regex.search(str(w))):
continue
# store warning for later retrieval
self.warning = w
self.filename = m.filename
self.lineno = m.lineno
return
# Now we simply try to choose a helpful failure message
if first_matching is not None:
self._raiseFailure('"{}" does not match "{}"'.format(
self.expected_regex.pattern, str(first_matching)))
if self.obj_name:
self._raiseFailure("{} not triggered by {}".format(exc_name,
self.obj_name))
else:
self._raiseFailure("{} not triggered".format(exc_name))
def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs):
"""Fail unless a warning of class warnClass is triggered
by callable_obj when invoked with arguments args and keyword
arguments kwargs. If a different type of warning is
triggered, it will not be handled: depending on the other
warning filtering rules in effect, it might be silenced, printed
out, or raised as an exception.
If called with callable_obj omitted or None, will return a
context object used like this::
with self.assertWarns(SomeWarning):
do_something()
An optional keyword argument 'msg' can be provided when assertWarns
is used as a context object.
The context manager keeps a reference to the first matching
warning as the 'warning' attribute; similarly, the 'filename'
and 'lineno' attributes give you information about the line
of Python code from which the warning was triggered.
This allows you to inspect the warning after the assertion::
with self.assertWarns(SomeWarning) as cm:
do_something()
the_warning = cm.warning
self.assertEqual(the_warning.some_attribute, 147)
"""
context = _AssertWarnsContext(expected_warning, self, callable_obj)
return context.handle('assertWarns', callable_obj, args, kwargs)
if not hasattr(unittest.TestCase, 'assertWarns'):
bind_method(unittest.TestCase, 'assertWarns', assertWarns)