import logging
import os
import re
import time
import pandas as pd
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from concurrent.futures import ThreadPoolExecutor
from pandas import json_normalize
from idstools.view.common import Terminal
logger = logging.getLogger("module")
yaml_mapping = {
"reference_name": "ref_name",
"responsible_name": "ro_name",
"characteristics.shot": "pulse",
"characteristics.run": "run",
"characteristics.type": "type",
"characteristics.workflow": "workflow",
"characteristics.machine": "database",
"scenario_key_parameters.confinement_regime": "confinement",
"scenario_key_parameters.plasma_current": "ip",
"scenario_key_parameters.magnetic_field": "b0",
"scenario_key_parameters.main_species": "fuelling",
"scenario_key_parameters.central_electron_density": "ne0",
"scenario_key_parameters.sepmid_electron_density": "nesep",
"scenario_key_parameters.central_zeff": "zeff",
"scenario_key_parameters.sepmid_zeff": "zeff_sep",
"scenario_key_parameters.density_peaking": "npeak",
"hcd.p_hcd": "p_hcd",
"hcd.p_ec": "p_ec",
"hcd.p_ic": "p_ic",
"hcd.p_nbi": "p_nbi",
"hcd.p_lh": "p_lh",
"hcd.p_sol": "p_sol",
"free_description": "extra",
"ids_list": "idslist",
"tsteps": "tsteps",
"location": "location",
"plasma_composition.species": "species",
"plasma_composition.n_over_e": "pc_n_over_ne",
"plasma_composition.a": "pc_a",
"plasma_composition.z": "pc_z",
"plasma_composition.n_over_ntot": "pc_n_over_ntot",
"plasma_composition.n_over_n_maj": "pc_n_over_n_maj",
"lastmodified": "date",
}
# Class is a base class for scenario descriptions.
[docs]class ScenarioDescriptionSummary:
def __init__(self, directory_list=[]) -> None:
"""
The function initializes a directory list variable based on the provided input or a default value.
Args:
directory_list (list): A list of directory paths to search for scenario files.
"""
self.directory_list = directory_list
[docs] @staticmethod
def get_yaml_data(yaml_file_path):
"""
The function `get_yaml_data` reads a YAML file and returns its contents as a Python object.
Args:
yaml_file_path: The `yaml_file_path` parameter is a string that represents the file path of the YAML
file that you want to load and retrieve data from.
Returns:
the data loaded from the YAML file.
"""
with open(yaml_file_path, "r", encoding="utf-8") as file_handle:
try:
yaml_data = yaml.load(file_handle, Loader=Loader)
except Exception as e:
logger.warning(f"Could not read yaml file: {yaml_file_path} {e}")
yaml_data = None
return yaml_data
[docs] @staticmethod
def get_data_frame_from_yaml(yaml_file_path, add_obsolete=False):
"""
The function `get_data_frame_from_yaml` takes a YAML file path, reads the data from the file, checks if
the status is active (unless `addObsolete` is set to True), converts the data into a flat table, and
returns it as a pandas DataFrame.
Args:
yaml_file_path: The path to the YAML file from which you want to create a DataFrame.
add_obsolete: The add_obsolete parameter is a boolean flag that determines whether or not to include
obsolete data in the resulting DataFrame.
Returns:
a pandas DataFrame object.
"""
yaml_data = ScenarioDescriptionSummary.get_yaml_data(yaml_file_path)
if yaml_data is None:
return None
if add_obsolete is False:
if yaml_data["status"] != "active":
return None
flat_table = json_normalize(yaml_data)
data_frame = pd.DataFrame(flat_table)
return data_frame
[docs] def get_dataframes_from_files(self, extension=".yaml", add_obsolete=False):
"""
The function `get_dataframes_from_files` retrieves data from YAML files, creates dataframes, adds additional
information, and returns a concatenated dataframe.
Args:
extension: The "extension" parameter is a string that specifies the file extension to search for.
add_obsolete: The "add_obsolete" parameter is a boolean flag that determines whether or not to
include obsolete data in the resulting dataframes.
Returns:
a pandas DataFrame object.
"""
files = []
for folder_path in self.directory_list:
for root, _, filenames in os.walk(folder_path):
for filename in filenames:
if filename.endswith(extension):
files.append(os.path.join(root, filename))
if extension == ".yaml":
data_frames = []
append_df = data_frames.append
def process_yaml_file(yaml_file):
df = ScenarioDescriptionSummary.get_data_frame_from_yaml(yaml_file, add_obsolete=add_obsolete)
yaml_file = os.path.abspath(yaml_file)
if df is not None:
df["dd_version"] = ""
if "ITER/3/0" in yaml_file or "iterdb/3/0" in yaml_file:
df["dd_version"] = "3"
elif "ITER/4/" in yaml_file or "iterdb/4/" in yaml_file:
df["dd_version"] = "4"
df["location"] = yaml_file
local_time = time.ctime(os.path.getmtime(yaml_file))
df["lastmodified"] = pd.to_datetime(local_time)
self._extract_information(df)
return df
return None
with ThreadPoolExecutor() as executor:
results = executor.map(process_yaml_file, files)
for result in results:
if result is not None:
append_df(result)
df = pd.concat(data_frames, ignore_index=True)
df = df.rename(columns=yaml_mapping)
return df
def _extract_information(self, df):
"""
The function `_extract_information` extracts information from a DataFrame and adds new columns based
on the extracted data.
Args:
df: The parameter `df` is a pandas DataFrame object.
"""
if "idslist.summary.time_step_number" in df.columns:
df["tsteps"] = df["idslist.summary.time_step_number"]
idslist = set([x.split(".")[1] for x in df.columns if "idslist" in x])
df["idslist"] = ",".join(idslist)
species = n_over_ne = None
if "plasma_composition.species" in df.columns:
species = str(df["plasma_composition.species"][0])
if "plasma_composition.n_over_ne" in df.columns:
n_over_ne = str(df["plasma_composition.n_over_ne"][0])
if species is not None and n_over_ne is not None:
species = species.split()
n_over_ne = n_over_ne.split()
species_dict = {k: v for k, v in zip(species, n_over_ne)}
sorted_dict = dict(sorted(species_dict.items(), key=lambda item: float(item[1]), reverse=True))
df["composition"] = ",".join([f"{key}({value})" for key, value in sorted_dict.items()])
else:
df["composition"] = "None"
# The class ScenarioDescription
[docs]class ScenarioDescription:
def __init__(self, pulse: int, run: int, yaml_path: str) -> None:
"""
The above function initializes an object with a pulse, run, and yaml path, and attempts to load
YAML data from a file.
Args:
pulse (int): The "pulse" parameter is an integer that represents a pulse number.
run (int): The `run` parameter is an integer that represents the run number.
yaml_path (str): The `yaml_path` parameter is a string that represents the path to the YAML file.
"""
self.yaml_path = yaml_path
self.yaml_data = None
try:
with open(self.yaml_path, "r") as f:
self.yaml_data = yaml.safe_load(f)
except Exception as e:
logger.debug(f"{e}")
logger.critical(f"{e}")
[docs] def get_children(self, yaml_data, dict_to_fill={}):
"""
The function `get_children` recursively retrieves data from a YAML file and populates a dictionary
with specific keys and values.
Args:
yaml_data: The `yaml_data` parameter is a dictionary that contains data in YAML format.
dict_to_fill: The `dict_to_fill` parameter is a dictionary that is used to store the values extracted
from the `yaml data` . It is initially an empty dictionary and is passed as an argument to the
`get_children` function.
Returns:
the dictionary with scenario children.
"""
if yaml_data is None:
return dict_to_fill
replaced_by = None
if "database_relations" in yaml_data.keys():
if "replaced_by" in yaml_data["database_relations"].keys():
replaced_by = yaml_data["database_relations"]["replaced_by"]
if "pulse" not in dict_to_fill.keys():
dict_to_fill["pulse"] = []
if "run" not in dict_to_fill.keys():
dict_to_fill["run"] = []
if "status" not in dict_to_fill.keys():
dict_to_fill["status"] = []
if "comment" not in dict_to_fill.keys():
dict_to_fill["comment"] = []
if replaced_by is not None:
string_list = re.findall(r"\d+", replaced_by)
pulsec = string_list[0]
runc = string_list[1]
parent_dir = os.path.dirname(self.yaml_path)
if os.path.basename(parent_dir) == "0":
yaml_file_name = parent_dir + f'/ids_{pulsec}{str(runc).rjust(4, "0")}.yaml'
else:
grandparent_dir = os.path.dirname(os.path.dirname(parent_dir))
yaml_file_name = grandparent_dir + f'/{pulsec}/{runc}/ids_{pulsec}{str(runc).rjust(4, "0")}.yaml'
scenario_description = ScenarioDescription(pulsec, runc, yaml_file_name)
if scenario_description.yaml_data is not None:
dict_to_fill["pulse"].append(pulsec)
dict_to_fill["run"].append(runc)
dict_to_fill["status"].append(scenario_description.yaml_data["status"])
dict_to_fill["comment"].append(scenario_description.yaml_data["database_relations"]["replaces"])
dict_to_fill = self.get_children(scenario_description.yaml_data, dict_to_fill)
return dict_to_fill
[docs] def get_parents(self, yaml_data, dict_to_fill={}):
"""
The function `get_parents` retrieves parent data from a YAML file and populates a dictionary with the
parent information.
Args:
yaml_data: The `yaml_data` parameter is a dictionary that contains data in YAML format.
dict_to_fill: The `dict_to_fill` parameter is a dictionary that is used to store the parents information.
It is initially empty and is filled with parent data as the function recursively calls itself.
Returns:
the dictionary with scenario parents
"""
if yaml_data is None:
return dict_to_fill
replaces = None
if "database_relations" in yaml_data.keys():
if "replaces" in yaml_data["database_relations"].keys():
replaces = yaml_data["database_relations"]["replaces"]
if "pulse" not in dict_to_fill.keys():
dict_to_fill["pulse"] = []
if "run" not in dict_to_fill.keys():
dict_to_fill["run"] = []
if "status" not in dict_to_fill.keys():
dict_to_fill["status"] = []
if "comment" not in dict_to_fill.keys():
dict_to_fill["comment"] = []
if replaces is not None:
string_list = re.findall(r"\d+", replaces)
pulsep = string_list[0]
runp = string_list[1]
parent_dir = os.path.dirname(self.yaml_path)
if os.path.basename(parent_dir) == "0":
yaml_file_name = parent_dir + f'/ids_{pulsep}{str(runp).rjust(4, "0")}.yaml'
else:
grandparent_dir = os.path.dirname(os.path.dirname(parent_dir))
yaml_file_name = grandparent_dir + f'/{pulsep}/{runp}/ids_{pulsep}{str(runp).rjust(4, "0")}.yaml'
scenario_description = ScenarioDescription(pulsep, runp, yaml_file_name)
if scenario_description.yaml_data is not None:
dict_to_fill["pulse"].insert(0, pulsep) # Order to be reversed for parents
dict_to_fill["run"].insert(0, runp)
dict_to_fill["status"].insert(0, scenario_description.yaml_data["status"])
dict_to_fill["comment"].insert(0, scenario_description.yaml_data["database_relations"]["replaces"])
dict_to_fill = self.get_parents(scenario_description.yaml_data, dict_to_fill)
return dict_to_fill
[docs] def get_family(self):
"""
The function "get_family" returns a dictionary containing the parents and children of a scenario based
on the provided YAML data.
Returns:
a dictionary called `family_dict` which contains two keys: "parents" and "children". The values
associated with these keys are the results of calling the `get_parents` and `get_children` methods,
passing in `yaml data` as argument.
"""
family_dict = {}
family_dict["parents"] = self.get_parents(self.yaml_data, {})
family_dict["children"] = self.get_children(self.yaml_data, {})
return family_dict
[docs] def print_yaml(self):
"""
The function `print_yaml` prints the `yaml_data` attribute of the object on Terminal.
"""
terminal = Terminal()
terminal.print(self.yaml_data)
if __name__ == "__main__":
default_folder_path = r"/work/imas/shared/imasdb/ITER/3/0"
scenario_description_obj = ScenarioDescriptionSummary(folder_path=default_folder_path)
df = scenario_description_obj.get_dataframes_from_files(extension=".yaml", add_obsolete=False)