Skip to content

Advanced elements

Base models inherited by all pySWAP models.

A lot of functionality can be abstracted away in the base models. This way, the code is more DRY and easier to maintain. The base models are used to enforce the correct data types and structure of the input data. They also provide methods to convert the data to the format required by the SWAP model.

Classes defined here are based on Pydantic BaseModel and Pandera DataFrameModel. Both are meant to ensure the correct data types and structure of the input data, as successful validation means smooth execution of the SWAP model. Particularily important when run as a submitted job on an HPC.

Classes:

Name Description
BaseModel

Base class for pySWAP models. Inherits from Pydantic BaseModel.

BaseTableModel

Base class for pySWAP models that validate pandas DataFrames. Inherits from Pandera DataFrameModel.

BaseTableModel

Bases: DataFrameModel

Base model for pandas DataFrames.

Methods:

Name Description
create

Create a validated DataFrame from a dictionary.

Source code in pyswap/core/basemodel.py
class BaseTableModel(pa.DataFrameModel):
    """Base model for pandas DataFrames.

    Methods:
        create: Create a validated DataFrame from a dictionary.
    """

    class Config:
        coerce = True

    @classmethod
    def create(cls, data: dict, columns: list | None = None) -> DataFrame:
        df = pd.DataFrame(data=data)
        if columns:
            df.columns = columns
        else:
            df.columns = df.columns.str.upper()
        validated_df = cls.validate(df)
        return validated_df

    @classmethod
    def update(cls, table, new: dict):
        # Update the table with new values
        table_upd = table.to_dict("list")
        table_upd.update(new)
        return cls.create(table_upd)

PySWAPBaseModel

Bases: BaseModel

Base class for pySWAP models.

Methods:

Name Description
__setattr__

Overriden method to silently ignore assignment of frozen fields.

update

Update the model with new values from a dictionary.

Source code in pyswap/core/basemodel.py
class PySWAPBaseModel(BaseModel):
    """Base class for pySWAP models.

    Methods:
        __setattr__: Overriden method to silently ignore assignment of frozen
            fields.
        update: Update the model with new values from a dictionary.
    """

    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        validate_assignment=True,
        extra="ignore",
        populate_by_name=True,
    )

    def __setattr__(self, name, value):
        """Silently ignore assignment of frozen fields.

        This method is overridden to silently ignore assignment of frozen fields
        to avoid errors when an old swp files is read.
        """
        _class = type(self)

        if name in _class.model_fields and _class.model_fields[name].frozen:
            return
        super().__setattr__(name, value)

    def update(self, new: dict, inplace: bool = False, no_validate: bool = False):
        """Update the model with new values.

        Given dictionary of values is first filtered to include only the fields
        that exist in the model. The model is then updated with the new values.
        The updated model is returned (either new or updated self).

        Parameters:
            new (dict): Dictionary with new values.
            inplace (bool): If True, update the model in place.
        """

        updated_model = self.model_validate(dict(self) | new)

        if not inplace:
            # added this for the case when the user loads a model from the
            # classic ASCII files. Then the .update() method is used, but not
            # all the attributes will be available immediatelly. Full validation
            # will still be performed upon model run.
            if no_validate:
                updated_model._validation = False
            else:
                updated_model._validation = True
            updated_model.validate_with_yaml() if hasattr(
                updated_model, "validate_with_yaml"
            ) else None
            return updated_model.model_copy(deep=True)

        else:
            for field, value in updated_model:
                setattr(self, field, value)
            if no_validate:
                updated_model._validation = False
            else:
                updated_model._validation = True
            self.validate_with_yaml() if hasattr(
                updated_model, "validate_with_yaml"
            ) else None

            return self

    @field_validator("*", mode="before")
    @classmethod
    def convert_switches(cls, value: Any, info: Any) -> Any:
        """Convert switch values to integers.

        This method was necessary to ensure that loading models from ASCII files
        would work. It could be improved to include literals that do not start
        with "sw" as well.

        !!! note:
            This should be eventually replaced by a custom Switch field type handling
            serialization and deserialization.
        """
        if (
            (info.field_name.startswith("sw") or info.field_name in ADDITIONAL_SWITCHES)
            and info.field_name != "swap_ver"
            and value
        ):
            try:
                return int(value)
            except ValueError:
                return value
        return value

__setattr__(name, value)

Silently ignore assignment of frozen fields.

This method is overridden to silently ignore assignment of frozen fields to avoid errors when an old swp files is read.

Source code in pyswap/core/basemodel.py
def __setattr__(self, name, value):
    """Silently ignore assignment of frozen fields.

    This method is overridden to silently ignore assignment of frozen fields
    to avoid errors when an old swp files is read.
    """
    _class = type(self)

    if name in _class.model_fields and _class.model_fields[name].frozen:
        return
    super().__setattr__(name, value)

convert_switches(value, info) classmethod

Convert switch values to integers.

This method was necessary to ensure that loading models from ASCII files would work. It could be improved to include literals that do not start with "sw" as well.

!!! note: This should be eventually replaced by a custom Switch field type handling serialization and deserialization.

Source code in pyswap/core/basemodel.py
@field_validator("*", mode="before")
@classmethod
def convert_switches(cls, value: Any, info: Any) -> Any:
    """Convert switch values to integers.

    This method was necessary to ensure that loading models from ASCII files
    would work. It could be improved to include literals that do not start
    with "sw" as well.

    !!! note:
        This should be eventually replaced by a custom Switch field type handling
        serialization and deserialization.
    """
    if (
        (info.field_name.startswith("sw") or info.field_name in ADDITIONAL_SWITCHES)
        and info.field_name != "swap_ver"
        and value
    ):
        try:
            return int(value)
        except ValueError:
            return value
    return value

update(new, inplace=False, no_validate=False)

Update the model with new values.

Given dictionary of values is first filtered to include only the fields that exist in the model. The model is then updated with the new values. The updated model is returned (either new or updated self).

Parameters:

Name Type Description Default
new dict

Dictionary with new values.

required
inplace bool

If True, update the model in place.

False
Source code in pyswap/core/basemodel.py
def update(self, new: dict, inplace: bool = False, no_validate: bool = False):
    """Update the model with new values.

    Given dictionary of values is first filtered to include only the fields
    that exist in the model. The model is then updated with the new values.
    The updated model is returned (either new or updated self).

    Parameters:
        new (dict): Dictionary with new values.
        inplace (bool): If True, update the model in place.
    """

    updated_model = self.model_validate(dict(self) | new)

    if not inplace:
        # added this for the case when the user loads a model from the
        # classic ASCII files. Then the .update() method is used, but not
        # all the attributes will be available immediatelly. Full validation
        # will still be performed upon model run.
        if no_validate:
            updated_model._validation = False
        else:
            updated_model._validation = True
        updated_model.validate_with_yaml() if hasattr(
            updated_model, "validate_with_yaml"
        ) else None
        return updated_model.model_copy(deep=True)

    else:
        for field, value in updated_model:
            setattr(self, field, value)
        if no_validate:
            updated_model._validation = False
        else:
            updated_model._validation = True
        self.validate_with_yaml() if hasattr(
            updated_model, "validate_with_yaml"
        ) else None

        return self

Mixins

Reusable mixins enhancing functionality of specific PySWAPBaseModel.

To keep the main PySWAPBaseModel class and the components library clean and focused, mixins are used to add additional functionality to the classes that need it. The concept of the mixins was inspired by the Django framework and it really helps to keep the code clean and organized.

Should more functionality be needed in the future for one or more classes, it should be implemented as a mixin and then inherited by the classes that need it.

Classes:

FileMixin: Custom saving functionality for models that need file I/O.
SerializableMixin: Converting a model to a SWAP-formatted string.
YAMLValidatorMixin: Validating parameters using external YAML rules.
WOFOSTUpdateMixin: Interface for the WOFOST crop parameters database for
    pySWAP.

