Skip to content

pySWAP Model

Building, running and parsing the results of a SWAP model run.

When the Model class begun to grow, it was clear that it needed to be refactored into a more modular structure. The functionality to build environment, run and parse result has been abstracted to 3 classes, focusing the main (and exposed to the user) Model class focused on the model components and their interactions. The four classes in this module are:

Classes:

ModelBuilder: Class responsible for building the model components.
ModelRunner: Class responsible for running the model.
ResultReader: Class responsible for parsing the model results.
Model: Main class that runs the SWAP model.

Model

Bases: PySWAPBaseModel, FileMixin, SerializableMixin

Main class that runs the SWAP model.

Even though all sections are set to optional, the model will not run if any of the components are missing.

Attributes:

Name Type Description
metadata Subsection

Metadata of the model.

version str

The version of the model.

generalsettings Subsection

Simulation settings.

meteorology Subsection

Meteorological data.

crop Subsection

Crop data.

fixedirrigation Subsection

Fixed irrigation settings.

soilmoisture Subsection

Soil moisture data.

surfaceflow Subsection

Surface flow data.

evaporation Subsection

Evaporation data.

soilprofile Subsection

Soil profile data.

snowandfrost Subsection

Snow and frost data.

richards Subsection

Richards data.

lateraldrainage Subsection

Lateral drainage data.

bottomboundary Subsection

Bottom boundary data.

heatflow Subsection

Heat flow data.

solutetransport Subsection

Solute transport data.

Methods:

Name Description
write_swp

Write the .swp input file.

validate

Validate the model.

run

Run the model.

Source code in pyswap/model/model.py
class Model(PySWAPBaseModel, FileMixin, SerializableMixin):
    """Main class that runs the SWAP model.

    Even though all sections are set to optional, the model will not run if
    any of the components are missing.

    Attributes:
        metadata (Subsection): Metadata of the model.
        version (str): The version of the model.
        generalsettings (Subsection): Simulation settings.
        meteorology (Subsection): Meteorological data.
        crop (Subsection): Crop data.
        fixedirrigation (Subsection): Fixed irrigation settings.
        soilmoisture (Subsection): Soil moisture data.
        surfaceflow (Subsection): Surface flow data.
        evaporation (Subsection): Evaporation data.
        soilprofile (Subsection): Soil profile data.
        snowandfrost (Subsection): Snow and frost data.
        richards (Subsection): Richards data.
        lateraldrainage (Subsection): Lateral drainage data.
        bottomboundary (Subsection): Bottom boundary data.
        heatflow (Subsection): Heat flow data.
        solutetransport (Subsection): Solute transport data.

    Methods:
        write_swp: Write the .swp input file.
        validate: Validate the model.
        run: Run the model.
    """

    _validate_on_run: bool = PrivateAttr(default=False)
    _extension = "swp"

    metadata: Subsection[Metadata] | None = Field(default=None, repr=False)
    version: str = Field(exclude=True, default="base")
    generalsettings: Subsection[GeneralSettings] | None = Field(
        default=None, repr=False
    )
    meteorology: Subsection[Meteorology] | None = Field(default=None, repr=False)
    crop: Subsection[Crop] | None = Field(default=None, repr=False)
    fixedirrigation: Subsection[FixedIrrigation] | None = Field(
        default=FixedIrrigation(swirfix=0), repr=False
    )
    soilmoisture: Subsection[SoilMoisture] | None = Field(default=None, repr=False)
    surfaceflow: Subsection[SurfaceFlow] | None = Field(default=None, repr=False)
    evaporation: Subsection[Evaporation] | None = Field(default=None, repr=False)
    soilprofile: Subsection[SoilProfile] | None = Field(default=None, repr=False)
    snowandfrost: Subsection[SnowAndFrost] | None = Field(
        default=SnowAndFrost(swsnow=0, swfrost=0), repr=False
    )
    richards: Subsection[RichardsSettings] | None = Field(
        default=RichardsSettings(swkmean=1, swkimpl=0), repr=False
    )
    lateraldrainage: Subsection[Drainage] | None = Field(default=None, repr=False)
    bottomboundary: Subsection[BottomBoundary] | None = Field(default=None, repr=False)
    heatflow: Subsection[HeatFlow] | None = Field(default=HeatFlow(swhea=0), repr=False)
    solutetransport: Subsection[SoluteTransport] | None = Field(
        default=SoluteTransport(swsolu=0), repr=False
    )

    @property
    def swp(self):
        """The content of the swp file.

        Serialization of Subsection field type has been set in a way that it
        will generate SWAP formatted string when `model_string()` is called on
        the parent class.
        """
        return self.model_string()

    @model_validator(mode="after")
    def validate_missing_components(self):
        """Validate, on run, that all required components are present."""

        if not self._validate_on_run:
            return self

        required_components = [
            "metadata",
            "generalsettings",
            "meteorology",
            "crop",
            "fixedirrigation",
            "soilmoisture",
            "surfaceflow",
            "evaporation",
            "soilprofile",
            "snowandfrost",
            "richards",
            "lateraldrainage",
            "bottomboundary",
            "heatflow",
            "solutetransport",
        ]

        missing_components = [
            comp for comp in required_components if getattr(self, comp) is None
        ]

        if missing_components:
            msg = f"Missing required components: {', '.join(missing_components)}"
            raise ValueError(msg)

        # validate each component
        for comp in required_components:
            getattr(self, comp)

        return self

    @model_validator(mode="after")
    def validate_each_component(self):
        """Validate, on run, that all required components are present."""

        if not self._validate_on_run:
            return self

        for comp in self.model_fields:
            item = getattr(self, comp)
            if hasattr(item, "validate_with_yaml"):
                item._validation = True
                item.validate_with_yaml()

        return self

    def validate(self):
        """Execute the model validation when `run()` is called.

        This method should probably be refactored. It seems to shadow some
        validation method from Pydantic.
        """

        try:
            self._validate_on_run = True
            self.model_validate(self)
        finally:
            self._validate_on_run = False
            logger.info("Validation successful.")

    def write_swp(self, path: str | Path):
        """Write the .swp input file.

        Parameters:
            path (str | Path): The path to write the file to.
        """
        self.save_file(string=self.swp, path=path, fname="swap")

    def get_inputs(self) -> dict:
        """Get the input files in a dictionary."""
        builder = ModelBuilder(model=self, tempdir=Path.cwd())
        return builder.get_inputs()

    def to_classic_swap(self, path: Path) -> None:
        """Prepare all the files for a model run in user's directory."""
        self.validate()
        builder = ModelBuilder(model=self, tempdir=path)

        builder.write_inputs()
        builder.copy_executable()

        logger.info(f"Model files written to {path}")

    def run(
        self, path: str | Path | None = None, silence_warnings: bool = False
    ) -> Result:
        """Run the model using ModelRunner."""
        self.validate()
        path = Path.cwd() if path is None else path
        return ModelRunner(self).run(path, silence_warnings)

