Source code for simultipac.cst.simulation_results

"""Define an object to store CST simulation results.

.. note::
    As for now, it can only load data stored in a single file. For Position
    Monitor exports (one file = one time step), see dedicated package
    ``PositionMonitor``.

.. todo::
    Evaluate expressions such as ``param2 = 2 * param1``

.. todo::
    Allow to have P_rms instead of E_acc; E_acc does not make a lot of sense in
    a lot of cases.

"""

import logging
from collections.abc import Sequence
from pathlib import Path
from pprint import pformat
from typing import Any

import numpy as np

from simultipac.cst.helper import (
    get_id,
    mmdd_xxxxxxx_folder_to_dict,
    no_extension,
)
from simultipac.particle_monitor.particle_monitor import (
    FILTER_FUNC,
    FILTER_KEY,
    ParticleMonitor,
)
from simultipac.plotter.default import DefaultPlotter
from simultipac.plotter.plotter import Plotter
from simultipac.simulation_results.simulation_results import (
    SimulationResults,
    SimulationResultsFactory,
)
from simultipac.types import PARTICLE_0D_t, PARTICLE_3D_t


[docs] class MissingFileError(Exception): """Error raised when a mandatory CST file was not found."""
[docs] class CSTResults(SimulationResults): """Store a single CST simulation results."""
[docs] def __init__( self, id: int, e_acc: float, time: np.ndarray, population: np.ndarray, p_rms: float | None = None, plotter: Plotter | None = None, trim_trailing: bool = False, period: float | None = None, parameters: dict[str, float | bool | str] | None = None, stl_alpha: float | None = None, particle_monitor: ParticleMonitor | None = None, **kwargs, ) -> None: """Instantiate, post-process. Parameters ---------- id : Unique simulation identifier. e_acc : Accelerating field in :unit:`V/m`. time : Time in :unit:`ns`. population : Evolution of population with time. Same shape as ``time``. p_rms : RMS power in :unit:`W`. plotter : An object allowing to plot data. trim_trailing : To remove the last simulation points, when the population is 0. Used with SPARK3D (``CSV`` import) for consistency with CST. period : RF period in :unit:`ns`. Mandatory for exponential growth fits. parameters : Additional information on the simulation. Typically, value of magnetic field, number of PIC cells, simulation flags... stl_path : Path to the ``STL`` file holding the 3D structure of the system. If given, we automatically load it. stl_alpha : Transparency for the 3D mesh. particle_monitor : Stores all particle monitor data. """ super().__init__( id=id, e_acc=e_acc, time=time, population=population, p_rms=p_rms, plotter=plotter, trim_trailing=trim_trailing, period=period, parameters=parameters, stl_alpha=stl_alpha, **kwargs, ) self._particle_monitor: ParticleMonitor if particle_monitor is not None: self._particle_monitor = particle_monitor
[docs] def hist( self, x: PARTICLE_0D_t, bins: int = 200, hist_range: tuple[float, float] | None = None, plotter: Plotter | None = None, filter: FILTER_KEY | FILTER_FUNC | None = None, **kwargs, ) -> Any: """Create a histogram. Parameters ---------- x : Name of the data to plot. bins : Number of histogram bins. hist_range : Lower and upper value for the histogram. plotter : Object creating the plots. filter : To plot only some of the particles. """ return self._particle_monitor.hist( x=x, bins=bins, hist_range=hist_range, plotter=plotter, filter=filter, **kwargs, )
[docs] def plot_3d(self, key: PARTICLE_3D_t, **kwargs) -> Any: if key == "trajectory": return self.plot_trajectories(**kwargs)
def plot_trajectories( self, emission_color: str | None = None, collision_color: str | None = None, lw: int = 7, r: int = 8, plotter: Plotter | None = None, filter: FILTER_KEY | FILTER_FUNC | None = None, **kwargs, ) -> Any: """Plot trajectories in 3D. Parameters ---------- emission_color : If provided, the first known position is colored with this color. collision_color : If provided, the last known position is colored with this color. collision_point : If provided and ``collision_color`` is not ``None``, we plot this point instead of the last of ``points``. This is useful when the extrapolated time is large, and actuel collision point may differ significantly from last position points. lw : Trajectory line width. r : Size of the emission/collision points. plotter : An object allowing to plot data. filter : To select the particles to be plotted. """ return self._particle_monitor.plot_trajectories( emission_color=emission_color, collision_color=collision_color, lw=lw, r=r, plotter=plotter, filter=filter, **kwargs, )
[docs] def plot_mesh(self, *args, **kwargs) -> Any: """Plot the stored mesh.""" return self._particle_monitor.plot_mesh(*args, **kwargs)
[docs] def plot_trajectories( self, emission_color: str | None = None, collision_color: str | None = None, lw: int = 7, r: int = 8, plotter: Plotter | None = None, filter: FILTER_KEY | FILTER_FUNC | None = None, **kwargs, ) -> Any: """Plot trajectories in 3D. Parameters ---------- emission_color : If provided, the first known position is colored with this color. collision_color : If provided, the last known position is colored with this color. collision_point : If provided and ``collision_color`` is not ``None``, we plot this point instead of the last of ``points``. This is useful when the extrapolated time is large, and actuel collision point may differ significantly from last position points. lw : Trajectory line width. r : Size of the emission/collision points. plotter : An object allowing to plot data. filter : To select the particles to be plotted. """ return self._particle_monitor.plot_trajectories( emission_color=emission_color, collision_color=collision_color, lw=lw, r=r, plotter=plotter, filter=filter, **kwargs, )
[docs] def fit_alpha( self, fitting_periods: int, running_mean: bool = False, log_fit: bool = True, minimum_final_number_of_electrons: int = 0, bounds: tuple[list[float], list[float]] = ( [1e-10, -10.0], [np.inf, 10.0], ), initial_values: list[float] = [0.0, 0.0], minimum_number_of_points: int = 20, min_points_per_period: int = 5, **kwargs, ) -> None: """Fit exp growth factor. Parameters ---------- fitting_periods : Number of periods over which the exp growth is searched. Longer is better, but you do not want to start the fit before the exp growth starts. running_mean : To tell if you want to average the number of particles over one period. It is recommended with CST, but does not bring anything for SPARK3D. The default is False. log_fit : To perform the fit on :func:`exp_growth_log` rather than :func:`exp_growth`. The default is True, as it generally shows better convergence. minimum_final_number_of_electrons : Under this final number of electrons, we do no bother finding the exp growth factor and set all fit parameters to ``NaN``. bounds : Upper bound and lower bound for the two variables: initial number of electrons, exp growth factor. initial_values: list[float], optional Initial values for the two variables: initial number of electrons, exp growth factor. minimum_number_of_points : Minimum number of fitting points; under this limit, a warning is issued. For CST, should be at least 10 or 20. With SPARK3D, there are two points per RF period so a value of 2 or 4 should be enough. min_points_per_period : Minimum number of points per period. In SPARK3D, we only have two points per RF period so this number should be lower to avoid unnecessary warnings. """ return super().fit_alpha( fitting_periods=fitting_periods, running_mean=running_mean, log_fit=log_fit, minimum_final_number_of_electrons=minimum_final_number_of_electrons, bounds=bounds, initial_values=initial_values, minimum_number_of_points=minimum_number_of_points, min_points_per_period=min_points_per_period, **kwargs, )
[docs] class CSTResultsFactory(SimulationResultsFactory): """Define an object to easily instantiate :class:`.CSTResults`.""" _parameters_file = "Parameters.txt" _time_population_file = "Particle vs. Time.txt"
[docs] def __init__( self, *args, plotter: Plotter | None = None, freq_ghz: float | None = None, e_acc_parameter: Sequence[str] = ( "E_acc", "e_acc", "accelerating_field", ), e_acc_file_mv_m: str = "E_acc in MV per m.txt", p_rms_file: str | None = None, stl_path: Path | str | None = None, stl_alpha: float | None = None, **kwargs, ) -> None: """Instantiate object. If necessary, override default ``e_acc`` filename. Parameters ---------- plotter : Object to plot data. freq_ghz : RF frequency in GHz. Used to compute RF period, which is mandatory for exp growth fitting. e_acc_parameter : The possible names of the accelerating field in :file:`Parameters.txt`; we try all of them sequentially, and resort to taking it from a file if it was not successful. You can pass in an empty tuple to force the use of the file. e_acc_file_mv_m : Name of the file where the value of the accelerating field in :unit:`MV/m` is written. This is a fallback, we prefer getting accelerating field from the :file:`Parameters.txt` file. e_acc_file : Name of the file where the value of the RMS power in W is written. If not provided, we do not load RMS power. stl_path : Path to the `STL` file describing the geometry. Used by :class:`.ParticleMonitor` to compute emission and collision angles, and realize 3D plots. stl_alpha : Transparency for the 3D mesh. """ if plotter is None: plotter = DefaultPlotter() self._e_acc_parameter = e_acc_parameter self._e_acc_file_mv_m = e_acc_file_mv_m self._p_rms_file = p_rms_file return super().__init__( *args, plotter=plotter, freq_ghz=freq_ghz, stl_path=stl_path, stl_alpha=stl_alpha, **kwargs, )
@property def mandatory_files(self) -> set[str]: """Give the name of the mandatory files.""" mandatory = {self._parameters_file, self._time_population_file} if len(self._e_acc_parameter) == 0: mandatory.add(self._e_acc_file_mv_m) return mandatory
[docs] def from_simulation_folder( self, folderpath: Path, delimiter: str = "\t", folder_particle_monitor: str | Path | None = None, load_first_n_particles: int | None = None, ) -> CSTResults: """Instantiate results from a :file:`mmdd-xxxxxxx` folder. The expected structure is the following:: mmdd-xxxxxxx ├── 'Adimensional e.txt' ├── 'Adimensional h.txt' ├── 'E_acc in MV per m.txt' # Mandatory if E_acc not in :file:`Parameters.txt` ├── Parameters.txt # Mandatory ├── 'ParticleInfo [PIC]' │ ├── 'Emitted Secondaries.txt' │ └── 'Particle vs. Time.txt' # Mandatory ├── 'TD Number of mesh cells.txt' └── 'TD Total solver time.txt' Non-mandatory files data will be loaded in the ``parameters`` attribute. If you want to load particle monitor data, you must provide ``folder_particle_monitor`` where all particle monitors are stored. Typical structure is:: folder_particle_monitor ├──'position monitor 1_0.117175810039043.txt' ├──'position monitor 1_0.156234413385391.txt' ├──'position monitor 1_0.19529302418232.txt' ├──'position monitor 1_0.232905015349388.txt' ├──'position monitor 1_0.271963626146317.txt' ├──... └──'position monitor 1_7.81172066926956E-02.txt' Parameters ---------- folderpath : Path to a :file:`mmdd-xxxxxxx` folder, holding the results of a single simulation among a parametric simulation export. delimiter : Delimiter between two columns. folder_particle_monitor : Holds all the particle monitor files. load_first_n_particles : If provided, we only load the ``load_first_n_particles`` first particles. Useful for debugging/speeding up. Returns ------- results : CSTResults Instantiated object. """ assert folderpath.is_dir(), f"{folderpath = } does not exist." id = get_id(folderpath) raw_results = mmdd_xxxxxxx_folder_to_dict(folderpath, delimiter) for filename in self.mandatory_files: if no_extension(filename) not in raw_results: raise MissingFileError( f"{filename = } was not found in {folderpath}. However, I " f"found {pformat(list(raw_results.keys()))}" ) e_acc = self._pop_e_acc(raw_results, folderpath) part_time = raw_results.pop(no_extension(self._time_population_file)) time, population = part_time[:, 0], part_time[:, 1] p_rms = ( raw_results.pop(no_extension(self._p_rms_file)) if self._p_rms_file else None ) particle_monitor = None if folder_particle_monitor is not None: particle_monitor = ParticleMonitor.from_folder( folder_particle_monitor, plotter=self._plotter, stl_path=self._stl_path, stl_alpha=self._stl_alpha, load_first_n_particles=load_first_n_particles, ) results = CSTResults( id=id, e_acc=e_acc, time=time, population=population, p_rms=p_rms, plotter=self._plotter, period=self._period, parameters=raw_results.pop(no_extension(self._parameters_file)), particle_monitor=particle_monitor, ) return results
[docs] def _pop_e_acc(self, raw_results: dict[str, Any], folder: Path) -> float: """Pop the value of the accelerating field from ``raw_results.`` First, we try to get it from the :file:`Parameters.txt` under the names listed in ``self._e_acc_parameter``. If was not found, we look into the ``self._e_acc_file_mv_m`` file. """ parameters = raw_results[no_extension(self._parameters_file)] for name in self._e_acc_parameter: e_acc = parameters.pop(name, None) if e_acc is not None: logging.debug( f"{folder}: took accelerating field from {name} in " f"{self._parameters_file}." ) return e_acc if self._e_acc_file_mv_m is not None: e_acc = raw_results.pop(no_extension(self._e_acc_file_mv_m), None) if e_acc is not None: logging.debug( f"{folder}: took accelerating field from " "{self._e_acc_file_mv_m} file. Multiplied it by 1e6." ) return e_acc * 1e-6 raise ValueError( f"Could not find accelerating field in {folder}. Tried to look for" f" {self._e_acc_parameter = } key in Parameters.txt, and then for " f"a file named {self._e_acc_file_mv_m = }" )
[docs] def from_simulation_folders( self, master_folder: Path, delimiter: str = "\t", ) -> list[CSTResults]: """Load all :file:`mmdd-xxxxxxx` folders in ``master_folder``. Note ---- Loading of :class:`.ParticleMonitor` for multiple simulations is currently not supported. """ folders = list(master_folder.iterdir()) return [ self.from_simulation_folder(folder, delimiter=delimiter) for folder in folders ]