FileMixin

Custom saving functionality for models that need file I/O.

!!! note:

The _extension attribute should be set in the class that inherits
this mixin. It is recommended that pydantic's PrivateAttr is used to
hide this attribute from the user.

Methods:

Name Description
save_file

Saves a string to a file.

Source code in pyswap/utils/mixins.py
class FileMixin:
    """Custom saving functionality for models that need file I/O.

    !!! note:

        The _extension attribute should be set in the class that inherits
        this mixin. It is recommended that pydantic's PrivateAttr is used to
        hide this attribute from the user.

    Methods:
        save_file: Saves a string to a file.
    """

    def save_file(
        self,
        string: str,
        fname: str,
        path: Path,
    ) -> None:
        """Saves a string to a file.

        The extension should now be provided in each class inheriting this
        mixin as a private attribute.

        Parameters:
            string: The string to be saved to a file.
            fname: The name of the file.
            path: The path where the file should be saved.
        """

        if not hasattr(self, "_extension"):
            msg = "The _extension attribute should be set."
            raise AttributeError(msg)

        ext = self._extension
        fname = f"{fname}.{ext}" if ext else fname

        with open(f"{path}/{fname}", "w", encoding="ascii") as f:
            f.write(string)

        logger.info(f"{fname} saved successfully.")

        return None

save_file(string, fname, path)

Saves a string to a file.

The extension should now be provided in each class inheriting this mixin as a private attribute.

Parameters:

Name Type Description Default
string str

The string to be saved to a file.

required
fname str

The name of the file.

required
path Path

The path where the file should be saved.

required
Source code in pyswap/utils/mixins.py
def save_file(
    self,
    string: str,
    fname: str,
    path: Path,
) -> None:
    """Saves a string to a file.

    The extension should now be provided in each class inheriting this
    mixin as a private attribute.

    Parameters:
        string: The string to be saved to a file.
        fname: The name of the file.
        path: The path where the file should be saved.
    """

    if not hasattr(self, "_extension"):
        msg = "The _extension attribute should be set."
        raise AttributeError(msg)

    ext = self._extension
    fname = f"{fname}.{ext}" if ext else fname

    with open(f"{path}/{fname}", "w", encoding="ascii") as f:
        f.write(string)

    logger.info(f"{fname} saved successfully.")

    return None

SerializableMixin

Bases: BaseModel

Converting a model to a SWAP-formatted string.

This mixin is only inherited by classes that directly serialize to a SWAP-formatted string. The assumptions are that the inheriting classes:

  • do not contain nested classes.
  • if the class contains nested classes it should either use Subsection field types or override the model_string() method.

Methods:

Name Description
if_is_union_type

Check if the field type is a Union type.

is_annotated_exception_type

Check if the attribute type is Table, Arrays, or ObjectList.

serialize_model

Override the default serialization method.

model_string

Concatenate the formatted strings from dictionary to one string.

Source code in pyswap/utils/mixins.py
class SerializableMixin(BaseModel):
    """Converting a model to a SWAP-formatted string.

    This mixin is only inherited by classes that directly serialize to a
    SWAP-formatted string. The assumptions are that the inheriting classes:

    - do not contain nested classes.
    - if the class contains nested classes it should either use Subsection field
        types or override the `model_string()` method.

    Methods:
        if_is_union_type: Check if the field type is a Union type.
        is_annotated_exception_type: Check if the attribute type is Table,
            Arrays, or ObjectList.
        serialize_model: Override the default serialization method.
        model_string: Concatenate the formatted strings from dictionary to
            one string.
    """

    def if_is_union_type(self, field_info: FieldInfo) -> dict | None:
        """Check if the field type is a Union type.

        If it is, look for the json_schema_extra attribute in the field_info
        of the first argument of the Union type. If it is not found, return
        None. It was necessary in cases of, for example, optional classes like
        Union[Table, None].

        Parameters:
            field_info (FieldInfo): The FieldInfo object of the field.
        """

        field_type = field_info.annotation

        if get_origin(field_type) is Union:
            union_args = get_args(field_type)
            args = get_args(union_args[0])

            field_info = [item for item in args if isinstance(item, FieldInfo)]

            if not field_info:
                return None

            # Only return the json_schema_extra attribute. This is used in some
            # cases to pass addotional information from the serializer in
            # pyswap.core.fields module to the model_dump.
            return field_info[0].json_schema_extra
        return None

    def is_annotated_exception_type(self, field_name: str) -> bool:
        """Check if the attribute type is Table, Arrays, or ObjectList.

        For Table, Arrays, and ObjectList types True is returned, ensuring a
        separate serialization path.

        First try to assign the json_schema_extra from a Union type. If that
        fails, assign the json_schema_extra from the field_info. If the
        json_schema_extra is None, return False.
        """
        # Every special field will have a FieldInfo object
        _class = type(self)
        field_info = _class.model_fields.get(field_name, None)

        if field_info is None:
            return False

        json_schema_extra = (
            self.if_is_union_type(field_info) or field_info.json_schema_extra
        )

        if json_schema_extra is None:
            return False

        return json_schema_extra.get("is_annotated_exception_type", False)

    @model_serializer(when_used="json", mode="wrap")
    def serialize_model(self, handler: Any):
        """Override the default serialization method.

        In the intermediate step, a dictionary is created with SWAP formatted
        strings.
        """
        result = {}
        validated_self = handler(self)
        for field_name, field_value in validated_self.items():
            if self.is_annotated_exception_type(field_name):
                result[field_name] = field_value
            else:
                result[field_name] = f"{field_name.upper()} = {field_value}"
        return result

    def model_string(
        self, mode: Literal["str", "list"] = "string", **kwargs
    ) -> str | list[str]:
        """Concatenate the formatted strings from dictionary to one string.


        !!! note:
            By alias is True, because in some cases, particularily in the case
            of CropSettings, the WOFOST names of parameters in the database were
            different from those used in SWAP. This allows those parameters to
            be properly matched, yet serialized properly in SWAP input files.

        Parameters:
            mode (Literal["str", "list]): The output format.
            kwargs (dict): Additional keyword arguments passed to `model_dump()`.
        """
        dump = self.model_dump(
            mode="json", exclude_none=True, by_alias=True, **kwargs
        ).values()

        if mode == "list":
            return list(dump)
        else:
            return "\n".join(dump)

if_is_union_type(field_info)

Check if the field type is a Union type.

If it is, look for the json_schema_extra attribute in the field_info of the first argument of the Union type. If it is not found, return None. It was necessary in cases of, for example, optional classes like Union[Table, None].

Parameters:

Name Type Description Default
field_info FieldInfo

The FieldInfo object of the field.

required
Source code in pyswap/utils/mixins.py
def if_is_union_type(self, field_info: FieldInfo) -> dict | None:
    """Check if the field type is a Union type.

    If it is, look for the json_schema_extra attribute in the field_info
    of the first argument of the Union type. If it is not found, return
    None. It was necessary in cases of, for example, optional classes like
    Union[Table, None].

    Parameters:
        field_info (FieldInfo): The FieldInfo object of the field.
    """

    field_type = field_info.annotation

    if get_origin(field_type) is Union:
        union_args = get_args(field_type)
        args = get_args(union_args[0])

        field_info = [item for item in args if isinstance(item, FieldInfo)]

        if not field_info:
            return None

        # Only return the json_schema_extra attribute. This is used in some
        # cases to pass addotional information from the serializer in
        # pyswap.core.fields module to the model_dump.
        return field_info[0].json_schema_extra
    return None

is_annotated_exception_type(field_name)

Check if the attribute type is Table, Arrays, or ObjectList.

