Source code for idstools.utils.idshelper

"""
This module

"""

import difflib
import inspect
import logging
import re
import sys
import time
import types
from typing import Union

import numpy as np
import pandas as pd
import rich
import scipy

try:
    import imaspy as imas
except ImportError:
    import imas
from packaging import version
from rich.table import Table
from rich.text import Text

logger = logging.getLogger("module")
ARRAY_EQUAL_KWARGS = "equal_nan=True" if version.parse(np.__version__) > version.parse("1.19") else ""


[docs]def parse_uri(uri: str): result = {} splitted_ids_info = uri.split("#") uri_part = splitted_ids_info[0] ids_name = "" ids_path = None occurrence = None if len(splitted_ids_info) == 2: ids_fragment = splitted_ids_info[1] splitted_ids_fragment = ids_fragment.split("/", 1) if ":" in splitted_ids_fragment[0]: splitted_ids_fragment = ids_fragment.split(":", 1) ids_name = splitted_ids_fragment[0] if len(splitted_ids_fragment) == 2: ids_path_fragment = splitted_ids_fragment[1] splitted_ids_path_fragment = ids_path_fragment.split("/", 1) occurrence = int(splitted_ids_path_fragment[0]) if len(splitted_ids_path_fragment) == 2: ids_path = splitted_ids_path_fragment[1] else: ids_name = splitted_ids_fragment[0] if len(splitted_ids_fragment) == 2: ids_path = splitted_ids_fragment[1] result["uri_part"] = uri_part result["occurrence"] = occurrence result["ids_name"] = ids_name result["ids_path"] = ids_path return result
[docs]def parse_slice_from_string(input_string): match = re.search(r"[\[\(]([-\d]*):([-\d]*):?([-\d]*)[\]\)]", input_string) start = end = step = None if match: start_str, end_str, step_str = match.groups() start = int(start_str) if start_str else None end = int(end_str) if end_str else None step = int(step_str) if step_str else None return slice(start, end, step)
[docs]def get_length_of_partial_field(ids, ids_path): partial_field = ids_path match = re.match(r"^(.*)\[t\]\.(.*)", ids_path) if match: partial_field = match.group(1) try: _inner_data = eval("ids." + partial_field) coordinate_partial = None coordinate_unit = "" if isinstance(_inner_data, imas.ids_primitive.IDSPrimitive) or isinstance( _inner_data, imas.ids_struct_array.IDSStructArray ): coordinate_partial = _inner_data.coordinates[0] if isinstance(coordinate_partial, imas.ids_primitive.IDSPrimitive): coordinate_unit = coordinate_partial.metadata.units return coordinate_partial, coordinate_unit except Exception as e: logger.error( f"{partial_field} path/value does not exist, hint: please check " f"length of an array, detailed error : {e}" ) return None
[docs]def partial_get(ids, ids_path, custom_coordinate=None): slice_object = parse_slice_from_string(ids_path) ids_path_for_eval = re.sub(r"[\[\(][^()\[\]]*:[^()\[\]]*[\]\)]", "(t)", ids_path) ids_path_for_eval = ids_path_for_eval.replace("(", "[").replace(")", "]").replace("/", ".") coordinate_partial, coordinate_unit = get_length_of_partial_field(ids, ids_path_for_eval) data = np.array([]).reshape( 0, ) array_data = [] start = slice_object.start if slice_object.start is not None else 0 stop = slice_object.stop if slice_object.stop is not None else len(coordinate_partial) step = slice_object.step if slice_object.step is not None else 1 data_flag = True data_unit = "" coordinate = coordinate_partial for t in range(start, stop, step): try: _inner_data = eval("ids." + ids_path_for_eval) if data_flag: data_flag = False if isinstance(_inner_data, imas.ids_primitive.IDSPrimitive): data_unit = _inner_data.metadata.units if custom_coordinate and custom_coordinate.sdigit(): _coordinate = _inner_data.coordinates[custom_coordinate] if isinstance(_coordinate, imas.ids_primitive.IDSPrimitive): if _coordinate.has_value is True: coordinate = _coordinate elif custom_coordinate and isinstance(custom_coordinate, str): _coordinate = eval("ids." + custom_coordinate) if isinstance(_coordinate, imas.ids_primitive.IDSPrimitive): if _coordinate.has_value is True: coordinate = _coordinate else: for _coordinate in _inner_data.coordinates: if isinstance(_coordinate, imas.ids_primitive.IDSPrimitive): if _coordinate.has_value is True: coordinate_unit = _coordinate.metadata.units coordinate = _coordinate break else: continue else: coordinate = _coordinate coordinate_unit = "Indices" except Exception as e: logger.error( f"{ids_path} path/value does not exist, hint: please check length of arrays, detailed error : {e}" ) return data, coordinate, data_unit, coordinate_unit if isinstance(_inner_data, (imas.ids_structure.IDSStructure, imas.ids_struct_array.IDSStructArray)): array_data.append(_inner_data) elif isinstance(_inner_data, imas.ids_primitive.IDSString0D): array_data.append(_inner_data.value) else: if len(_inner_data.shape) == 0: data = np.append(data, _inner_data) elif len(_inner_data.shape) == 1: if data.size == 0: data = _inner_data else: data = np.vstack((data, _inner_data)) if len(array_data) == 0: data = np.array(data) else: data = np.array(array_data) # if len(data) != len(coordinate): # coordinate=None return data, coordinate, data_unit, coordinate_unit
[docs]def is_ids_field(idstype: type) -> bool: """ This function checks if a given type is a possible field of an IDS. Args: idstype (type): The type of an attribute from an IDS or a substructure of an IDS. Returns: The function isIdsField returns a boolean value indicating whether the passed type is a possible field of an IDS or not. """ return ( idstype != types.MethodType and idstype != types.FunctionType and "Logger" not in str(idstype) and "HLIUtils" not in str(idstype) )
[docs]def get_ids_attributes(idsobj: object) -> list: """ This function returns a list of attribute names for a given IDS object. Args: idsobj (object): The IDS or substructure object for which the function will return a list of attribute names. Returns: The function `get_ids_attributes` returns a list of attribute names for the given IDS object which are not private and are ids fields. """ if "imas" in str(type(idsobj)): return [a[0] for a in inspect.getmembers(idsobj) if not a[0].startswith("_") and is_ids_field(type(a[1]))] else: return []
[docs]def get_ids_size(db_entry_object, ids_names=None, dd_update=False, ignore_empty=False) -> dict: """ The function `get_ids_size` retrieves the size of IDS objects from a database entry and returns a dictionary containing the size in bytes and the time taken to read each object. Args: db_entry_object: The `db_entry_object` parameter is used to access the data in the IMAS database. ids_names: idsNames is a list of IDS names. If it is not provided, it defaults to None. Returns: a dictionary containing information about the size and time taken to read IDS objects from a database entry. The dictionary has the following structure: """ if ids_names is None: factory = imas.ids_factory.IDSFactory() ids_names = factory.ids_names() ids_size_dict = {} for ids_name in ids_names: occurrence_list = db_entry_object.list_all_occurrences(ids_name) if len(occurrence_list) == 0: continue occurrences_count = max(occurrence_list) for o in range(occurrences_count + 1): try: if dd_update: ids_object = imas.convert_ids( db_entry_object.get(ids_name, occurrence=o, autoconvert=False), db_entry_object.factory.version ) else: ids_object = db_entry_object.get(ids_name, occurrence=o, autoconvert=False) homogeneous_time = ids_object.ids_properties.homogeneous_time if homogeneous_time >= 0: field = f"{ids_name}/{o}" ids_size_dict[field] = {} start_time = time.time() ids_size_dict[field]["bytes"] = get_object_size(ids_object, ignore_empty) ids_size_dict[field]["time"] = time.time() - start_time print( "Reading %0.3f MB of data for %s took %0.3f seconds" % ( ids_size_dict[field]["bytes"] / 1024**2, field, ids_size_dict[field]["time"], ) ) del ids_object except Exception as e: logger.warning(f"Failed to retrieve IDS '{ids_name}' with occurrence {o}: {str(e)}. Skipping this IDS.") return ids_size_dict
[docs]def get_all_ids_size(db_entry_object): """ The function `get_all_ids_size` calculates the total size in bytes of all IDS in a given `db_entry_object`. Args: db_entry_object : The parameter `db_entry_object` is of type . Returns: the total size in bytes of all the IDS in the given `db_entry_object`. """ ids_size_dict = get_ids_size(db_entry_object) total_bytes = np.array([ids["bytes"] for ids in ids_size_dict.values()]).sum() return total_bytes
[docs]def get_all_ids_get_time(db_entry_object): """ The function `get_all_ids_get_time` calculates the total time for all IDS in a given `db_entry_object`. Args: db_entry_object : The parameter `db_entry_object` is of type . Returns: the total time to get all the IDSes in the given `db_entry_object`. """ ids_size_dict = get_ids_size(db_entry_object) return np.array([ids["time"] for ids in ids_size_dict.values()]).sum()
[docs]def get_object_size(obj: object, ignore_empty=False) -> int: object_size = 0 if ( isinstance(obj, imas.ids_primitive.IDSInt0D) or isinstance(obj, imas.ids_primitive.IDSString0D) or isinstance(obj, imas.ids_primitive.IDSComplex0D) or isinstance(obj, imas.ids_primitive.IDSFloat0D) or isinstance(obj, imas.ids_primitive.IDSNumericArray) or isinstance(obj, imas.ids_primitive.IDSPrimitive) or isinstance(obj, imas.ids_primitive.IDSString1D) ): if ignore_empty and obj.has_value is False: return object_size elif isinstance(obj.value, str): object_size += len(obj.value.encode("utf-8")) elif isinstance(obj.value, np.ndarray): object_size += obj.value.nbytes elif isinstance(obj.value, int): object_size += 4 elif isinstance(obj.value, float): object_size += 8 elif isinstance(obj.value, complex): object_size += 16 elif isinstance(obj.value, list): for obj_item in obj: object_size += get_object_size(obj_item, ignore_empty) else: object_size += sys.getsizeof(obj) print(f"Unkonwn {type(obj.value)} getting size with getsizeof -> {obj}") elif isinstance(obj, imas.ids_struct_array.IDSStructArray): for obj_item in obj: object_size += get_object_size(obj_item, ignore_empty) elif isinstance(obj, imas.ids_structure.IDSStructure): for obj_value in obj: object_size += get_object_size(obj_value, ignore_empty) else: object_size += sys.getsizeof(obj) print(f"Unkonwn {type(obj)} getting size with getsizeof -> {obj}") return object_size
[docs]def get_ids_types(): """ This function returns list of strings corresponding to all ids types for each IDSName object in the imas module. Returns: The function `get_ids_types()` is returning a list of values of all the `value` attributes of the `IDSName` objects in the `imas` module. """ factory = imas.ids_factory.IDSFactory() return factory.ids_names()
[docs]def get_available_ids_and_occurrences( db_entry_object, time_mode=None, get_comment=False, get_version=False, ): """ This function returns a list of pairs of available IDS types and their occurrences in a given DBEntry object. Args: db_entry_object: An object of the class DBEntry, which represents an open DBEntry in which available IDSs will be looked for. time_mode: The time mode of interest for the IDSs in the given DBEntry. get_comment: Output ids_properties.comment field for each found occurrence get_version: Whether to return version information Returns: a list of pairs (idstype:str,occurrence:int) with data in the given DBEntry. """ occ_type_dict = { 1: "reconstruction", 2: "prediction_fixed", 3: "prediction_free", 4: "mapping", } availableidslist = [] for idstype in get_ids_types(): occurrence_list = db_entry_object.list_all_occurrences(idstype) for occ in occurrence_list: homogeneous_time = "" comment = "" occ_type = "" try: ids_object = db_entry_object.get( idstype, occurrence=occ, lazy=True, autoconvert=False, ignore_unknown_dd_version=True ) dd_version = ids_object.ids_properties.version_put.data_dictionary.value homogeneous_time = ids_object.ids_properties.homogeneous_time comment = ids_object.ids_properties.comment occ_type_text = "" if hasattr(ids_object.ids_properties, "occurrence_type"): occ_type = ids_object.ids_properties.occurrence_type if occ_type.index != imas.ids_defs.EMPTY_INT: if occ_type.index.value in occ_type_dict.keys(): occ_type_text = occ_type_dict[occ_type.index.value] comment += f" [occurrence type = {occ_type_text}]" if homogeneous_time != imas.ids_defs.EMPTY_INT and (time_mode is None or time_mode == homogeneous_time): if get_comment is True: availableidslist.append((idstype, occ, comment)) elif get_version is True: availableidslist.append((idstype, occ, dd_version)) elif get_comment is True and get_version is True: availableidslist.append(idstype, occ, comment, dd_version) else: availableidslist.append((idstype, occ)) except Exception as e: logger.warning( f"Failed to retrieve IDS '{idstype}' with occurrence {occ}: {str(e)}. Skipping this IDS." ) return availableidslist
[docs]def get_available_ids_and_times(db_entry_object) -> list: """ The function `get_available_ids_and_times` retrieves available IDS names and corresponding time arrays from a given `db_entry_object`. Args: db_entry_object: The `db_entry_object` parameter. Returns: a list of tuples. Each tuple contains an IDS name and a corresponding time array. """ result = [] for _ids_name in get_ids_types(): occurrence_list = db_entry_object.list_all_occurrences(_ids_name) if len(occurrence_list) == 0: continue for occurrence in occurrence_list: time_array = None try: ids_object = db_entry_object.get( _ids_name, occurrence=occurrence, lazy=True, autoconvert=False, ignore_unknown_dd_version=True ) homogeneous_time = ids_object.ids_properties.homogeneous_time if homogeneous_time == imas.ids_defs.IDS_TIME_MODE_UNKNOWN: time_array = [] if homogeneous_time == imas.ids_defs.IDS_TIME_MODE_HETEROGENEOUS: time_array = [np.NaN] if homogeneous_time == imas.ids_defs.IDS_TIME_MODE_HOMOGENEOUS: if getattr(ids_object, "time", None): time_array = ids_object.time.value if homogeneous_time == imas.ids_defs.IDS_TIME_MODE_INDEPENDENT: time_array = [-np.inf] except Exception as e: logger.debug(f"{e}") time_array = [] logger.info(f"ERROR! IDS {_ids_name} : Reading time array fails due to following problem : {e}") if occurrence != 0: result.append((f"{_ids_name}/{occurrence}", time_array)) else: result.append((_ids_name, time_array)) return result
[docs]def resample_indices( dbin: str, dbout: str, idsname: str, occurrence=0, start: int = 0, stop: int = None, step: int = 1, interpolation_method=imas.ids_defs.PREVIOUS_INTERP, ): """ The function resample_indices takes in a database input, database output, and an idsname, and resamples the data based on the specified start, stop, and step values. Args: dbin (str): The parameter "dbin" is a string that represents the input database name. It is the database from which the data will be read. dbout (str): The parameter `dbout` is a string that represents the name of the output database. It is the database where the resampled data will be stored. idsname (str): The parameter "idsname" is a string that represents the ids that you want to resample. start (int): The start parameter is the index of the first time value to be resampled. stop (int): The `stop` parameter is used to specify the index at which the resampling should stop. If `stop` is not provided, the resampling will continue until the end of the `times` array. step (int): The `step` parameter determines the interval between the indices that are selected from the `times` array. For example, if `step` is set to 2, every second index will be selected. If `step` is set to 3, every third index will be selected, and so. Defaults to 1 """ idsobj = None try: idsobj = dbin.get(idsname, lazy=True, autoconvert=False) times = idsobj.time except Exception as e: # noqa: F841 logger.error(f"Error occurred while resampling data for {idsname} in the input database. {e}") if idsobj: if stop is not None and stop >= len(times): stop = len(times) if start is not None and start >= len(times): start = 0 idsobj = dbin.get_sample( idsname, tmin=start, tmax=stop, dtime=times[start:stop:step], interpolation_method=interpolation_method, occurrence=occurrence, autoconvert=False, ) dbout.put(idsobj, occurrence=occurrence)
[docs]def resample_times( dbin: object, dbout: object, idsname: str, occurrence=0, start: float = None, stop: float = None, step: float = None, interpolation_method=imas.ids_defs.PREVIOUS_INTERP, ): """ Resamples time-dependent data from an input database and stores it in an output database. Parameters: dbin (object): The input database object from which data is retrieved. dbout (object): The output database object where resampled data is stored. idsname (str): The name of the IDS (Integrated Data Structure) to be resampled. occurrence (int, optional): The occurrence index of the IDS. Defaults to 0. start (float, optional): The start time for resampling. Defaults to None. stop (float, optional): The stop time for resampling. Defaults to None. step (float, optional): The time step for resampling. Defaults to None. interpolation_method (int, optional): The interpolation method to use for resampling. Defaults to `imas.ids_defs.PREVIOUS_INTERP`. Returns: None: The function does not return a value. The resampled data is stored in the output database. Raises: Exception: If an error occurs during data retrieval from the input database, it is caught and ignored. """ idsobj = None idsobj = dbin.get_sample( idsname, tmin=start, tmax=stop, dtime=step, interpolation_method=interpolation_method, occurrence=occurrence, autoconvert=False, ) dbout.put(idsobj, occurrence=occurrence)
[docs]def compare_ids( x, y, field=None, ignore_version=True, verb=True, name_x="first", name_y="second", output={}, ): """ The function compares two ids objects and returns whether they are identical or not, along with a dictionary of differences. Args: x: The first input ids object to compare. y: The second input ids object to compare. field: The name of the field being compared in the IDSes. ignore_version: A boolean parameter that determines whether to ignore the "version_put" attribute when comparing the two objects. If set to True, the function will ignore this attribute. Defaults to True verb: a boolean indicating whether to print log messages during the comparison process. Defaults to True output: A dictionary that stores the output of the function, which includes information about any differences found between the two input objects. Returns: tuple containing a boolean value indicating whether the two input objects are identical, and a dictionary containing information about any differences found during the comparison. """ identical = True if hasattr(x, "__name__") and hasattr(y, "__name__"): if x.__name__ == y.__name__: if field is None: field = x.__name__ logger.debug("Has __name__ in IDSes :" + x.__name__) else: if verb: logger.error(f"Different IDSs: {x.__name__} and {y.__name__}") return False elif hasattr(x, "_base_path") and hasattr(y, "_base_path"): if x._base_path == y._base_path: if field is None: field = x._base_path logger.debug("Has _base_path in IDSes :" + x._base_path) else: if verb: logger.error(f"Different structure: {x._base_path} and {y._base_path}") return False else: # un-expected different objects logger.error(f"Unexpected objects: {type(x)} and {type(y)}") return False xd = x.__dict__ yd = y.__dict__ for key in set(xd.keys()).union(set(yd.keys())): if key.startswith("_"): continue if "hli_utils" == key: continue if ignore_version and "version_put" == key: continue if key not in xd: if field + "." + key not in output.keys(): output[field + "." + key] = ( field + "." + key, field + "." + key, f"not present in {name_x} ids", ) else: logger.error("Duplicate key found") if verb: logger.info(f"{key} not present in X") identical = False continue if key not in yd: if field + "." + key not in output.keys(): output[field + "." + key] = ( field + "." + key, field + "." + key, f"not present in {name_y} ids", ) else: logger.error("Duplicate key found") if verb: logger.info(f"{key} not present in Y") identical = False continue xo = x.__dict__[key] yo = y.__dict__[key] if not isinstance(xo, type(yo)): if field + "." + key not in output.keys(): output[field + "." + key] = ( xo, yo, None, f"different type {name_x} type(Xo), {name_y} type(Yo) ", ) else: logger.error("Duplicate key found") if verb: logger.warning(f"Different type for {field}.{key}") if hasattr(xo, "__module__") and "imas" in xo.__module__: # TO DO: To be removed, when private _base_path will be replaced by __name__ if hasattr(xo, "__name__"): attrname = xo.__name__ else: attrname = xo._base_path identical_result, output = compare_ids( xo, yo, field=f"{field}.{attrname}", ignore_version=ignore_version, verb=verb, name_x=name_x, name_y=name_y, output=output, ) identical &= identical_result continue # treatment of struct_array and list of strings if type(xo).__name__ == "list": data_type = list if len(xo) != len(yo): # avoids printing "array" as this is internal attribute for AoS if key == "array": f = field else: f = f"{field}.{key}" if f not in output.keys(): output[f] = (xo, yo, data_type, "different length") else: logger.error("Duplicate key found") if verb: logger.info(f"{f} is of different length") identical = False else: for i in range(len(xo)): if "structArrayElement" in type(xo[i]).__name__: identical_result, output = compare_ids( xo[i], yo[i], field=f"{field}[{i}]", ignore_version=ignore_version, verb=verb, name_x=name_x, name_y=name_y, output=output, ) identical &= identical_result else: # print("list of "+type(xo[i]).__name__) continue else: # Check equalities of arrays first as numpy array if isinstance(xo, np.ndarray) and isinstance(yo, np.ndarray): result = np.array_equal(xo, yo, ARRAY_EQUAL_KWARGS) # output[field + "." + key]= (Xo, Yo, "equal") # and second as list else: result = xo == yo # output[field + "." + key]= (Xo, Yo, "equal") if not result: data_type = None missing = [False] if isinstance(xo, np.ndarray): data_type = np.ndarray if xo.size == 0: missing = [True, name_x] elif yo.size == 0: missing = [True, name_y] else: missmap = {int: -999999999, float: -9e40} for t in missmap: if isinstance(xo, t): data_type = t if xo == missmap[t]: missing = [True, name_x] elif yo == missmap[t]: missing = [True, name_y] if missing[0]: if field + "." + key not in output.keys(): output[field + "." + key] = ( xo, yo, data_type, f"missing in {missing[1]}", ) else: logger.error("Duplicate key found") if verb: logger.info(f"{field}.{key} is missing in {missing[1]}") identical = False else: if field + "." + key not in output.keys(): output[field + "." + key] = ( xo, yo, data_type, "different values", ) else: logger.error("Duplicate key found") if verb: logger.info(f"{field}.{key} has different values") identical = False return identical, output
[docs]def get_ids_values(uri: str, idspaths: Union[str, list], dd_update=False, verbose=False): connection = imas.DBEntry(uri, "r") if isinstance(idspaths, str): idspaths = [idspaths] output = {} # Process each IDS path for this pulse for full_path, idsname, valpath in idspaths: try: output[full_path] = None if dd_update: ids = imas.convert_ids(connection.get(idsname, autoconvert=False), connection.factory.version) else: ids = connection.get(idsname, autoconvert=False, lazy=True) if ":" in valpath: node, _, _, _ = partial_get(ids, valpath) if node.size == 0: node = None else: node = eval("ids." + valpath) if isinstance(node, imas.ids_primitive.IDSPrimitive) and not node.has_value: node = None elif node.size == 0: node = None if node is not None: output[full_path] = node except Exception as e: if verbose: logger.error(f"Exception for {full_path}: {e}", exc_info=True) connection.close() return output
[docs]def execute_query( query: str, ids_values: dict, ): query_names = {} qcounter = 1 are_values_present = True output = None for _, ids_value in ids_values.items(): if ids_value is None: are_values_present = False break _value = ids_value if isinstance(_value, str): _value = f"'{ids_value}'" elif isinstance(_value, imas.ids_primitive.IDSNumericArray): _value = _value.value query_names[f"x{qcounter}"] = _value qcounter += 1 query_names["np"] = np query_names["scipy"] = scipy if are_values_present: result = eval(query, {}, query_names) if result is not None: if isinstance(result, (np.bool_, bool)): if result: output = True else: output = result return output
[docs]def get_quantities_from_pulses( idspath: list, pulses: tuple, list_count: int = 0, verbose: bool = False, query=None, dd_update: bool = False ) -> pd.DataFrame: """ The `get_quantities_from_pulses` function retrieves values from specified IDS paths for a given set of pulses and returns a DataFrame containing the pulse, run, and corresponding values. Args: idspath (list or str): The `idspath` parameter is either a single string or a list of strings that represent the paths to the IDS nodes from which the quantities will be extracted. pulses (tuple): The `pulses` parameter is a tuple containing information about each pulse. Each element in the tuple is itself a tuple with the following elements: pulse, run, backend, database, user, version, and file path. list_count (int): The `list_count` parameter is an optional parameter that specifies the number of pulses to retrieve values for. If `list_count` is set to 0 (default), values will be retrieved for all pulses in the `pulses` tuple. If `list_count` is set to a positive integer, values will be retrieved for first `listCount` pulses in the `pulses` tuple. Defaults to 0 verbose (bool): print debug information query (str, optional): Query string to filter results. Defaults to None. dd_update (bool, optional): Flag to indicate whether to update data dictionary. Defaults to False. Returns: The function returns a pandas DataFrame containing the columns "URI", "FILEPATH", "FILETIME" and one column for each IDS path specified. """ # Convert single string to list for consistent handling if isinstance(idspath, str): idspath = [idspath] paths_info = [] for path in idspath: idsname = path.split("/")[0] valpath = path[1 + len(idsname) :] paths_info.append((path, idsname, valpath.replace("(", "[").replace(")", "]").replace("/", "."))) list_counter = 0 results = [] for pulse_tuple in pulses: pulse = pulse_tuple[0] run = pulse_tuple[1] backend = pulse_tuple[2] database = pulse_tuple[3] user = pulse_tuple[4] version = pulse_tuple[5] file_path = pulse_tuple[6] file_time = pulse_tuple[7] backend_string = "" if backend == imas.ids_defs.MDSPLUS_BACKEND: backend_string = "mdsplus" if backend == imas.ids_defs.HDF5_BACKEND: backend_string = "hdf5" uri = f"imas:{backend_string}?user={user};shot={pulse};run={run};database={database};version={version}" if verbose: print(f"fetching data from {pulse}, {run}") found_values = False pulse_data = {"URI": uri, "FILEPATH": file_path, "FILETIME": file_time} ids_values = get_ids_values(uri, paths_info, dd_update=dd_update, verbose=verbose) if ids_values: for _path, _value in ids_values.items(): if _value is None: if verbose: print(uri, _path, "is None, skipping") found_values = False break pulse_data[_path] = _value if query is None: found_values = True if query is not None: pulse_data[query] = execute_query(query, ids_values) if isinstance(pulse_data[query], (bool, np.bool_)): found_values = True elif isinstance(pulse_data[query], np.ndarray): if pulse_data[query].size > 0: found_values = True elif pulse_data[query] is not None: found_values = True if found_values: results.append(pulse_data) list_counter += 1 if list_count != 0 and list_counter >= list_count: break df = pd.DataFrame(results) # If no results were found, create empty dataframe with appropriate columns if df.empty: columns = ["URI", "FILEPATH", "FILETIME"] + idspath df = pd.DataFrame(columns=columns) return df
[docs]def idsdiff_full( struct1: imas.ids_structure.IDSStructure, struct2: imas.ids_structure.IDSStructure, name1="", name2="", print_result=False, ignore_version=False, ): diff_result = [] compare_result = False table_title = Text() if isinstance(struct1, imas.ids_toplevel.IDSStructure) and isinstance(struct1, imas.ids_toplevel.IDSStructure): table_title.append("First: ", style="bold blue") table_title.append(f"{name1} ({struct1.metadata.name}) -\n", style="blue") table_title.append("Second: ", style="bold magenta") table_title.append(f"{name2} ({struct2.metadata.name})", style="magenta") elif isinstance(struct1, imas.ids_structure.IDSStructure) and isinstance(struct1, imas.ids_structure.IDSStructure): table_title.append("First: ", style="bold blue") table_title.append(f"{name1} ({struct1._path}) -\n", style="blue") table_title.append("Second: ", style="bold magenta") table_title.append(f"{name2} ({struct2._path})", style="magenta") else: table_title.append("first - second") diff_table = Table(title=table_title) diff_table.add_column("first", style="blue") diff_table.add_column("second", style="magenta") for description, child1, child2 in imas.util.idsdiffgen(struct1, struct2): if "_path" in dir(child1): if ignore_version is True and "version_put" in child1._path: continue if not isinstance(child1, imas.ids_base.IDSBase) and not isinstance(child2, imas.ids_base.IDSBase): txt1 = f"{description}: {child1}" txt2 = f"{description}: {child2}" else: txt1 = "-" if child1 is None else repr(child1) txt2 = "-" if child2 is None else repr(child2) seqmat = difflib.SequenceMatcher() seqmat.set_seqs(txt1, txt2) out1 = Text() out2 = Text() prevmatch = difflib.Match(0, 0, 0) for match in seqmat.get_matching_blocks(): if match.a > prevmatch.a + prevmatch.size: out1.append(txt1[prevmatch.a + prevmatch.size : match.a], "bold red") if match.b > prevmatch.b + prevmatch.size: out2.append(txt2[prevmatch.b + prevmatch.size : match.b], "bold green") out1.append(txt1[match.a : match.a + match.size]) out2.append(txt2[match.b : match.b + match.size]) prevmatch = match out1.append(txt1[match.a + match.size :], style="bold red") out2.append(txt2[match.b + match.size :], style="bold green") diff_result.append((description, child1, child2)) diff_table.add_row(out1, out2) diff_table.add_section() text_output = None if diff_table.row_count: compare_result = False text_output = diff_table else: text_output = "Structures", struct1, "and", struct2, "are identical" compare_result = True if print_result: rich.print(text_output) return compare_result, diff_result, text_output
[docs]def idsdiff( struct1: imas.ids_structure.IDSStructure, struct2: imas.ids_structure.IDSStructure, name1="", name2="", print_result=False, verbose=True, ignore_version=False, ): diff_result = [] compare_result = False table_title = Text() if isinstance(struct1, imas.ids_toplevel.IDSStructure) and isinstance(struct2, imas.ids_toplevel.IDSStructure): table_title.append("First: ", style="bold blue") table_title.append(f"{name1} ({struct1.metadata.name}) -\n", style="blue") table_title.append("Second: ", style="bold magenta") table_title.append(f"{name2} ({struct2.metadata.name})", style="magenta") elif isinstance(struct1, imas.ids_structure.IDSStructure) and isinstance(struct2, imas.ids_structure.IDSStructure): table_title.append("First: ", style="bold blue") table_title.append(f"{name1} ({struct1._path}) -\n", style="blue") table_title.append("Second: ", style="bold magenta") table_title.append(f"{name2} ({struct2._path})", style="magenta") else: table_title.append("first - second") diff_table = Table(title=table_title) diff_table.add_column("IDS Path") diff_table.add_column("Description") if verbose: diff_table.add_column("Value first", style="blue") diff_table.add_column("Value second", style="magenta") for description, child1, child2 in imas.util.idsdiffgen(struct1, struct2): if "_path" in dir(child1): if ignore_version is True and "version_put" in child1._path: continue diff_result.append((description, child1, child2)) information = Text("different values", style="cyan") if child1 is None: information = Text("missing in first", style="red") if child2 is None: information = Text("missing in second", style="yellow") if isinstance(child1, imas.ids_struct_array.IDSStructArray): data_type1 = "STRUCT_ARRAY" information = Text("different length", style="magenta") else: if child1 is None: data_type1 = "-" else: if hasattr(child1, "data_type"): data_type1 = child1.data_type else: data_type1 = type(child1).__name__.upper() if isinstance(child2, imas.ids_struct_array.IDSStructArray): data_type2 = "STRUCT_ARRAY" information = Text("different length", style="magenta") else: if child2 is None: data_type2 = "-" else: if hasattr(child2, "data_type"): data_type2 = child2.data_type else: data_type2 = type(child2).__name__.upper() # data_type2 = "-" if child2 is None else child2.data_type if child1 is not None and hasattr(child1, "_path"): path = child1._path elif child2 is not None and hasattr(child2, "_path"): path = child2._path else: path = None if child1 is not None and hasattr(child1, "value"): value1 = child1.value else: value1 = child1 if child2 is not None and hasattr(child2, "value"): value2 = child2.value else: value2 = child2 if type(value1) is np.ndarray: value1 = str(value1[0]) + ",..." elif type(value1) is list: value1 = str(len(value1)) + " items" if type(value2) is np.ndarray: value2 = str(value2[0]) + ",..." elif type(value2) is list: value2 = str(len(value2)) + " items" if verbose: if not isinstance(child1, imas.ids_base.IDSBase) and not isinstance(child2, imas.ids_base.IDSBase): txt1 = f"{description}: {child1}" txt2 = f"{description}: {child2}" else: txt1 = "" if data_type1 == "-" else f"({data_type1}) {value1}" txt2 = "" if data_type2 == "-" else f"({data_type2}) {value2}" seqmat = difflib.SequenceMatcher() seqmat.set_seqs(txt1, txt2) out1 = Text() out2 = Text() prevmatch = difflib.Match(0, 0, 0) for match in seqmat.get_matching_blocks(): if match.a > prevmatch.a + prevmatch.size: out1.append(txt1[prevmatch.a + prevmatch.size : match.a], "bold red") if match.b > prevmatch.b + prevmatch.size: out2.append(txt2[prevmatch.b + prevmatch.size : match.b], "bold green") out1.append(txt1[match.a : match.a + match.size]) out2.append(txt2[match.b : match.b + match.size]) prevmatch = match out1.append(txt1[match.a + match.size :], style="bold red") out2.append(txt2[match.b + match.size :], style="bold green") if path: if verbose: diff_table.add_row(path, information, out1, out2) else: diff_table.add_row(path, information) # diff_table.add_section() text_output = None if diff_table.row_count: compare_result = False text_output = diff_table else: text_output = f"Structures {struct1} and {struct2} are identical" compare_result = True if print_result: rich.print(text_output) return compare_result, diff_result, text_output