swp property

The content of the swp file.

Serialization of Subsection field type has been set in a way that it will generate SWAP formatted string when model_string() is called on the parent class.

get_inputs()

Get the input files in a dictionary.

Source code in pyswap/model/model.py
def get_inputs(self) -> dict:
    """Get the input files in a dictionary."""
    builder = ModelBuilder(model=self, tempdir=Path.cwd())
    return builder.get_inputs()

run(path=None, silence_warnings=False)

Run the model using ModelRunner.

Source code in pyswap/model/model.py
def run(
    self, path: str | Path | None = None, silence_warnings: bool = False
) -> Result:
    """Run the model using ModelRunner."""
    self.validate()
    path = Path.cwd() if path is None else path
    return ModelRunner(self).run(path, silence_warnings)

to_classic_swap(path)

Prepare all the files for a model run in user's directory.

Source code in pyswap/model/model.py
def to_classic_swap(self, path: Path) -> None:
    """Prepare all the files for a model run in user's directory."""
    self.validate()
    builder = ModelBuilder(model=self, tempdir=path)

    builder.write_inputs()
    builder.copy_executable()

    logger.info(f"Model files written to {path}")

validate()

Execute the model validation when run() is called.

This method should probably be refactored. It seems to shadow some validation method from Pydantic.

Source code in pyswap/model/model.py
def validate(self):
    """Execute the model validation when `run()` is called.

    This method should probably be refactored. It seems to shadow some
    validation method from Pydantic.
    """

    try:
        self._validate_on_run = True
        self.model_validate(self)
    finally:
        self._validate_on_run = False
        logger.info("Validation successful.")

validate_each_component()

Validate, on run, that all required components are present.