For Table, Arrays, and ObjectList types True is returned, ensuring a separate serialization path.

First try to assign the json_schema_extra from a Union type. If that fails, assign the json_schema_extra from the field_info. If the json_schema_extra is None, return False.

Source code in pyswap/utils/mixins.py
def is_annotated_exception_type(self, field_name: str) -> bool:
    """Check if the attribute type is Table, Arrays, or ObjectList.

    For Table, Arrays, and ObjectList types True is returned, ensuring a
    separate serialization path.

    First try to assign the json_schema_extra from a Union type. If that
    fails, assign the json_schema_extra from the field_info. If the
    json_schema_extra is None, return False.
    """
    # Every special field will have a FieldInfo object
    _class = type(self)
    field_info = _class.model_fields.get(field_name, None)

    if field_info is None:
        return False

    json_schema_extra = (
        self.if_is_union_type(field_info) or field_info.json_schema_extra
    )

    if json_schema_extra is None:
        return False

    return json_schema_extra.get("is_annotated_exception_type", False)

model_string(mode='string', **kwargs)

Concatenate the formatted strings from dictionary to one string.

!!! note: By alias is True, because in some cases, particularily in the case of CropSettings, the WOFOST names of parameters in the database were different from those used in SWAP. This allows those parameters to be properly matched, yet serialized properly in SWAP input files.

Parameters:

