mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-01-07 11:40:01 -08:00
a528f052b9
* Bump cherrypy from 18.9.0 to 18.10.0 Bumps [cherrypy](https://github.com/cherrypy/cherrypy) from 18.9.0 to 18.10.0. - [Changelog](https://github.com/cherrypy/cherrypy/blob/main/CHANGES.rst) - [Commits](https://github.com/cherrypy/cherrypy/compare/v18.9.0...v18.10.0) --- updated-dependencies: - dependency-name: cherrypy dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * Update cherrypy==18.10.0 --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> [skip ci]
241 lines
8.2 KiB
Python
241 lines
8.2 KiB
Python
"""Logtest, a unittest.TestCase helper for testing log output."""
|
|
|
|
import sys
|
|
import time
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
|
|
from cherrypy._cpcompat import text_or_bytes
|
|
|
|
|
|
try:
|
|
# On Windows, msvcrt.getch reads a single char without output.
|
|
import msvcrt
|
|
|
|
def getchar():
|
|
return msvcrt.getch()
|
|
except ImportError:
|
|
# Unix getchr
|
|
import tty
|
|
import termios
|
|
|
|
def getchar():
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
try:
|
|
tty.setraw(sys.stdin.fileno())
|
|
ch = sys.stdin.read(1)
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
return ch
|
|
|
|
|
|
class LogCase(object):
|
|
"""unittest.TestCase mixin for testing log messages.
|
|
|
|
logfile: a filename for the desired log. Yes, I know modes are evil,
|
|
but it makes the test functions so much cleaner to set this once.
|
|
|
|
lastmarker: the last marker in the log. This can be used to search for
|
|
messages since the last marker.
|
|
|
|
markerPrefix: a string with which to prefix log markers. This should be
|
|
unique enough from normal log output to use for marker identification.
|
|
"""
|
|
|
|
interactive = False
|
|
logfile = None
|
|
lastmarker = None
|
|
markerPrefix = b'test suite marker: '
|
|
|
|
def _handleLogError(self, msg, data, marker, pattern):
|
|
print('')
|
|
print(' ERROR: %s' % msg)
|
|
|
|
if not self.interactive:
|
|
raise pytest.fail(msg)
|
|
|
|
p = (' Show: '
|
|
'[L]og [M]arker [P]attern; '
|
|
'[I]gnore, [R]aise, or sys.e[X]it >> ')
|
|
sys.stdout.write(p + ' ')
|
|
# ARGH
|
|
sys.stdout.flush()
|
|
while True:
|
|
i = getchar().upper()
|
|
if i not in 'MPLIRX':
|
|
continue
|
|
print(i.upper()) # Also prints new line
|
|
if i == 'L':
|
|
for x, line in enumerate(data):
|
|
if (x + 1) % self.console_height == 0:
|
|
# The \r and comma should make the next line overwrite
|
|
sys.stdout.write('<-- More -->\r ')
|
|
m = getchar().lower()
|
|
# Erase our "More" prompt
|
|
sys.stdout.write(' \r ')
|
|
if m == 'q':
|
|
break
|
|
print(line.rstrip())
|
|
elif i == 'M':
|
|
print(repr(marker or self.lastmarker))
|
|
elif i == 'P':
|
|
print(repr(pattern))
|
|
elif i == 'I':
|
|
# return without raising the normal exception
|
|
return
|
|
elif i == 'R':
|
|
raise pytest.fail(msg)
|
|
elif i == 'X':
|
|
self.exit()
|
|
sys.stdout.write(p + ' ')
|
|
|
|
def exit(self):
|
|
sys.exit()
|
|
|
|
def emptyLog(self):
|
|
"""Overwrite self.logfile with 0 bytes."""
|
|
with open(self.logfile, 'wb') as f:
|
|
f.write('')
|
|
|
|
def markLog(self, key=None):
|
|
"""Insert a marker line into the log and set self.lastmarker."""
|
|
if key is None:
|
|
key = str(time.time())
|
|
self.lastmarker = key
|
|
|
|
with open(self.logfile, 'ab+') as f:
|
|
f.write(
|
|
b'%s%s\n'
|
|
% (self.markerPrefix, key.encode('utf-8'))
|
|
)
|
|
|
|
def _read_marked_region(self, marker=None):
|
|
"""Return lines from self.logfile in the marked region.
|
|
|
|
If marker is None, self.lastmarker is used. If the log hasn't
|
|
been marked (using self.markLog), the entire log will be
|
|
returned.
|
|
"""
|
|
# Give the logger time to finish writing?
|
|
# time.sleep(0.5)
|
|
|
|
logfile = self.logfile
|
|
marker = marker or self.lastmarker
|
|
if marker is None:
|
|
with open(logfile, 'rb') as f:
|
|
return f.readlines()
|
|
|
|
if isinstance(marker, str):
|
|
marker = marker.encode('utf-8')
|
|
data = []
|
|
in_region = False
|
|
with open(logfile, 'rb') as f:
|
|
for line in f:
|
|
if in_region:
|
|
if (line.startswith(self.markerPrefix)
|
|
and marker not in line):
|
|
break
|
|
else:
|
|
data.append(line)
|
|
elif marker in line:
|
|
in_region = True
|
|
return data
|
|
|
|
def assertInLog(self, line, marker=None):
|
|
"""Fail if the given (partial) line is not in the log.
|
|
|
|
The log will be searched from the given marker to the next
|
|
marker. If marker is None, self.lastmarker is used. If the log
|
|
hasn't been marked (using self.markLog), the entire log will be
|
|
searched.
|
|
"""
|
|
data = self._read_marked_region(marker)
|
|
for logline in data:
|
|
if line in logline:
|
|
return
|
|
msg = '%r not found in log' % line
|
|
self._handleLogError(msg, data, marker, line)
|
|
|
|
def assertNotInLog(self, line, marker=None):
|
|
"""Fail if the given (partial) line is in the log.
|
|
|
|
The log will be searched from the given marker to the next
|
|
marker. If marker is None, self.lastmarker is used. If the log
|
|
hasn't been marked (using self.markLog), the entire log will be
|
|
searched.
|
|
"""
|
|
data = self._read_marked_region(marker)
|
|
for logline in data:
|
|
if line in logline:
|
|
msg = '%r found in log' % line
|
|
self._handleLogError(msg, data, marker, line)
|
|
|
|
def assertValidUUIDv4(self, marker=None):
|
|
"""Fail if the given UUIDv4 is not valid.
|
|
|
|
The log will be searched from the given marker to the next
|
|
marker. If marker is None, self.lastmarker is used. If the log
|
|
hasn't been marked (using self.markLog), the entire log will be
|
|
searched.
|
|
"""
|
|
data = self._read_marked_region(marker)
|
|
data = [
|
|
chunk.decode('utf-8').rstrip('\n').rstrip('\r')
|
|
for chunk in data
|
|
]
|
|
for log_chunk in data:
|
|
try:
|
|
uuid_log = data[-1]
|
|
uuid_obj = UUID(uuid_log, version=4)
|
|
except (TypeError, ValueError):
|
|
pass # it might be in other chunk
|
|
else:
|
|
if str(uuid_obj) == uuid_log:
|
|
return
|
|
msg = '%r is not a valid UUIDv4' % uuid_log
|
|
self._handleLogError(msg, data, marker, log_chunk)
|
|
|
|
msg = 'UUIDv4 not found in log'
|
|
self._handleLogError(msg, data, marker, log_chunk)
|
|
|
|
def assertLog(self, sliceargs, lines, marker=None):
|
|
"""Fail if log.readlines()[sliceargs] is not contained in 'lines'.
|
|
|
|
The log will be searched from the given marker to the next
|
|
marker. If marker is None, self.lastmarker is used. If the log
|
|
hasn't been marked (using self.markLog), the entire log will be
|
|
searched.
|
|
"""
|
|
data = self._read_marked_region(marker)
|
|
if isinstance(sliceargs, int):
|
|
# Single arg. Use __getitem__ and allow lines to be str or list.
|
|
if isinstance(lines, (tuple, list)):
|
|
lines = lines[0]
|
|
if isinstance(lines, str):
|
|
lines = lines.encode('utf-8')
|
|
if lines not in data[sliceargs]:
|
|
msg = '%r not found on log line %r' % (lines, sliceargs)
|
|
self._handleLogError(
|
|
msg,
|
|
[data[sliceargs], '--EXTRA CONTEXT--'] + data[
|
|
sliceargs + 1:sliceargs + 6],
|
|
marker,
|
|
lines)
|
|
else:
|
|
# Multiple args. Use __getslice__ and require lines to be list.
|
|
if isinstance(lines, tuple):
|
|
lines = list(lines)
|
|
elif isinstance(lines, text_or_bytes):
|
|
raise TypeError("The 'lines' arg must be a list when "
|
|
"'sliceargs' is a tuple.")
|
|
|
|
start, stop = sliceargs
|
|
for line, logline in zip(lines, data[start:stop]):
|
|
if isinstance(line, str):
|
|
line = line.encode('utf-8')
|
|
if line not in logline:
|
|
msg = '%r not found in log' % line
|
|
self._handleLogError(msg, data[start:stop], marker, line)
|