Source code in pyswap/model/model.py
@model_validator(mode="after")
def validate_each_component(self):
    """Validate, on run, that all required components are present."""

    if not self._validate_on_run:
        return self

    for comp in self.model_fields:
        item = getattr(self, comp)
        if hasattr(item, "validate_with_yaml"):
            item._validation = True
            item.validate_with_yaml()

    return self

validate_missing_components()

Validate, on run, that all required components are present.

Source code in pyswap/model/model.py
@model_validator(mode="after")
def validate_missing_components(self):
    """Validate, on run, that all required components are present."""

    if not self._validate_on_run:
        return self

    required_components = [
        "metadata",
        "generalsettings",
        "meteorology",
        "crop",
        "fixedirrigation",
        "soilmoisture",
        "surfaceflow",
        "evaporation",
        "soilprofile",
        "snowandfrost",
        "richards",
        "lateraldrainage",
        "bottomboundary",
        "heatflow",
        "solutetransport",
    ]

    missing_components = [
        comp for comp in required_components if getattr(self, comp) is None
    ]

    if missing_components:
        msg = f"Missing required components: {', '.join(missing_components)}"
        raise ValueError(msg)

    # validate each component
    for comp in required_components:
        getattr(self, comp)

    return self

write_swp(path)

Write the .swp input file.

Parameters:

Name Type Description Default
path str | Path

The path to write the file to.

required
Source code in pyswap/model/model.py
def write_swp(self, path: str | Path):
    """Write the .swp input file.

    Parameters:
        path (str | Path): The path to write the file to.
    """
    self.save_file(string=self.swp, path=path, fname="swap")

ModelBuilder

Building model components.

Attributes:

Name Type Description
model Model

The model to build.

tempdir str

The temporary directory to store the input files.

Methods:

Name Description
copy_executable

Copy the appropriate SWAP executable to the temporary directory.

write_inputs

Write the input files to the temporary directory.

Source code in pyswap/model/model.py
class ModelBuilder:
    """Building model components.

    Attributes:
        model (Model): The model to build.
        tempdir (str): The temporary directory to store the input files.

    Methods:
        copy_executable: Copy the appropriate SWAP executable to the
            temporary directory.
        write_inputs: Write the input files to the temporary directory.
    """

    def __init__(self, model: Model, tempdir: str):
        self.model = model
        self.tempdir = tempdir

    def copy_executable(self) -> None:
        """Copy the appropriate SWAP executable to the temporary directory."""
        if IS_WINDOWS:
            shutil.copy(swap_windows, self.tempdir)
            logger.info(
                "Copying the windows version of SWAP into temporary directory..."
            )
        else:
            shutil.copy(swap_linux, self.tempdir)
            logger.info("Copying linux executable into temporary directory...")

        return self

    def get_inputs(self) -> dict:
        """Get the inpup files in a dictionary."""
        inputs = {}

        inputs["swp"] = self.model.swp
        if self.model.lateraldrainage.swdra in [1, 2]:
            inputs["dra"] = self.model.lateraldrainage.drafile.dra
        if self.model.crop.cropfiles:
            inputs["crop"] = self.model.crop.cropfiles
        if self.model.meteorology.metfile:
            inputs["met"] = self.model.meteorology.met
        if self.model.fixedirrigation.swirgfil == 1:
            inputs["irg"] = self.model.fixedirrigation.irg
        if self.model.bottomboundary.swbbcfile == 1:
            inputs["bbc"] = self.model.bottomboundary.bbc

        return inputs

    def write_inputs(self) -> None:
        """Write the input files to the temporary directory."""
        logger.info("Preparing files...")

        self.model.write_swp(self.tempdir)

        if self.model.lateraldrainage.swdra in [1, 2]:
            self.model.lateraldrainage.write_dra(self.tempdir)
        if self.model.crop.cropfiles:
            self.model.crop.write_crop(self.tempdir)
        if self.model.meteorology.metfile:
            self.model.meteorology.write_met(self.tempdir)
        if self.model.fixedirrigation.swirgfil == 1:
            self.model.fixedirrigation.write_irg(self.tempdir)
        if self.model.bottomboundary.swbbcfile == 1:
            self.model.bottomboundary.write_bbc(self.tempdir)

        return self

copy_executable()

Copy the appropriate SWAP executable to the temporary directory.

Source code in pyswap/model/model.py
def copy_executable(self) -> None:
    """Copy the appropriate SWAP executable to the temporary directory."""
    if IS_WINDOWS:
        shutil.copy(swap_windows, self.tempdir)
        logger.info(
            "Copying the windows version of SWAP into temporary directory..."
        )
    else:
        shutil.copy(swap_linux, self.tempdir)
        logger.info("Copying linux executable into temporary directory...")

    return self

