Source code for idstools.eqdsk2ids

#!/usr/bin/env python
import datetime
import glob
import logging
import os
import re
from copy import deepcopy
from pprint import pformat
from statistics import median
from typing import List, Optional, Union

try:
    import imaspy as imas
except ImportError:
    import imas
import numpy as np
from fortranformat import FortranRecordReader

from idstools import GIT_REV, __version__
from idstools.cocos import COCOS, IDS_COCOS, compute_COCOS

logger = logging.getLogger(__name__)


# ----------------------------------------------------------------------


[docs]class GEQDSK: """ GEQDSK module for IMAS [1] L.L. Lao, "G EQDSK FORMAT", https://w3.pppl.gov/ntcc/TORAY/G_EQDSK.pdf [2] O. Sauter and S.Yu. Medvedev, "Tokamak Coordinate Conventions : COCOS", Comput. Physics Commun. 184 (2013) 293 """ def __init__(self, fpath, ipsign_out, b0sign_out, cocos_in): """ Read GEQDSK file and set COCOS transformation coefficients Parameters ---------- fpath: str Path to GEQDSK file ipsign_out: int Desired sign(Ip) in output b0sign_out: int Desired sign(B0) in output cocos_in: int Coerce input COCOS """ # 1. Register name of GEQDSK file self.fpath = os.path.expanduser(os.path.expandvars(fpath)) logger.debug("GEQDSK fpath: \n%s", pformat(self.fpath, indent=2)) # 2. Read GEQDSK file self.data = self._load(self.fpath) logger.debug("GEQDSK data: \n%s", pformat(self.data, indent=2, sort_dicts=False)) # 3. Confer COCOS if cocos_in: self.cocos = COCOS( index={ "COCOS": cocos_in, "ipsign": np.sign(self.data["CURRENT"]), "b0sign": np.sign(self.data["BCENTR"]), } ) else: self.cocos = self._set_cocos(self.data) logger.info( "GEQDSK COCOS: \n%s", pformat(self.cocos.__dict__, indent=2, sort_dicts=False), ) # 4. Compute transformation coeff. self.coef = self.cocos.values_coefficients( self.cocos.COCOS, IDS_COCOS, self.data["CURRENT"], self.data["BCENTR"], ipsign_out, b0sign_out, ) logger.info( "GEQDSK Transformation Coeff.: \n%s", pformat(self.coef, indent=2, sort_dicts=False), ) def _load(self, fpath): """ Read GEQDSK Parameters ---------- fpath: str Path to GEQDSK file Returns ------- dict Information in GEQDSK file """ if not os.path.exists(fpath): raise FileNotFoundError(fpath) if os.stat(fpath).st_size == 0: raise IOError(f"file size is zero: {fpath}") try: fp = open(fpath, "r") except OSError: raise IOError(f"cannot open/read file: {fpath}") fmt00 = FortranRecordReader("6a8,3i4") fmt20 = FortranRecordReader("5e16.9") fmt22 = FortranRecordReader("2i5") data = {} # header = fp.readline().rstrip() data["TIME"] = self._extract_time(header) rec = fmt00.read(header) data["CASE"] = rec[0:6] if len(header) != 60: logger.warning(f"irregular length of header: {len(header)}") header = header.split() data["IDUM"] = int(header[-3]) data["NW"] = nw = int(header[-2]) data["NH"] = nh = int(header[-1]) else: data["IDUM"] = int(header[48:52]) data["NW"] = nw = int(header[52:56]) data["NH"] = nh = int(header[56:60]) # rec = np.float64(fmt20.read(fp.readline())) data["RDIM"] = rec[0] data["ZDIM"] = rec[1] data["RCENTR"] = rec[2] data["RLEFT"] = rec[3] data["ZMID"] = rec[4] # rec = np.float64(fmt20.read(fp.readline())) data["RMAXIS"] = rec[0] data["ZMAXIS"] = rec[1] data["SIMAG"] = rec[2] data["SIBRY"] = rec[3] data["BCENTR"] = rec[4] # rec = np.float64(fmt20.read(fp.readline())) data["CURRENT"] = rec[0] # data["SIMAG"] = rec[1] data["XDUM"] = rec[2] # data["RMAXIS"] = rec[3] data["XDUM"] = rec[4] # rec = np.float64(fmt20.read(fp.readline())) # data["ZMAXIS"] = rec[0] data["XDUM"] = rec[1] # data["SIBRY"] = rec[2] data["XDUM"] = rec[3] data["XDUM"] = rec[4] # data["FPOL"] = self._read1d(fp, nw, fmt20) data["PRES"] = self._read1d(fp, nw, fmt20) data["FFPRIM"] = self._read1d(fp, nw, fmt20) data["PPRIME"] = self._read1d(fp, nw, fmt20) data["PSIRZ"] = np.reshape(self._read1d(fp, nw * nh, fmt20), (nh, nw)) data["QPSI"] = self._read1d(fp, nw, fmt20) # rec = [0, 0] try: rec = np.int32(fmt22.read(fp.readline())) except Exception as e: logger.debug(f"{e}") data["NBBBS"] = nbbbs = rec[0] data["LIMITR"] = limitr = rec[1] # if nbbbs > 0: bbbs = np.reshape(self._read1d(fp, 2 * nbbbs, fmt20), (nbbbs, 2)) data["RBBBS"] = bbbs[:, 0] data["ZBBBS"] = bbbs[:, 1] else: data["RBBBS"] = [] data["ZBBBS"] = [] # if limitr > 0: lim = np.reshape(self._read1d(fp, 2 * limitr, fmt20), (limitr, 2)) data["RLIM"] = lim[:, 0] data["ZLIM"] = lim[:, 1] else: data["RLIM"] = [] data["ZLIM"] = [] return data def _read1d(self, fp, nlen, fmt): """ Read array with specified format in Fortran Parameters ---------- fp: _io.TextIOWrapper file pointer nlen: int length of record fmt: FortranRecordReader Returns ------- numpy.ndarray, dtype=numpy.float64 """ ret = np.zeros(nlen, dtype=np.float64) i = 0 while i < nlen - 1: fdata = fmt.read(fp.readline()) i2 = min(i + len(fdata), nlen) ret[i:i2] = fdata[: i2 - i] i = i2 if i < nlen: fdata = fmt.read(fp.readline()) ret[i:] = fdata[: nlen - i] return ret def _set_cocos(self, g): """ Compute COCOS for GEQDSK file and Return class COCOS Parameters ---------- g: dict Information of GEQDSK file Returns ------- COCOS COCOS index and values """ # Sign(Ip) and Sign(B0) from input sigma_ip = np.sign(g["CURRENT"]) sigma_b0 = np.sign(g["BCENTR"]) # PSIRZ divided by 2*pi [1], Table 1(a) [2] exp_bp = 0 # Eq.(22) [2] sign_psi_edge_axis = np.sign(g["SIBRY"] - g["SIMAG"]) sigma_bp = int(sign_psi_edge_axis * sigma_ip) # Right-handed cylindrical coordinate system [1], Table 1(a) [2] sigma_rphi_z = int(+1) # Eq.(22), Table 1(b) [2] x = np.sign(median(g["QPSI"])) if x > 0.0: sign_q = int(+1) elif x < 0.0: sign_q = int(-1) else: sign_q = int(0) # raise ValueError in Class COCOS sign_q_pos = int(sign_q * sigma_ip * sigma_b0) # Eq.(22) [2] sigma_rhothetaphi = int(sign_q * sigma_ip * sigma_b0) # Eq.(22), Table 1(b) [2] x = np.sign(median(g["PPRIME"])) if x > 0.0: sign_pprime = int(+1) elif x < 0.0: sign_pprime = int(-1) else: sign_pprime = int(0) # raise ValueError in Class COCOS sign_pprime_pos = int(sign_pprime * sigma_ip) values = { "exp_Bp": exp_bp, "sigma_Bp": sigma_bp, "sigma_RphiZ": sigma_rphi_z, "sigma_rhothetaphi": sigma_rhothetaphi, "sign_q_pos": sign_q_pos, "sign_pprime_pos": sign_pprime_pos, "ipsign": sigma_ip, "b0sign": sigma_b0, } return COCOS(values=values) def _extract_time( self, header_line: str, skip_if_unit_missing: bool = False, ) -> Optional[float]: """ Extracts a time value from header string of GEQDSK file. Supported formats: - "260ms", "2.6e2(ms)", "150 [s]", "time=1.0", "t = 2 (s)" Parameters: header_line (str): The header line containing text. skip_if_unit_missing (bool): If True, skip values with no unit; if False, assume seconds. Returns: float or None: Time in seconds, or None if not found or skipped. """ # Supported unit conversion map unit_map = { "s": 1e0, "ms": 1e-3, "us": 1e-6, "μs": 1e-6, "min": 6e1, } # Remove trailing integers at the end (e.g., "3 129 129") header_line = re.sub(r"(\d+\s+){2,}\d+$", "", header_line.strip()) # 1. Pattern: numeric value + optional unit # (in any brackets, with optional spaces) time_regex = re.compile( ( r"(?P<value>[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?)" r"[\s]*[\(\[\{]?\s*" r"(?P<unit>s|ms|us|μs|min)" r"\s*[\)\]\}]?" ), re.IGNORECASE, ) # 2. Pattern: key=value with optional brackets for unit # (e.g., t = 2 (s), time=1.[ms]) key_value_regex = re.compile( ( r"(?:\btime\b|\bt\b)\s*=\s*" r"(?P<value>[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?)" r"(?:\s*[\(\[\{]?\s*" r"(?P<unit>s|ms|us|μs|min)\s*[\)\]\}]?)?" ), re.IGNORECASE, ) # Step 1: Try general value+unit match for match in time_regex.finditer(header_line): value = float(match.group("value")) unit = match.group("unit").lower() factor = unit_map.get(unit) if factor is not None: return value * factor # Step 2: Try key=value[unit] match for match in key_value_regex.finditer(header_line): value = float(match.group("value")) unit = match.group("unit") if unit: factor = unit_map.get(unit.lower()) if factor is not None: return value * factor else: logger.warning(f"Unknown unit: {unit}, skipping.") return None else: if skip_if_unit_missing: logger.warning(f"No unit found, t={value}; skipping.") return None else: logger.warning(f"No unit found, t={value}; assuming sec.") return value return None
# ----------------------------------------------------------------------
[docs]def map__GEQDSK_to_ids(geqdsk, eq): """ Convert GEQDSK file into IDS/equilibrium Parameters ---------- geqdsk: GEQDSK Class GEQDSK eq: object Returns ---------- None """ def common_properties(ids): """ """ ids.ids_properties.homogeneous_time = 1 ids.ids_properties.creation_date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M") ids.ids_properties.provider = os.getenv("USER") ids.code.name = "IDStools/eqdsk2ids" ids.code.repository = "https://github.com/iterorganization/IDStools" ids.code.commit = GIT_REV ids.code.version = __version__ ids.code.output_flag.resize(1) ids.code.output_flag[0] = 0 # Abbrev. gdsk = geqdsk.data coef = geqdsk.coef # IDS_COCOS cocos = COCOS(index={"COCOS": IDS_COCOS, "ipsign": +1, "b0sign": +1}) # IDS info. common_properties(eq) # Set time eq.time.resize(1) if gdsk["TIME"]: eq.time[0] = gdsk["TIME"] else: eq.time[0] = imas.ids_defs.EMPTY_FLOAT # 0D eq.time_slice.resize(1) eq.time_slice[0].global_quantities.ip = gdsk["CURRENT"] * coef["sigma_Ip_eff"] eq.time_slice[0].global_quantities.magnetic_axis.r = gdsk["RMAXIS"] eq.time_slice[0].global_quantities.magnetic_axis.z = gdsk["ZMAXIS"] eq.time_slice[0].global_quantities.psi_axis = gdsk["SIMAG"] * coef["fact_psi"] eq.time_slice[0].global_quantities.psi_boundary = gdsk["SIBRY"] * coef["fact_psi"] # vacuume_toroidal_field eq.vacuum_toroidal_field.r0 = gdsk["RCENTR"] eq.vacuum_toroidal_field.b0.resize(1) eq.vacuum_toroidal_field.b0[0] = gdsk["BCENTR"] * coef["sigma_B0_eff"] # 1D nw = gdsk["NW"] nh = gdsk["NH"] eq.time_slice[0].profiles_1d.dpressure_dpsi.resize(nw) eq.time_slice[0].profiles_1d.f_df_dpsi.resize(nw) eq.time_slice[0].profiles_1d.f.resize(nw) eq.time_slice[0].profiles_1d.pressure.resize(nw) eq.time_slice[0].profiles_1d.q.resize(nw) eq.time_slice[0].profiles_1d.psi.resize(nw) eq.time_slice[0].profiles_1d.dpressure_dpsi = gdsk["PPRIME"] / coef["fact_psi"] eq.time_slice[0].profiles_1d.f_df_dpsi = gdsk["FFPRIM"] / coef["fact_psi"] eq.time_slice[0].profiles_1d.f = gdsk["FPOL"] * coef["sigma_B0_eff"] eq.time_slice[0].profiles_1d.pressure = gdsk["PRES"] eq.time_slice[0].profiles_1d.q = gdsk["QPSI"] * coef["fact_q"] simag = gdsk["SIMAG"] sibry = gdsk["SIBRY"] for i in range(nw): psi_val = (1.0 - float(i) / float(nw - 1)) * (simag - sibry) + sibry eq.time_slice[0].profiles_1d.psi[i] = psi_val * coef["fact_psi"] # Boundary if gdsk["NBBBS"] > 0: eq.time_slice[0].boundary.outline.r.resize(gdsk["NBBBS"]) eq.time_slice[0].boundary.outline.z.resize(gdsk["NBBBS"]) eq.time_slice[0].boundary.outline.r = gdsk["RBBBS"] eq.time_slice[0].boundary.outline.z = gdsk["ZBBBS"] # 2D eq.time_slice[0].profiles_2d.resize(1) eq.time_slice[0].profiles_2d[0].grid_type.index = 1 eq.time_slice[0].profiles_2d[0].grid.dim1.resize(nw) eq.time_slice[0].profiles_2d[0].grid.dim2.resize(nh) eq.time_slice[0].profiles_2d[0].psi.resize(nw, nh) eq.time_slice[0].profiles_2d[0].r.resize(nw, nh) eq.time_slice[0].profiles_2d[0].z.resize(nw, nh) eq.time_slice[0].profiles_2d[0].b_field_r.resize(nw, nh) eq.time_slice[0].profiles_2d[0].b_field_z.resize(nw, nh) for i in range(nw): eq.time_slice[0].profiles_2d[0].grid.dim1[i] = float(i) / float(nw - 1) * gdsk["RDIM"] + gdsk["RLEFT"] for j in range(nh): eq.time_slice[0].profiles_2d[0].grid.dim2[j] = ( float(j) / float(nh - 1) * gdsk["ZDIM"] - 0.5 * gdsk["ZDIM"] + gdsk["ZMID"] ) for j in range(nh): for i in range(nw): eq.time_slice[0].profiles_2d[0].psi[i, j] = gdsk["PSIRZ"][j, i] * coef["fact_psi"] for j in range(nh): for i in range(nw): eq.time_slice[0].profiles_2d[0].r[i, j] = eq.time_slice[0].profiles_2d[0].grid.dim1[i] eq.time_slice[0].profiles_2d[0].z[i, j] = eq.time_slice[0].profiles_2d[0].grid.dim2[j] # Eq. (19) fact = cocos.sigma_rphi_z * cocos.sigma_bp / (2.0 * np.pi) ** cocos.exp_bp dim1 = eq.time_slice[0].profiles_2d[0].grid.dim1 dim2 = eq.time_slice[0].profiles_2d[0].grid.dim2 for i in range(nw): psi = eq.time_slice[0].profiles_2d[0].psi[i, :] br = np.gradient(psi, dim2, edge_order=2) / dim1[i] eq.time_slice[0].profiles_2d[0].b_field_r[i, :] = br[:] * fact for j in range(nh): psi = eq.time_slice[0].profiles_2d[0].psi[:, j] bz = np.gradient(psi, dim1, edge_order=2) / dim1[:] eq.time_slice[0].profiles_2d[0].b_field_z[:, j] = bz[:] * fact * -1.0 logger.debug("IDS/equilibrium: \n%s", pformat(eq, indent=2, sort_dicts=False))
# ----------------------------------------------------------------------
[docs]def merge_equilibrium(eq1, eq2, sort_by_time=True): """ Concatenate two IMAS IDS/equilibrium objects (eq1 appended after eq2), with an option to sort the resulting time slices by time. Parameters ---------- eq1 : object The equilibrium IDS to append. eq2 : object The base equilibrium IDS. sort_by_time : bool, optional If True, the resulting IDS will be sorted by time (default: True). Returns ------- eq : object New IDS/equilibrium instance with combined (and optionally sorted) content. """ if eq1 is None or eq2 is None: raise ValueError("Both eq1 and eq2 must be valid IDS/equilibrium objects.") # Prepare new equilibrium IDS eq = imas.IDSFactory().equilibrium() n1 = len(eq1.time) n2 = len(eq2.time) n_total = n1 + n2 eq.time.resize(n_total) eq.time_slice.resize(n_total) eq.vacuum_toroidal_field.b0.resize(n_total) eq.code.output_flag.resize(n_total) eq.ids_properties = deepcopy(eq1.ids_properties) eq.code.name = eq1.code.name eq.code.repository = eq1.code.repository eq.code.commit = eq1.code.commit eq.code.version = eq1.code.version # Copy time slices from both sources for i in range(n2): eq.time[i] = eq2.time[i] eq.time_slice[i] = deepcopy(eq2.time_slice[i]) eq.vacuum_toroidal_field.b0[i] = eq2.vacuum_toroidal_field.b0[i] eq.code.output_flag[i] = eq2.code.output_flag[i] for i in range(n1): idx = i + n2 eq.time[idx] = eq1.time[i] eq.time_slice[idx] = deepcopy(eq1.time_slice[i]) eq.vacuum_toroidal_field.b0[idx] = eq1.vacuum_toroidal_field.b0[i] eq.code.output_flag[idx] = eq1.code.output_flag[i] if sort_by_time: eqw = deepcopy(eq) # Sort all fields by ascending time sorted_indices = np.argsort(eq.time[:]) for i, j in enumerate(sorted_indices): eq.time[i] = eqw.time[j] eq.time_slice[i] = eqw.time_slice[j] eq.vacuum_toroidal_field.b0[i] = eqw.vacuum_toroidal_field.b0[j] eq.code.output_flag[i] = eqw.code.output_flag[j] return eq
# ----------------------------------------------------------------------
[docs]def geqdsk2ids(fpath, ipsign=0, b0sign=0, cocos_in=None): """ Functional Interface of GEQDSK Converter (geqdsk2ids) Parameters ---------- fpath: str Path to GEQDSK file ipsign: int=0, optional Desired sign(Ip) in output b0sign: int=0, optional Desired sign(B0) in output cocos_in: int=None, optional Coerce input COCOS Returns ------- eq: ``imas_*_ual_*``.equilibrium.equilibrium ('*' corresponds to IMAS/UAL ver.) IDS/equilibrium """ # Read GEQDSK file logger.info("loading GEQDSK file ...") geqdsk = GEQDSK(fpath, ipsign, b0sign, cocos_in) # Map GEQDSK to IDS/equilibrium logger.info("mapping GEQDSK to IDS/equilibrium ...") eq = imas.ids_factory.IDSFactory().equilibrium() map__GEQDSK_to_ids(geqdsk, eq) # Compute COCOS in output IDS/equilibrium cocos = compute_COCOS(eq) logger.info("COCOS values in output IDS/equilibrium: \n%s", pformat(cocos, indent=2)) # Check if COCOS is equal to IDS_COCOS if cocos["COCOS"] != IDS_COCOS: logger.warning("COCOS Target= {}, Output= {}, Input= {}".format(IDS_COCOS, cocos["COCOS"], geqdsk.cocos.COCOS)) raise SystemExit("Input COCOS is inconsistent between GEQDSK file and COCOS with the option '--cocos_in'.") return eq
# ---------------------------------------------------------------------- def _expand_file_patterns(pattern: str) -> List[str]: """ Expand a file pattern to a list of matching files. Parameters ---------- pattern : str File pattern that can be: - A single file path - A directory path - A glob pattern with wildcards Returns ------- List[str] List of matching file paths """ # Expand environment variables and user home directory expanded = os.path.expanduser(os.path.expandvars(pattern)) # Check if it's an existing file if os.path.isfile(expanded): return [os.path.abspath(expanded)] # Check if it's an existing directory if os.path.isdir(expanded): abs_dir = os.path.abspath(expanded) return [ os.path.join(abs_dir, fname) for fname in sorted(os.listdir(abs_dir)) if os.path.isfile(os.path.join(abs_dir, fname)) ] # Try glob expansion for patterns with wildcards matches = glob.glob(expanded) if matches: # Filter to only include files (not directories) return [os.path.abspath(match) for match in sorted(matches) if os.path.isfile(match)] # If no matches found, raise an error raise FileNotFoundError(f"No files found matching pattern: {pattern}")
[docs]def eqdsk2ids( gfile: Union[str, List[str], None] = None, afile: Optional[str] = None, ipsign: int = 0, b0sign: int = 0, cocos_in: Optional[int] = None, ) -> "imas.ids.equilibrium.equilibrium": """ Convert one or more GEQDSK files into a merged IMAS equilibrium IDS. Parameters ---------- gfile : str, list of str, optional Path(s) to GEQDSK file(s). Can be: - Single file path - Directory path (all files processed) - Space-separated string of multiple files/patterns - List of file paths - Glob pattern(s) with wildcards (``*``, ``?``, ``[]``) afile : str, optional Path to AEQDSK file (currently not used). ipsign : int, default=0 Desired sign of plasma current (Ip) in the output. b0sign : int, default=0 Desired sign of toroidal field (B0) in the output. cocos_in : int or None, optional Input COCOS convention to coerce. None means autodetect. Returns ------- eq : imas.ids.equilibrium.equilibrium Combined equilibrium IDS from one or more GEQDSK files. """ if not gfile: raise ValueError("No GEQDSK file(s) provided.") file_list: List[str] = [] # Handle different input types if isinstance(gfile, list): # If it's already a list, process each element for item in gfile: file_list.extend(_expand_file_patterns(item)) elif isinstance(gfile, str): # If it's a string, it could be space-separated or a single path if " " in gfile: # Split by whitespace and expand each part parts = gfile.split() for part in parts: file_list.extend(_expand_file_patterns(part)) else: # Single string - could be file, dir, or pattern file_list.extend(_expand_file_patterns(gfile)) else: raise TypeError("gfile must be a string or list of strings") if not file_list: raise FileNotFoundError(f"No GEQDSK files found matching: {gfile}") # Sort the file list for consistent processing order file_list = sorted(set(file_list)) # Remove duplicates and sort logger.info(f"Processing {len(file_list)} GEQDSK files: {file_list}") # Initialize empty equilibrium IDS eq = imas.IDSFactory().equilibrium() # Convert and merge each GEQDSK file for fpath in file_list: geq = geqdsk2ids(fpath, ipsign=ipsign, b0sign=b0sign, cocos_in=cocos_in) eq = merge_equilibrium(geq, eq, sort_by_time=True) return eq