214 lines
6.8 KiB
Python
214 lines
6.8 KiB
Python
# Copyright (C) 2022 Mohamed El Mouctar HAIDARA
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
import hashlib
|
|
import os
|
|
import uuid
|
|
from collections import defaultdict
|
|
from itertools import chain
|
|
from operator import methodcaller
|
|
from typing import Tuple, List, Dict, Any, Set
|
|
|
|
from ansible.errors import AnsibleError
|
|
from ansible.module_utils._text import to_text
|
|
from ansible.parsing.dataloader import DataLoader
|
|
from ansible.playbook.role_include import IncludeRole
|
|
from ansible.playbook.task import Task
|
|
from ansible.playbook.task_include import TaskInclude
|
|
from ansible.template import Templar
|
|
from ansible.utils.display import Display
|
|
from colour import Color
|
|
|
|
display = Display()
|
|
|
|
|
|
def convert_when_to_str(when: List) -> str:
|
|
"""
|
|
Convert ansible conditional when to str
|
|
:param when:
|
|
:return:
|
|
"""
|
|
if len(when) == 0:
|
|
return ""
|
|
|
|
# Convert each element in the list to str
|
|
when_to_str = list(map(str, when))
|
|
return f"[when: {' and '.join(when_to_str)}]".strip().replace("\n", "")
|
|
|
|
|
|
def hash_value(value: str) -> str:
|
|
"""
|
|
Convert name to md5 to avoid issues with special chars,
|
|
The ID are not visible to end user in web/rendered graph so we do
|
|
not have to care to make them look pretty.
|
|
There are chances for hash collisions, but we do not care for that
|
|
so much in here.
|
|
:param value: string which represents id
|
|
:return: string representing a hex hash
|
|
"""
|
|
|
|
m = hashlib.md5()
|
|
m.update(value.encode("utf-8"))
|
|
return m.hexdigest()[:8]
|
|
|
|
|
|
def generate_id(prefix: str = "") -> str:
|
|
"""
|
|
Generate an uuid to be used as id
|
|
:param prefix: Prefix to add to the generated ID
|
|
"""
|
|
return prefix + str(uuid.uuid4())[:8]
|
|
|
|
|
|
def clean_name(name: str):
|
|
"""
|
|
Clean a name for the node, edge...
|
|
Because every name we use is double quoted,
|
|
then we just have to convert double quotes to html special char
|
|
See https://www.graphviz.org/doc/info/lang.html on the bottom.
|
|
|
|
:param name: pretty name of the object
|
|
:return: string with double quotes converted to html special char
|
|
"""
|
|
return name.strip().replace('"', """)
|
|
|
|
|
|
def get_play_colors(play_id: str) -> Tuple[str, str]:
|
|
"""
|
|
Generate two colors (in hex) for a given play: the main color and the color to use as a font color
|
|
:param play_id
|
|
:return:
|
|
"""
|
|
picked_color = Color(pick_for=play_id, luminance=0.4)
|
|
play_font_color = "#ffffff"
|
|
|
|
return picked_color.get_hex_l(), play_font_color
|
|
|
|
|
|
def has_role_parent(task_block: Task) -> bool:
|
|
"""
|
|
Check if one of the parent of the task or block is a role
|
|
:param task_block:
|
|
:return:
|
|
"""
|
|
parent = task_block._parent
|
|
while parent:
|
|
if parent._role:
|
|
return True
|
|
parent = parent._parent
|
|
|
|
return False
|
|
|
|
|
|
def merge_dicts(dict_1: Dict[Any, List], dict_2: Dict[Any, List]) -> Dict[Any, List]:
|
|
"""
|
|
Merge two dicts by grouping keys and appending values in list
|
|
:param dict_1:
|
|
:param dict_2:
|
|
:return:
|
|
"""
|
|
final = defaultdict(list)
|
|
# iterate dict items
|
|
all_dict_items = map(methodcaller("items"), [dict_1, dict_2])
|
|
for k, v in chain.from_iterable(all_dict_items):
|
|
final[k].extend(v)
|
|
|
|
return final
|
|
|
|
|
|
def handle_include_path(
|
|
original_task: TaskInclude, loader: DataLoader, templar: Templar
|
|
) -> str:
|
|
"""
|
|
handle relative includes by walking up the list of parent include tasks
|
|
|
|
This function is widely inspired by the static method ansible uses when executing the playbook.
|
|
See :func:`~ansible.playbook.included_file.IncludedFile.process_include_results`
|
|
|
|
:param original_task:
|
|
:param loader:
|
|
:param templar:
|
|
:return:
|
|
"""
|
|
parent_include = original_task._parent
|
|
include_file = None
|
|
# task path or role name
|
|
include_param = original_task.args.get(
|
|
"_raw_params", original_task.args.get("name", None)
|
|
)
|
|
|
|
cumulative_path = None
|
|
while parent_include is not None:
|
|
if not isinstance(parent_include, TaskInclude):
|
|
parent_include = parent_include._parent
|
|
continue
|
|
if isinstance(parent_include, IncludeRole):
|
|
parent_include_dir = parent_include._role_path
|
|
else:
|
|
try:
|
|
parent_include_dir = os.path.dirname(
|
|
templar.template(parent_include.args.get("_raw_params"))
|
|
)
|
|
except AnsibleError as e:
|
|
parent_include_dir = ""
|
|
display.warning(
|
|
"Templating the path of the parent %s failed. The path to the "
|
|
"included file may not be found. "
|
|
"The error was: %s." % (original_task.action, to_text(e))
|
|
)
|
|
|
|
if cumulative_path is not None and not os.path.isabs(cumulative_path):
|
|
cumulative_path = os.path.join(parent_include_dir, cumulative_path)
|
|
else:
|
|
cumulative_path = parent_include_dir
|
|
include_target = templar.template(include_param)
|
|
if original_task._role:
|
|
new_basedir = os.path.join(
|
|
original_task._role._role_path, "tasks", cumulative_path
|
|
)
|
|
candidates = [
|
|
loader.path_dwim_relative(
|
|
original_task._role._role_path, "tasks", include_target
|
|
),
|
|
loader.path_dwim_relative(new_basedir, "tasks", include_target),
|
|
]
|
|
for include_file in candidates:
|
|
try:
|
|
# may throw OSError
|
|
os.stat(include_file)
|
|
# or select the task file if it exists
|
|
break
|
|
except OSError:
|
|
pass
|
|
else:
|
|
include_file = loader.path_dwim_relative(
|
|
loader.get_basedir(), cumulative_path, include_target
|
|
)
|
|
|
|
if os.path.exists(include_file):
|
|
break
|
|
else:
|
|
parent_include = parent_include._parent
|
|
|
|
if include_file is None:
|
|
if original_task._role:
|
|
include_target = templar.template(include_param)
|
|
include_file = loader.path_dwim_relative(
|
|
original_task._role._role_path, "tasks", include_target
|
|
)
|
|
else:
|
|
include_file = loader.path_dwim(templar.template(include_param))
|
|
|
|
return include_file
|