Source code for idstools.compute.core_profiles

"""
This module provides compute functions and classes for core_profiles ids data

`refer data dictionary <https://imas-data-dictionary.readthedocs.io/en/latest/>`_.

"""

import contextlib
import functools
import itertools
import logging
from typing import Union

try:
    import imaspy as imas
except ImportError:
    import imas
import numpy as np

import idstools.init_mendeleiev as mend

logger = logging.getLogger("module")


[docs]class CoreProfilesCompute: """This class provides compute functions for core profiles ids. Attributes: ids (object): The core profiles IDS (Integrated Data Structure) object containing plasma profile data including temperature, density, pressure, and rotation profiles. volume (object, optional): Optional volume data object for spatial integration and analysis. """ def __init__(self, ids, volume=None): self.ids = ids self.volume = volume
[docs] @staticmethod def get_plasma_composition_with_species_concentration(ids, time_slice, volume=None) -> Union[dict, int]: """ Function retrives composition and species concentration in below format """ try: _ = ids.profiles_1d[time_slice] except Exception as e: logger.debug(f"{e}") return 0 core_profile_compute = CoreProfilesCompute(ids, volume=volume) if core_profile_compute.volume is None: volume = core_profile_compute.get_volume(time_slice) if volume is None: return -1 else: core_profile_compute.volume = volume data = {} nspec_over_ntot = core_profile_compute.get_nspec_over_ntot(time_slice) nspec_over_ne = core_profile_compute.get_nspec_over_ne(time_slice) nspec_over_nmaj = core_profile_compute.get_nspec_over_nmaj(time_slice) species = core_profile_compute.get_species(time_slice) labels = core_profile_compute.get_labels(time_slice) core_profile_compute.combine_species_when_appear_twice( species, nspec_over_ntot, nspec_over_ne, nspec_over_nmaj, time_slice ) a = core_profile_compute.get_a(time_slice) z = core_profile_compute.get_z(time_slice) states_data = core_profile_compute.get_states_data(time_slice) if species: for species_index in range(len(species)): species_data = { "nspec_over_ntot": nspec_over_ntot[species_index], "nspec_over_ne": nspec_over_ne[species_index], "nspec_over_nmaj": nspec_over_nmaj[species_index], "a": a[species_index], "z": z[species_index], "species": species[species_index], "states": states_data[str(species_index)], "label": labels[species_index], } data[str(species_index)] = species_data return data
[docs] @functools.lru_cache(maxsize=128) def get_electron_density_ne0(self): """ This function `get_electron_density_ne0` returns a list of electron densities at the first position for each time step in a given object. Returns: The function `get_ne0` returns a list of electron densities at the first spatial point (index 0) for all time steps in the simulation. The electron density is in units of 1e-19 m^-3. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=105033;run=1;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_electron_density_ne0(time_slice=0) [5.106128949975287] """ ntime = len(self.ids.time) return [self.ids.profiles_1d[itime].electrons.density[0] * 1.0e-19 for itime in range(ntime)]
[docs] @functools.lru_cache(maxsize=128) def get_a(self, time_slice, element_index=0) -> list: """ This function returns a list of atomic masses for a given slice and element index. Args: time_slice (int, optional): The index of the slice in the `ggd` list that contains the ion information.Defaults to 0 element_index (int, optional): Element index, It is used to access the 'a' attribute of the element object. Defaults to 0 Returns: a list of atomic masses for each species in the given slice index and element index. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=105033;run=1;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_a(time_slice=0) [2.0, 3.0, 4.0, 9.0, 183.84, 40.0, 20.0] """ nspecies = len(self.ids.profiles_1d[time_slice].ion) a = [0] * nspecies for ispecies in range(nspecies): a[ispecies] = self.ids.profiles_1d[time_slice].ion[ispecies].element[element_index].a logger.debug(f"Mass of atom : {a}") return a
[docs] @functools.lru_cache(maxsize=128) def get_z(self, time_slice: int, element_index: int = 0) -> list: """ This function `get_z` returns a list of nuclear charges for each species in a given slice and element index. Args: time_slice (int, optional): time slice on which functions should operate on. Defaults to 0. element_index (int, optional): element of the atom or molecule on which functions should operate on. Defaults to 0. Returns: a list of nuclear charges for each species in the given time_slice and elementIndex. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=105033;run=1;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_z(time_slice=0) [1, 1, 2, 4, 74, 18, 10] """ # TODO why always element_index = 0 we are picking up nspecies = len(self.ids.profiles_1d[time_slice].ion) z = [0] * nspecies for ispecies in range(nspecies): z[ispecies] = int(self.ids.profiles_1d[time_slice].ion[ispecies].element[element_index].z_n) logger.debug(f"Nuclear charge each species : {z}") return z
[docs] def get_states(self, time_slice: int) -> list: """ This function `get_states` returns quantities related to the different states of the species (ionisation, energy, excitation, ...) for each species Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: a list of states (ionisation, energy, excitation, etc.) in the input data of each species . Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=105033;run=1;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_states(time_slice=0) print(result[0]) # state object from species """ nspecies = len(self.ids.profiles_1d[time_slice].ion) return [self.ids.profiles_1d[time_slice].ion[species_index].state for species_index in range(nspecies)]
[docs] def get_state_density( self, time_slice: int, species_index: int = 0, state_index: int = 0 ) -> Union[np.ndarray, None]: """ This function `get_state_density` returns the density of a specified state of a specified species at a specified time slice, or the thermal density if the former is not available. Args: time_slice (int): an integer representing the index of the time slice for which the density is being requested. Defaults to 0 species_index (int): The index of the ion species for which the density is being retrieved. Defaults to 0 state_index (int): The index of the state for which the density is being retrieved. Defaults to 0 Returns: a numpy array containing the density of a specified state of a specified species at a specified time slice. If the density is not available, it returns None. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_state_density(time_slice, speciesIndex=0, stateIndex=0) array([4.16759116e+19, 4.17266130e+19, 4.17275806e+19, 4.17086410e+19, 4.16751781e+19, 4.16983762e+19, 4.17344996e+19, 4.17944658e+19, """ with contextlib.suppress(Exception): density = self.ids.profiles_1d[time_slice].ion[species_index].state[state_index].density if len(density) != 0: return density with contextlib.suppress(Exception): density = self.ids.profiles_1d[time_slice].ion[species_index].state[state_index].density_thermal if len(density) != 0: return density return None
[docs] def get_states_data(self, time_slice: int) -> dict: """ This function `get_states_data` returns a dictionary containing data on the states and densities of different species in a plasma simulation. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: a dictionary containing information about the states of different species in a plasma, including their labels, z-averages, densities, and relative densities. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_states_data(time_slice=0) {'0': {'0': {'density_available': True, 'label': '', 'n_ni': 100.0, 'states_density': [6.50016400579169e+23], 'z_average': -9e+40}}, '1': {'0': {'density_available': True, 'label': '', 'n_ni': 0.023001604469815865, 'states_density': [1.906627956029117e+19, 8.287201892847867e+22], 'z_average': -9e+40}, """ volume = self.get_volume(time_slice) nspecies = len(self.ids.profiles_1d[time_slice].ion) species_density, _, _ = self.get_species_density(time_slice) states_data = {} for species_index in range(nspecies): logger.debug(f"Species index :{species_index}") logger.debug(f"Species density :{species_density[species_index]}") species_data = {} nstates = len(self.ids.profiles_1d[time_slice].ion[species_index].state) logger.debug(f"Species states count :{nstates}") states_density = [0] * nstates for state_index in range(nstates): if hasattr(self.ids.profiles_1d[time_slice].ion[species_index], "label"): ion_name = self.ids.profiles_1d[time_slice].ion[species_index].label elif hasattr(self.ids.profiles_1d[time_slice].ion[species_index], "name"): ion_name = self.ids.profiles_1d[time_slice].ion[species_index].name if hasattr(self.ids.profiles_1d[time_slice].ion[species_index].state[state_index], "label"): state_name = self.ids.profiles_1d[time_slice].ion[species_index].state[state_index].label elif hasattr(self.ids.profiles_1d[time_slice].ion[species_index].state[state_index], "name"): state_name = self.ids.profiles_1d[time_slice].ion[species_index].state[state_index].name state_data = { "label": state_name, "z_average": np.mean( self.ids.profiles_1d[time_slice].ion[species_index].state[state_index].z_average ), } density = self.get_state_density(time_slice, species_index, state_index) state_data["density_available"] = False if density is None: logger.critical( f"core_profile IDS: Density data for species" f"{ion_name} and state" f"{str(state_index)} is empty" ) elif len(density) != 0: # if all density values in the array are 1.0 or 0.0 then do not calculate # because it can be false values if np.all(density == 1.0) or np.all(density == 0.0): logger.critical( f"core_profile IDS: Density data for species" f"{ion_name}" f"and state {str(state_index)} all are ones or zeros" ) else: logger.debug(f"Density array :{density}") states_density[state_index] = sum(density * volume) state_data["density_available"] = True else: logger.critical( f"core_profile IDS: Density data for species" f"{ion_name}" f" and state {state_index} is empty" ) # TODO Couldn't retrive state desnity should we calculate n/ni? # In that case density is always 0 and no meaning of n/ni # We can also get weired errors # idstools/src/compute/core_profiles/functions.py:230: RuntimeWarning: # invalid value encountered in double_scalars # 100 * states_density[state_index] / species_density[species_index] # idstools/src/compute/core_profiles/functions.py:230: RuntimeWarning: # divide by zero encountered in double_scalars # 100 * states_density[state_index] / species_density[species_index] state_data["states_density"] = states_density logger.debug( f"State density at index {state_index} : State density : {states_density[state_index]}" + "\t Species density :" + str(species_density[species_index]) ) # if species density is 0.0 then do not calculate n/ni if species_density[species_index] != 0.0: state_data["n_ni"] = 100 * states_density[state_index] / species_density[species_index] else: state_data["n_ni"] = 0.0 species_data[str(state_index)] = state_data # label = self.ids_object.profiles_1d[time_slice].ion[species_index].label states_data[str(species_index)] = species_data return states_data
[docs] def get_ne(self, time_slice: int) -> float: """ This function `get_ne` calculates the total number of electrons (ne) based on the volume and electron density of a given slice. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: the total number of electrons (ne) in the given slice of the object, calculated by multiplying the volume of the slice with its electron density and summing the results. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_ne(time_slice=0) 8.778296205101714e+23 """ volume = self.get_volume(time_slice) electron_density = self.ids.profiles_1d[time_slice].electrons.density logger.info(f"Total no. electrons (ne): {str(sum(volume * electron_density))}") return sum(volume * electron_density)
[docs] @functools.lru_cache(maxsize=128) def get_volume(self, time_slice: int) -> np.ndarray: """ This function `get_volume` returns the volume of a grid at a given time slice. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: the volume of the grid for a given time slice. If the volume is empty, it returns None Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_volume(time_slice=0) array([4.39932160e-02, 2.19952424e-01, 5.71837023e-01, 1.09958863e+00, 1.80311391e+00, 2.68234060e+00, 3.73724537e+00, 4.96778828e+00, """ volume = self.ids.profiles_1d[time_slice].grid.volume if len(volume) == 0: volume = None logger.critical("core_profile IDS: Grid volume is empty") logger.info(f"Total volume:{np.sum(volume)}") return volume
[docs] @functools.lru_cache(maxsize=128) def get_species_density(self, time_slice: int) -> tuple: """ This function calculates the density of different species in a given slice and returns a tuple containing the species density list, the total density, and the index of the species with the maximum density. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: a tuple containing three values: a list of species density, the total density of all species, and the index of the species with the maximum density. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') connection.close() computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_species_density(time_slice=0) ([6.50016400579169e+23, 8.289108520803897e+22, 6.202712465391594e+21],7.391101982525995e+23, 0) """ nspecies = len(self.ids.profiles_1d[time_slice].ion) sum_density = 0 species_density_list = [0] * nspecies max_density = -999.0 max_density_index = 0 for ispecies in range(nspecies): volume = self.get_volume(time_slice) density = self.ids.profiles_1d[time_slice].ion[ispecies].density species_density_list[ispecies] = sum(volume * density) sum_density = sum_density + species_density_list[ispecies] if species_density_list[ispecies] > max_density: max_density = species_density_list[ispecies] max_density_index = ispecies logger.debug(f"Species density:{str(species_density_list)}") return species_density_list, sum_density, max_density_index
[docs] def get_nspec_over_ntot(self, time_slice: int): """ This function calculates the ratio of the number of species to the total number of particles in a plasma. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: The function `get_nspec_over_ntot` is returning the ratio of the list of species densities to the total density (`ntot`). Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_nspec_over_ntot(time_slice=0) array([0.87945803, 0.11214983, 0.00839213]) """ species_density_list, sum_density, _ = self.get_species_density(time_slice) return species_density_list / sum_density
[docs] def get_nspec_over_ne(self, time_slice: int): """ This function calculates the ratio of species density to electron density. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: the ratio of the species density list to the electron density (ne). Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_nspec_over_ne(time_slice=0) array([0.74048128, 0.0944273 , 0.00706596]) """ species_density_list, _, _ = self.get_species_density(time_slice) ne = self.get_ne(time_slice) return species_density_list / ne
[docs] def get_nspec_over_nmaj(self, time_slice: int) -> list: """ This function returns a list of the ratio of each species density to the maximum species density. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: a list of values obtained by dividing each element of the list `species_density_list` by the maximum value in that list. This list represents the ratio of the density of each species to the density of the most abundant species. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_nspec_over_nmaj(time_slice=0) array([1. , 0.12752153, 0.00954239]) """ ( species_density_list, _, max_density_index, ) = self.get_species_density(time_slice) return species_density_list / species_density_list[max_density_index]
[docs] def get_species(self, time_slice: int) -> list: """ This function `get_species` creates a Mendeleiev table and returns a list of species based on the values of a, z, and the table. Args: time_slice (int, optional): time slice on which function should operate on. Defaults to 0. Returns: a list of species based on the values of a, z, and the Mendeleev table. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_species(time_slice=0) ['H', 'He4', 'Ne'] """ table_mendeleiev = mend.create_table_mendeleiev() nspecies = len(self.ids.profiles_1d[time_slice].ion) a = list(map(int, self.get_a(time_slice))) z = list(map(int, self.get_z(time_slice))) if any(value == imas.ids_defs.EMPTY_INT for value in z): logger.error( f"core_profiles.profiles_1d[{time_slice}].ion[].element[].z_n" f" values are not available {z}" ) return None return [table_mendeleiev[z[ispecies]][a[ispecies]].element for ispecies in range(nspecies)]
[docs] def get_labels(self, time_slice: int) -> list: """ This function `get_labels` returns a list of labels for all species in a given time slice. Args: time_slice: an optional integer parameter that specifies the time slice on which the function should operate. The default value is 0 Returns: a list of labels for all species in a given time slice. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_labels(time_slice=0) ['H', 'He', 'Ne'] """ nspecies = len(self.ids.profiles_1d[time_slice].ion) if hasattr(self.ids.profiles_1d[time_slice].ion[0], "label"): labels = [self.ids.profiles_1d[time_slice].ion[ispecies].label for ispecies in range(nspecies)] elif hasattr(self.ids.profiles_1d[time_slice].ion[0], "name"): labels = [self.ids.profiles_1d[time_slice].ion[ispecies].name for ispecies in range(nspecies)] logger.debug(f"Species identification :{labels}") return labels
[docs] def combine_species_when_appear_twice(self, species, nspec_over_ntot, nspec_over_ne, nspec_over_nmaj, time_slice): """ This is helper function which checks if there are duplicate entries of species and combine the species. This is in place change of arrays Args: species (list): result from get_species() nspec_over_ntot (list): result from get_nspec_over_ntot() nspec_over_ne (list): result from get_nspec_over_ne() nspec_over_nmaj (list): result from get_nspec_over_nmaj() time_slice (int, optional): time_slice on which function should operate on. Defaults to 0. """ if species is not None: nspecies = len(self.ids.profiles_1d[time_slice].ion) for ispecies, jspecies in itertools.product(range(nspecies), range(nspecies)): if (species[jspecies] == species[ispecies]) & (jspecies != ispecies): nspec_over_ntot[ispecies] = nspec_over_ntot[ispecies] + nspec_over_ntot[jspecies] nspec_over_ntot[jspecies] = 0 nspec_over_ne[ispecies] = nspec_over_ne[ispecies] + nspec_over_ne[jspecies] nspec_over_ne[jspecies] = 0 nspec_over_nmaj[ispecies] = nspec_over_nmaj[ispecies] + nspec_over_nmaj[jspecies] nspec_over_nmaj[jspecies] = 0
[docs] def get_rho_tor_norm(self, time_slice: int) -> Union[np.ndarray, None]: """ This function `get_rho_tor_norm` returns a list of normalized toroidal rho values from a given time slice of a profiles_1d object. Args: time_slice (int): time index. Defaults to 0 Returns: a list of normalized toroidal flux coordinates (rho_tor_norm) for a given time slice of the IDS object. If rho_tor_norm is not available, it tries to return a list of toroidal flux coordinates (rho_tor) instead. If neither is available, it returns None. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_rho_tor_norm(time_slice=0) [0.005025125628140704, 0.015075376884422112, 0.035175879396984924, 0.045226130653266326, 0.05527638190954774] """ try: if len(self.ids.profiles_1d[time_slice].grid.rho_tor_norm) > 0: return self.ids.profiles_1d[time_slice].grid.rho_tor_norm elif len(self.ids.profiles_1d[time_slice].grid.rho_tor) > 0: return self.ids.profiles_1d[time_slice].grid.rho_tor / self.ids.profiles_1d[time_slice].grid.rho_tor[-1] except IndexError: logger.error(f"core_profiles.profiles_1d[{time_slice}].grid.rho_tor_norm or rho_tor is not available") return None
[docs] def get_psi(self, time_slice: int) -> Union[list, None]: """ This function `get_psi` returns the poloidal magnetic flux (psi) at a given time slice. Args: time_slice (int): time index Returns: the poloidal magnetic flux (psi) as a list of floats for a given time slice. If the length of the poloidal magnetic flux is greater than 0, then the function returns the negative of the poloidal magnetic flux. If the length of the poloidal magnetic flux is 0, then the function returns None. Example: .. code-block:: python import imas connection = imas.DBEntry("imas:mdsplus?user=public;pulse=104010;run=2;database=ITER;version=3", "r") idsObj = connection.get('core_profiles') computeObj = CoreProfilesCompute(idsObj) result = computeObj.get_psi(time_slice=0) array([-4.95660880e+01, -4.95537345e+01, -4.95275298e+01, -4.94833135e+01, -4.94209348e+01, -4.93461904e+01, -4.92595767e+01, -4.91573223e+01, """ psi = None if len(self.ids.profiles_1d[time_slice].grid.psi) > 0: psi = -self.ids.profiles_1d[time_slice].grid.psi return psi
[docs] def get_ion_pressure_properties(self, time_slice): """ The `get_ion_pressure_properties` function calculates and returns the total thermal pressure, fast parallel pressure, and fast perpendicular pressure of ions in a given set of profiles. Returns: The function `get_ion_pressure_properties` is returning a dictionary containing the following keys and values: maxima_ion, pressure_ion_thermal, pressure_ion_fast_parallel, pressure_ion_fast_perpendicular """ nrho = len(self.get_rho_tor_norm(time_slice)) pressure_ion_thermal = 0.0 pressure_ion_fast_parallel = 0.0 pressure_ion_fast_perpendicular = 0.0 for ion in self.ids.profiles_1d[time_slice].ion: if len(ion.pressure_thermal) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].ion.pressure_thermal") if len(ion.pressure_fast_parallel) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].ion.pressure_fast_parallel") if len(ion.pressure_fast_perpendicular) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].ion.pressure_fast_perpendicular") pressure_ion_thermal = pressure_ion_thermal + ion.pressure_thermal pressure_ion_fast_parallel = ( pressure_ion_fast_parallel + np.asarray([np.nan] * nrho) if len(ion.pressure_fast_parallel) == 0 else ion.pressure_fast_parallel ) pressure_ion_fast_perpendicular = ( pressure_ion_fast_perpendicular + np.asarray([np.nan] * nrho) if len(ion.pressure_fast_perpendicular) == 0 else ion.pressure_fast_perpendicular ) pressure_ion_thermal = np.asarray([np.nan] * nrho) if len(pressure_ion_thermal) == 0 else pressure_ion_thermal pressure_ion_fast_parallel = ( np.asarray([np.nan] * nrho) if len(pressure_ion_fast_parallel) == 0 else pressure_ion_fast_parallel ) pressure_ion_fast_perpendicular = ( np.asarray([np.nan] * nrho) if len(pressure_ion_fast_perpendicular) == 0 else pressure_ion_fast_perpendicular ) maxima_ion = max( np.nan_to_num(max(pressure_ion_thermal[: nrho - 1])), np.nan_to_num(max(pressure_ion_fast_parallel[: nrho - 1])), np.nan_to_num(max(pressure_ion_fast_perpendicular[: nrho - 1])), ) maxima_ion = maxima_ion * 1.1 return { "maxima_ion": maxima_ion, "pressure_ion_thermal": pressure_ion_thermal, "pressure_ion_fast_parallel": pressure_ion_fast_parallel, "pressure_ion_fast_perpendicular": pressure_ion_fast_perpendicular, }
[docs] def get_electrons_pressure_properties(self, time_slice): """ The function `get_electrons_pressure_properties` calculates and returns various pressure properties of electrons, including maximum pressure and individual pressure components. Returns: The `get_electrons_pressure_properties` function returns a dictionary with the following key-value pairs: "maxima_electrons": Maximum pressure of electrons "pressure_electron_total": Total pressure of electrons "pressure_electron_thermal": Thermal pressure of electrons "pressure_electron_fast_parallel": Pressure of fast parallel electrons "pressure_electron_fast_perpendicular """ nrho = len(self.get_rho_tor_norm(time_slice)) pressure_electron_total = self.ids.profiles_1d[time_slice].electrons.pressure pressure_electron_thermal = self.ids.profiles_1d[time_slice].electrons.pressure_thermal pressure_electron_fast_parallel = self.ids.profiles_1d[time_slice].electrons.pressure_fast_parallel pressure_electron_fast_perpendicular = self.ids.profiles_1d[time_slice].electrons.pressure_fast_perpendicular if len(pressure_electron_total) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].electrons.pressure") if len(pressure_electron_thermal) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].electrons.pressure_thermal") if len(pressure_electron_fast_parallel) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].electrons.pressure_fast_parallel") if len(pressure_electron_fast_perpendicular) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].electrons.pressure_fast_perpendicular") pressure_electron_total = ( np.asarray([np.nan] * nrho) if len(pressure_electron_total) == 0 else pressure_electron_total ) pressure_electron_thermal = ( np.asarray([np.nan] * nrho) if len(pressure_electron_thermal) == 0 else pressure_electron_thermal ) pressure_electron_fast_parallel = ( np.asarray([np.nan] * nrho) if len(pressure_electron_fast_parallel) == 0 else pressure_electron_fast_parallel ) pressure_electron_fast_perpendicular = ( np.asarray([np.nan] * nrho) if len(pressure_electron_fast_perpendicular) == 0 else pressure_electron_fast_perpendicular ) maxima_electrons = max( np.nan_to_num(max(pressure_electron_total[: nrho - 1])), np.nan_to_num(max(pressure_electron_thermal[: nrho - 1])), np.nan_to_num(max(pressure_electron_fast_parallel[: nrho - 1])), np.nan_to_num(max(pressure_electron_fast_perpendicular[: nrho - 1])), ) maxima_electrons = maxima_electrons * 1.1 return { "maxima_electrons": maxima_electrons, "pressure_electron_total": pressure_electron_total, "pressure_electron_thermal": pressure_electron_thermal, "pressure_electron_fast_parallel": pressure_electron_fast_parallel, "pressure_electron_fast_perpendicular": pressure_electron_fast_perpendicular, }
[docs] def get_pressure(self, time_slice): """ The function `get_pressure` returns a dictionary containing the thermal pressure, parallel pressure, and perpendicular pressure. Returns: The `get_pressure` function returns a dictionary with the following key-value pairs: "maxima_total": maximum value calculated based on pressure values "pressure_total": total pressure value calculated as the sum of electron pressure and ion pressure "pressure_thermal": thermal pressure values "pressure_parallel": parallel pressure values "pressure_perpendicular": perpendicular pressure values """ nrho = len(self.get_rho_tor_norm(time_slice)) pressure_thermal = self.ids.profiles_1d[time_slice].pressure_thermal pressure_parallel = self.ids.profiles_1d[time_slice].pressure_parallel pressure_perpendicular = self.ids.profiles_1d[time_slice].pressure_perpendicular if len(pressure_thermal) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].pressure_thermal") if len(pressure_parallel) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].pressure_fast_parallel") if len(pressure_perpendicular) == 0: logger.warning(f"Empty profiles_1d[{time_slice}].pressure_fast_perpendicular") pressure_thermal = np.asarray([np.nan] * nrho) if len(pressure_thermal) == 0 else pressure_thermal pressure_parallel = np.asarray([np.nan] * nrho) if len(pressure_parallel) == 0 else pressure_parallel pressure_perpendicular = ( np.asarray([np.nan] * nrho) if len(pressure_perpendicular) == 0 else pressure_perpendicular ) dict_electrons_pressure_properties = self.get_electrons_pressure_properties(time_slice) pressure_electron_total = dict_electrons_pressure_properties["pressure_electron_total"] pressure_ion_total = self.get_pressure_ion_total(time_slice) pressure_total = np.copy(pressure_electron_total) if pressure_ion_total is not None: pressure_total += pressure_ion_total # Minima and maxima calculations for plots maxima_total = max( np.nan_to_num(max(pressure_total[: nrho - 1])), np.nan_to_num(max(pressure_thermal[: nrho - 1])), np.nan_to_num(max(pressure_parallel[: nrho - 1])), np.nan_to_num(max(pressure_perpendicular[: nrho - 1])), ) maxima_total = maxima_total * 1.1 return { "maxima_total": maxima_total, "pressure_total": pressure_total, "pressure_thermal": pressure_thermal, "pressure_parallel": pressure_parallel, "pressure_perpendicular": pressure_perpendicular, }
[docs] def get_pressure_ion_total(self, time_slice) -> Union[float, None]: """ The function `get_pressure_ion_total` returns the total ion pressure from a given set of profiles or None if the pressure values cannot be read. Returns: The function `get_pressure_ion_total` returns the total ion pressure from a given set of profiles, or `None` if the pressure values cannot be read. """ pressure_ion_total = None if len(self.ids.profiles_1d[time_slice].pressure_ion_total) > 1: pressure_ion_total = self.ids.profiles_1d[time_slice].pressure_ion_total else: logger.critical( f"core_profiles.profiles_1d[{time_slice}].pressure_ion_total could not be read", ) if len(self.ids.profiles_1d[time_slice].ion[0].pressure) > 1: pressure_ion_total = 0.0 for ion in self.ids.profiles_1d[time_slice].ion: pressure_ion_total = pressure_ion_total + ion.pressure else: logger.critical( f"core_profiles.profiles_1d[{time_slice}].ion[0].pressure could not be read", ) return pressure_ion_total
[docs] def get_profiles(self, time_slice): """ The function `get_profiles` retrieves and organizes various profiles from a data source for further analysis. Args: time_slice: The `time_slice` parameter in the `get_profiles` method is used to specify which slice of profiles to retrieve. Returns: A dictionary named `profiles` is being returned, which contains the following keys and corresponding values """ rho_tor_norm = self.get_rho_tor_norm(time_slice) if rho_tor_norm is None: logger.critical("core_profiles.profiles_1d[:].grid.rho_tor_norm and rho_tor are empty") logger.critical("----> Aborted.") return None nrho = len(rho_tor_norm) # J_bootstrap profile j_bootstrap = self.ids.profiles_1d[time_slice].j_bootstrap if len(self.ids.profiles_1d[time_slice].j_bootstrap) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].j_bootstrap could not be read") j_bootstrap = np.asarray([np.nan] * nrho) # J_non_inductive profile j_non_inductive = self.ids.profiles_1d[time_slice].j_non_inductive if len(self.ids.profiles_1d[time_slice].j_non_inductive) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].j_non_inductive could not be read") j_non_inductive = np.asarray([np.nan] * nrho) # J_ohmic profile j_ohmic = self.ids.profiles_1d[time_slice].j_ohmic if len(self.ids.profiles_1d[time_slice].j_ohmic) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].j_ohmic could not be read") j_ohmic = np.asarray([np.nan] * nrho) # J_total profile j_total = self.ids.profiles_1d[time_slice].j_total if len(self.ids.profiles_1d[time_slice].j_total) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].j_total could not be read") j_total = np.asarray([np.nan] * nrho) # q-profile q = self.ids.profiles_1d[time_slice].q if len(self.ids.profiles_1d[time_slice].q) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].q could not be read") q = np.asarray([np.nan] * nrho) # Magnetic shear profile magnetic_shear = self.ids.profiles_1d[time_slice].magnetic_shear if len(self.ids.profiles_1d[time_slice].magnetic_shear) < 1: logger.critical("core_profiles.profiles_1d[" + str(time_slice) + "].magnetic_shear could not be read") magnetic_shear = np.asarray([np.nan] * nrho) if len(self.ids.profiles_1d[time_slice].q) != nrho: logger.critical("--------------------------------------------------------------") logger.critical("Dimensions of input core profiles are not consistent:") logger.critical(f" core_profiles.profiles_1d[{time_slice}].grid.rho_tor_norm)") logger.critical(f" and core_profiles.profiles_1d[{time_slice}].q") logger.critical(" have different dimensions:") logger.critical(f"- len(core_profiles.profiles_1d[{time_slice}].grid.rho_tor_norm))= {nrho}") logger.critical( f"- len(core_profiles.profiles_1d[{time_slice}].q = {len(self.ids.profiles_1d[time_slice].q)}" ) logger.critical("----> Aborted.") logger.critical("--------------------------------------------------------------") return None # Create the dictionary defining the list of profiles that can be displayed profiles = {} profiles["rhonorm"] = rho_tor_norm profiles["j_bootstrap"] = j_bootstrap profiles["j_non_inductive"] = j_non_inductive profiles["j_ohmic"] = j_ohmic profiles["j_total"] = j_total profiles["q"] = q profiles["magnetic_shear"] = magnetic_shear return profiles
[docs] def getnrho(self, time_slice): """ This function `getnrho` returns the number of elements in the `rho_tor_norm` or `rho_tor` grid based on the provided slice index. Args: time_slice: The `time_slice` parameter in the `getnrho` method is used to specify which slice of data to retrieve the number of rho values from. Returns: The `getnrho` method is returning the number of elements in the `rho_tor_norm` or `rho_tor` attribute of the `grid` object within the `profiles_1d` object at the specified `time_slice`. If either of these attributes has elements, the length of that attribute is returned as the number of `nrho`. """ nrho = None try: if len(self.ids.profiles_1d[time_slice].grid.rho_tor_norm) > 0: nrho = len(self.ids.profiles_1d[time_slice].grid.rho_tor_norm) elif len(self.ids.profiles_1d[time_slice].grid.rho_tor) > 0: nrho = len(self.ids.profiles_1d[time_slice].grid.rho_tor) except Exception as e: logger.debug(f"{e}") logger.warning(f"core_profiles.profiles_1d[:].grid.rho_tor_norm and rho_tor could not be read. {e}") return nrho