Name Type Description Default
mode Literal["str", "list]

The output format.

'string'
kwargs dict

Additional keyword arguments passed to model_dump().

{}
Source code in pyswap/utils/mixins.py
def model_string(
    self, mode: Literal["str", "list"] = "string", **kwargs
) -> str | list[str]:
    """Concatenate the formatted strings from dictionary to one string.


    !!! note:
        By alias is True, because in some cases, particularily in the case
        of CropSettings, the WOFOST names of parameters in the database were
        different from those used in SWAP. This allows those parameters to
        be properly matched, yet serialized properly in SWAP input files.

    Parameters:
        mode (Literal["str", "list]): The output format.
        kwargs (dict): Additional keyword arguments passed to `model_dump()`.
    """
    dump = self.model_dump(
        mode="json", exclude_none=True, by_alias=True, **kwargs
    ).values()

    if mode == "list":
        return list(dump)
    else:
        return "\n".join(dump)

serialize_model(handler)

Override the default serialization method.

In the intermediate step, a dictionary is created with SWAP formatted strings.

Source code in pyswap/utils/mixins.py
@model_serializer(when_used="json", mode="wrap")
def serialize_model(self, handler: Any):
    """Override the default serialization method.

    In the intermediate step, a dictionary is created with SWAP formatted
    strings.
    """
    result = {}
    validated_self = handler(self)
    for field_name, field_value in validated_self.items():
        if self.is_annotated_exception_type(field_name):
            result[field_name] = field_value
        else:
            result[field_name] = f"{field_name.upper()} = {field_value}"
    return result

YAMLUpdateMixin

Interface for the WOFOST crop parameters database for pySWAP.

This mixin should be inherited by classes that share parameters with the WOFOST crop database.

Source code in pyswap/utils/mixins.py
class YAMLUpdateMixin:
    """Interface for the WOFOST crop parameters database for pySWAP.

    This mixin should be inherited by classes that share parameters with the
    WOFOST crop database.
    """

    def _process_parameters(
        self, params: dict, source_name: str, grass: bool = False
    ) -> dict:
        """Process parameters by converting tables/arrays using TableProcessor.

        Args:
            params: Dictionary of parameters to process
            source_name: Name of the parameter source (for logging)

        Returns:
            Processed parameters dictionary with tables converted to dataframes
        """
        params_copy = params.copy()
        tp = TableProcessor()

        for name, value in params.items():
            processed = None

            # Handle YAML dict format (two-column tables)
            if isinstance(value, dict) and len(value) == 2:
                keys = list(value.keys())
                logger.debug(
                    f"Processing YAML dict format for parameter: {name} with keys: {keys}"
                )
                if len(keys) == 2:
                    array_data = [
                        [x, y]
                        for x, y in zip(value[keys[0]], value[keys[1]], strict=True)
                    ]
                    processed = tp.process(
                        "array", data=array_data, columns=name, grass=grass
                    )

            # Handle list of lists format (array parameters)
            elif isinstance(value, list) and value and isinstance(value[0], list):
                processed = tp.process("array", data=value, columns=name, grass=grass)

            # Update or remove parameter based on processing result
            if processed is not None:
                params_copy.update(processed)
                logger.debug(f"Processed parameter from {source_name}: {name}")
            elif isinstance(value, list | dict) and not isinstance(value, str):
                # Remove unmatched complex parameters (but keep scalars)
                params_copy.pop(name, None)
                logger.warning(
                    f"Failed to process parameter from {source_name}: {name}, removing from update"
                )

        return params_copy

    def update_from_wofost(self) -> None:
        """Update the model with the WOFOST variety settings."""

        # parameters attribute returns a dictionary with the key-value pairs and
        # tables as list of lists. Before updating, the tables should be
        # created.
        if not hasattr(self, "wofost_variety"):
            msg = "The model does not have the WOFOST variety settings."
            raise AttributeError(msg)

        variety_params = self.wofost_variety.parameters
        logger.debug(f"Updating from WOFOST variety parameters: {variety_params}")

        processed_params = self._process_parameters(variety_params, "WOFOST")
        self.update(processed_params, inplace=True)

    def update_from_yaml(self, yaml_path: str | Path, grass: bool = False) -> None:
        """Update the model with parameters from a YAML file.

        Parameters:
            yaml_path (str | Path): Path to the YAML file containing parameters.
        """

        yaml_path = Path(yaml_path)
        if not yaml_path.exists():
            msg = f"YAML file not found: {yaml_path}"
            raise FileNotFoundError(msg)

        yaml_content = load_yaml(yaml_path)
        logger.debug(f"Loaded YAML content from: {yaml_path}")

        # Extract parameters from the YAML structure
        # Assuming the YAML has a structure like: CropParameters -> SWAPInput -> parameters
        params = None
        if (
            isinstance(yaml_content, dict)
            and "CropParameters" in yaml_content
            and isinstance(yaml_content["CropParameters"], dict)
            and "SWAPInput" in yaml_content["CropParameters"]
        ):
            params = yaml_content["CropParameters"]["SWAPInput"]
        elif isinstance(yaml_content, dict):
            # If it's a flat dictionary, use it directly
            params = yaml_content
        else:
            structure_info = str(type(yaml_content).__name__)
            if hasattr(yaml_content, "keys"):
                structure_info = (
                    f"{structure_info} with keys: {list(yaml_content.keys())}"
                )
            msg = f"Could not find parameters in YAML structure: {structure_info}"
            raise ValueError(msg)

        logger.debug(f"Extracted parameters from YAML: {list(params.keys())}")

        processed_params = self._process_parameters(params, "YAML", grass)
        self.update(processed_params, inplace=True)

update_from_wofost()

Update the model with the WOFOST variety settings.

Source code in pyswap/utils/mixins.py
def update_from_wofost(self) -> None:
    """Update the model with the WOFOST variety settings."""

    # parameters attribute returns a dictionary with the key-value pairs and
    # tables as list of lists. Before updating, the tables should be
    # created.
    if not hasattr(self, "wofost_variety"):
        msg = "The model does not have the WOFOST variety settings."
        raise AttributeError(msg)

    variety_params = self.wofost_variety.parameters
    logger.debug(f"Updating from WOFOST variety parameters: {variety_params}")

    processed_params = self._process_parameters(variety_params, "WOFOST")
    self.update(processed_params, inplace=True)

update_from_yaml(yaml_path, grass=False)

Update the model with parameters from a YAML file.

Parameters:

Name Type Description Default
yaml_path str | Path

Path to the YAML file containing parameters.

required
Source code in pyswap/utils/mixins.py
def update_from_yaml(self, yaml_path: str | Path, grass: bool = False) -> None:
    """Update the model with parameters from a YAML file.

    Parameters:
        yaml_path (str | Path): Path to the YAML file containing parameters.
    """

    yaml_path = Path(yaml_path)
    if not yaml_path.exists():
        msg = f"YAML file not found: {yaml_path}"
        raise FileNotFoundError(msg)

    yaml_content = load_yaml(yaml_path)
    logger.debug(f"Loaded YAML content from: {yaml_path}")

    # Extract parameters from the YAML structure
    # Assuming the YAML has a structure like: CropParameters -> SWAPInput -> parameters
    params = None
    if (
        isinstance(yaml_content, dict)
        and "CropParameters" in yaml_content
        and isinstance(yaml_content["CropParameters"], dict)
        and "SWAPInput" in yaml_content["CropParameters"]
    ):
        params = yaml_content["CropParameters"]["SWAPInput"]
    elif isinstance(yaml_content, dict):
        # If it's a flat dictionary, use it directly
        params = yaml_content
    else:
        structure_info = str(type(yaml_content).__name__)
        if hasattr(yaml_content, "keys"):
            structure_info = (
                f"{structure_info} with keys: {list(yaml_content.keys())}"
            )
        msg = f"Could not find parameters in YAML structure: {structure_info}"
        raise ValueError(msg)

    logger.debug(f"Extracted parameters from YAML: {list(params.keys())}")

    processed_params = self._process_parameters(params, "YAML", grass)
    self.update(processed_params, inplace=True)

YAMLValidatorMixin

Bases: BaseModel

A mixin class that provides YAML-based validation for models.

Initially, pySWAP had model serializers on each model component class which had a number of assertions to validate the parameters (i.e., require parameters rlwtb and wrtmax if swrd = 3). This created chaos in the code, and since none of it was used by inspection tools anyways, it was decided to leave the validation logic in the code and move the rules to a separate YAML file.

Methods:

Name Description
validate_parameters

Validate parameters against required rules.

validate_with_yaml

Pydantic validator executing validation logic.

Source code in pyswap/utils/mixins.py
class YAMLValidatorMixin(BaseModel):
    """A mixin class that provides YAML-based validation for models.

    Initially, pySWAP had model serializers on each model component class which
    had a number of assertions to validate the parameters (i.e., require
    parameters rlwtb and wrtmax if swrd = 3). This created chaos
    in the code, and since none of it was used by inspection tools anyways, it
    was decided to leave the validation logic in the code and move the rules to
    a separate YAML file.

    Methods:
        validate_parameters: Validate parameters against required rules.
        validate_with_yaml: Pydantic validator executing validation logic.
    """

    _validation: bool = PrivateAttr(default=False)

    @staticmethod
    def validate_parameters(
        switch_name: str, switch_value: str, params: dict, rules: dict
    ):
        """Validate parameters against required rules.

        This method reads the rules for the model from the YAML file and checks
        if the required parameters are present. If not, it raises a ValueError.

        ```yaml
        SaltStress: # <--- Model name
            swsalinity:  # <--- Switch name (switch_name)
                1:  # <--- Switch value (switch_value)
                - saltmax  # <---| Required parameters
                - saltslope  # <--|
                2:
                - salthead
        ```

        Parameters:
            switch_name (str): The name of the switch (e.g., 'swcf').
            switch_value (Any): The value of the switch (e.g., 1 or 2).
            params (dict): Dictionary of parameters to check.
            rules (dict): Dictionary with validation rules.

        Raises:
            ValueError: If required parameters are missing.
        """

        required_params = rules.get(switch_name, {}).get(switch_value, [])

        if not required_params:
            return  # No rules for this switch value

        missing_params = [
            param for param in required_params if params.get(param) is None
        ]

        if missing_params:
            msg = f"The following parameters are required for {switch_name}={switch_value}: {', '.join(missing_params)}"
            raise ValueError(msg)

    @model_validator(mode="after")
    def validate_with_yaml(self) -> Self:
        """Pydantic validator executing validation logic.

        All validators defined on a model run on model instantiation. This
        method makes sure that YAML validation is postponed until the
        _validation parameter (required on all classes inheriting this mixin) is
        set to True. This state is done when all the required parameters are
        presumed to be set, e.g., when the user tries to run the model.
        """

        if not self._validation:
            return self

        rules = VALIDATIONRULES.get(self.__class__.__name__, {})

        logger.debug(f"Validating {self.__class__.__name__} with rules: {rules}")

        for switch_name in rules:
            switch_value = getattr(self, switch_name, None)
            if switch_value is not None:  # Only validate if the switch is set
                self.validate_parameters(
                    switch_name, switch_value, self.__dict__, rules
                )

        self._validation = False
        return self

validate_parameters(switch_name, switch_value, params, rules) staticmethod

Validate parameters against required rules.

This method reads the rules for the model from the YAML file and checks if the required parameters are present. If not, it raises a ValueError.

SaltStress: # <--- Model name
    swsalinity:  # <--- Switch name (switch_name)
        1:  # <--- Switch value (switch_value)
        - saltmax  # <---| Required parameters
        - saltslope  # <--|
        2:
        - salthead

Parameters:

Name Type Description Default
switch_name str

The name of the switch (e.g., 'swcf').

required
switch_value Any

The value of the switch (e.g., 1 or 2).

required
params dict

Dictionary of parameters to check.

required
rules dict

Dictionary with validation rules.

required

Raises:

Type Description
ValueError

If required parameters are missing.

Source code in pyswap/utils/mixins.py
@staticmethod
def validate_parameters(
    switch_name: str, switch_value: str, params: dict, rules: dict
):
    """Validate parameters against required rules.

    This method reads the rules for the model from the YAML file and checks
    if the required parameters are present. If not, it raises a ValueError.

    ```yaml
    SaltStress: # <--- Model name
        swsalinity:  # <--- Switch name (switch_name)
            1:  # <--- Switch value (switch_value)
            - saltmax  # <---| Required parameters
            - saltslope  # <--|
            2:
            - salthead
    ```

    Parameters:
        switch_name (str): The name of the switch (e.g., 'swcf').
        switch_value (Any): The value of the switch (e.g., 1 or 2).
        params (dict): Dictionary of parameters to check.
        rules (dict): Dictionary with validation rules.

    Raises:
        ValueError: If required parameters are missing.
    """

    required_params = rules.get(switch_name, {}).get(switch_value, [])

    if not required_params:
        return  # No rules for this switch value

    missing_params = [
        param for param in required_params if params.get(param) is None
    ]

    if missing_params:
        msg = f"The following parameters are required for {switch_name}={switch_value}: {', '.join(missing_params)}"
        raise ValueError(msg)

validate_with_yaml()

Pydantic validator executing validation logic.

All validators defined on a model run on model instantiation. This method makes sure that YAML validation is postponed until the _validation parameter (required on all classes inheriting this mixin) is set to True. This state is done when all the required parameters are presumed to be set, e.g., when the user tries to run the model.

Source code in pyswap/utils/mixins.py
@model_validator(mode="after")
def validate_with_yaml(self) -> Self:
    """Pydantic validator executing validation logic.

    All validators defined on a model run on model instantiation. This
    method makes sure that YAML validation is postponed until the
    _validation parameter (required on all classes inheriting this mixin) is
    set to True. This state is done when all the required parameters are
    presumed to be set, e.g., when the user tries to run the model.
    """

    if not self._validation:
        return self

    rules = VALIDATIONRULES.get(self.__class__.__name__, {})

    logger.debug(f"Validating {self.__class__.__name__} with rules: {rules}")

    for switch_name in rules:
        switch_value = getattr(self, switch_name, None)
        if switch_value is not None:  # Only validate if the switch is set
            self.validate_parameters(
                switch_name, switch_value, self.__dict__, rules
            )

    self._validation = False
    return self

Validation and serialization

Functions to parse SWAP formatted ascii files into pySWAP objects.

pySWAP has the ability to interact directly with the classic SWAP input files. Parsers defined in this module are used for the custom field validators defined in the pyswap.core.fields module. These functions convert (or deserialize) the SWAP formatted ascii files into pySWAP objects.

Parsers in this module

parse_string_list: Convert a SWAP string list to a list of strings. parse_quoted_string: Make sure to remove unnecessary quotes from source. parse_day_month: Convert a string to a date object with just the day and month.

parse_day_month(value)

Convert a string to a date object with just the day and month.

Source code in pyswap/core/parsers.py
def parse_day_month(value: str | date) -> date:
    """Convert a string to a date object with just the day and month."""
    msg = "Invalid day-month format. Expected 'DD MM'"
    if isinstance(value, date):
        return value
    if isinstance(value, str):
        try:
            day, month = map(int, value.split())
            return date(date.today().year, month, day)
        except (ValueError, TypeError):
            raise ValueError(msg) from None
    raise ValueError(msg)

parse_decimal(value)

remove fortan style decimal point.

Source code in pyswap/core/parsers.py
def parse_decimal(value: str) -> str:
    """remove fortan style decimal point."""
    if isinstance(value, str):
        value = value.lower().replace("d", "e")
    return float(value)

parse_float_list(value)

Convert a SWAP string list to a list of strings.

Source code in pyswap/core/parsers.py
def parse_float_list(value: str) -> str:
    """Convert a SWAP string list to a list of strings."""
    if isinstance(value, list):
        return value
    if isinstance(value, str):
        return value.strip("'").split(" ")

parse_int_list(value)

Convert a SWAP string list to a list of strings.

Source code in pyswap/core/parsers.py
def parse_int_list(value: str) -> str:
    """Convert a SWAP string list to a list of strings."""
    if isinstance(value, list):
        return value
    if isinstance(value, str):
        return value.strip("'").split(" ")

parse_quoted_string(value)

Make sure to remove unnecessary quotes from source.

Source code in pyswap/core/parsers.py
def parse_quoted_string(value: str) -> str:
    """Make sure to remove unnecessary quotes from source."""
    if isinstance(value, str):
        return value.strip("'")
    msg = "Invalid type. Expected string"
    raise ValueError(msg)

parse_string_list(value)

Convert a SWAP string list to a list of strings.

Source code in pyswap/core/parsers.py
def parse_string_list(value: str) -> str:
    """Convert a SWAP string list to a list of strings."""
    if isinstance(value, list):
        return value
    if isinstance(value, str):
        return value.strip("'").split(",")

Functions to fine tune the serializatino of pySWAP objects to SWAP formatted ASCII.

More complex serialization logic which would be unwieldy to implement directly in the Annotated field definitions (pyswap.core.fields module) as lambda functions are defined in the serializers module (pyswap.core.serializers). These are functions that convert objects to strings in the valid SWAP format.

Serializers in this module

serialize_table: Convert a DataFrame to a string. serialize_arrays: Convert a DataFrame to a string without headers and newline in front. serialize_csv_table: Convert a DataFrame to a string in CSV format. serialize_object_list: Convert a list of objects to a string. serialize_day_month: Convert a date object to a string with just the day and month.

serialize_arrays(table)

Convert the DataFrame to a string without headers and newline in front.

Arguments:
    table: The DataFrame to be serialized.

Result:
    >>> 'ARRAYS =

1 4 2 5 3 6

'

Source code in pyswap/core/serializers.py
def serialize_arrays(table: DataFrame) -> str:
    """Convert the DataFrame to a string without headers and newline in front.

    Arguments:
        table: The DataFrame to be serialized.

    Result:
        >>> 'ARRAYS = \n1 4\n2 5\n3 6\n\n'
    """
    return f"\n{table.to_string(index=False, header=False)}\n"

serialize_day_month(value)

Serialize a date object to a string with just the day and month.

Parameters:

Name Type Description Default
value date

The date object to be serialized.

required
Result

'01 01'

Source code in pyswap/core/serializers.py
def serialize_day_month(value: date) -> str:
    """Serialize a date object to a string with just the day and month.

    Arguments:
        value: The date object to be serialized.

    Result:
        >>> '01 01'
    """
    return value.strftime("%d %m")

serialize_table(table)

Convert the DataFrame to a string.

Arguments:
    table: The DataFrame to be serialized.

Result:
    >>> ' A  B

1 4 2 5 3 6 '

Source code in pyswap/core/serializers.py
def serialize_table(table: DataFrame) -> str:
    """Convert the DataFrame to a string.

    Arguments:
        table: The DataFrame to be serialized.

    Result:
        >>> ' A  B\n 1  4\n 2  5\n 3  6\n'
    """
    return f"{table.to_string(index=False)}\n"

I/O

Interact with the filesystem

All functions that interact with the filesystem are located in this subpackage.

Modules:

Name Description
io_ascii

Functions to interact with ASCII files.

io_yaml

Functions to interact with YAML files.

classic_swap

Functions to load with classic SWAP input files.

io_ascii

Interact with ASCII files.

Functions:

Name Description
open_ascii

Open an ASCII file and detect its encoding.

save_ascii

Save a string to an ASCII file.

open_ascii(file_path)

Open file and detect encoding.

Parameters:

Name Type Description Default
file_path str

Path to the file to be opened.

required
Source code in pyswap/core/io/io_ascii.py
def open_ascii(file_path: Path) -> str:
    """Open file and detect encoding.

    Arguments:
        file_path (str): Path to the file to be opened.
    """
    with open(file_path, "rb") as f:
        raw_data = f.read()
    encoding = chardet.detect(raw_data)["encoding"]

    return raw_data.decode(encoding)

save_ascii(string, fname, path, mode='w', extension=None, encoding='ascii')

Saves a string to a file with a given extension.

Parameters:

Name Type Description Default
string str

The string to be saved to a file.

required
extension str

The extension that the file should have (e.g. 'txt', 'csv', etc.).

None
fname str

The name of the file.

required
path str

The path where the file should be saved.

required
mode str

The mode in which the file should be opened (e.g. 'w' for write, 'a' for append, etc.).

'w'
encoding str

The encoding to use for the file (default is 'ascii').

'ascii'

Returns:

Type Description
None

None

Source code in pyswap/core/io/io_ascii.py
def save_ascii(
    string: str,
    fname: str,
    path: str,
    mode: str = "w",
    extension: str | None = None,
    encoding: str = "ascii",
) -> None:
    """
    Saves a string to a file with a given extension.

    Parameters:
        string (str): The string to be saved to a file.
        extension (str): The extension that the file should have (e.g. 'txt', 'csv', etc.).
        fname (str): The name of the file.
        path (str): The path where the file should be saved.
        mode (str): The mode in which the file should be opened (e.g. 'w' for write, 'a' for append, etc.).
        encoding (str): The encoding to use for the file (default is 'ascii').

    Returns:
        None
    """

    if extension is not None:
        fname = f"{fname}.{extension}"

    with open(f"{path}/{fname}", f"{mode}", encoding=f"{encoding}") as f:
        f.write(string)

io_csv

Interact with .csv files.

Functions:

Name Description
load_csv

Load a .csv file.

load_csv(file, delimiter=',', skiprows=None, index_col=None)

Load a .csv file.

Parameters:

Name Type Description Default
file Path

Path to the .csv file.

required
Source code in pyswap/core/io/io_csv.py
def load_csv(file: Path, delimiter=",", skiprows=None, index_col=None) -> DataFrame:
    """Load a .csv file.

    Arguments:
        file: Path to the .csv file.
    """
    if skiprows:
        return read_csv(
            file,
            delimiter=delimiter,
            skiprows=skiprows,
            index_col=index_col,
        )
    else:
        return read_csv(
            file,
            delimiter=delimiter,
            index_col=index_col,
        )

io_yaml

Interact with YAML files.

Functions:

Name Description
load_yaml

Load a YAML file.

load_yaml(file)

Load a YAML file.

Parameters:

Name Type Description Default
file Path

Path to the YAML file.

required
Source code in pyswap/core/io/io_yaml.py
def load_yaml(file: Path) -> dict:
    """Load a YAML file.

    Arguments:
        file: Path to the YAML file.
    """
    with open(file) as file:
        content: dict = yaml.safe_load(file)

    return content

process_ascii

Module processing ASCII files in SWAP format.

Steps to process SWAP format ASCII files: 1. Identify and remove comments. 2. Parse the remaining content into key-value pairs, tables and arrays.

parse_ascii_file(file_content, grass=False)

Parse an ASCII file in SWAP format.

Assumptions

  • key-value pairs are lines with a single = character
  • tables are lines in which columns are split by spaces
  • empty tags are lines that end with an = character, followed by table-like data in the following lines.
  • tables are followed by an empty line or a line that is not a part of another table.

Parameters:

Name Type Description Default
file_content str

The content of the ASCII file.

required

Returns:

Name Type Description
dict dict[str, dict]

A dictionary with key-value pairs, arrays and tables (in the exact order).

Source code in pyswap/core/io/process_ascii.py
def parse_ascii_file(file_content, grass=False) -> dict[str, dict]:
    """Parse an ASCII file in SWAP format.

    !!! note "Assumptions"
        - key-value pairs are lines with a single `=` character
        - tables are lines in which columns are split by spaces
        - empty tags are lines that end with an `=` character, followed by
            table-like data in the following lines.
        - tables are followed by an empty line or a line that is not
          a part of another table.

    Parameters:
        file_content (str): The content of the ASCII file.

    Returns:
        dict: A dictionary with key-value pairs, arrays and tables
            (in the exact order).
    """
    cleaned_file = remove_comments(file_content)
    lines = cleaned_file.splitlines()
    pairs = {}
    arrays = {}
    tables = {}

    def is_key_value(line):
        return (
            "=" in line
            and not line.strip().startswith("=")
            and not line.strip().endswith("=")
        )

    def format_key_value(line):
        key, value = line.split("=", 1)
        return {key.strip().lower(): value.strip()}

    def is_table(line):
        """Check if the line is a part of a table.

        A table is essentially everything else than a key-value pair or
        an empty tag except empty lines.
        """
        return line.strip() and "=" not in line and not line.strip().endswith("=")

    def is_empty_tag(line):
        """Check if the line is an empty tag.

        An empty tag is a line where there is only the tag folloed by an = sign (e.g., DZNEW =)
        and the data for that tag is in the next line(s). This is most common for tables,
        which in pySWAP are called ARRAYS - tables with no header, but values groupped in
        columns separated by spaces."""

        return line.strip().endswith("=")

    def parse_table(lines, start_index, key, param_type):
        """Parse a table from the list of lines.

        This function is triggered if a line is detected as an empty tag or a table. It will
        assume all lines after the empty tag or the table header are part of the table until
        an empty line or a line that is not part of the table is found. Those lines are then
        stored in a list, later used to skip the table rows before parsing the next item.
        """
        data = []
        tp = TableProcessor()
        for line in lines[start_index:]:
            if line.strip() and not is_key_value(line) and not is_empty_tag(line):
                data.append(line.strip().split())
            else:
                break
        processed = tp.process(
            data_type=param_type,
            data=data,
            columns=tuple(key.strip().split()),
            grass=grass,
        )
        logger.debug(f"Processed {len(data)} rows for {key.strip()}")
        return processed, len(data)

    i = 0
    # loop over the list of lines, stripping each

    while i < len(lines):
        line = lines[i].strip()

        if is_key_value(line):
            pairs.update(format_key_value(line))

        elif is_empty_tag(line):
            key = line[:-1].strip()
            array = parse_table(
                lines=lines, start_index=i + 1, key=key, param_type="array"
            )
            arrays.update(array[0])
            i += array[1] + 1  # Skip the tag data

        elif is_table(line):
            # The table header is the line itself (de facto dictionary key)
            table = parse_table(
                lines=lines, start_index=i + 1, key=line, param_type="table"
            )
            tables.update(table[0])
            i += table[1] + 1  # Skip the table rows
        i += 1  # Move to the next line

    return pairs | arrays | tables

remove_comments(text)

Remove comments from a SWAP input file.

In a SWAP input files some lines are comments. Full line comments start with a * character. Partial comments start with a ! character and sometimes follow the actual data.

Note

Empty lines are not removed at this stage. They are important for parsing the tables.

Parameters:

Name Type Description Default
text str

The text to remove comments from.

required

Returns:

Name Type Description
str str

Stripped text with comments removed.

Source code in pyswap/core/io/process_ascii.py
def remove_comments(text: str) -> str:
    """Remove comments from a SWAP input file.

    In a SWAP input files some lines are comments. Full line comments
    start with a * character. Partial comments start with a ! character and
    sometimes follow the actual data.

    !!! note
        Empty lines are not removed at this stage. They are important for
        parsing the tables.

    Parameters:
        text (str): The text to remove comments from.

    Returns:
        str: Stripped text with comments removed.
    """

    text = re.sub(r"(^\*.*$|!.*)", "", text, flags=re.MULTILINE)

    return text.strip()

processors

Module processing various elements encountered in files.

Processors in this module: - TableProcessor

TableProcessor

Source code in pyswap/core/io/processors.py
class TableProcessor:
    def __init__(self):
        self.schemas = self.get_schemas_with_columns()
        logger.debug("TableProcessor initialized.")

    @staticmethod
    def is_dataframe_schema(member) -> bool:
        """Check if a member is a class and not a subclass of pd.Series or BaseTableModel.

        Parameters:
            member (Any): A member to check.
        """
        cond = (
            inspect.isclass(member)
            and not issubclass(member, pd.Series)
            and member is not BaseTableModel
        )
        return cond

    @staticmethod
    def match_schema_by_columns(data_columns: tuple, schema_columns: tuple) -> bool:
        """Check if data columns are a subset of schema columns.

        Parameters:
            data_columns (tuple): A tuple of column names from the data parsed from
                ascii files.
            schema_columns (tuple): A tuple of column names from the schema.
        """
        return frozenset(data_columns).issubset(frozenset(schema_columns["cols"]))

    @staticmethod
    @cache
    def get_schemas_with_columns() -> list[dict]:
        """Create a list of dictionaries with table names, classes and columns names."""
        members = inspect.getmembers(tables, TableProcessor.is_dataframe_schema)
        return [
            {
                "name": v[0],
                "class": v[1],
                "cols": tuple(v[1].to_schema().columns.keys()),
            }
            for v in members
        ]

    @staticmethod
    def create_schema_object(
        schema: BaseTableModel, columns: list, data: list
    ) -> pd.DataFrame | None:
        """Create a schema object from a list of data.

        Parameters:
            schema (BaseTableModel): A schema class to validate the data.
            columns (list): A list of column names.
            data (list): A list of data to validate.
        """
        df = pd.DataFrame(data, columns=columns)
        try:
            schema_object = schema.validate(df)
        except pa.errors.SchemaError:
            logger.exception(f"Validation error for {schema.__name__}")
            return None
        else:
            logger.debug(f"Successfully validated {schema.__name__}")
            return schema_object

    def process(
        self,
        data_type: Literal["table", "array"],
        data: dict | list[dict],
        columns: list[str] | tuple[str],
        grass=False,
    ) -> dict[str, pd.DataFrame] | None:
        """Process the data and return a DataFrame.

        Parameters:
            data (dict | list[dict]): The data to process.
            columns (list[str] | tuple[str]): The columns to include in the DataFrame.

        Returns:
            pd.DataFrame | None: The processed DataFrame or None if processing failed.
        """
        if not data:
            logger.warning("No data provided to process")
            return None

        if data_type == "table":
            for schema in self.schemas:
                if self.match_schema_by_columns(columns, schema):
                    logger.debug(f"Matched table schema: {schema['name']}")
                    schema_obj = self.create_schema_object(
                        schema["class"], columns, data
                    )
                    if schema_obj is not None:
                        return {schema["name"].lower(): schema_obj}

            logger.warning(f"No matching table schema found for columns: {columns}")
            return None

        else:
            array_name = columns[0] if isinstance(columns, list | tuple) else columns
            for schema in self.schemas:
                # if the array is a grass crop, remove DVS column from the set.
                # Otherwise remocve DNR column. This is done to still provide
                # data validation and sustain the idea of matching the schema
                # with the parameter by schema name.

                if schema["name"].lower() == array_name.lower():
                    logger.debug(f"Matched array schema: {schema['name']}")

                    # Create a copy of columns to avoid mutating cached schema
                    filtered_cols = tuple(
                        col
                        for col in schema["cols"]
                        if col.upper() != ("DVS" if grass else "DNR")
                    )

                    schema_obj = self.create_schema_object(
                        schema["class"], filtered_cols, data
                    )
                    if schema_obj is not None:
                        return {schema["name"].lower(): schema_obj}

            logger.warning(f"No matching array schema found for: {array_name}")
            return None

create_schema_object(schema, columns, data) staticmethod

Create a schema object from a list of data.

Parameters:

Name Type Description Default
schema BaseTableModel

A schema class to validate the data.

required
columns list

A list of column names.

required
data list

A list of data to validate.

required
Source code in pyswap/core/io/processors.py
@staticmethod
def create_schema_object(
    schema: BaseTableModel, columns: list, data: list
) -> pd.DataFrame | None:
    """Create a schema object from a list of data.

    Parameters:
        schema (BaseTableModel): A schema class to validate the data.
        columns (list): A list of column names.
        data (list): A list of data to validate.
    """
    df = pd.DataFrame(data, columns=columns)
    try:
        schema_object = schema.validate(df)
    except pa.errors.SchemaError:
        logger.exception(f"Validation error for {schema.__name__}")
        return None
    else:
        logger.debug(f"Successfully validated {schema.__name__}")
        return schema_object

get_schemas_with_columns() cached staticmethod

Create a list of dictionaries with table names, classes and columns names.

Source code in pyswap/core/io/processors.py
@staticmethod
@cache
def get_schemas_with_columns() -> list[dict]:
    """Create a list of dictionaries with table names, classes and columns names."""
    members = inspect.getmembers(tables, TableProcessor.is_dataframe_schema)
    return [
        {
            "name": v[0],
            "class": v[1],
            "cols": tuple(v[1].to_schema().columns.keys()),
        }
        for v in members
    ]

is_dataframe_schema(member) staticmethod

Check if a member is a class and not a subclass of pd.Series or BaseTableModel.

Parameters:

Name Type Description Default
member Any

A member to check.

required
Source code in pyswap/core/io/processors.py
@staticmethod
def is_dataframe_schema(member) -> bool:
    """Check if a member is a class and not a subclass of pd.Series or BaseTableModel.

    Parameters:
        member (Any): A member to check.
    """
    cond = (
        inspect.isclass(member)
        and not issubclass(member, pd.Series)
        and member is not BaseTableModel
    )
    return cond

match_schema_by_columns(data_columns, schema_columns) staticmethod

Check if data columns are a subset of schema columns.

Parameters:

Name Type Description Default
data_columns tuple

A tuple of column names from the data parsed from ascii files.

required
schema_columns tuple

A tuple of column names from the schema.

required
Source code in pyswap/core/io/processors.py
@staticmethod
def match_schema_by_columns(data_columns: tuple, schema_columns: tuple) -> bool:
    """Check if data columns are a subset of schema columns.

    Parameters:
        data_columns (tuple): A tuple of column names from the data parsed from
            ascii files.
        schema_columns (tuple): A tuple of column names from the schema.
    """
    return frozenset(data_columns).issubset(frozenset(schema_columns["cols"]))

process(data_type, data, columns, grass=False)

Process the data and return a DataFrame.

Parameters:

Name Type Description Default
data dict | list[dict]

The data to process.

required
columns list[str] | tuple[str]

The columns to include in the DataFrame.

required

Returns:

Type Description
dict[str, DataFrame] | None

pd.DataFrame | None: The processed DataFrame or None if processing failed.

Source code in pyswap/core/io/processors.py
def process(
    self,
    data_type: Literal["table", "array"],
    data: dict | list[dict],
    columns: list[str] | tuple[str],
    grass=False,
) -> dict[str, pd.DataFrame] | None:
    """Process the data and return a DataFrame.

    Parameters:
        data (dict | list[dict]): The data to process.
        columns (list[str] | tuple[str]): The columns to include in the DataFrame.

    Returns:
        pd.DataFrame | None: The processed DataFrame or None if processing failed.
    """
    if not data:
        logger.warning("No data provided to process")
        return None

    if data_type == "table":
        for schema in self.schemas:
            if self.match_schema_by_columns(columns, schema):
                logger.debug(f"Matched table schema: {schema['name']}")
                schema_obj = self.create_schema_object(
                    schema["class"], columns, data
                )
                if schema_obj is not None:
                    return {schema["name"].lower(): schema_obj}

        logger.warning(f"No matching table schema found for columns: {columns}")
        return None

    else:
        array_name = columns[0] if isinstance(columns, list | tuple) else columns
        for schema in self.schemas:
            # if the array is a grass crop, remove DVS column from the set.
            # Otherwise remocve DNR column. This is done to still provide
            # data validation and sustain the idea of matching the schema
            # with the parameter by schema name.

            if schema["name"].lower() == array_name.lower():
                logger.debug(f"Matched array schema: {schema['name']}")

                # Create a copy of columns to avoid mutating cached schema
                filtered_cols = tuple(
                    col
                    for col in schema["cols"]
                    if col.upper() != ("DVS" if grass else "DNR")
                )

                schema_obj = self.create_schema_object(
                    schema["class"], filtered_cols, data
                )
                if schema_obj is not None:
                    return {schema["name"].lower(): schema_obj}

        logger.warning(f"No matching array schema found for: {array_name}")
        return None

CLI

Command Line Interface for pySWAP.

This is a prototype subpackage for potential enhancement of pyswap's functionality. CLI tools can be very helpful in automating some tasks, like loading databases or classic SWAP models.

Note

At the moment only creating project structure was prototyped. More functionality will be added in the future if users express such need.

Example:

```cmd
pyswap init --notebook  # creates the project structure with a template .ipynb file.
pyswap init --script  # creates the project structure with a .py file.
```

After running the script, you will see the following folder created:

test project
├── README
├── __init__.py
├── data
├── models
│   ├── __init__.py
│   └── main.ipynb
└── scripts
    └── __init__.py

The __init__.py files are added to create a module structure. Now when you create a python file in scripts with some helper functions, you can import those functions to the main model script or notebook and use it there.

from ..scripts.helper_module import helper_function

var = helper_function(**kwargs)

By default, a git repository is also created along with the project structure.

cli

The cli module is supposed to help in structuring the direcotries of created models and enforce best practices in documenting. It creates a modular structure (with init.py files) what can be helpful when writing scripts. This way, modules from the scripts can be directly imported into the main.py or main.ipynb

check_swap(verbose=typer.Option(True, '--verbose/--quiet', help='Enable verbose output'))

Check if SWAP executable is available and working.

Source code in pyswap/core/cli/cli.py
@app.command()
def check_swap(
    verbose: bool = typer.Option(
        True, "--verbose/--quiet", help="Enable verbose output"
    ),
):
    """Check if SWAP executable is available and working."""
    from pyswap.utils.executables import check_swap as _check_swap

    if _check_swap(verbose=verbose):
        typer.echo("✓ SWAP is ready to use!")
    else:
        typer.echo(
            "✗ SWAP is not available. Run 'pyswap get-swap' to install.", err=True
        )
        raise typer.Exit(1)

copy_readme(templates_path, project_root, use_pixi=True, attrs=None)

Copy and customize README template based on whether pixi is used.

Source code in pyswap/core/cli/cli.py
def copy_readme(templates_path, project_root, use_pixi=True, attrs=None):
    """Copy and customize README template based on whether pixi is used."""
    if use_pixi:
        template_file = templates_path / "README_template"
        pixi_instructions_file = templates_path / "pixi_instructions.md"

        readme_content = template_file.read_text()
        pixi_instructions = pixi_instructions_file.read_text()

        pixi_structure = "\n├── pixi.toml               # Pixi dependency management"

        readme_content = readme_content.format(
            project=attrs.get("project", "Project"),
            pixi_structure=pixi_structure,
            pixi_instructions=pixi_instructions
        )

        readme_path = project_root / "README.md"
        readme_path.write_text(readme_content)
    else:
        template_file = templates_path / "README"
        readme_content = template_file.read_text()

        readme_content = readme_content.format(
            project=attrs.get("project", "Project"),
            pixi_structure="",
            pixi_instructions=""
        )

        readme_path = project_root / "README"
        readme_path.write_text(readme_content)

    return "Successfully created README in the root directory."

create_pixi_toml(templates_path, project_root, attrs)

Create pixi.toml file from template.

Source code in pyswap/core/cli/cli.py
def create_pixi_toml(templates_path, project_root, attrs):
    """Create pixi.toml file from template."""
    template_file = templates_path / "pixi.toml"
    pixi_content = template_file.read_text()

    # Format the template with user attributes
    pixi_content = pixi_content.format(
        author=attrs.get("author", "Unknown"),
        email=attrs.get("email", "unknown@example.com"),
        project=attrs.get("project", "pyswap-project")
    )

    pixi_path = project_root / "pixi.toml"
    pixi_path.write_text(pixi_content)

    return "Successfully created pixi.toml file."

get_swap(version=typer.Option('4.2.0', '--version', '-v', help='SWAP version to download'), force=typer.Option(False, '--force', '-f', help='Force re-download even if executable exists'), verbose=typer.Option(True, '--verbose/--quiet', help='Enable verbose output'))

Download and install SWAP executable.

Source code in pyswap/core/cli/cli.py
@app.command()
def get_swap(
    version: str = typer.Option(
        "4.2.0", "--version", "-v", help="SWAP version to download"
    ),
    force: bool = typer.Option(
        False, "--force", "-f", help="Force re-download even if executable exists"
    ),
    verbose: bool = typer.Option(
        True, "--verbose/--quiet", help="Enable verbose output"
    ),
):
    """Download and install SWAP executable."""
    from pyswap.utils.executables import get_swap as _get_swap

    try:
        exe_path = _get_swap(version=version, force=force, verbose=verbose)
        if verbose:
            typer.echo(f"Success! SWAP executable ready at: {exe_path}")
    except Exception as e:
        typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e

info()

Display information about pySWAP and SWAP setup.

Source code in pyswap/core/cli/cli.py
@app.command()
def info():
    """Display information about pySWAP and SWAP setup."""
    from pyswap.utils.executables import show_info

    show_info()

init(script=False, notebook=True, pixi=typer.Option(True, '--pixi/--no-pixi', help='Include Pixi configuration for dependency management (default: True)'))

Prompt the user to enter their information and create a User class.

Source code in pyswap/core/cli/cli.py
@app.command()
def init(
    script: bool = False,
    notebook: bool = True,
    pixi: bool = typer.Option(True, "--pixi/--no-pixi", help="Include Pixi configuration for dependency management (default: True)")
):
    """Prompt the user to enter their information and create a User class."""
    attrs = {
        "project": typer.prompt("Project name"),
        "swap_ver": typer.prompt("SWAP version used"),
        "author": typer.prompt("Author first/last name"),
        "institution": typer.prompt("Your last institution"),
        "email": typer.prompt("Your email address"),
        "comment": typer.prompt("Any comments?", default=""),
    }

    folder_name = typer.prompt("Choose a folder name", default=attrs.get("project"))

    # Defining paths and creating folders.
    templates_path = Path(__file__).resolve().parent / "templates"
    project_root = Path.cwd() / folder_name

    basic_code_to_write_path = templates_path / "script.txt"
    basic_code_to_write = dict_to_custom_string(attrs)

    folders_to_create = ["models", "scripts", "data", "tests"]
    folders_to_create_paths = [project_root / folder for folder in folders_to_create]

    [folder.mkdir(parents=True, exist_ok=True) for folder in folders_to_create_paths]

    # Dealing with files.
    copy_readme(templates_path, project_root, use_pixi=pixi, attrs=attrs)
    create_inits(
        project_root=project_root,
        models_dir=folders_to_create_paths[0],
        scripts_dir=folders_to_create_paths[1],
    )

    # Create pixi.toml if requested
    if pixi:
        create_pixi_toml(templates_path, project_root, attrs)
        print("Created pixi.toml for dependency management.")

    if script:
        make_script(
            folders_to_create_paths[0], basic_code_to_write_path, basic_code_to_write
        )

    if notebook:
        make_notebook(
            folders_to_create_paths[0], basic_code_to_write, templates_path, attrs
        )

    init_git_repo(project_root, use_pixi=pixi)

remove_swap(verbose=typer.Option(True, '--verbose/--quiet', help='Enable verbose output'))

Remove SWAP executable from package directory.

Source code in pyswap/core/cli/cli.py
@app.command()
def remove_swap(
    verbose: bool = typer.Option(
        True, "--verbose/--quiet", help="Enable verbose output"
    ),
):
    """Remove SWAP executable from package directory."""
    from pyswap.utils.executables import remove_swap as _remove_swap

    if _remove_swap(verbose=verbose):
        typer.echo("SWAP executable removed successfully")
    else:
        typer.echo("Failed to remove SWAP executable", err=True)
        raise typer.Exit(1)

upload_swap(file_path=typer.Argument(help='Path to the SWAP executable file to install'), version=typer.Argument(help='Version identifier for the uploaded executable'), force=typer.Option(False, '--force', '-f', help='Force replace existing executable'), verbose=typer.Option(True, '--verbose/--quiet', help='Enable verbose output'))

Install SWAP executable from a local file.

Source code in pyswap/core/cli/cli.py
@app.command()
def upload_swap(
    file_path: str = typer.Argument(help="Path to the SWAP executable file to install"),
    version: str = typer.Argument(
        help="Version identifier for the uploaded executable"
    ),
    force: bool = typer.Option(
        False, "--force", "-f", help="Force replace existing executable"
    ),
    verbose: bool = typer.Option(
        True, "--verbose/--quiet", help="Enable verbose output"
    ),
):
    """Install SWAP executable from a local file."""
    from pyswap.utils.executables import upload_swap as _upload_swap

    try:
        exe_path = _upload_swap(
            file_path=file_path, version=version, force=force, verbose=verbose
        )
        if verbose:
            typer.echo(f"Success! SWAP executable installed at: {exe_path}")
    except Exception as e:
        typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from e