get_inputs()

Get the inpup files in a dictionary.

Source code in pyswap/model/model.py
def get_inputs(self) -> dict:
    """Get the inpup files in a dictionary."""
    inputs = {}

    inputs["swp"] = self.model.swp
    if self.model.lateraldrainage.swdra in [1, 2]:
        inputs["dra"] = self.model.lateraldrainage.drafile.dra
    if self.model.crop.cropfiles:
        inputs["crop"] = self.model.crop.cropfiles
    if self.model.meteorology.metfile:
        inputs["met"] = self.model.meteorology.met
    if self.model.fixedirrigation.swirgfil == 1:
        inputs["irg"] = self.model.fixedirrigation.irg
    if self.model.bottomboundary.swbbcfile == 1:
        inputs["bbc"] = self.model.bottomboundary.bbc

    return inputs

write_inputs()

Write the input files to the temporary directory.

Source code in pyswap/model/model.py
def write_inputs(self) -> None:
    """Write the input files to the temporary directory."""
    logger.info("Preparing files...")

    self.model.write_swp(self.tempdir)

    if self.model.lateraldrainage.swdra in [1, 2]:
        self.model.lateraldrainage.write_dra(self.tempdir)
    if self.model.crop.cropfiles:
        self.model.crop.write_crop(self.tempdir)
    if self.model.meteorology.metfile:
        self.model.meteorology.write_met(self.tempdir)
    if self.model.fixedirrigation.swirgfil == 1:
        self.model.fixedirrigation.write_irg(self.tempdir)
    if self.model.bottomboundary.swbbcfile == 1:
        self.model.bottomboundary.write_bbc(self.tempdir)

    return self

ModelRunner

Class responsible for running the model.

In the run method, the ResultReader is utilized to abstract the parsing of the model results.

Attributes:

Name Type Description
model Model

The model to run.

Methods:

Name Description
run_swap

Run the SWAP executable.

raise_swap_warning

Raise a warning.

run

Main function that runs the model

Source code in pyswap/model/model.py
class ModelRunner:
    """Class responsible for running the model.

    In the run method, the ResultReader is utilized to abstract the parsing of
    the model results.

    Attributes:
        model (Model): The model to run.

    Methods:
        run_swap: Run the SWAP executable.
        raise_swap_warning: Raise a warning.
        run: Main function that runs the model
    """

    def __init__(self, model: Model):
        self.model = model

    @staticmethod
    def run_swap(tempdir: Path) -> str:
        """Run the SWAP executable.

        Run the exacutable in the tempdirectory and pass the newline to the
        stdin when the executable asks for input (upon termination).

        Parameters:
            tempdir (Path): The temporary directory where the executable
                is stored.
        """
        swap_path = Path(tempdir, "swap.exe") if IS_WINDOWS else "./swap420"
        p = subprocess.Popen(
            swap_path,
            stdout=subprocess.PIPE,
            stdin=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            cwd=tempdir,
        )
        stdout = p.communicate(input=b"\n")[0]

        return stdout.decode()

    def raise_swap_warning(self, warnings: list):
        """Log the warnings form the model run.

        Parameters:
            warnings (list): The warnings from the model run parsed with the
                ResultReaded.
        """
        for message in warnings:
            logger.warning(message)

    def run(self, path: str | Path, silence_warnings: bool = False) -> Result:
        """Main function that runs the model.

        First ModelBuilder is used to prepare the environment for the model run.
        Second, the SWAP executable is run and the decoded result passed from
        the executable is parsed using the ResultReader and used to update the
        Result object.

        Parameters:
            path (str | Path): The path to the temporary directory.
            silence_warnings (bool): If True, warnings are not raised.

        Returns:
            Result: The parsed model results.
        """

        with tempfile.TemporaryDirectory(dir=path) as tempdir:
            builder = ModelBuilder(self.model, tempdir)
            builder.copy_executable().write_inputs()

            stdout = self.run_swap(tempdir)

            if "normal completion" not in stdout:
                msg = f"Model run failed. \n {stdout}"
                raise RuntimeError(msg)

            logger.info(stdout)

            # --- Handle the results ---
            result: Result = Result()

            reader = ResultReader(self.model, tempdir)

            log = reader.read_swap_log()
            result.log = log

            warnings = reader.identify_warnings(log)
            result.warning = warnings

            if warnings and not silence_warnings:
                self.raise_swap_warning(warnings=warnings)

            if "csv" in self.model.generalsettings.extensions:
                output = reader.read_csv_output(which="csv")
                result.output.update({"csv": output})

            if "csv_tz" in self.model.generalsettings.extensions:
                output_tz = reader.read_csv_output(which="csv_tz")
                result.output.update({"csv_tz": output_tz})

            ascii_files = reader.read_ascii_output()

            result.output.update(ascii_files)
            return result

