Init: mediaserver

This commit is contained in:
2023-02-08 12:13:28 +01:00
parent 848bc9739c
commit f7c23d4ba9
31914 changed files with 6175775 additions and 0 deletions

View File

@@ -0,0 +1,325 @@
"""Common logic for the coverage subcommand."""
from __future__ import annotations
import errno
import os
import re
import typing as t
from ...constants import (
COVERAGE_REQUIRED_VERSION,
)
from ...encoding import (
to_bytes,
)
from ...io import (
open_binary_file,
read_json_file,
)
from ...util import (
ApplicationError,
common_environment,
display,
ANSIBLE_TEST_DATA_ROOT,
)
from ...util_common import (
intercept_python,
ResultType,
)
from ...config import (
EnvironmentConfig,
)
from ...python_requirements import (
install_requirements,
)
from ... target import (
walk_module_targets,
)
from ...data import (
data_context,
)
from ...pypi_proxy import (
configure_pypi_proxy,
)
from ...provisioning import (
HostState,
)
if t.TYPE_CHECKING:
import coverage as coverage_module
COVERAGE_GROUPS = ('command', 'target', 'environment', 'version')
COVERAGE_CONFIG_PATH = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'coveragerc')
COVERAGE_OUTPUT_FILE_NAME = 'coverage'
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args, 'coverage')
def initialize_coverage(args, host_state): # type: (CoverageConfig, HostState) -> coverage_module
"""Delegate execution if requested, install requirements, then import and return the coverage module. Raises an exception if coverage is not available."""
configure_pypi_proxy(args, host_state.controller_profile) # coverage
install_requirements(args, host_state.controller_profile.python, coverage=True) # coverage
try:
import coverage
except ImportError:
coverage = None
if not coverage:
raise ApplicationError(f'Version {COVERAGE_REQUIRED_VERSION} of the Python "coverage" module must be installed to use this command.')
if coverage.__version__ != COVERAGE_REQUIRED_VERSION:
raise ApplicationError(f'Version {COVERAGE_REQUIRED_VERSION} of the Python "coverage" module is required. Version {coverage.__version__} was found.')
return coverage
def run_coverage(args, host_state, output_file, command, cmd): # type: (CoverageConfig, HostState, str, str, t.List[str]) -> None
"""Run the coverage cli tool with the specified options."""
env = common_environment()
env.update(dict(COVERAGE_FILE=output_file))
cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
intercept_python(args, host_state.controller_profile.python, cmd, env)
def get_all_coverage_files(): # type: () -> t.List[str]
"""Return a list of all coverage file paths."""
return get_python_coverage_files() + get_powershell_coverage_files()
def get_python_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str]
"""Return the list of Python coverage file paths."""
return get_coverage_files('python', path)
def get_powershell_coverage_files(path=None): # type: (t.Optional[str]) -> t.List[str]
"""Return the list of PowerShell coverage file paths."""
return get_coverage_files('powershell', path)
def get_coverage_files(language, path=None): # type: (str, t.Optional[str]) -> t.List[str]
"""Return the list of coverage file paths for the given language."""
coverage_dir = path or ResultType.COVERAGE.path
try:
coverage_files = [os.path.join(coverage_dir, f) for f in os.listdir(coverage_dir)
if '=coverage.' in f and '=%s' % language in f]
except IOError as ex:
if ex.errno == errno.ENOENT:
return []
raise
return coverage_files
def get_collection_path_regexes(): # type: () -> t.Tuple[t.Optional[t.Pattern], t.Optional[t.Pattern]]
"""Return a pair of regexes used for identifying and manipulating collection paths."""
if data_context().content.collection:
collection_search_re = re.compile(r'/%s/' % data_context().content.collection.directory)
collection_sub_re = re.compile(r'^.*?/%s/' % data_context().content.collection.directory)
else:
collection_search_re = None
collection_sub_re = None
return collection_search_re, collection_sub_re
def get_python_modules(): # type: () -> t.Dict[str, str]
"""Return a dictionary of Ansible module names and their paths."""
return dict((target.module, target.path) for target in list(walk_module_targets()) if target.path.endswith('.py'))
def enumerate_python_arcs(
path, # type: str
coverage, # type: coverage_module
modules, # type: t.Dict[str, str]
collection_search_re, # type: t.Optional[t.Pattern]
collection_sub_re, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]]]
"""Enumerate Python code coverage arcs in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
return
original = coverage.CoverageData()
try:
original.read_file(path)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
with open_binary_file(path) as file_obj:
header = file_obj.read(6)
if header == b'SQLite':
display.error('File created by "coverage" 5.0+: %s' % os.path.relpath(path))
else:
display.error(u'%s' % ex)
return
for filename in original.measured_files():
arcs = original.arcs(filename)
if not arcs:
# This is most likely due to using an unsupported version of coverage.
display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path))
continue
filename = sanitize_filename(filename, modules=modules, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
if not filename:
continue
yield filename, set(arcs)
def enumerate_powershell_lines(
path, # type: str
collection_search_re, # type: t.Optional[t.Pattern]
collection_sub_re, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Generator[t.Tuple[str, t.Dict[int, int]]]
"""Enumerate PowerShell code coverage lines in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
return
try:
coverage_run = read_json_file(path)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
return
for filename, hits in coverage_run.items():
filename = sanitize_filename(filename, collection_search_re=collection_search_re, collection_sub_re=collection_sub_re)
if not filename:
continue
if isinstance(hits, dict) and not hits.get('Line'):
# Input data was previously aggregated and thus uses the standard ansible-test output format for PowerShell coverage.
# This format differs from the more verbose format of raw coverage data from the remote Windows hosts.
hits = dict((int(key), value) for key, value in hits.items())
yield filename, hits
continue
# PowerShell unpacks arrays if there's only a single entry so this is a defensive check on that
if not isinstance(hits, list):
hits = [hits]
hits = dict((hit['Line'], hit['HitCount']) for hit in hits if hit)
yield filename, hits
def sanitize_filename(
filename, # type: str
modules=None, # type: t.Optional[t.Dict[str, str]]
collection_search_re=None, # type: t.Optional[t.Pattern]
collection_sub_re=None, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Optional[str]
"""Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
integration_temp_path = os.path.sep + os.path.join(ResultType.TMP.relative_path, 'integration') + os.path.sep
if modules is None:
modules = {}
if '/ansible_modlib.zip/ansible/' in filename:
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.6 and earlier.
new_name = re.sub('^.*/ansible_modlib.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif collection_search_re and collection_search_re.search(filename):
new_name = os.path.abspath(collection_sub_re.sub('', filename))
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload\.zip/ansible/', filename):
# Rewrite the module_utils path from the remote host to match the controller. Ansible 2.7 and later.
new_name = re.sub(r'^.*/ansible_[^/]+_payload\.zip/ansible/', ansible_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif '/ansible_module_' in filename:
# Rewrite the module path from the remote host to match the controller. Ansible 2.6 and earlier.
module_name = re.sub('^.*/ansible_module_(?P<module>.*).py$', '\\g<module>', filename)
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search(r'/ansible_[^/]+_payload(_[^/]+|\.zip)/__main__\.py$', filename):
# Rewrite the module path from the remote host to match the controller. Ansible 2.7 and later.
# AnsiballZ versions using zipimporter will match the `.zip` portion of the regex.
# AnsiballZ versions not using zipimporter will match the `_[^/]+` portion of the regex.
module_name = re.sub(r'^.*/ansible_(?P<module>[^/]+)_payload(_[^/]+|\.zip)/__main__\.py$',
'\\g<module>', filename).rstrip('_')
if module_name not in modules:
display.warning('Skipping coverage of unknown module: %s' % module_name)
return None
new_name = os.path.abspath(modules[module_name])
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif re.search('^(/.*?)?/root/ansible/', filename):
# Rewrite the path of code running on a remote host or in a docker container as root.
new_name = re.sub('^(/.*?)?/root/ansible/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
elif integration_temp_path in filename:
# Rewrite the path of code running from an integration test temporary directory.
new_name = re.sub(r'^.*' + re.escape(integration_temp_path) + '[^/]+/', root_path, filename)
display.info('%s -> %s' % (filename, new_name), verbosity=3)
filename = new_name
filename = os.path.abspath(filename) # make sure path is absolute (will be relative if previously exported)
return filename
class PathChecker:
"""Checks code coverage paths to verify they are valid and reports on the findings."""
def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None
self.args = args
self.collection_search_re = collection_search_re
self.invalid_paths = []
self.invalid_path_chars = 0
def check_path(self, path): # type: (str) -> bool
"""Return True if the given coverage path is valid, otherwise display a warning and return False."""
if os.path.isfile(to_bytes(path)):
return True
if self.collection_search_re and self.collection_search_re.search(path) and os.path.basename(path) == '__init__.py':
# the collection loader uses implicit namespace packages, so __init__.py does not need to exist on disk
# coverage is still reported for these non-existent files, but warnings are not needed
return False
self.invalid_paths.append(path)
self.invalid_path_chars += len(path)
if self.args.verbosity > 1:
display.warning('Invalid coverage path: %s' % path)
return False
def report(self): # type: () -> None
"""Display a warning regarding invalid paths if any were found."""
if self.invalid_paths:
display.warning('Ignored %d characters from %d invalid coverage path(s).' % (self.invalid_path_chars, len(self.invalid_paths)))

View File

@@ -0,0 +1,17 @@
"""Common logic for the `coverage analyze` subcommand."""
from __future__ import annotations
import typing as t
from .. import (
CoverageConfig,
)
class CoverageAnalyzeConfig(CoverageConfig):
"""Configuration for the `coverage analyze` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
# avoid mixing log messages with file output when using `/dev/stdout` for the output file on commands
# this may be worth considering as the default behavior in the future, instead of being dependent on the command or options used
self.info_stderr = True

View File

@@ -0,0 +1,152 @@
"""Analyze integration test target code coverage."""
from __future__ import annotations
import os
import typing as t
from .....io import (
read_json_file,
write_json_file,
)
from .....util import (
ApplicationError,
display,
)
from .. import (
CoverageAnalyzeConfig,
)
if t.TYPE_CHECKING:
TargetKey = t.TypeVar('TargetKey', int, t.Tuple[int, int])
NamedPoints = t.Dict[str, t.Dict[TargetKey, t.Set[str]]]
IndexedPoints = t.Dict[str, t.Dict[TargetKey, t.Set[int]]]
Arcs = t.Dict[str, t.Dict[t.Tuple[int, int], t.Set[int]]]
Lines = t.Dict[str, t.Dict[int, t.Set[int]]]
TargetIndexes = t.Dict[str, int]
TargetSetIndexes = t.Dict[t.FrozenSet[int], int]
class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
"""Configuration for the `coverage analyze targets` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.info_stderr = True
def make_report(target_indexes, arcs, lines): # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any]
"""Condense target indexes, arcs and lines into a compact report."""
set_indexes = {}
arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items())
line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items())
report = dict(
targets=[name for name, index in sorted(target_indexes.items(), key=lambda kvp: kvp[1])],
target_sets=[sorted(data) for data, index in sorted(set_indexes.items(), key=lambda kvp: kvp[1])],
arcs=arc_refs,
lines=line_refs,
)
return report
def load_report(report): # type: (t.Dict[str, t.Any]) -> t.Tuple[t.List[str], Arcs, Lines]
"""Extract target indexes, arcs and lines from an existing report."""
try:
target_indexes = report['targets'] # type: t.List[str]
target_sets = report['target_sets'] # type: t.List[t.List[int]]
arc_data = report['arcs'] # type: t.Dict[str, t.Dict[str, int]]
line_data = report['lines'] # type: t.Dict[str, t.Dict[int, int]]
except KeyError as ex:
raise ApplicationError('Document is missing key "%s".' % ex.args)
except TypeError:
raise ApplicationError('Document is type "%s" instead of "dict".' % type(report).__name__)
arcs = dict((path, dict((parse_arc(arc), set(target_sets[index])) for arc, index in data.items())) for path, data in arc_data.items())
lines = dict((path, dict((int(line), set(target_sets[index])) for line, index in data.items())) for path, data in line_data.items())
return target_indexes, arcs, lines
def read_report(path): # type: (str) -> t.Tuple[t.List[str], Arcs, Lines]
"""Read a JSON report from disk."""
try:
report = read_json_file(path)
except Exception as ex:
raise ApplicationError('File "%s" is not valid JSON: %s' % (path, ex))
try:
return load_report(report)
except ApplicationError as ex:
raise ApplicationError('File "%s" is not an aggregated coverage data file. %s' % (path, ex))
def write_report(args, report, path): # type: (CoverageAnalyzeTargetsConfig, t.Dict[str, t.Any], str) -> None
"""Write a JSON report to disk."""
if args.explain:
return
write_json_file(path, report, formatted=False)
display.info('Generated %d byte report with %d targets covering %d files.' % (
os.path.getsize(path), len(report['targets']), len(set(report['arcs'].keys()) | set(report['lines'].keys())),
), verbosity=1)
def format_arc(value): # type: (t.Tuple[int, int]) -> str
"""Format an arc tuple as a string."""
return '%d:%d' % value
def parse_arc(value): # type: (str) -> t.Tuple[int, int]
"""Parse an arc string into a tuple."""
first, last = tuple(map(int, value.split(':')))
return first, last
def get_target_set_index(data, target_set_indexes): # type: (t.Set[int], TargetSetIndexes) -> int
"""Find or add the target set in the result set and return the target set index."""
return target_set_indexes.setdefault(frozenset(data), len(target_set_indexes))
def get_target_index(name, target_indexes): # type: (str, TargetIndexes) -> int
"""Find or add the target in the result set and return the target index."""
return target_indexes.setdefault(name, len(target_indexes))
def expand_indexes(
source_data, # type: IndexedPoints
source_index, # type: t.List[str]
format_func, # type: t.Callable[[TargetKey], str]
): # type: (...) -> NamedPoints
"""Expand indexes from the source into target names for easier processing of the data (arcs or lines)."""
combined_data = {} # type: t.Dict[str, t.Dict[t.Any, t.Set[str]]]
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(format_func(covered_point), set())
for covered_target_index in covered_target_indexes:
combined_point.add(source_index[covered_target_index])
return combined_data
def generate_indexes(target_indexes, data): # type: (TargetIndexes, NamedPoints) -> IndexedPoints
"""Return an indexed version of the given data (arcs or points)."""
results = {} # type: IndexedPoints
for path, points in data.items():
result_points = results[path] = {}
for point, target_names in points.items():
result_point = result_points[point] = set()
for target_name in target_names:
result_point.add(get_target_index(target_name, target_indexes))
return results

View File

@@ -0,0 +1,75 @@
"""Combine integration test target code coverage reports."""
from __future__ import annotations
import typing as t
from .....executor import (
Delegate,
)
from .....provisioning import (
prepare_profiles,
)
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
Arcs,
IndexedPoints,
Lines,
TargetIndexes,
)
class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets combine` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.input_files = args.input_file # type: t.List[str]
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_combine(args): # type: (CoverageAnalyzeTargetsCombineConfig) -> None
"""Combine integration test target code coverage reports."""
host_state = prepare_profiles(args) # coverage analyze targets combine
if args.delegate:
raise Delegate(host_state=host_state)
combined_target_indexes = {} # type: TargetIndexes
combined_path_arcs = {} # type: Arcs
combined_path_lines = {} # type: Lines
for report_path in args.input_files:
covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path)
merge_indexes(covered_path_arcs, covered_targets, combined_path_arcs, combined_target_indexes)
merge_indexes(covered_path_lines, covered_targets, combined_path_lines, combined_target_indexes)
report = make_report(combined_target_indexes, combined_path_arcs, combined_path_lines)
write_report(args, report, args.output_file)
def merge_indexes(
source_data, # type: IndexedPoints
source_index, # type: t.List[str]
combined_data, # type: IndexedPoints
combined_index, # type: TargetIndexes
): # type: (...) -> None
"""Merge indexes from the source into the combined data set (arcs or lines)."""
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
for covered_point, covered_target_indexes in covered_points.items():
combined_point = combined_points.setdefault(covered_point, set())
for covered_target_index in covered_target_indexes:
combined_point.add(get_target_index(source_index[covered_target_index], combined_index))

View File

@@ -0,0 +1,50 @@
"""Expand target names in an aggregated coverage file."""
from __future__ import annotations
import typing as t
from .....io import (
SortedSetEncoder,
write_json_file,
)
from .....executor import (
Delegate,
)
from .....provisioning import (
prepare_profiles,
)
from . import (
CoverageAnalyzeTargetsConfig,
expand_indexes,
format_arc,
read_report,
)
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTargetsExpandConfig) -> None
"""Expand target names in an aggregated coverage file."""
host_state = prepare_profiles(args) # coverage analyze targets expand
if args.delegate:
raise Delegate(host_state=host_state)
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
report = dict(
arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc),
lines=expand_indexes(covered_path_lines, covered_targets, str),
)
if not args.explain:
write_json_file(args.output_file, report, encoder=SortedSetEncoder)

View File

@@ -0,0 +1,117 @@
"""Filter an aggregated coverage file, keeping only the specified targets."""
from __future__ import annotations
import re
import typing as t
from .....executor import (
Delegate,
)
from .....provisioning import (
prepare_profiles,
)
from . import (
CoverageAnalyzeTargetsConfig,
expand_indexes,
generate_indexes,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
NamedPoints,
TargetIndexes,
)
class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets filter` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.include_targets = args.include_targets # type: t.List[str]
self.exclude_targets = args.exclude_targets # type: t.List[str]
self.include_path = args.include_path # type: t.Optional[str]
self.exclude_path = args.exclude_path # type: t.Optional[str]
def command_coverage_analyze_targets_filter(args): # type: (CoverageAnalyzeTargetsFilterConfig) -> None
"""Filter target names in an aggregated coverage file."""
host_state = prepare_profiles(args) # coverage analyze targets filter
if args.delegate:
raise Delegate(host_state=host_state)
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, lambda v: v)
filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, lambda v: v)
include_targets = set(args.include_targets) if args.include_targets else None
exclude_targets = set(args.exclude_targets) if args.exclude_targets else None
include_path = re.compile(args.include_path) if args.include_path else None
exclude_path = re.compile(args.exclude_path) if args.exclude_path else None
def path_filter_func(path):
"""Return True if the given path should be included, otherwise return False."""
if include_path and not re.search(include_path, path):
return False
if exclude_path and re.search(exclude_path, path):
return False
return True
def target_filter_func(targets):
"""Filter the given targets and return the result based on the defined includes and excludes."""
if include_targets:
targets &= include_targets
if exclude_targets:
targets -= exclude_targets
return targets
filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func)
filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func)
target_indexes = {} # type: TargetIndexes
indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs)
indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines)
report = make_report(target_indexes, indexed_path_arcs, indexed_path_lines)
write_report(args, report, args.output_file)
def filter_data(
data, # type: NamedPoints
path_filter_func, # type: t.Callable[[str], bool]
target_filter_func, # type: t.Callable[[t.Set[str]], t.Set[str]]
): # type: (...) -> NamedPoints
"""Filter the data set using the specified filter function."""
result = {} # type: NamedPoints
for src_path, src_points in data.items():
if not path_filter_func(src_path):
continue
dst_points = {}
for src_point, src_targets in src_points.items():
dst_targets = target_filter_func(src_targets)
if dst_targets:
dst_points[src_point] = dst_targets
if dst_points:
result[src_path] = dst_points
return result

View File

@@ -0,0 +1,159 @@
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
from __future__ import annotations
import os
import typing as t
from .....encoding import (
to_text,
)
from .....data import (
data_context,
)
from .....util_common import (
ResultType,
)
from .....executor import (
Delegate,
)
from .....provisioning import (
prepare_profiles,
HostState,
)
from ... import (
enumerate_powershell_lines,
enumerate_python_arcs,
get_collection_path_regexes,
get_powershell_coverage_files,
get_python_coverage_files,
get_python_modules,
initialize_coverage,
PathChecker,
)
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
Arcs,
Lines,
TargetIndexes,
)
class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets generate` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.input_dir = args.input_dir or ResultType.COVERAGE.path # type: str
self.output_file = args.output_file # type: str
def command_coverage_analyze_targets_generate(args): # type: (CoverageAnalyzeTargetsGenerateConfig) -> None
"""Analyze code coverage data to determine which integration test targets provide coverage for each arc or line."""
host_state = prepare_profiles(args) # coverage analyze targets generate
if args.delegate:
raise Delegate(host_state)
root = data_context().content.root
target_indexes = {}
arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items())
lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items())
report = make_report(target_indexes, arcs, lines)
write_report(args, report, args.output_file)
def analyze_python_coverage(
args, # type: CoverageAnalyzeTargetsGenerateConfig
host_state, # type: HostState
path, # type: str
target_indexes, # type: TargetIndexes
): # type: (...) -> Arcs
"""Analyze Python code coverage."""
results = {} # type: Arcs
collection_search_re, collection_sub_re = get_collection_path_regexes()
modules = get_python_modules()
python_files = get_python_coverage_files(path)
coverage = initialize_coverage(args, host_state)
for python_file in python_files:
if not is_integration_coverage_file(python_file):
continue
target_name = get_target_name(python_file)
target_index = get_target_index(target_name, target_indexes)
for filename, covered_arcs in enumerate_python_arcs(python_file, coverage, modules, collection_search_re, collection_sub_re):
arcs = results.setdefault(filename, {})
for covered_arc in covered_arcs:
arc = arcs.setdefault(covered_arc, set())
arc.add(target_index)
prune_invalid_filenames(args, results, collection_search_re=collection_search_re)
return results
def analyze_powershell_coverage(
args, # type: CoverageAnalyzeTargetsGenerateConfig
path, # type: str
target_indexes, # type: TargetIndexes
): # type: (...) -> Lines
"""Analyze PowerShell code coverage"""
results = {} # type: Lines
collection_search_re, collection_sub_re = get_collection_path_regexes()
powershell_files = get_powershell_coverage_files(path)
for powershell_file in powershell_files:
if not is_integration_coverage_file(powershell_file):
continue
target_name = get_target_name(powershell_file)
target_index = get_target_index(target_name, target_indexes)
for filename, hits in enumerate_powershell_lines(powershell_file, collection_search_re, collection_sub_re):
lines = results.setdefault(filename, {})
for covered_line in hits:
line = lines.setdefault(covered_line, set())
line.add(target_index)
prune_invalid_filenames(args, results)
return results
def prune_invalid_filenames(
args, # type: CoverageAnalyzeTargetsGenerateConfig
results, # type: t.Dict[str, t.Any]
collection_search_re=None, # type: t.Optional[str]
): # type: (...) -> None
"""Remove invalid filenames from the given result set."""
path_checker = PathChecker(args, collection_search_re)
for path in list(results.keys()):
if not path_checker.check_path(path):
del results[path]
def get_target_name(path): # type: (str) -> str
"""Extract the test target name from the given coverage path."""
return to_text(os.path.basename(path).split('=')[1])
def is_integration_coverage_file(path): # type: (str) -> bool
"""Returns True if the coverage file came from integration tests, otherwise False."""
return os.path.basename(path).split('=')[0] in ('integration', 'windows-integration', 'network-integration')

View File

@@ -0,0 +1,120 @@
"""Identify aggregated coverage in one file missing from another."""
from __future__ import annotations
import os
import typing as t
from .....encoding import (
to_bytes,
)
from .....executor import (
Delegate,
)
from .....provisioning import (
prepare_profiles,
)
from . import (
CoverageAnalyzeTargetsConfig,
get_target_index,
make_report,
read_report,
write_report,
)
if t.TYPE_CHECKING:
from . import (
TargetIndexes,
IndexedPoints,
)
class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets missing` command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.from_file = args.from_file # type: str
self.to_file = args.to_file # type: str
self.output_file = args.output_file # type: str
self.only_gaps = args.only_gaps # type: bool
self.only_exists = args.only_exists # type: bool
def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTargetsMissingConfig) -> None
"""Identify aggregated coverage in one file missing from another."""
host_state = prepare_profiles(args) # coverage analyze targets missing
if args.delegate:
raise Delegate(host_state=host_state)
from_targets, from_path_arcs, from_path_lines = read_report(args.from_file)
to_targets, to_path_arcs, to_path_lines = read_report(args.to_file)
target_indexes = {}
if args.only_gaps:
arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists)
lines = find_gaps(from_path_lines, from_targets, to_path_lines, target_indexes, args.only_exists)
else:
arcs = find_missing(from_path_arcs, from_targets, to_path_arcs, to_targets, target_indexes, args.only_exists)
lines = find_missing(from_path_lines, from_targets, to_path_lines, to_targets, target_indexes, args.only_exists)
report = make_report(target_indexes, arcs, lines)
write_report(args, report, args.output_file)
def find_gaps(
from_data, # type: IndexedPoints
from_index, # type: t.List[str]
to_data, # type: IndexedPoints
target_indexes, # type: TargetIndexes
only_exists, # type: bool
): # type: (...) -> IndexedPoints
"""Find gaps in coverage between the from and to data sets."""
target_data = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
continue
to_points = to_data.get(from_path, {})
gaps = set(from_points.keys()) - set(to_points.keys())
if gaps:
gap_points = dict((key, value) for key, value in from_points.items() if key in gaps)
target_data[from_path] = dict((gap, set(get_target_index(from_index[i], target_indexes) for i in indexes)) for gap, indexes in gap_points.items())
return target_data
def find_missing(
from_data, # type: IndexedPoints
from_index, # type: t.List[str]
to_data, # type: IndexedPoints
to_index, # type: t.List[str]
target_indexes, # type: TargetIndexes
only_exists, # type: bool
): # type: (...) -> IndexedPoints
"""Find coverage in from_data not present in to_data (arcs or lines)."""
target_data = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
continue
to_points = to_data.get(from_path, {})
for from_point, from_target_indexes in from_points.items():
to_target_indexes = to_points.get(from_point, set())
remaining_targets = set(from_index[i] for i in from_target_indexes) - set(to_index[i] for i in to_target_indexes)
if remaining_targets:
target_index = target_data.setdefault(from_path, {}).setdefault(from_point, set())
target_index.update(get_target_index(name, target_indexes) for name in remaining_targets)
return target_data

View File

@@ -0,0 +1,357 @@
"""Combine code coverage files."""
from __future__ import annotations
import os
import json
import typing as t
from ...target import (
walk_compile_targets,
walk_powershell_targets,
)
from ...io import (
read_text_file,
)
from ...util import (
ANSIBLE_TEST_TOOLS_ROOT,
display,
ApplicationError,
)
from ...util_common import (
ResultType,
run_command,
write_json_file,
write_json_test_results,
)
from ...executor import (
Delegate,
)
from ...data import (
data_context,
)
from ...host_configs import (
DockerConfig,
RemoteConfig,
)
from ...provisioning import (
HostState,
prepare_profiles,
)
from . import (
enumerate_python_arcs,
enumerate_powershell_lines,
get_collection_path_regexes,
get_all_coverage_files,
get_python_coverage_files,
get_python_modules,
get_powershell_coverage_files,
initialize_coverage,
COVERAGE_OUTPUT_FILE_NAME,
COVERAGE_GROUPS,
CoverageConfig,
PathChecker,
)
def command_coverage_combine(args): # type: (CoverageCombineConfig) -> None
"""Patch paths in coverage files and merge into a single file."""
host_state = prepare_profiles(args) # coverage combine
combine_coverage_files(args, host_state)
def combine_coverage_files(args, host_state): # type: (CoverageCombineConfig, HostState) -> t.List[str]
"""Combine coverage and return a list of the resulting files."""
if args.delegate:
if isinstance(args.controller, (DockerConfig, RemoteConfig)):
paths = get_all_coverage_files()
exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']]
if not exported_paths:
raise ExportedCoverageDataNotFound()
pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths]
def coverage_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add the coverage files to the payload file list."""
display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1)
files.extend(pairs)
data_context().register_payload_callback(coverage_callback)
raise Delegate(host_state=host_state)
paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state)
for path in paths:
display.info('Generated combined output: %s' % path, verbosity=1)
return paths
class ExportedCoverageDataNotFound(ApplicationError):
"""Exception when no combined coverage data is present yet is required."""
def __init__(self):
super().__init__(
'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'
'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path)
def _command_coverage_combine_python(args, host_state): # type: (CoverageCombineConfig, HostState) -> t.List[str]
"""Combine Python coverage files and return a list of the output files."""
coverage = initialize_coverage(args, host_state)
modules = get_python_modules()
coverage_files = get_python_coverage_files()
counter = 0
sources = _get_coverage_targets(args, walk_compile_targets)
groups = _build_stub_groups(args, sources, lambda s: dict((name, set()) for name in s))
collection_search_re, collection_sub_re = get_collection_path_regexes()
for coverage_file in coverage_files:
counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
group = get_coverage_group(args, coverage_file)
if group is None:
display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue
for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
if args.export:
filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems
if group not in groups:
groups[group] = {}
arc_data = groups[group]
if filename not in arc_data:
arc_data[filename] = set()
arc_data[filename].update(arcs)
output_files = []
if args.export:
coverage_file = os.path.join(args.export, '')
suffix = '=coverage.combined'
else:
coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
suffix = ''
path_checker = PathChecker(args, collection_search_re)
for group in sorted(groups):
arc_data = groups[group]
updated = coverage.CoverageData()
for filename in arc_data:
if not path_checker.check_path(filename):
continue
updated.add_arcs({filename: list(arc_data[filename])})
if args.all:
updated.add_arcs(dict((source[0], []) for source in sources))
if not args.explain:
output_file = coverage_file + group + suffix
updated.write_file(output_file) # always write files to make sure stale files do not exist
if updated:
# only report files which are non-empty to prevent coverage from reporting errors
output_files.append(output_file)
path_checker.report()
return sorted(output_files)
def _command_coverage_combine_powershell(args): # type: (CoverageCombineConfig) -> t.List[str]
"""Combine PowerShell coverage files and return a list of the output files."""
coverage_files = get_powershell_coverage_files()
def _default_stub_value(source_paths):
cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')]
cmd.extend(source_paths)
stubs = json.loads(run_command(args, cmd, capture=True, always=True)[0])
return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs)
counter = 0
sources = _get_coverage_targets(args, walk_powershell_targets)
groups = _build_stub_groups(args, sources, _default_stub_value)
collection_search_re, collection_sub_re = get_collection_path_regexes()
for coverage_file in coverage_files:
counter += 1
display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)
group = get_coverage_group(args, coverage_file)
if group is None:
display.warning('Unexpected name for coverage file: %s' % coverage_file)
continue
for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re):
if args.export:
filename = os.path.relpath(filename) # exported paths must be relative since absolute paths may differ between systems
if group not in groups:
groups[group] = {}
coverage_data = groups[group]
if filename not in coverage_data:
coverage_data[filename] = {}
file_coverage = coverage_data[filename]
for line_no, hit_count in hits.items():
file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count
output_files = []
path_checker = PathChecker(args)
for group in sorted(groups):
coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))
if args.all:
# Add 0 line entries for files not in coverage_data
for source, source_line_count in sources:
if source in coverage_data:
continue
coverage_data[source] = _default_stub_value(source_line_count)
if not args.explain:
if args.export:
output_file = os.path.join(args.export, group + '=coverage.combined')
write_json_file(output_file, coverage_data, formatted=False)
output_files.append(output_file)
continue
output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell'
write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False)
output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))
path_checker.report()
return sorted(output_files)
def _get_coverage_targets(args, walk_func): # type: (CoverageCombineConfig, t.Callable) -> t.List[t.Tuple[str, int]]
"""Return a list of files to cover and the number of lines in each file, using the given function as the source of the files."""
sources = []
if args.all or args.stub:
# excludes symlinks of regular files to avoid reporting on the same file multiple times
# in the future it would be nice to merge any coverage for symlinks into the real files
for target in walk_func(include_symlinks=False):
target_path = os.path.abspath(target.path)
target_lines = len(read_text_file(target_path).splitlines())
sources.append((target_path, target_lines))
sources.sort()
return sources
def _build_stub_groups(args, sources, default_stub_value):
"""
:type args: CoverageCombineConfig
:type sources: List[tuple[str, int]]
:type default_stub_value: Func[List[str]]
:rtype: dict
"""
groups = {}
if args.stub:
stub_group = []
stub_groups = [stub_group]
stub_line_limit = 500000
stub_line_count = 0
for source, source_line_count in sources:
stub_group.append(source)
stub_line_count += source_line_count
if stub_line_count > stub_line_limit:
stub_line_count = 0
stub_group = []
stub_groups.append(stub_group)
for stub_index, stub_group in enumerate(stub_groups):
if not stub_group:
continue
groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group)
return groups
def get_coverage_group(args, coverage_file): # type: (CoverageCombineConfig, str) -> t.Optional[str]
"""Return the name of the coverage group for the specified coverage file, or None if no group was found."""
parts = os.path.basename(coverage_file).split('=', 4)
# noinspection PyTypeChecker
if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None
names = dict(
command=parts[0],
target=parts[1],
environment=parts[2],
version=parts[3],
)
export_names = dict(
version=parts[3],
)
group = ''
for part in COVERAGE_GROUPS:
if part in args.group_by:
group += '=%s' % names[part]
elif args.export:
group += '=%s' % export_names.get(part, 'various')
if args.export:
group = group.lstrip('=')
return group
class CoverageCombineConfig(CoverageConfig):
"""Configuration for the coverage combine command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.group_by = frozenset(args.group_by) if args.group_by else frozenset() # type: t.FrozenSet[str]
self.all = args.all # type: bool
self.stub = args.stub # type: bool
# only available to coverage combine
self.export = args.export if 'export' in args else False # type: str

View File

@@ -0,0 +1,43 @@
"""Erase code coverage files."""
from __future__ import annotations
import os
from ...util_common import (
ResultType,
)
from ...executor import (
Delegate,
)
from ...provisioning import (
prepare_profiles,
)
from . import (
CoverageConfig,
)
def command_coverage_erase(args): # type: (CoverageEraseConfig) -> None
"""Erase code coverage data files collected during test runs."""
host_state = prepare_profiles(args) # coverage erase
if args.delegate:
raise Delegate(host_state=host_state)
coverage_dir = ResultType.COVERAGE.path
for name in os.listdir(coverage_dir):
if not name.startswith('coverage') and '=coverage.' not in name:
continue
path = os.path.join(coverage_dir, name)
if not args.explain:
os.remove(path)
class CoverageEraseConfig(CoverageConfig):
"""Configuration for the coverage erase command."""

View File

@@ -0,0 +1,51 @@
"""Generate HTML code coverage reports."""
from __future__ import annotations
import os
from ...io import (
make_dirs,
)
from ...util import (
display,
)
from ...util_common import (
ResultType,
)
from ...provisioning import (
prepare_profiles,
)
from .combine import (
combine_coverage_files,
CoverageCombineConfig,
)
from . import (
run_coverage,
)
def command_coverage_html(args): # type: (CoverageHtmlConfig) -> None
"""Generate an HTML coverage report."""
host_state = prepare_profiles(args) # coverage html
output_files = combine_coverage_files(args, host_state)
for output_file in output_files:
if output_file.endswith('-powershell'):
# coverage.py does not support non-Python files so we just skip the local html report.
display.info("Skipping output file %s in html generation" % output_file, verbosity=3)
continue
dir_name = os.path.join(ResultType.REPORTS.path, os.path.basename(output_file))
make_dirs(dir_name)
run_coverage(args, host_state, output_file, 'html', ['-i', '-d', dir_name])
display.info('HTML report generated: file:///%s' % os.path.join(dir_name, 'index.html'))
class CoverageHtmlConfig(CoverageCombineConfig):
"""Configuration for the coverage html command."""

View File

@@ -0,0 +1,152 @@
"""Generate console code coverage reports."""
from __future__ import annotations
import os
import typing as t
from ...io import (
read_json_file,
)
from ...util import (
display,
)
from ...data import (
data_context,
)
from ...provisioning import (
prepare_profiles,
)
from .combine import (
combine_coverage_files,
CoverageCombineConfig,
)
from . import (
run_coverage,
)
def command_coverage_report(args): # type: (CoverageReportConfig) -> None
"""Generate a console coverage report."""
host_state = prepare_profiles(args) # coverage report
output_files = combine_coverage_files(args, host_state)
for output_file in output_files:
if args.group_by or args.stub:
display.info('>>> Coverage Group: %s' % ' '.join(os.path.basename(output_file).split('=')[1:]))
if output_file.endswith('-powershell'):
display.info(_generate_powershell_output_report(args, output_file))
else:
options = []
if args.show_missing:
options.append('--show-missing')
if args.include:
options.extend(['--include', args.include])
if args.omit:
options.extend(['--omit', args.omit])
run_coverage(args, host_state, output_file, 'report', options)
def _generate_powershell_output_report(args, coverage_file): # type: (CoverageReportConfig, str) -> str
"""Generate and return a PowerShell coverage report for the given coverage file."""
coverage_info = read_json_file(coverage_file)
root_path = data_context().content.root + '/'
name_padding = 7
cover_padding = 8
file_report = []
total_stmts = 0
total_miss = 0
for filename in sorted(coverage_info.keys()):
hit_info = coverage_info[filename]
if filename.startswith(root_path):
filename = filename[len(root_path):]
if args.omit and filename in args.omit:
continue
if args.include and filename not in args.include:
continue
stmts = len(hit_info)
miss = len([c for c in hit_info.values() if c == 0])
name_padding = max(name_padding, len(filename) + 3)
total_stmts += stmts
total_miss += miss
cover = "{0}%".format(int((stmts - miss) / stmts * 100))
missing = []
current_missing = None
sorted_lines = sorted([int(x) for x in hit_info.keys()])
for idx, line in enumerate(sorted_lines):
hit = hit_info[str(line)]
if hit == 0 and current_missing is None:
current_missing = line
elif hit != 0 and current_missing is not None:
end_line = sorted_lines[idx - 1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
current_missing = None
if current_missing is not None:
end_line = sorted_lines[-1]
if current_missing == end_line:
missing.append(str(current_missing))
else:
missing.append('%s-%s' % (current_missing, end_line))
file_report.append({'name': filename, 'stmts': stmts, 'miss': miss, 'cover': cover, 'missing': missing})
if total_stmts == 0:
return ''
total_percent = '{0}%'.format(int((total_stmts - total_miss) / total_stmts * 100))
stmts_padding = max(8, len(str(total_stmts)))
miss_padding = max(7, len(str(total_miss)))
line_length = name_padding + stmts_padding + miss_padding + cover_padding
header = 'Name'.ljust(name_padding) + 'Stmts'.rjust(stmts_padding) + 'Miss'.rjust(miss_padding) + \
'Cover'.rjust(cover_padding)
if args.show_missing:
header += 'Lines Missing'.rjust(16)
line_length += 16
line_break = '-' * line_length
lines = ['%s%s%s%s%s' % (f['name'].ljust(name_padding), str(f['stmts']).rjust(stmts_padding),
str(f['miss']).rjust(miss_padding), f['cover'].rjust(cover_padding),
' ' + ', '.join(f['missing']) if args.show_missing else '')
for f in file_report]
totals = 'TOTAL'.ljust(name_padding) + str(total_stmts).rjust(stmts_padding) + \
str(total_miss).rjust(miss_padding) + total_percent.rjust(cover_padding)
report = '{0}\n{1}\n{2}\n{1}\n{3}'.format(header, line_break, "\n".join(lines), totals)
return report
class CoverageReportConfig(CoverageCombineConfig):
"""Configuration for the coverage report command."""
def __init__(self, args): # type: (t.Any) -> None
super().__init__(args)
self.show_missing = args.show_missing # type: bool
self.include = args.include # type: str
self.omit = args.omit # type: str

View File

@@ -0,0 +1,190 @@
"""Generate XML code coverage reports."""
from __future__ import annotations
import os
import time
import typing as t
from xml.etree.ElementTree import (
Comment,
Element,
SubElement,
tostring,
)
from xml.dom import (
minidom,
)
from ...io import (
make_dirs,
read_json_file,
)
from ...util_common import (
ResultType,
write_text_test_results,
)
from ...util import (
get_ansible_version,
)
from ...data import (
data_context,
)
from ...provisioning import (
prepare_profiles,
)
from .combine import (
combine_coverage_files,
CoverageCombineConfig,
)
from . import (
run_coverage,
)
def command_coverage_xml(args): # type: (CoverageXmlConfig) -> None
"""Generate an XML coverage report."""
host_state = prepare_profiles(args) # coverage xml
output_files = combine_coverage_files(args, host_state)
for output_file in output_files:
xml_name = '%s.xml' % os.path.basename(output_file)
if output_file.endswith('-powershell'):
report = _generate_powershell_xml(output_file)
rough_string = tostring(report, 'utf-8')
reparsed = minidom.parseString(rough_string)
pretty = reparsed.toprettyxml(indent=' ')
write_text_test_results(ResultType.REPORTS, xml_name, pretty)
else:
xml_path = os.path.join(ResultType.REPORTS.path, xml_name)
make_dirs(ResultType.REPORTS.path)
run_coverage(args, host_state, output_file, 'xml', ['-i', '-o', xml_path])
def _generate_powershell_xml(coverage_file): # type: (str) -> Element
"""Generate a PowerShell coverage report XML element from the specified coverage file and return it."""
coverage_info = read_json_file(coverage_file)
content_root = data_context().content.root
is_ansible = data_context().content.is_ansible
packages = {}
for path, results in coverage_info.items():
filename = os.path.splitext(os.path.basename(path))[0]
if filename.startswith('Ansible.ModuleUtils'):
package = 'ansible.module_utils'
elif is_ansible:
package = 'ansible.modules'
else:
rel_path = path[len(content_root) + 1:]
plugin_type = "modules" if rel_path.startswith("plugins/modules") else "module_utils"
package = 'ansible_collections.%splugins.%s' % (data_context().content.collection.prefix, plugin_type)
if package not in packages:
packages[package] = {}
packages[package][path] = results
elem_coverage = Element('coverage')
elem_coverage.append(
Comment(' Generated by ansible-test from the Ansible project: https://www.ansible.com/ '))
elem_coverage.append(
Comment(' Based on https://raw.githubusercontent.com/cobertura/web/master/htdocs/xml/coverage-04.dtd '))
elem_sources = SubElement(elem_coverage, 'sources')
elem_source = SubElement(elem_sources, 'source')
elem_source.text = data_context().content.root
elem_packages = SubElement(elem_coverage, 'packages')
total_lines_hit = 0
total_line_count = 0
for package_name, package_data in packages.items():
lines_hit, line_count = _add_cobertura_package(elem_packages, package_name, package_data)
total_lines_hit += lines_hit
total_line_count += line_count
elem_coverage.attrib.update({
'branch-rate': '0',
'branches-covered': '0',
'branches-valid': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'lines-covered': str(total_line_count),
'lines-valid': str(total_lines_hit),
'timestamp': str(int(time.time())),
'version': get_ansible_version(),
})
return elem_coverage
def _add_cobertura_package(packages, package_name, package_data): # type: (SubElement, str, t.Dict[str, t.Dict[str, int]]) -> t.Tuple[int, int]
"""Add a package element to the given packages element."""
elem_package = SubElement(packages, 'package')
elem_classes = SubElement(elem_package, 'classes')
total_lines_hit = 0
total_line_count = 0
for path, results in package_data.items():
lines_hit = len([True for hits in results.values() if hits])
line_count = len(results)
total_lines_hit += lines_hit
total_line_count += line_count
elem_class = SubElement(elem_classes, 'class')
class_name = os.path.splitext(os.path.basename(path))[0]
if class_name.startswith("Ansible.ModuleUtils"):
class_name = class_name[20:]
content_root = data_context().content.root
filename = path
if filename.startswith(content_root):
filename = filename[len(content_root) + 1:]
elem_class.attrib.update({
'branch-rate': '0',
'complexity': '0',
'filename': filename,
'line-rate': str(round(lines_hit / line_count, 4)) if line_count else "0",
'name': class_name,
})
SubElement(elem_class, 'methods')
elem_lines = SubElement(elem_class, 'lines')
for number, hits in results.items():
elem_line = SubElement(elem_lines, 'line')
elem_line.attrib.update(
hits=str(hits),
number=str(number),
)
elem_package.attrib.update({
'branch-rate': '0',
'complexity': '0',
'line-rate': str(round(total_lines_hit / total_line_count, 4)) if total_line_count else "0",
'name': package_name,
})
return total_lines_hit, total_line_count
class CoverageXmlConfig(CoverageCombineConfig):
"""Configuration for the coverage xml command."""