raise_swap_warning(warnings)

Log the warnings form the model run.

Parameters:

Name Type Description Default
warnings list

The warnings from the model run parsed with the ResultReaded.

required
Source code in pyswap/model/model.py
def raise_swap_warning(self, warnings: list):
    """Log the warnings form the model run.

    Parameters:
        warnings (list): The warnings from the model run parsed with the
            ResultReaded.
    """
    for message in warnings:
        logger.warning(message)

run(path, silence_warnings=False)

Main function that runs the model.

First ModelBuilder is used to prepare the environment for the model run. Second, the SWAP executable is run and the decoded result passed from the executable is parsed using the ResultReader and used to update the Result object.

Parameters:

Name Type Description Default
path str | Path

The path to the temporary directory.

required
silence_warnings bool

If True, warnings are not raised.

False

Returns:

Name Type Description
Result Result

The parsed model results.

Source code in pyswap/model/model.py
def run(self, path: str | Path, silence_warnings: bool = False) -> Result:
    """Main function that runs the model.

    First ModelBuilder is used to prepare the environment for the model run.
    Second, the SWAP executable is run and the decoded result passed from
    the executable is parsed using the ResultReader and used to update the
    Result object.

    Parameters:
        path (str | Path): The path to the temporary directory.
        silence_warnings (bool): If True, warnings are not raised.

    Returns:
        Result: The parsed model results.
    """

    with tempfile.TemporaryDirectory(dir=path) as tempdir:
        builder = ModelBuilder(self.model, tempdir)
        builder.copy_executable().write_inputs()

        stdout = self.run_swap(tempdir)

        if "normal completion" not in stdout:
            msg = f"Model run failed. \n {stdout}"
            raise RuntimeError(msg)

        logger.info(stdout)

        # --- Handle the results ---
        result: Result = Result()

        reader = ResultReader(self.model, tempdir)

        log = reader.read_swap_log()
        result.log = log

        warnings = reader.identify_warnings(log)
        result.warning = warnings

        if warnings and not silence_warnings:
            self.raise_swap_warning(warnings=warnings)

        if "csv" in self.model.generalsettings.extensions:
            output = reader.read_csv_output(which="csv")
            result.output.update({"csv": output})

        if "csv_tz" in self.model.generalsettings.extensions:
            output_tz = reader.read_csv_output(which="csv_tz")
            result.output.update({"csv_tz": output_tz})

        ascii_files = reader.read_ascii_output()

        result.output.update(ascii_files)
        return result

run_swap(tempdir) staticmethod

Run the SWAP executable.

Run the exacutable in the tempdirectory and pass the newline to the stdin when the executable asks for input (upon termination).

Parameters:

Name Type Description Default
tempdir Path

The temporary directory where the executable is stored.

required
Source code in pyswap/model/model.py
@staticmethod
def run_swap(tempdir: Path) -> str:
    """Run the SWAP executable.

    Run the exacutable in the tempdirectory and pass the newline to the
    stdin when the executable asks for input (upon termination).

    Parameters:
        tempdir (Path): The temporary directory where the executable
            is stored.
    """
    swap_path = Path(tempdir, "swap.exe") if IS_WINDOWS else "./swap420"
    p = subprocess.Popen(
        swap_path,
        stdout=subprocess.PIPE,
        stdin=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        cwd=tempdir,
    )
    stdout = p.communicate(input=b"\n")[0]

    return stdout.decode()

ResultReader

Class responsible for reading the model results.

Attributes:

Name Type Description
model Model

The model to read the results from.

tempdir str

The temporary directory where the results are stored.

Methods:

Name Description
read_csv_output

Read the csv output.

read_swap_log

Read the log files.

identify_warnings

Catch warnings from the log file.

read_ascii_output

Read all output files that are not in csv format as strings.

Source code in pyswap/model/model.py
class ResultReader:
    """Class responsible for reading the model results.

    Attributes:
        model (Model): The model to read the results from.
        tempdir (str): The temporary directory where the results are stored.

    Methods:
        read_csv_output: Read the csv output.
        read_swap_log: Read the log files.
        identify_warnings: Catch warnings from the log file.
        read_ascii_output: Read all output files that are not in csv format
            as strings.
    """

    def __init__(self, model: Model, tempdir: str):
        self.model: Model = model
        self.tempdir = tempdir

    def read_csv_output(self, which: Literal["csv", "csv_tz"]) -> DataFrame:
        """Read the csv output.

        There are two types of csv output files: csv and csv_tz. They are both
        handle in the same method with mode change.

        Parameters:
            which (str): The type of output file to read.

        Returns:
            DataFrame: The output file as a DataFrame.
        """

        outfil = self.model.generalsettings.outfil
        output_suffix = "_output.csv" if which == "csv" else "_output_tz.csv"
        index_col = "DATETIME" if which == "csv" else "DATE"

        path = Path(self.tempdir, outfil + output_suffix)

        if not path.exists():
            logger.warning(f"Expected output file {path} not found.")
            return DataFrame()

        df = read_csv(path, comment="*", index_col=index_col)
        df.index = to_datetime(df.index)

        return df

    def read_swap_log(self) -> str:
        """Read the log files.

        Returns:
            str: The content of the log file.

        Raises:
            FileNotFoundError: If no log file is found. There should always be
                a log file. If not, something went wrong.
            FileExistsError: If multiple log files are found. Not sure if this
                is possible or not. If so, it should be handled.
        """

        log_files = [
            f for f in Path(self.tempdir).glob("*.log") if f.name != "reruns.log"
        ]

        if len(log_files) == 0:
            msg = "No .log file found in the directory."
            raise FileNotFoundError(msg)

        elif len(log_files) > 1:
            msg = "Multiple .log files found in the directory."
            raise FileExistsError(msg)

        log_file = log_files[0]

        with open(log_file) as file:
            log_content = file.read()

        return log_content

    @staticmethod
    def identify_warnings(log: str) -> list:
        """Catch warnings from the log file.

        This is used by the ModelRunner to raise warnings after the model run.

        Parameters:
            log (str): The log file content.

        Returns:
            list: A list of warnings.
        """
        lines = log.split("\n")
        warnings = [
            line for line in lines if line.strip().lower().startswith("warning")
        ]
        return warnings

    def read_ascii_output(self):
        """Read all output files that are not csv format as strings.

        This method is perhaps a bit oversimplified. In the future, we might
        think about introducing parsers for the different output files. For now,
        we just read them as strings.

        Returns:
            dict (dict): A dictionary of the output strings with extension as key.
        """

        ascii_extensions = [
            ext
            for ext in self.model.generalsettings.extensions
            if ext not in ["csv", "csv_tz"]
        ]

        list_dir = os.listdir(self.tempdir)
        list_dir = [f for f in list_dir if f.endswith(tuple(ascii_extensions))]

        if list_dir:
            dict_files = {
                f.split(".")[1]: open_ascii(Path(self.tempdir, f)) for f in list_dir
            }
            return dict_files
        return {}

identify_warnings(log) staticmethod

Catch warnings from the log file.

This is used by the ModelRunner to raise warnings after the model run.

Parameters:

Name Type Description Default
log str

The log file content.

required

Returns:

Name Type Description
list list

A list of warnings.

Source code in pyswap/model/model.py
@staticmethod
def identify_warnings(log: str) -> list:
    """Catch warnings from the log file.

    This is used by the ModelRunner to raise warnings after the model run.

    Parameters:
        log (str): The log file content.

    Returns:
        list: A list of warnings.
    """
    lines = log.split("\n")
    warnings = [
        line for line in lines if line.strip().lower().startswith("warning")
    ]
    return warnings

read_ascii_output()

Read all output files that are not csv format as strings.

This method is perhaps a bit oversimplified. In the future, we might think about introducing parsers for the different output files. For now, we just read them as strings.

Returns:

Name Type Description
dict dict

A dictionary of the output strings with extension as key.

Source code in pyswap/model/model.py
def read_ascii_output(self):
    """Read all output files that are not csv format as strings.

    This method is perhaps a bit oversimplified. In the future, we might
    think about introducing parsers for the different output files. For now,
    we just read them as strings.

    Returns:
        dict (dict): A dictionary of the output strings with extension as key.
    """

    ascii_extensions = [
        ext
        for ext in self.model.generalsettings.extensions
        if ext not in ["csv", "csv_tz"]
    ]

    list_dir = os.listdir(self.tempdir)
    list_dir = [f for f in list_dir if f.endswith(tuple(ascii_extensions))]

    if list_dir:
        dict_files = {
            f.split(".")[1]: open_ascii(Path(self.tempdir, f)) for f in list_dir
        }
        return dict_files
    return {}

read_csv_output(which)

Read the csv output.

There are two types of csv output files: csv and csv_tz. They are both handle in the same method with mode change.

Parameters:

Name Type Description Default
which str

The type of output file to read.

required

Returns:

Name Type Description
DataFrame DataFrame

The output file as a DataFrame.

Source code in pyswap/model/model.py
def read_csv_output(self, which: Literal["csv", "csv_tz"]) -> DataFrame:
    """Read the csv output.

    There are two types of csv output files: csv and csv_tz. They are both
    handle in the same method with mode change.

    Parameters:
        which (str): The type of output file to read.

    Returns:
        DataFrame: The output file as a DataFrame.
    """

    outfil = self.model.generalsettings.outfil
    output_suffix = "_output.csv" if which == "csv" else "_output_tz.csv"
    index_col = "DATETIME" if which == "csv" else "DATE"

    path = Path(self.tempdir, outfil + output_suffix)

    if not path.exists():
        logger.warning(f"Expected output file {path} not found.")
        return DataFrame()

    df = read_csv(path, comment="*", index_col=index_col)
    df.index = to_datetime(df.index)

    return df

read_swap_log()

Read the log files.

Returns:

Name Type Description
str str

The content of the log file.

Raises:

Type Description
FileNotFoundError

If no log file is found. There should always be a log file. If not, something went wrong.

FileExistsError

If multiple log files are found. Not sure if this is possible or not. If so, it should be handled.

Source code in pyswap/model/model.py
def read_swap_log(self) -> str:
    """Read the log files.

    Returns:
        str: The content of the log file.

    Raises:
        FileNotFoundError: If no log file is found. There should always be
            a log file. If not, something went wrong.
        FileExistsError: If multiple log files are found. Not sure if this
            is possible or not. If so, it should be handled.
    """

    log_files = [
        f for f in Path(self.tempdir).glob("*.log") if f.name != "reruns.log"
    ]

    if len(log_files) == 0:
        msg = "No .log file found in the directory."
        raise FileNotFoundError(msg)

    elif len(log_files) > 1:
        msg = "Multiple .log files found in the directory."
        raise FileExistsError(msg)

    log_file = log_files[0]

    with open(log_file) as file:
        log_content = file.read()

    return log_content

run_parallel(mls, path=None, silence_warnings=False, **kwargs)

Run multiple models in parallel.

Parameters:

Name Type Description Default
mls list[Model]

List of models to run.

required
path Path | str

The path to the temporary directory.

None
silence_warnings bool

If True, warnings are not raised.

False
**kwargs dict

Keyword arguments for Pool().

{}

Returns:

Type Description
list[Result]

list[Result]: List of results from the model runs.

Source code in pyswap/model/model.py
def run_parallel(
    mls: list[Model],
    path: Path | str | None = None,
    silence_warnings: bool = False,
    **kwargs,
) -> list[Result]:
    """Run multiple models in parallel.

    Parameters:
        mls (list[Model]): List of models to run.
        path (Path | str): The path to the temporary directory.
        silence_warnings (bool): If True, warnings are not raised.
        **kwargs (dict): Keyword arguments for Pool().

    Returns:
        list[Result]: List of results from the model runs.
    """
    with Pool(**kwargs) as pool:
        results = pool.map(
            _run_model_with_params, [(model, path, silence_warnings) for model in mls]
        )

    return results

Capturing model results.

After a model is run, the results are stored in a Result object. The object stores the log file, output file, and warnings. Output is a dictionary with the keys being the file extensions and the values being the file contents. There are also computed properties making the most common output formats easily accessible.

Classes:

Name Description
Result

Result of a model run.

Result

Bases: BaseModel

Result of a model run.

Attributes:

Name Type Description
log str

The log file of the model run.

output DataFrame

The output file of the model run.

warning List[str]

The warnings of the model run.

Properties

ascii (dict): The output in ASCII format. csv (DataFrame): The output in CSV format. csv_tz (DataFrame): The output in CSV format with depth. iteration_stats (str): Return the part the iteration statistics from the log. blc_summary (str): The .blc file if it exists. yearly_summary (DataFrame): Yearly sums of all output variables. Will return an error if csv was not included in the output file formats.

Source code in pyswap/model/result.py
class Result(BaseModel):
    """Result of a model run.

    Attributes:
        log (str): The log file of the model run.
        output (DataFrame): The output file of the model run.
        warning (List[str]): The warnings of the model run.

    Properties:
        ascii (dict): The output in ASCII format.
        csv (DataFrame): The output in CSV format.
        csv_tz (DataFrame): The output in CSV format with depth.
        iteration_stats (str): Return the part the iteration statistics from
            the log.
        blc_summary (str): The .blc file if it exists.
        yearly_summary (DataFrame): Yearly sums of all output variables. Will
            return an error if csv was not included in the output file formats.
    """

    log: str | None = Field(default=None, repr=False)
    output: dict | None = Field(default_factory=dict, repr=False)
    warning: list[str] | None = Field(default=None, repr=False)

    model_config = ConfigDict(
        arbitrary_types_allowed=True, validate_assignment=True, extra="forbid"
    )

    @computed_field(return_type=dict, repr=False)
    def ascii(self) -> dict:
        """Return all outputs in ASCII format."""
        return {k: v for k, v in self.output.items() if not k.endswith("csv")}

    @computed_field(return_type=DataFrame, repr=False)
    def csv(self) -> DataFrame:
        """Return the output in CSV format."""
        return self.output.get("csv", None)

    @computed_field(return_type=DataFrame, repr=False)
    def csv_tz(self) -> DataFrame:
        """Return the output in CSV format with depth."""
        return self.output.get("csv_tz", None)

    @computed_field(return_type=str, repr=False)
    def iteration_stats(self) -> str:
        """Print the part the iteration statistics from the log."""
        match = re.search(r".*(Iteration statistics\s*.*)$", self.log, re.DOTALL)
        if match:
            return match.group(1)
        return ""

    @computed_field(return_type=str, repr=False)
    def blc_summary(self) -> str:
        """Print the .blc file if it exists."""
        print(self.output.get("blc", None))
        return

    @computed_field(return_type=DataFrame, repr=False)
    def yearly_summary(self) -> DataFrame:
        """Return yearly sums of all output variables."""
        if not isinstance(self.csv, DataFrame):
            msg = "CSV file not included in output file formats."
            raise TypeError(msg)
        return self.csv.resample("YE").sum()

ascii()

Return all outputs in ASCII format.

Source code in pyswap/model/result.py
@computed_field(return_type=dict, repr=False)
def ascii(self) -> dict:
    """Return all outputs in ASCII format."""
    return {k: v for k, v in self.output.items() if not k.endswith("csv")}

blc_summary()

Print the .blc file if it exists.

Source code in pyswap/model/result.py
@computed_field(return_type=str, repr=False)
def blc_summary(self) -> str:
    """Print the .blc file if it exists."""
    print(self.output.get("blc", None))
    return

csv()

Return the output in CSV format.

Source code in pyswap/model/result.py
@computed_field(return_type=DataFrame, repr=False)
def csv(self) -> DataFrame:
    """Return the output in CSV format."""
    return self.output.get("csv", None)

csv_tz()

Return the output in CSV format with depth.

Source code in pyswap/model/result.py
@computed_field(return_type=DataFrame, repr=False)
def csv_tz(self) -> DataFrame:
    """Return the output in CSV format with depth."""
    return self.output.get("csv_tz", None)

iteration_stats()

Print the part the iteration statistics from the log.

Source code in pyswap/model/result.py
@computed_field(return_type=str, repr=False)
def iteration_stats(self) -> str:
    """Print the part the iteration statistics from the log."""
    match = re.search(r".*(Iteration statistics\s*.*)$", self.log, re.DOTALL)
    if match:
        return match.group(1)
    return ""

yearly_summary()

Return yearly sums of all output variables.

Source code in pyswap/model/result.py
@computed_field(return_type=DataFrame, repr=False)
def yearly_summary(self) -> DataFrame:
    """Return yearly sums of all output variables."""
    if not isinstance(self.csv, DataFrame):
        msg = "CSV file not included in output file formats."
        raise TypeError(msg)
    return self.csv.resample("YE").sum()