Skip to content

darts.cli

Entrypoint for the darts-pipeline CLI.

LoggingManager module-attribute

LoggingManager = (
    darts.utils.logging.LoggingManagerSingleton()
)

__version__ module-attribute

__version__ = importlib.metadata.version('darts-nextgen')

app module-attribute

app = cyclopts.App(
    version=darts.__version__,
    console=rich.get_console(),
    config=darts.cli.config_parser,
    help_format="plaintext",
    version_format="plaintext",
)

config_parser module-attribute

config_parser = darts.utils.config.ConfigParser()

inference_app module-attribute

inference_app = cyclopts.App(
    name="inference",
    group=darts.cli.subcommands_group,
    help="Predefined inference pipelines",
)

inference_data_app module-attribute

inference_data_app = cyclopts.App(
    name="prep-data",
    group=darts.cli.utilities_group,
    help="Data preparation for offline use",
)

logger module-attribute

logger = logging.getLogger(__name__)

ray_group module-attribute

ray_group = cyclopts.Group.create_ordered('Ray Pipelines')

root_file module-attribute

root_file = pathlib.Path(__file__).resolve()

sequential_group module-attribute

sequential_group = cyclopts.Group.create_ordered(
    "Sequential Pipelines"
)

subcommands_group module-attribute

subcommands_group = cyclopts.Group.create_ordered(
    "Pipelines & Scripts"
)

training_app module-attribute

training_app = cyclopts.App(
    name="training",
    group=darts.cli.subcommands_group,
    help="Predefined training pipelines",
)

training_data_app module-attribute

training_data_app = cyclopts.App(
    name="create-dataset", help="Dataset creation"
)

utilities_group module-attribute

utilities_group = cyclopts.Group.create_ordered("Utilities")

ConfigParser

ConfigParser()

Parser for cyclopts config.

An own implementation is needed to select our own toml structure and source. Implemented as a class to be able to provide the config-file as a parameter of the CLI.

Initialize the ConfigParser (no-op).

Source code in darts/src/darts/utils/config.py
def __init__(self) -> None:
    """Initialize the ConfigParser (no-op)."""
    self._config = None

__call__

__call__(
    apps: list[cyclopts.App],
    commands: tuple[str, ...],
    arguments: cyclopts.ArgumentCollection,
)

Parser for cyclopts config. An own implementation is needed to select our own toml structure.

First, the configuration file at "config.toml" is loaded. Then, this config is flattened and then mapped to the input arguments of the called function. Hence parent keys are not considered.

Parameters:

  • apps (list[cyclopts.App]) –

    The cyclopts apps. Unused, but must be provided for the cyclopts hook.

  • commands (tuple[str, ...]) –

    The commands. Unused, but must be provided for the cyclopts hook.

  • arguments (cyclopts.ArgumentCollection) –

    The arguments to apply the config to.

Examples:

Setup the cyclopts App
import cyclopts
from darts.utils.config import ConfigParser

config_parser = ConfigParser()
app = cyclopts.App(config=config_parser)

# Intercept the logging behavior to add a file handler
@app.meta.default
def launcher(
    *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
    log_dir: Path = Path("logs"),
    config_file: Path = Path("config.toml"),
):
    command, bound, _ = app.parse_args(tokens)
    add_logging_handlers(command.__name__, console, log_dir)
    return command(*bound.args, **bound.kwargs)

if __name__ == "__main__":
    app.meta()
Usage

Config file ./config.toml:

[darts.hello] # The parent key is completely ignored
name = "Tobias"

Function signature which is called:

# ... setup code for cyclopts
@app.command()
def hello(name: str):
    print(f"Hello {name}")

Calling the function from CLI:

$ darts hello
Hello Tobias

$ darts hello --name=Max
Hello Max
Source code in darts/src/darts/utils/config.py
def __call__(self, apps: list[cyclopts.App], commands: tuple[str, ...], arguments: cyclopts.ArgumentCollection):
    """Parser for cyclopts config. An own implementation is needed to select our own toml structure.

    First, the configuration file at "config.toml" is loaded.
    Then, this config is flattened and then mapped to the input arguments of the called function.
    Hence parent keys are not considered.

    Args:
        apps (list[cyclopts.App]): The cyclopts apps. Unused, but must be provided for the cyclopts hook.
        commands (tuple[str, ...]): The commands. Unused, but must be provided for the cyclopts hook.
        arguments (cyclopts.ArgumentCollection): The arguments to apply the config to.

    Examples:
        ### Setup the cyclopts App

        ```python
        import cyclopts
        from darts.utils.config import ConfigParser

        config_parser = ConfigParser()
        app = cyclopts.App(config=config_parser)

        # Intercept the logging behavior to add a file handler
        @app.meta.default
        def launcher(
            *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
            log_dir: Path = Path("logs"),
            config_file: Path = Path("config.toml"),
        ):
            command, bound, _ = app.parse_args(tokens)
            add_logging_handlers(command.__name__, console, log_dir)
            return command(*bound.args, **bound.kwargs)

        if __name__ == "__main__":
            app.meta()
        ```


        ### Usage

        Config file `./config.toml`:

        ```toml
        [darts.hello] # The parent key is completely ignored
        name = "Tobias"
        ```

        Function signature which is called:

        ```python
        # ... setup code for cyclopts
        @app.command()
        def hello(name: str):
            print(f"Hello {name}")
        ```

        Calling the function from CLI:

        ```sh
        $ darts hello
        Hello Tobias

        $ darts hello --name=Max
        Hello Max
        ```

    """
    if self._config is None:
        config_arg, _, _ = arguments.match("--config-file")
        config_file = config_arg.convert_and_validate()
        # Use default config file if not specified
        if not config_file:
            config_file = config_arg.field_info.default
        # else never happens
        self.open_config(config_file)

    self.apply_config(arguments)

apply_config

apply_config(arguments: cyclopts.ArgumentCollection)

Apply the loaded config to the cyclopts mapping.

Parameters:

  • arguments (cyclopts.ArgumentCollection) –

    The arguments to apply the config to.

Source code in darts/src/darts/utils/config.py
def apply_config(self, arguments: cyclopts.ArgumentCollection):
    """Apply the loaded config to the cyclopts mapping.

    Args:
        arguments (cyclopts.ArgumentCollection): The arguments to apply the config to.

    """
    to_add = []
    for k in self._config.keys():
        value = self._config[k]["value"]

        try:
            argument, remaining_keys, _ = arguments.match(f"--{k}")
        except ValueError:
            # Config key not found in arguments - ignore
            continue

        # Skip if the argument is not bound to a parameter
        if argument.tokens or argument.field_info.kind is argument.field_info.VAR_KEYWORD:
            continue

        # Skip if the argument is from the config file
        if any(x.source != "config-file" for x in argument.tokens):
            continue

        # Parse value to tuple of strings
        if not isinstance(value, list):
            value = (value,)
        value = tuple(str(x) for x in value)
        # Add the new tokens to the list
        for i, v in enumerate(value):
            to_add.append(
                (
                    argument,
                    cyclopts.Token(keyword=k, value=v, source="config-file", index=i, keys=remaining_keys),
                )
            )
    # Add here after all "arguments.match" calls, to avoid changing the list while iterating
    for argument, token in to_add:
        argument.append(token)

open_config

open_config(file_path: str | pathlib.Path) -> None

Open the config file, takes the 'darts' key, flattens the resulting dict and saves as config.

Parameters:

Source code in darts/src/darts/utils/config.py
def open_config(self, file_path: str | Path) -> None:
    """Open the config file, takes the 'darts' key, flattens the resulting dict and saves as config.

    Args:
        file_path (str | Path): The path to the config file.

    """
    file_path = file_path if isinstance(file_path, Path) else Path(file_path)

    if not file_path.exists():
        logger.warning(f"No config file found at {file_path.resolve()}")
        self._config = {}
        return

    with file_path.open("rb") as f:
        config = tomllib.load(f)["darts"]

    # Flatten the config data ()
    self._config = flatten_dict(config)
    logger.info(f"loaded config from '{file_path.resolve()}'")

PipelineV2Paths dataclass

PipelineV2Paths(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    orthotiles_dir: pathlib.Path | None = None,
    scenes_dir: pathlib.Path | None = None,
    sentinel2_grid_dir: pathlib.Path | None = None,
    raw_data_store: pathlib.Path | None = None,
    raw_data_source: typing.Literal["cdse", "gee"] = "cdse",
    no_raw_data_store: bool = False,
)

Default paths for v2 pipelines.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

no_raw_data_store class-attribute instance-attribute

no_raw_data_store: bool = False

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path | None = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

raw_data_source class-attribute instance-attribute

raw_data_source: typing.Literal['cdse', 'gee'] = 'cdse'

raw_data_store class-attribute instance-attribute

raw_data_store: pathlib.Path | None = None

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path | None = None

sentinel2_grid_dir class-attribute instance-attribute

sentinel2_grid_dir: pathlib.Path | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    paths.set_defaults(self.default_dirs)
    # The defaults will be overwritten in the respective realizations
    self.output_data_dir = self.output_data_dir or paths.output_data("base_pipeline")
    self.model_files = self.model_files or paths.ensemble_models()
    self.arcticdem_dir = self.arcticdem_dir or paths.arcticdem(2)
    self.tcvis_dir = self.tcvis_dir or paths.tcvis()
    self.output_data_dir = self.output_data_dir or paths.output_data("planet")
    self.orthotiles_dir = self.orthotiles_dir or paths.planet_orthotiles()
    self.scenes_dir = self.scenes_dir or paths.planet_scenes()
    self.output_data_dir = self.output_data_dir or paths.output_data(f"sentinel2-{self.raw_data_source}")
    self.raw_data_store = self.raw_data_store or paths.sentinel2_raw_data(self.raw_data_source)
    if self.no_raw_data_store:
        self.raw_data_store = None

log

log(level: int = logging.DEBUG)

Log all paths managed.

Source code in darts/src/darts/pipelines/sequential_v2.py
def log(self, level: int = logging.DEBUG):
    """Log all paths managed."""
    label_width = 47
    logmsg = textwrap.dedent(f"""
        === Pipeline (Sequential V2) Paths ===
        {"Output Data Directory:":<{label_width}} {self.output_data_dir}
        {"ArcticDEM Directory:":<{label_width}} {self.arcticdem_dir}
        {"TCVis Directory:":<{label_width}} {self.tcvis_dir}
        {"Planet Orthotiles Directory:":<{label_width}} {self.orthotiles_dir}
        {"Planet Scenes Directory:":<{label_width}} {self.scenes_dir}
        {"Sentinel-2 Grid Directory:":<{label_width}} {self.sentinel2_grid_dir}
        {"Sentinel-2 Raw Data Directory:":<{label_width}} {self.raw_data_store}
    """).strip()
    logger.log(level, logmsg)

PlanetPipeline dataclass

PlanetPipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    orthotiles_dir: pathlib.Path | None = None,
    scenes_dir: pathlib.Path | None = None,
    image_ids: list = None,
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing PlanetScope data.

Processes PlanetScope imagery (both orthotiles and scenes) for RTS segmentation. Supports both offline and online processing modes.

Data Structure

Expects PlanetScope data organized as: - Orthotiles: orthotiles_dir/tile_id/scene_id/ - Scenes: scenes_dir/scene_id/

Parameters:

  • orthotiles_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope orthotiles. If None, uses default path from DARTS paths. Defaults to None.

  • scenes_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope scenes. If None, uses default path from DARTS paths. Defaults to None.

  • image_ids (list | None, default: None ) –

    List of image/scene IDs to process. If None, processes all images found in orthotiles_dir and scenes_dir. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/planet. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

offline class-attribute instance-attribute

offline: bool = False

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path | None = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    super().__post_init__()
    self.output_data_dir = self.output_data_dir or paths.output_data("planet")
    self.orthotiles_dir = self.orthotiles_dir or paths.planet_orthotiles()
    self.scenes_dir = self.scenes_dir or paths.planet_scenes()

cli staticmethod

Run the sequential pipeline for PlanetScope data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "PlanetPipeline"):
    """Run the sequential pipeline for PlanetScope data.

    Args:
        pipeline: Configured PlanetPipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.PlanetPipeline,
    aux: bool = False,
    force: bool = False,
)

Download all necessary data for offline processing.

Parameters:

  • pipeline (darts.pipelines.sequential_v2.PlanetPipeline) –

    Configured PlanetPipeline instance.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

  • force (bool, default: False ) –

    If True, downloads all possible data, independent of the aux flag or model needs. Defaults to False.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(*, pipeline: "PlanetPipeline", aux: bool = False, force: bool = False):
    """Download all necessary data for offline processing.

    Args:
        pipeline: Configured PlanetPipeline instance.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.
        force: If True, downloads all possible data, independent of the `aux` flag or model needs.
            Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."
    pipeline.__post_init__()
    pipeline.prepare_data(optical=False, aux=aux, force=force)

prepare_data

prepare_data(
    optical: bool = False,
    aux: bool = False,
    force: bool = False,
)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

  • force (bool, default: False ) –

    If True, downloads all possible data, independent of optical and aux flags or model needs. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False, force: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.
        force: If True, downloads all possible data, independent of `optical` and `aux` flags or model needs.
            Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    # ? We only want to download stuff - no need for using the GPU here
    self.device = "cpu"
    self._dump_config()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)

    if aux or force:
        # Get the ensemble to check which auxiliary data is necessary
        if force:
            needs_arcticdem, needs_tcvis = True, True
        else:
            ensemble = self._load_ensemble()
            needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical and not force:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

PlanetRayPipeline dataclass

PlanetRayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    orthotiles_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSOrthoTile"
    ),
    scenes_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSScene"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for PlanetScope data.

Parameters:

  • orthotiles_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSOrthoTile') ) –

    The directory containing the PlanetScope orthotiles.

  • scenes_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSScene') ) –

    The directory containing the PlanetScope scenes.

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSOrthoTile"
)

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSScene"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Planet data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "PlanetRayPipeline"):
    """Run the sequential pipeline for Planet data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")

Sentinel2Pipeline dataclass

Sentinel2Pipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    scene_ids: list[str] | None = None,
    scene_id_file: pathlib.Path | None = None,
    tile_ids: list[str] | None = None,
    aoi_file: pathlib.Path | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    max_cloud_cover: int | None = 10,
    max_snow_cover: int | None = 10,
    months: list[int] | None = None,
    years: list[int] | None = None,
    prep_data_scene_id_file: pathlib.Path | None = None,
    sentinel2_grid_dir: pathlib.Path | None = None,
    raw_data_store: pathlib.Path | None = None,
    no_raw_data_store: bool = False,
    raw_data_source: typing.Literal["gee", "cdse"] = "cdse",
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing Sentinel-2 data.

Processes Sentinel-2 Surface Reflectance (SR) imagery from either CDSE or Google Earth Engine. Supports multiple scene selection methods and flexible filtering options.

Source Selection

The data source is specified via the raw_data_source parameter: - "cdse": Copernicus Data Space Ecosystem (CDSE) - "gee": Google Earth Engine (GEE)

Both sources require accounts and proper credential setup on the system.

Scene Selection

Scenes can be selected using one of four mutually exclusive methods (priority order):

  1. scene_ids: Direct list of Sentinel-2 scene IDs
  2. scene_id_file: JSON file containing scene IDs
  3. tile_ids: List of Sentinel-2 tile IDs (e.g., "33UVP") with optional filters
  4. aoi_file: Shapefile defining area of interest with optional filters
Filtering Options

When using tile_ids or aoi_file, scenes can be filtered by: - Cloud/snow cover: max_cloud_cover, max_snow_cover - Date range: start_date and end_date (YYYY-MM-DD format) - OR specific months/years: months (1-12) and years

Note: Date range takes priority over month/year filtering. Warning: No temporal filtering may cause rate-limit errors. Note: Month/year filtering is experimental and only implemented for CDSE.

Offline Processing

Use cli_prepare_data to download data for offline use. The prep_data_scene_id_file stores scene IDs from queries for offline reuse.

Parameters:

  • scene_ids (list[str] | None, default: None ) –

    Direct list of Sentinel-2 scene IDs to process. Defaults to None.

  • scene_id_file (pathlib.Path | None, default: None ) –

    JSON file containing scene IDs to process. Defaults to None.

  • tile_ids (list[str] | None, default: None ) –

    List of Sentinel-2 tile IDs (requires filtering params). Defaults to None.

  • aoi_file (pathlib.Path | None, default: None ) –

    Shapefile with area of interest (requires filtering params). Defaults to None.

  • start_date (str | None, default: None ) –

    Start date for filtering (YYYY-MM-DD format). Defaults to None.

  • end_date (str | None, default: None ) –

    End date for filtering (YYYY-MM-DD format). Defaults to None.

  • max_cloud_cover (int | None, default: 10 ) –

    Maximum cloud cover percentage (0-100). Defaults to 10.

  • max_snow_cover (int | None, default: 10 ) –

    Maximum snow cover percentage (0-100). Defaults to 10.

  • months (list[int] | None, default: None ) –

    Filter by months (1-12). Defaults to None.

  • years (list[int] | None, default: None ) –

    Filter by years. Defaults to None.

  • prep_data_scene_id_file (pathlib.Path | None, default: None ) –

    File to store/load scene IDs for offline processing. Written during prepare_data, read during offline run. Defaults to None.

  • sentinel2_grid_dir (pathlib.Path | None, default: None ) –

    Directory for Sentinel-2 grid shapefiles. Used only in prepare_data with tile_ids. If None, uses default path. Defaults to None.

  • raw_data_store (pathlib.Path | None, default: None ) –

    Directory for storing raw Sentinel-2 data locally. If None, uses default path based on raw_data_source. Defaults to None.

  • no_raw_data_store (bool, default: False ) –

    If True, processes data in-memory without local storage. Overrides raw_data_store. Defaults to False.

  • raw_data_source (typing.Literal['gee', 'cdse'], default: 'cdse' ) –

    Data source to use. Defaults to "cdse".

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/sentinel2-{raw_data_source}. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Requires pre-downloaded data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data to output directory. Defaults to False.

aoi_file class-attribute instance-attribute

aoi_file: pathlib.Path | None = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str | None = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int | None = 10

max_snow_cover class-attribute instance-attribute

max_snow_cover: int | None = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

months class-attribute instance-attribute

months: list[int] | None = None

no_raw_data_store class-attribute instance-attribute

no_raw_data_store: bool = False

offline class-attribute instance-attribute

offline: bool = False

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

prep_data_scene_id_file class-attribute instance-attribute

prep_data_scene_id_file: pathlib.Path | None = None

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

raw_data_source class-attribute instance-attribute

raw_data_source: typing.Literal['gee', 'cdse'] = 'cdse'

raw_data_store class-attribute instance-attribute

raw_data_store: pathlib.Path | None = None

reflection class-attribute instance-attribute

reflection: int = 0

scene_id_file class-attribute instance-attribute

scene_id_file: pathlib.Path | None = None

scene_ids class-attribute instance-attribute

scene_ids: list[str] | None = None

sentinel2_grid_dir class-attribute instance-attribute

sentinel2_grid_dir: pathlib.Path | None = None

start_date class-attribute instance-attribute

start_date: str | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tile_ids class-attribute instance-attribute

tile_ids: list[str] | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

years class-attribute instance-attribute

years: list[int] | None = None

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    logger.debug("Before super")
    super().__post_init__()
    logger.debug("After super")
    self.output_data_dir = self.output_data_dir or paths.output_data(f"sentinel2-{self.raw_data_source}")
    self.raw_data_store = self.raw_data_store or paths.sentinel2_raw_data(self.raw_data_source)
    if self.no_raw_data_store:
        self.raw_data_store = None

cli staticmethod

Run the sequential pipeline for Sentinel-2 data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2Pipeline"):
    """Run the sequential pipeline for Sentinel-2 data.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.Sentinel2Pipeline,
    optical: bool = False,
    aux: bool = False,
    force: bool = False,
)

Download all necessary data for offline processing.

Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data. Stores scene IDs in prep_data_scene_id_file if specified for later offline use.

Parameters:

  • pipeline (darts.pipelines.sequential_v2.Sentinel2Pipeline) –

    Configured Sentinel2Pipeline instance.

  • optical (bool, default: False ) –

    If True, downloads optical (Sentinel-2) imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

  • force (bool, default: False ) –

    If True, downloads all possible data, independent of optical and aux flags or model needs. Defaults to False.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(
    *, pipeline: "Sentinel2Pipeline", optical: bool = False, aux: bool = False, force: bool = False
):
    """Download all necessary data for offline processing.

    Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data.
    Stores scene IDs in `prep_data_scene_id_file` if specified for later offline use.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.
        optical: If True, downloads optical (Sentinel-2) imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.
        force: If True, downloads all possible data, independent of `optical` and `aux` flags or model needs.
            Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."

    # !: Because of an unknown bug, __post_init__ is not initialized automatically
    pipeline.__post_init__()

    logger.debug(f"Preparing data with {optical=}, {aux=}.")

    if pipeline.prep_data_scene_id_file is not None:
        if pipeline.prep_data_scene_id_file.exists():
            logger.warning(
                f"Prep-data scene id file {pipeline.prep_data_scene_id_file=} already exists. "
                "It will be overwritten."
            )
            pipeline.prep_data_scene_id_file.unlink()
    pipeline.prepare_data(optical=optical, aux=aux, force=force)

prepare_data

prepare_data(
    optical: bool = False,
    aux: bool = False,
    force: bool = False,
)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

  • force (bool, default: False ) –

    If True, downloads all possible data, independent of optical and aux flags or model needs. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False, force: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.
        force: If True, downloads all possible data, independent of `optical` and `aux` flags or model needs.
            Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    # ? We only want to download stuff - no need for using the GPU here
    self.device = "cpu"
    self._dump_config()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)

    if aux or force:
        # Get the ensemble to check which auxiliary data is necessary
        if force:
            needs_arcticdem, needs_tcvis = True, True
        else:
            ensemble = self._load_ensemble()
            needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical and not force:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

Sentinel2RayPipeline dataclass

Sentinel2RayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    aoi_shapefile: pathlib.Path = None,
    start_date: str = None,
    end_date: str = None,
    max_cloud_cover: int = 10,
    input_cache: pathlib.Path = pathlib.Path(
        "data/cache/input"
    ),
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for Sentinel 2 data based on an area of interest.

Parameters:

  • aoi_shapefile (pathlib.Path, default: None ) –

    The shapefile containing the area of interest.

  • start_date (str, default: None ) –

    The start date of the time series in YYYY-MM-DD format.

  • end_date (str, default: None ) –

    The end date of the time series in YYYY-MM-DD format.

  • max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for filtering the Sentinel 2 scenes. Defaults to 10.

  • input_cache (pathlib.Path, default: pathlib.Path('data/cache/input') ) –

    The directory to use for caching the input data. Defaults to Path("data/cache/input").

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

aoi_shapefile class-attribute instance-attribute

aoi_shapefile: pathlib.Path = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

input_cache class-attribute instance-attribute

input_cache: pathlib.Path = pathlib.Path("data/cache/input")

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

start_date class-attribute instance-attribute

start_date: str = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for AOI Sentinel 2 data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2RayPipeline"):
    """Run the sequential pipeline for AOI Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")

VerbosityLevel

Bases: enum.IntEnum

Enum for verbosity levels.

DEBUG class-attribute instance-attribute

DEBUG = 3

NORMAL class-attribute instance-attribute

NORMAL = 0

VERBOSE class-attribute instance-attribute

VERBOSE = 1

VERY_VERBOSE class-attribute instance-attribute

VERY_VERBOSE = 2

from_cli classmethod

from_cli(
    verbose: bool, very_verbose: bool, debug: bool
) -> darts.utils.logging.VerbosityLevel

Get the verbosity level from CLI flags.

Parameters:

  • verbose (bool) –

    Whether the verbose flag is set.

  • very_verbose (bool) –

    Whether the very verbose flag is set.

  • debug (bool) –

    Whether the debug flag is set.

Returns:

Source code in darts/src/darts/utils/logging.py
@classmethod
def from_cli(cls, verbose: bool, very_verbose: bool, debug: bool) -> "VerbosityLevel":
    """Get the verbosity level from CLI flags.

    Args:
        verbose (bool): Whether the verbose flag is set.
        very_verbose (bool): Whether the very verbose flag is set.
        debug (bool): Whether the debug flag is set.

    Returns:
        VerbosityLevel: The corresponding verbosity level.

    """
    if debug:
        return cls.DEBUG
    if very_verbose:
        return cls.VERY_VERBOSE
    if verbose:
        return cls.VERBOSE
    return cls.NORMAL

benchviz

benchviz(
    stopuhr_data: pathlib.Path,
    *,
    viz_dir: pathlib.Path | None = None,
)

Visulize benchmark based on a Stopuhr data file produced by a pipeline run.

Note

This function changes the seaborn theme to "whitegrid" for better visualization.

Parameters:

  • stopuhr_data (pathlib.Path) –

    Path to the Stopuhr data file.

  • viz_dir (pathlib.Path | None, default: None ) –

    Path to the directory where the visualization will be saved. If None, the defaults to the parent directory of the Stopuhr data file. Defaults to None.

Returns:

  • plt.Figure: A matplotlib figure containing the benchmark visualization.

Source code in darts/src/darts/utils/bench.py
def benchviz(
    stopuhr_data: Path,
    *,
    viz_dir: Path | None = None,
):
    """Visulize benchmark based on a Stopuhr data file produced by a pipeline run.

    !!! note
        This function changes the seaborn theme to "whitegrid" for better visualization.

    Args:
        stopuhr_data (Path): Path to the Stopuhr data file.
        viz_dir (Path | None): Path to the directory where the visualization will be saved.
            If None, the defaults to the parent directory of the Stopuhr data file.
            Defaults to None.

    Returns:
        plt.Figure: A matplotlib figure containing the benchmark visualization.

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import pandas as pd
    import seaborn as sns

    # Visualize the results
    sns.set_theme(style="whitegrid")

    assert stopuhr_data.suffix == ".parquet", "Stopuhr data file must be a parquet file."

    times = pd.read_parquet(stopuhr_data)
    times_long = times.melt(ignore_index=False, value_name="time", var_name="step").reset_index(drop=False)
    times_desc = times.describe()
    times_sum = times.sum()

    # Pretty print the results
    for col in times_desc.columns:
        mean = times_desc[col]["mean"]
        std = times_desc[col]["std"]
        total = times_sum[col]
        n = int(times_desc[col]["count"].item())
        logger.info(f"{col} took {mean:.2f} ± {std:.2f}s ({n=} -> {total=:.2f}s)")

    # axs: hist, histlog, bar, heat
    fig, axs = plt.subplot_mosaic(
        [
            ["histlog"] * 4,
            ["histlog"] * 4,
            ["hist", "hist", "heat", "heat"],
            ["hist", "hist", "heat", "heat"],
            ["bar", "bar", "bar", "bar"],
        ],
        layout="constrained",
        figsize=(20, 15),
    )

    sns.histplot(
        data=times_long,
        x="time",
        hue="step",
        bins=100,
        # log_scale=True,
        ax=axs["hist"],
    )
    axs["hist"].set_xlabel("Time in seconds")
    axs["hist"].set_title("Histogram of time taken for each step", fontdict={"fontweight": "bold"})

    sns.histplot(
        data=times_long,
        x="time",
        hue="step",
        bins=100,
        log_scale=True,
        kde=True,
        ax=axs["histlog"],
    )
    axs["histlog"].set_xlabel("Time in seconds")
    axs["histlog"].set_title("Histogram of time taken for each step (log scale)", fontdict={"fontweight": "bold"})

    sns.heatmap(
        times.T,
        robust=True,
        cbar_kws={"label": "Time in seconds"},
        ax=axs["heat"],
    )
    axs["heat"].set_xlabel("Sample")
    axs["heat"].set_title("Heatmap of time taken for each step and sample", fontdict={"fontweight": "bold"})

    bottom = np.array([0.0])
    for i, (step, time_taken) in enumerate(times.mean().items()):
        axs["bar"].barh(["Time taken"], [time_taken], label=step, color=sns.color_palette()[i], left=bottom)
        # Add a text label to the bar
        axs["bar"].text(
            bottom[-1] + time_taken / 2,
            0,
            f"{step}:\n{time_taken:.1f} s",
            va="center",
            ha="center",
            fontsize=10,
            color="white",
        )
        bottom += time_taken
    axs["bar"].legend(loc="upper center", bbox_to_anchor=(0.5, 1.05), ncol=3)
    # Make the y-axis labels vertical
    axs["bar"].set_yticks([0.15], labels=["Time taken"], rotation=90)
    axs["bar"].set_xlabel("Time in seconds")
    axs["bar"].set_title("Avg. time taken for each step", fontdict={"fontweight": "bold"})

    # Save the figure
    viz_dir = viz_dir or stopuhr_data.parent
    viz_dir.mkdir(parents=True, exist_ok=True)
    fpath = viz_dir / stopuhr_data.name.replace(".parquet", ".png")
    fig.savefig(fpath, dpi=300, bbox_inches="tight")
    logger.info(f"Benchmark visualization saved to {fpath.resolve()}")

    return fig

debug_default_paths

Debug and print the current DARTS paths.

Parameters:

Source code in darts/src/darts/cli.py
@app.command
def debug_default_paths(
    default_paths: DefaultPaths = DefaultPaths(), pipeline_paths: PipelineV2Paths = PipelineV2Paths()
):
    """Debug and print the current DARTS paths.

    Args:
        default_paths (DefaultPaths, optional): Default paths to set before logging.
            Defaults to DefaultPaths().
        pipeline_paths (PipelineV2Paths, optional): Pipeline paths to log.
            Defaults to PipelineV2Paths().

    """
    paths.set_defaults(default_paths)
    paths.log_all_paths(level=logging.INFO)
    # TODO: This is just temporary until we upgrade to cyclotps v4 and rework our pipeline structure
    pipeline_paths.log(level=logging.INFO)

env_info

env_info()

Print debug information about the environment.

Source code in darts/src/darts/cli.py
@app.command
def env_info():
    """Print debug information about the environment."""
    from darts.utils.cuda import debug_info

    logger.debug(f"PATH: {os.environ.get('PATH', 'UNSET')}")
    debug_info()

hello

hello(name: str, *, n: int = 1)

Say hello to someone.

Parameters:

  • name (str) –

    The name of the person to say hello to

  • n (int, default: 1 ) –

    The number of times to say hello. Defaults to 1.

Raises:

Source code in darts/src/darts/cli.py
@app.command
def hello(name: str, *, n: int = 1):
    """Say hello to someone.

    Args:
        name (str): The name of the person to say hello to
        n (int, optional): The number of times to say hello. Defaults to 1.

    Raises:
        ValueError: If n is 3.

    """
    for i in range(n):
        logger.debug(f"Currently at {i=}")
        if n == 3:
            raise ValueError("I don't like 3")
        logger.info(f"Hello {name}")

help

help()

Display the help screen.

Source code in darts/src/darts/cli.py
@app.command
def help():
    """Display the help screen."""
    app.help_print()

launcher

launcher(
    *tokens: str,
    log_dir: pathlib.Path = pathlib.Path("logs"),
    config_file: pathlib.Path = pathlib.Path("config.toml"),
    verbose: bool = False,
    very_verbose: bool = False,
    debug: bool = False,
    log_plain: bool = False,
)
Source code in darts/src/darts/cli.py
@app.meta.default
def launcher(  # noqa: D103
    *tokens: Annotated[str, cyclopts.Parameter(show=False, allow_leading_hyphen=True)],
    log_dir: Path = Path("logs"),
    config_file: Path = Path("config.toml"),
    verbose: Annotated[bool, cyclopts.Parameter(alias="-v")] = False,
    very_verbose: Annotated[bool, cyclopts.Parameter(alias="-vv")] = False,
    debug: Annotated[bool, cyclopts.Parameter(alias="-vvv")] = False,
    log_plain: bool = False,
):
    verbosity = VerbosityLevel.from_cli(verbose, very_verbose, debug)
    command, bound, ignored = app.parse_args(tokens, verbose=verbosity == VerbosityLevel.VERBOSE)
    # Set verbosity to 1 for debug stuff like env_info
    if command.__name__ == "env_info" and verbosity == VerbosityLevel.NORMAL:
        verbosity = VerbosityLevel.VERBOSE
    LoggingManager.add_logging_handlers(command.__name__, log_dir, verbosity, log_plain=log_plain)
    logger.debug(f"Running on Python version {sys.version} from {__name__} ({root_file})")
    additional_args = {}
    if "config_file" in ignored:
        additional_args["config_file"] = config_file
    if "log_dir" in ignored:
        additional_args["log_dir"] = log_dir
    if "verbosity" in ignored:
        additional_args["verbosity"] = verbosity
    return command(*bound.args, **bound.kwargs, **additional_args)

preprocess_planet_train_data

preprocess_planet_train_data(
    *,
    data_dir: pathlib.Path,
    labels_dir: pathlib.Path,
    default_dirs: darts_utils.paths.DefaultPaths = darts_utils.paths.DefaultPaths(),
    train_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    admin_dir: pathlib.Path | None = None,
    preprocess_cache: pathlib.Path | None = None,
    force_preprocess: bool = False,
    append: bool = True,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
)

Preprocess Planet data for training.

This function preprocesses Planet scenes into a training-ready format by creating fixed-size patches and storing them in a zarr array for efficient random access during training. All data is stored in a single zarr group with associated metadata.

The preprocessing creates patches of the specified size from each Planet scene and stores them as: - A zarr group containing 'x' (input data) and 'y' (labels) arrays - A geopandas dataframe with metadata including region, position, and label statistics - A configuration file with preprocessing parameters

The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size). The y dataarray contains the labels with shape (n_patches, patch_size, patch_size). Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in each patch being stored in a separate file for super fast random access.

The metadata dataframe contains information about each patch including: - sample_id: Identifier for the source Planet scene - region: Administrative region name - geometry: Spatial extent of the patch - empty: Whether the patch contains positive labeled pixels - Additional metadata as specified

Through exclude_nopositve and exclude_nan, respective patches can be excluded from the final data.

A config.toml file is saved in the train_data_dir containing the configuration used for the preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

The final directory structure of train_data_dir will look like this:

train_data_dir/
├── config.toml
├── data.zarr/
   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
   └── y/          # Label patches [n_patches, patch_size, patch_size]
├── metadata.parquet
└── {timestamp}.cli.toml

Parameters:

  • data_dir (pathlib.Path) –

    The directory containing the Planet scenes and orthotiles.

  • labels_dir (pathlib.Path) –

    The directory containing the labels and footprints / extents.

  • default_dirs (darts_utils.paths.DefaultPaths, default: darts_utils.paths.DefaultPaths() ) –

    The default directories for DARTS. Defaults to a config filled with None.

  • train_data_dir (pathlib.Path | None, default: None ) –

    The "output" directory where the tensors are written to. If None, will use the default training data directory based on the DARTS paths. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    The directory containing the TCVis data. If None, will use the default TCVis directory based on the DARTS paths. Defaults to None.

  • admin_dir (pathlib.Path | None, default: None ) –

    The directory containing the admin files. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • preprocess_cache (pathlib.Path | None, default: None ) –

    The directory to store the preprocessed data. If None, will neither use nor store preprocessed data. Defaults to None.

  • force_preprocess (bool, default: False ) –

    Whether to force the preprocessing of the data. Defaults to False.

  • append (bool, default: True ) –

    Whether to append the data to the existing data. Defaults to True.

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 16 ) –

    The overlap to use for inference. Defaults to 16.

  • exclude_nopositive (bool, default: False ) –

    Whether to exclude patches where the labels do not contain positives. Defaults to False.

  • exclude_nan (bool, default: True ) –

    Whether to exclude patches where the input data has nan values. Defaults to True.

Source code in darts/src/darts/training/preprocess_planet_v2.py
def preprocess_planet_train_data(  # noqa: C901
    *,
    data_dir: Path,
    labels_dir: Path,
    default_dirs: DefaultPaths = DefaultPaths(),
    train_data_dir: Path | None = None,
    arcticdem_dir: Path | None = None,
    tcvis_dir: Path | None = None,
    admin_dir: Path | None = None,
    preprocess_cache: Path | None = None,
    force_preprocess: bool = False,
    append: bool = True,
    device: Literal["cuda", "cpu", "auto"] | int | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
):
    """Preprocess Planet data for training.

    This function preprocesses Planet scenes into a training-ready format by creating fixed-size patches
    and storing them in a zarr array for efficient random access during training. All data is stored in
    a single zarr group with associated metadata.

    The preprocessing creates patches of the specified size from each Planet scene and stores them as:
    - A zarr group containing 'x' (input data) and 'y' (labels) arrays
    - A geopandas dataframe with metadata including region, position, and label statistics
    - A configuration file with preprocessing parameters

    The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size).
    The y dataarray contains the labels with shape (n_patches, patch_size, patch_size).
    Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in
    each patch being stored in a separate file for super fast random access.

    The metadata dataframe contains information about each patch including:
    - sample_id: Identifier for the source Planet scene
    - region: Administrative region name
    - geometry: Spatial extent of the patch
    - empty: Whether the patch contains positive labeled pixels
    - Additional metadata as specified

    Through `exclude_nopositve` and `exclude_nan`, respective patches can be excluded from the final data.

    A `config.toml` file is saved in the `train_data_dir` containing the configuration used for the
    preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

    The final directory structure of `train_data_dir` will look like this:

    ```sh
    train_data_dir/
    ├── config.toml
    ├── data.zarr/
    │   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
    │   └── y/          # Label patches [n_patches, patch_size, patch_size]
    ├── metadata.parquet
    └── {timestamp}.cli.toml
    ```

    Args:
        data_dir (Path): The directory containing the Planet scenes and orthotiles.
        labels_dir (Path): The directory containing the labels and footprints / extents.
        default_dirs (DefaultPaths, optional): The default directories for DARTS. Defaults to a config filled with None.
        train_data_dir (Path | None, optional): The "output" directory where the tensors are written to.
            If None, will use the default training data directory based on the DARTS paths.
            Defaults to None.
        arcticdem_dir (Path | None, optional): The directory containing the ArcticDEM data
            (the datacube and the extent files).
            Will be created and downloaded if it does not exist.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        tcvis_dir (Path | None, optional): The directory containing the TCVis data.
            If None, will use the default TCVis directory based on the DARTS paths.
            Defaults to None.
        admin_dir (Path | None, optional): The directory containing the admin files.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        preprocess_cache (Path | None, optional): The directory to store the preprocessed data.
            If None, will neither use nor store preprocessed data.
            Defaults to None.
        force_preprocess (bool, optional): Whether to force the preprocessing of the data. Defaults to False.
        append (bool, optional): Whether to append the data to the existing data. Defaults to True.
        device (Literal["cuda", "cpu"] | int, optional): The device to run the model on.
            If "cuda" take the first device (0), if int take the specified device.
            If "auto" try to automatically select a free GPU (<50% memory usage).
            Defaults to "cuda" if available, else "cpu".
        ee_project (str, optional): The Earth Engine project ID or number to use. May be omitted if
            project is defined within persistent API credentials obtained via `earthengine authenticate`.
        ee_use_highvolume (bool, optional): Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).
        tpi_outer_radius (int, optional): The outer radius of the annulus kernel for the tpi calculation
            in m. Defaults to 100m.
        tpi_inner_radius (int, optional): The inner radius of the annulus kernel for the tpi calculation
            in m. Defaults to 0.
        patch_size (int, optional): The patch size to use for inference. Defaults to 1024.
        overlap (int, optional): The overlap to use for inference. Defaults to 16.
        exclude_nopositive (bool, optional): Whether to exclude patches where the labels do not contain positives.
            Defaults to False.
        exclude_nan (bool, optional): Whether to exclude patches where the input data has nan values.
            Defaults to True.

    """
    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting preprocessing at {current_time}.")

    paths.set_defaults(default_dirs)
    train_data_dir = train_data_dir or paths.train_data_dir("planet_v2_rts", patch_size)
    arcticdem_dir = arcticdem_dir or paths.arcticdem(2)
    tcvis_dir = tcvis_dir or paths.tcvis()
    admin_dir = admin_dir or paths.admin_boundaries()

    # Storing the configuration as JSON file
    train_data_dir.mkdir(parents=True, exist_ok=True)
    from darts_utils.functools import write_function_args_to_config_file

    write_function_args_to_config_file(
        fpath=train_data_dir / f"{current_time}.cli.toml",
        function=preprocess_planet_train_data,
        locals_=locals(),
    )

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    # Import here to avoid long loading times when running other commands
    import geopandas as gpd
    import pandas as pd
    import rich
    import smart_geocubes
    import xarray as xr
    from darts_acquisition import load_arcticdem, load_planet_masks, load_planet_scene, load_tcvis
    from darts_acquisition.admin import download_admin_files
    from darts_preprocessing import preprocess_v2
    from darts_segmentation.training.prepare_training import TrainDatasetBuilder
    from darts_utils.tilecache import XarrayCacheManager
    from odc.stac import configure_rio
    from rich.progress import track

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee
    from darts.utils.logging import LoggingManager

    device = decide_device(device)
    init_ee(ee_project, ee_use_highvolume)
    configure_rio(cloud_defaults=True, aws={"aws_unsigned": True})
    logger.info("Configured Rasterio")

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    accessor = smart_geocubes.ArcticDEM2m(arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    labels = (gpd.read_file(labels_file) for labels_file in labels_dir.glob("*/TrainingLabel*.gpkg"))
    labels = gpd.GeoDataFrame(pd.concat(labels, ignore_index=True))

    footprints = (gpd.read_file(footprints_file) for footprints_file in labels_dir.glob("*/ImageFootprints*.gpkg"))
    footprints = gpd.GeoDataFrame(pd.concat(footprints, ignore_index=True))
    fpaths = {fpath.stem: fpath for fpath in _legacy_path_gen(data_dir)}
    footprints["fpath"] = footprints.image_id.map(fpaths)

    # Download admin files if they do not exist
    admin2_fpath = admin_dir / "geoBoundariesCGAZ_ADM2.shp"
    if not admin2_fpath.exists():
        download_admin_files(admin_dir)
    admin2 = gpd.read_file(admin2_fpath)

    # We hardcode these since they depend on the preprocessing we use
    bands = [
        "red",
        "green",
        "blue",
        "nir",
        "ndvi",
        "relative_elevation",
        "slope",
        "aspect",
        "hillshade",
        "curvature",
        "tc_brightness",
        "tc_greenness",
        "tc_wetness",
    ]

    builder = TrainDatasetBuilder(
        train_data_dir=train_data_dir,
        patch_size=patch_size,
        overlap=overlap,
        bands=bands,
        exclude_nopositive=exclude_nopositive,
        exclude_nan=exclude_nan,
        device=device,
        append=append,
    )
    cache_manager = XarrayCacheManager(preprocess_cache)

    if append and (train_data_dir / "metadata.parquet").exists():
        metadata = gpd.read_parquet(train_data_dir / "metadata.parquet")
        already_processed_planet_ids = set(metadata["planet_id"].unique())
        logger.info(f"Already processed {len(already_processed_planet_ids)} samples.")
        footprints = footprints[~footprints.image_id.isin(already_processed_planet_ids)]

    for i, footprint in track(
        footprints.iterrows(), description="Processing samples", total=len(footprints), console=rich.get_console()
    ):
        planet_id = footprint.image_id
        info_id = f"{planet_id=} ({i + 1} of {len(footprint)})"
        try:
            logger.info(f"Processing sample {info_id}")

            if not footprint.fpath or (not footprint.fpath.exists() and not cache_manager.exists(planet_id)):
                logger.warning(
                    f"Footprint image '{planet_id}' at {footprint.fpath} does not exist. Skipping {info_id}..."
                )
                continue

            def _get_tile():
                tile = load_planet_scene(footprint.fpath)
                arctidem_res = 2
                arcticdem_buffer = ceil(tpi_outer_radius / arctidem_res * sqrt(2))
                arcticdem = load_arcticdem(
                    tile.odc.geobox, arcticdem_dir, resolution=arctidem_res, buffer=arcticdem_buffer
                )
                tcvis = load_tcvis(tile.odc.geobox, tcvis_dir)
                data_masks = load_planet_masks(footprint.fpath)
                tile = xr.merge([tile, data_masks])

                tile: xr.Dataset = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    tpi_outer_radius,
                    tpi_inner_radius,
                    device,
                )
                return tile

            with timer("Loading tile"):
                tile = cache_manager.get_or_create(
                    identifier=planet_id,
                    creation_func=_get_tile,
                    force=force_preprocess,
                )

            logger.debug(f"Found tile with size {tile.sizes}")

            footprint_labels = labels[labels.image_id == planet_id]
            region = _get_region_name(footprint, admin2)

            with timer("Save as patches"):
                builder.add_tile(
                    tile=tile,
                    labels=footprint_labels,
                    region=region,
                    sample_id=planet_id,
                    metadata={
                        "planet_id": planet_id,
                        "fpath": footprint.fpath,
                    },
                )

            logger.info(f"Processed sample {info_id}")

        except (KeyboardInterrupt, SystemExit, SystemError):
            logger.info("Interrupted by user.")
            break

        except Exception as e:
            logger.warning(f"Could not process sample {info_id}. Skipping...")
            logger.exception(e)

    timer.summary()

    if len(builder) == 0:
        logger.warning("No samples were processed. Exiting...")
        return

    builder.finalize(
        {
            "data_dir": data_dir,
            "labels_dir": labels_dir,
            "arcticdem_dir": arcticdem_dir,
            "tcvis_dir": tcvis_dir,
            "ee_project": ee_project,
            "ee_use_highvolume": ee_use_highvolume,
            "tpi_outer_radius": tpi_outer_radius,
            "tpi_inner_radius": tpi_inner_radius,
        }
    )

preprocess_planet_train_data_pingo

preprocess_planet_train_data_pingo(
    *,
    data_dir: pathlib.Path,
    labels_dir: pathlib.Path,
    default_dirs: darts_utils.paths.DefaultPaths = darts_utils.paths.DefaultPaths(),
    train_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    admin_dir: pathlib.Path | None = None,
    preprocess_cache: pathlib.Path | None = None,
    force_preprocess: bool = False,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
)

Preprocess Planet data for training (Pingo version).

This function preprocesses Planet scenes into a training-ready format by creating fixed-size patches and storing them in a zarr array for efficient random access during training. All data is stored in a single zarr group with associated metadata.

The preprocessing creates patches of the specified size from each Planet scene and stores them as: - A zarr group containing 'x' (input data) and 'y' (labels) arrays - A geopandas dataframe with metadata including region, position, and label statistics - A configuration file with preprocessing parameters

The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size). The y dataarray contains the labels with shape (n_patches, patch_size, patch_size). Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in each patch being stored in a separate file for super fast random access.

The metadata dataframe contains information about each patch including: - sample_id: Identifier for the source Planet scene - region: Administrative region name - geometry: Spatial extent of the patch - empty: Whether the patch contains positive labeled pixels - Additional metadata as specified

Through exclude_nopositive and exclude_nan, respective patches can be excluded from the final data.

A config.toml file is saved in the train_data_dir containing the configuration used for the preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

The final directory structure of train_data_dir will look like this:

train_data_dir/
├── config.toml
├── data.zarr/
   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
   └── y/          # Label patches [n_patches, patch_size, patch_size]
├── metadata.parquet
└── {timestamp}.cli.json

Parameters:

  • data_dir (pathlib.Path) –

    The directory containing the Planet scenes and orthotiles.

  • labels_dir (pathlib.Path) –

    The directory containing the labels and footprints / extents.

  • default_dirs (darts_utils.paths.DefaultPaths, default: darts_utils.paths.DefaultPaths() ) –

    The default directories for DARTS. Defaults to a config filled with None.

  • train_data_dir (pathlib.Path | None, default: None ) –

    The "output" directory where the tensors are written to. If None, will use the default training data directory based on the DARTS paths. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    The directory containing the TCVis data. If None, will use the default TCVis directory based on the DARTS paths. Defaults to None.

  • admin_dir (pathlib.Path | None, default: None ) –

    The directory containing the admin files. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • preprocess_cache (pathlib.Path | None, default: None ) –

    The directory to store the preprocessed data. If None, will neither use nor store preprocessed data. Defaults to None.

  • force_preprocess (bool, default: False ) –

    Whether to force the preprocessing of the data. Defaults to False.

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 16 ) –

    The overlap to use for inference. Defaults to 16.

  • exclude_nopositive (bool, default: False ) –

    Whether to exclude patches where the labels do not contain positives. Defaults to False.

  • exclude_nan (bool, default: True ) –

    Whether to exclude patches where the input data has nan values. Defaults to True.

Source code in darts/src/darts/training/preprocess_planet_v2_pingo.py
def preprocess_planet_train_data_pingo(
    *,
    data_dir: Path,
    labels_dir: Path,
    default_dirs: DefaultPaths = DefaultPaths(),
    train_data_dir: Path | None = None,
    arcticdem_dir: Path | None = None,
    tcvis_dir: Path | None = None,
    admin_dir: Path | None = None,
    preprocess_cache: Path | None = None,
    force_preprocess: bool = False,
    device: Literal["cuda", "cpu", "auto"] | int | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
):
    """Preprocess Planet data for training (Pingo version).

    This function preprocesses Planet scenes into a training-ready format by creating fixed-size patches
    and storing them in a zarr array for efficient random access during training. All data is stored in
    a single zarr group with associated metadata.

    The preprocessing creates patches of the specified size from each Planet scene and stores them as:
    - A zarr group containing 'x' (input data) and 'y' (labels) arrays
    - A geopandas dataframe with metadata including region, position, and label statistics
    - A configuration file with preprocessing parameters

    The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size).
    The y dataarray contains the labels with shape (n_patches, patch_size, patch_size).
    Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in
    each patch being stored in a separate file for super fast random access.

    The metadata dataframe contains information about each patch including:
    - sample_id: Identifier for the source Planet scene
    - region: Administrative region name
    - geometry: Spatial extent of the patch
    - empty: Whether the patch contains positive labeled pixels
    - Additional metadata as specified

    Through `exclude_nopositive` and `exclude_nan`, respective patches can be excluded from the final data.

    A `config.toml` file is saved in the `train_data_dir` containing the configuration used for the
    preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

    The final directory structure of `train_data_dir` will look like this:

    ```sh
    train_data_dir/
    ├── config.toml
    ├── data.zarr/
    │   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
    │   └── y/          # Label patches [n_patches, patch_size, patch_size]
    ├── metadata.parquet
    └── {timestamp}.cli.json
    ```

    Args:
        data_dir (Path): The directory containing the Planet scenes and orthotiles.
        labels_dir (Path): The directory containing the labels and footprints / extents.
        default_dirs (DefaultPaths, optional): The default directories for DARTS. Defaults to a config filled with None.
        train_data_dir (Path | None, optional): The "output" directory where the tensors are written to.
            If None, will use the default training data directory based on the DARTS paths.
            Defaults to None.
        arcticdem_dir (Path | None, optional): The directory containing the ArcticDEM data
            (the datacube and the extent files).
            Will be created and downloaded if it does not exist.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        tcvis_dir (Path | None, optional): The directory containing the TCVis data.
            If None, will use the default TCVis directory based on the DARTS paths.
            Defaults to None.
        admin_dir (Path | None, optional): The directory containing the admin files.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        preprocess_cache (Path | None, optional): The directory to store the preprocessed data.
            If None, will neither use nor store preprocessed data.
            Defaults to None.
        force_preprocess (bool, optional): Whether to force the preprocessing of the data. Defaults to False.
        device (Literal["cuda", "cpu"] | int, optional): The device to run the model on.
            If "cuda" take the first device (0), if int take the specified device.
            If "auto" try to automatically select a free GPU (<50% memory usage).
            Defaults to "cuda" if available, else "cpu".
        ee_project (str, optional): The Earth Engine project ID or number to use. May be omitted if
            project is defined within persistent API credentials obtained via `earthengine authenticate`.
        ee_use_highvolume (bool, optional): Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).
        tpi_outer_radius (int, optional): The outer radius of the annulus kernel for the tpi calculation
            in m. Defaults to 100m.
        tpi_inner_radius (int, optional): The inner radius of the annulus kernel for the tpi calculation
            in m. Defaults to 0.
        patch_size (int, optional): The patch size to use for inference. Defaults to 1024.
        overlap (int, optional): The overlap to use for inference. Defaults to 16.
        exclude_nopositive (bool, optional): Whether to exclude patches where the labels do not contain positives.
            Defaults to False.
        exclude_nan (bool, optional): Whether to exclude patches where the input data has nan values.
            Defaults to True.

    """
    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting preprocessing at {current_time}.")

    paths.set_defaults(default_dirs)
    train_data_dir = train_data_dir or paths.train_data_dir("planet_v2_pingo", patch_size)
    arcticdem_dir = arcticdem_dir or paths.arcticdem(2)
    tcvis_dir = tcvis_dir or paths.tcvis()
    admin_dir = admin_dir or paths.admin_boundaries()

    # Storing the configuration as JSON file
    train_data_dir.mkdir(parents=True, exist_ok=True)
    from darts_utils.functools import write_function_args_to_config_file

    write_function_args_to_config_file(
        fpath=train_data_dir / f"{current_time}.cli.toml",
        function=preprocess_planet_train_data_pingo,
        locals_=locals(),
    )

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    # Import here to avoid long loading times when running other commands
    import geopandas as gpd
    import pandas as pd
    import rich
    import smart_geocubes
    import xarray as xr
    from darts_acquisition import load_arcticdem, load_planet_masks, load_planet_scene, load_tcvis
    from darts_acquisition.admin import download_admin_files
    from darts_preprocessing import preprocess_v2
    from darts_segmentation.training.prepare_training import TrainDatasetBuilder
    from darts_utils.tilecache import XarrayCacheManager
    from odc.stac import configure_rio
    from rich.progress import track

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee
    from darts.utils.logging import LoggingManager

    device = decide_device(device)
    init_ee(ee_project, ee_use_highvolume)
    configure_rio(cloud_defaults=True, aws={"aws_unsigned": True})
    logger.info("Configured Rasterio")

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    accessor = smart_geocubes.ArcticDEM2m(arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    labels = (gpd.read_file(labels_file) for labels_file in labels_dir.glob("*/TrainingLabel*.gpkg"))
    labels = gpd.GeoDataFrame(pd.concat(labels, ignore_index=True))

    footprints = (gpd.read_file(footprints_file) for footprints_file in labels_dir.glob("*/ImageFootprints*.gpkg"))
    footprints = gpd.GeoDataFrame(pd.concat(footprints, ignore_index=True))
    footprints["fpath"] = footprints.image_id.map(_path_gen(data_dir))

    # Download admin files if they do not exist
    admin2_fpath = admin_dir / "geoBoundariesCGAZ_ADM2.shp"
    if not admin2_fpath.exists():
        download_admin_files(admin_dir)
    admin2 = gpd.read_file(admin2_fpath)

    # We hardcode these since they depend on the preprocessing we use
    bands = [
        "red",
        "green",
        "blue",
        "nir",
        "ndvi",
        "relative_elevation",
        "slope",
        "aspect",
        "hillshade",
        "curvature",
        "tc_brightness",
        "tc_greenness",
        "tc_wetness",
    ]

    builder = TrainDatasetBuilder(
        train_data_dir=train_data_dir,
        patch_size=patch_size,
        overlap=overlap,
        bands=bands,
        exclude_nopositive=exclude_nopositive,
        exclude_nan=exclude_nan,
        device=device,
    )
    cache_manager = XarrayCacheManager(preprocess_cache)

    for i, footprint in track(
        footprints.iterrows(), description="Processing samples", total=len(footprints), console=rich.get_console()
    ):
        planet_id = footprint.image_id
        info_id = f"{planet_id=} ({i + 1} of {len(footprint)})"
        try:
            logger.debug(f"Processing sample {info_id}")

            if not footprint.fpath or (not footprint.fpath.exists() and not cache_manager.exists(planet_id)):
                logger.warning(
                    f"Footprint image '{planet_id}' at {footprint.fpath} does not exist. Skipping {info_id}..."
                )
                continue

            def _get_tile():
                tile = load_planet_scene(footprint.fpath)
                arctidem_res = 2
                arcticdem_buffer = ceil(tpi_outer_radius / arctidem_res * sqrt(2))
                arcticdem = load_arcticdem(
                    tile.odc.geobox, arcticdem_dir, resolution=arctidem_res, buffer=arcticdem_buffer
                )
                tcvis = load_tcvis(tile.odc.geobox, tcvis_dir)
                data_masks = load_planet_masks(footprint.fpath)
                tile = xr.merge([tile, data_masks])

                tile: xr.Dataset = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    tpi_outer_radius,
                    tpi_inner_radius,
                    device,
                )
                return tile

            with timer("Loading tile"):
                tile = cache_manager.get_or_create(
                    identifier=planet_id,
                    creation_func=_get_tile,
                    force=force_preprocess,
                )

            logger.debug(f"Found tile with size {tile.sizes}")

            footprint_labels = labels[labels.image_id == planet_id]
            region = _get_region_name(footprint, admin2)

            with timer("Save as patches"):
                builder.add_tile(
                    tile=tile,
                    labels=footprint_labels,
                    region=region,
                    sample_id=planet_id,
                    metadata={
                        "planet_id": planet_id,
                        "fpath": footprint.fpath,
                    },
                )

            logger.info(f"Processed sample {info_id}")

        except (KeyboardInterrupt, SystemExit, SystemError):
            logger.info("Interrupted by user.")
            break

        except Exception as e:
            logger.warning(f"Could not process sample {info_id} . Skipping...")
            logger.exception(e)

    timer.summary()

    if len(builder) == 0:
        logger.warning("No samples were processed. Exiting...")
        return

    builder.finalize(
        {
            "data_dir": data_dir,
            "labels_dir": labels_dir,
            "arcticdem_dir": arcticdem_dir,
            "tcvis_dir": tcvis_dir,
            "ee_project": ee_project,
            "ee_use_highvolume": ee_use_highvolume,
            "tpi_outer_radius": tpi_outer_radius,
            "tpi_inner_radius": tpi_inner_radius,
        }
    )

preprocess_s2_train_data

preprocess_s2_train_data(
    *,
    labels_dir: pathlib.Path,
    default_dirs: darts_utils.paths.DefaultPaths = darts_utils.paths.DefaultPaths(),
    train_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    admin_dir: pathlib.Path | None = None,
    planet_data_dir: pathlib.Path | None = None,
    raw_data_store: pathlib.Path | None = None,
    no_raw_data_store: bool = False,
    preprocess_cache: pathlib.Path | None = None,
    matching_cache: pathlib.Path | None = None,
    no_matching_cache: bool = False,
    force_preprocess: bool = False,
    append: bool = True,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    matching_day_range: int = 7,
    matching_max_cloud_cover: int = 10,
    matching_min_intersects: float = 0.7,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
    save_matching_scores: bool = False,
)

Preprocess Sentinel-2 data for training.

This function preprocesses Sentinel-2 scenes matched to Planet footprints into a training-ready format by creating fixed-size patches and storing them in a zarr array for efficient random access during training. All data is stored in a single zarr group with associated metadata.

The preprocessing matches Sentinel-2 scenes to Planet footprints based on temporal and spatial criteria, optionally aligns them spatially to Planet data, and creates patches of the specified size. The data is stored as: - A zarr group containing 'x' (input data) and 'y' (labels) arrays - A geopandas dataframe with metadata including region, position, and label statistics - A configuration file with preprocessing parameters

The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size). The y dataarray contains the labels with shape (n_patches, patch_size, patch_size). Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in each patch being stored in a separate file for super fast random access.

The metadata dataframe contains information about each patch including: - sample_id: Combined identifier for the S2 scene and Planet footprint - region: Administrative region name - geometry: Spatial extent of the patch - empty: Whether the patch contains positive labeled pixels - planet_id: Original Planet scene identifier - s2_id: Sentinel-2 scene identifier - Additional alignment and matching metadata

Through exclude_nopositive and exclude_nan, respective patches can be excluded from the final data.

A config.toml file is saved in the train_data_dir containing the configuration used for the preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

The final directory structure of train_data_dir will look like this:

train_data_dir/
├── config.toml
├── data.zarr/
   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
   └── y/          # Label patches [n_patches, patch_size, patch_size]
├── metadata.parquet
├── matching-cache.json      # Optional matching cache
├── matching-scores.parquet  # Optional matching scores
└── {timestamp}.cli.toml

Parameters:

  • labels_dir (pathlib.Path) –

    The directory containing the labels and footprints / extents.

  • default_dirs (darts_utils.paths.DefaultPaths, default: darts_utils.paths.DefaultPaths() ) –

    The default directories for DARTS. Defaults to a config filled with None.

  • train_data_dir (pathlib.Path | None, default: None ) –

    The "output" directory where the tensors are written to. If None, will use the default training data directory based on the DARTS paths. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    The directory containing the TCVis data. If None, will use the default TCVis directory based on the DARTS paths. Defaults to None.

  • admin_dir (pathlib.Path | None, default: None ) –

    The directory containing the admin files. If None, will use the default auxiliary directory based on the DARTS paths. Defaults to None.

  • planet_data_dir (pathlib.Path, default: None ) –

    The directory containing the Planet scenes and orthotiles. The planet data is used to align the Sentinel-2 data to the Planet data, spatially. Can be set to None if no alignment is wished. Defaults to None.

  • raw_data_store (pathlib.Path | None, default: None ) –

    The directory to use for storing the raw Sentinel 2 data locally. If None, will use the default raw data directory based on the DARTS paths. Defaults to None.

  • no_raw_data_store (bool, default: False ) –

    If True, will not store any raw data locally. This overrides the raw_data_store parameter. Defaults to False.

  • preprocess_cache (pathlib.Path | None, default: None ) –

    The directory to store the preprocessed data. If None, will neither use nor store preprocessed data. Defaults to None.

  • matching_cache (pathlib.Path | None, default: None ) –

    The path to a file where the matchings are stored. Note: this is different from the matching scores. If None, will query the sentinel 2 STAC and calculate the best match based on the criteria. Defaults to None.

  • no_matching_cache (bool, default: False ) –

    If True, will not use or store any matching cache. This overrides the matching_cache parameter. Defaults to False.

  • force_preprocess (bool, default: False ) –

    Whether to force the preprocessing of the data. Defaults to False.

  • append (bool, default: True ) –

    Whether to append the data to the existing data. Defaults to True.

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com). Defaults to True.

  • matching_day_range (int, default: 7 ) –

    The day range to use for matching S2 scenes to Planet footprints. Defaults to 7.

  • matching_max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for matching S2 scenes to Planet footprints. Defaults to 10.

  • matching_min_intersects (float, default: 0.7 ) –

    The minimum intersection percentage to use for matching S2 scenes to Planet footprints. Defaults to 0.7.

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 16 ) –

    The overlap to use for inference. Defaults to 16.

  • exclude_nopositive (bool, default: False ) –

    Whether to exclude patches where the labels do not contain positives. Defaults to False.

  • exclude_nan (bool, default: True ) –

    Whether to exclude patches where the input data has nan values. Defaults to True.

  • save_matching_scores (bool, default: False ) –

    Whether to save the matching scores. Defaults to False.

Source code in darts/src/darts/training/preprocess_sentinel2_v2.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def preprocess_s2_train_data(  # noqa: C901
    *,
    labels_dir: Path,
    default_dirs: DefaultPaths = DefaultPaths(),
    train_data_dir: Path | None = None,
    arcticdem_dir: Path | None = None,
    tcvis_dir: Path | None = None,
    admin_dir: Path | None = None,
    planet_data_dir: Path | None = None,
    raw_data_store: Path | None = None,
    no_raw_data_store: bool = False,
    preprocess_cache: Path | None = None,
    matching_cache: Path | None = None,
    no_matching_cache: bool = False,
    force_preprocess: bool = False,
    append: bool = True,
    device: Literal["cuda", "cpu", "auto"] | int | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    matching_day_range: int = 7,
    matching_max_cloud_cover: int = 10,
    matching_min_intersects: float = 0.7,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 16,
    exclude_nopositive: bool = False,
    exclude_nan: bool = True,
    save_matching_scores: bool = False,
):
    """Preprocess Sentinel-2 data for training.

    This function preprocesses Sentinel-2 scenes matched to Planet footprints into a training-ready format
    by creating fixed-size patches and storing them in a zarr array for efficient random access during training.
    All data is stored in a single zarr group with associated metadata.

    The preprocessing matches Sentinel-2 scenes to Planet footprints based on temporal and spatial criteria,
    optionally aligns them spatially to Planet data, and creates patches of the specified size. The data is stored as:
    - A zarr group containing 'x' (input data) and 'y' (labels) arrays
    - A geopandas dataframe with metadata including region, position, and label statistics
    - A configuration file with preprocessing parameters

    The x dataarray contains the input data with shape (n_patches, n_bands, patch_size, patch_size).
    The y dataarray contains the labels with shape (n_patches, patch_size, patch_size).
    Both dataarrays are chunked along the n_patches dimension with chunk size 1, resulting in
    each patch being stored in a separate file for super fast random access.

    The metadata dataframe contains information about each patch including:
    - sample_id: Combined identifier for the S2 scene and Planet footprint
    - region: Administrative region name
    - geometry: Spatial extent of the patch
    - empty: Whether the patch contains positive labeled pixels
    - planet_id: Original Planet scene identifier
    - s2_id: Sentinel-2 scene identifier
    - Additional alignment and matching metadata

    Through `exclude_nopositive` and `exclude_nan`, respective patches can be excluded from the final data.

    A `config.toml` file is saved in the `train_data_dir` containing the configuration used for the
    preprocessing. Additionally, a timestamp-based CLI configuration file is saved for reproducibility.

    The final directory structure of `train_data_dir` will look like this:

    ```sh
    train_data_dir/
    ├── config.toml
    ├── data.zarr/
    │   ├── x/          # Input patches [n_patches, n_bands, patch_size, patch_size]
    │   └── y/          # Label patches [n_patches, patch_size, patch_size]
    ├── metadata.parquet
    ├── matching-cache.json      # Optional matching cache
    ├── matching-scores.parquet  # Optional matching scores
    └── {timestamp}.cli.toml
    ```

    Args:
        labels_dir (Path): The directory containing the labels and footprints / extents.
        default_dirs (DefaultPaths, optional): The default directories for DARTS. Defaults to a config filled with None.
        train_data_dir (Path | None, optional): The "output" directory where the tensors are written to.
            If None, will use the default training data directory based on the DARTS paths.
            Defaults to None.
        arcticdem_dir (Path | None, optional): The directory containing the ArcticDEM data
            (the datacube and the extent files).
            Will be created and downloaded if it does not exist.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        tcvis_dir (Path | None, optional): The directory containing the TCVis data.
            If None, will use the default TCVis directory based on the DARTS paths.
            Defaults to None.
        admin_dir (Path | None, optional): The directory containing the admin files.
            If None, will use the default auxiliary directory based on the DARTS paths.
            Defaults to None.
        planet_data_dir (Path, optional): The directory containing the Planet scenes and orthotiles.
            The planet data is used to align the Sentinel-2 data to the Planet data, spatially.
            Can be set to None if no alignment is wished.
            Defaults to None.
        raw_data_store (Path | None): The directory to use for storing the raw Sentinel 2 data locally.
            If None, will use the default raw data directory based on the DARTS paths.
            Defaults to None.
        no_raw_data_store (bool, optional): If True, will not store any raw data locally.
            This overrides the `raw_data_store` parameter.
            Defaults to False.
        preprocess_cache (Path | None, optional): The directory to store the preprocessed data.
            If None, will neither use nor store preprocessed data.
            Defaults to None.
        matching_cache (Path | None, optional): The path to a file where the matchings are stored.
            Note: this is different from the matching scores.
            If None, will query the sentinel 2 STAC and calculate the best match based on the criteria.
            Defaults to None.
        no_matching_cache (bool, optional): If True, will not use or store any matching cache.
            This overrides the `matching_cache` parameter.
            Defaults to False.
        force_preprocess (bool, optional): Whether to force the preprocessing of the data. Defaults to False.
        append (bool, optional): Whether to append the data to the existing data. Defaults to True.
        device (Literal["cuda", "cpu"] | int, optional): The device to run the model on.
            If "cuda" take the first device (0), if int take the specified device.
            If "auto" try to automatically select a free GPU (<50% memory usage).
            Defaults to "cuda" if available, else "cpu".
        ee_project (str, optional): The Earth Engine project ID or number to use. May be omitted if
            project is defined within persistent API credentials obtained via `earthengine authenticate`.
        ee_use_highvolume (bool, optional): Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).
            Defaults to True.
        matching_day_range (int, optional): The day range to use for matching S2 scenes to Planet footprints.
            Defaults to 7.
        matching_max_cloud_cover (int, optional): The maximum cloud cover percentage to use for matching S2 scenes
            to Planet footprints. Defaults to 10.
        matching_min_intersects (float, optional): The minimum intersection percentage to use for matching S2 scenes
            to Planet footprints. Defaults to 0.7.
        tpi_outer_radius (int, optional): The outer radius of the annulus kernel for the tpi calculation
            in m. Defaults to 100m.
        tpi_inner_radius (int, optional): The inner radius of the annulus kernel for the tpi calculation
            in m. Defaults to 0.
        patch_size (int, optional): The patch size to use for inference. Defaults to 1024.
        overlap (int, optional): The overlap to use for inference. Defaults to 16.
        exclude_nopositive (bool, optional): Whether to exclude patches where the labels do not contain positives.
            Defaults to False.
        exclude_nan (bool, optional): Whether to exclude patches where the input data has nan values.
            Defaults to True.
        save_matching_scores (bool, optional): Whether to save the matching scores. Defaults to False.

    """
    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting preprocessing at {current_time}.")

    paths.set_defaults(default_dirs)
    train_data_dir = train_data_dir or paths.train_data_dir("sentinel2_v2_rts", patch_size)
    arcticdem_dir = arcticdem_dir or paths.arcticdem(10)
    tcvis_dir = tcvis_dir or paths.tcvis()
    admin_dir = admin_dir or paths.admin_boundaries()
    raw_data_store = raw_data_store or paths.sentinel2_raw_data("cdse")
    if no_raw_data_store:
        raw_data_store = None
    matching_cache = matching_cache or train_data_dir / "matching-cache.json"
    if no_matching_cache:
        matching_cache = None

    # Storing the configuration as JSON file
    train_data_dir.mkdir(parents=True, exist_ok=True)
    from darts_utils.functools import write_function_args_to_config_file

    write_function_args_to_config_file(
        fpath=train_data_dir / f"{current_time}.cli.toml",
        function=preprocess_s2_train_data,
        locals_=locals(),
    )

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    # Import here to avoid long loading times when running other commands
    import geopandas as gpd
    import pandas as pd
    import rich
    import smart_geocubes
    import xarray as xr
    from botocore.exceptions import ProfileNotFound
    from darts_acquisition import (
        load_arcticdem,
        load_cdse_s2_sr_scene,
        load_tcvis,
        match_cdse_s2_sr_scene_ids_from_geodataframe,
    )
    from darts_acquisition.admin import download_admin_files
    from darts_preprocessing import preprocess_v2
    from darts_segmentation.training.prepare_training import TrainDatasetBuilder
    from darts_utils.tilecache import XarrayCacheManager
    from odc.geo.geom import Geometry
    from pystac import Item
    from rich.progress import track

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee
    from darts.utils.logging import LoggingManager

    device = decide_device(device)
    init_ee(ee_project, ee_use_highvolume)
    logger.info("Configured Rasterio")

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    accessor = smart_geocubes.ArcticDEM10m(arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    labels = (gpd.read_file(labels_file) for labels_file in labels_dir.glob("*/TrainingLabel*.gpkg"))
    labels = gpd.GeoDataFrame(pd.concat(labels, ignore_index=True))

    footprints = (gpd.read_file(footprints_file) for footprints_file in labels_dir.glob("*/ImageFootprints*.gpkg"))
    footprints = gpd.GeoDataFrame(pd.concat(footprints, ignore_index=True))
    footprints["geometry"] = footprints["geometry"].simplify(0.001)  # Simplify to reduce compute
    footprints["date"] = footprints.apply(_parse_date, axis=1)
    if planet_data_dir is not None:
        fpaths = {fpath.stem: fpath for fpath in _planet_legacy_path_gen(planet_data_dir)}
        footprints["fpath"] = footprints.image_id.map(fpaths)

    logger.info(f"label directory contained {len(footprints)} footprints")

    # Find S2 scenes that intersect with the Planet footprints
    if matching_cache is None or not matching_cache.exists():
        logger.info("evaluating online CDSE catalogue for matching Sentinel-2 scenes")
        matches = match_cdse_s2_sr_scene_ids_from_geodataframe(
            aoi=footprints,
            day_range=matching_day_range,
            max_cloud_cover=matching_max_cloud_cover,
            min_intersects=matching_min_intersects,
            simplify_geometry=0.001,
            save_scores=train_data_dir / "matching-scores.parquet" if save_matching_scores else None,
        )
        if matching_cache is not None:
            matches_serializable = {k: v.to_dict() if isinstance(v, Item) else "None" for k, v in matches.items()}
            with matching_cache.open("w") as f:
                json.dump(matches_serializable, f)
            logger.info(f"Saved matching scores to {matching_cache}")
            del matches_serializable  # Free memory
    else:
        logger.info(f"Loading matching scores from {matching_cache}")
        with matching_cache.open("r") as f:
            matches_serializable = json.load(f)
        matches = {int(k): Item.from_dict(v) if v != "None" else None for k, v in matches_serializable.items()}
        del matches_serializable  # Free memory
    footprints["s2_item"] = footprints.index.map(matches)

    # Filter out footprints without a matching S2 item
    logger.info(f"Found {len(footprints)} footprints, {footprints.s2_item.notna().sum()} with matching S2 items.")
    footprints = footprints[footprints.s2_item.notna()]

    # Download admin files if they do not exist
    admin2_fpath = admin_dir / "geoBoundariesCGAZ_ADM2.shp"
    if not admin2_fpath.exists():
        download_admin_files(admin_dir)
    admin2 = gpd.read_file(admin2_fpath)

    # We hardcode these since they depend on the preprocessing we use
    bands = [
        "red",
        "green",
        "blue",
        "nir",
        "ndvi",
        "relative_elevation",
        "slope",
        "aspect",
        "hillshade",
        "curvature",
        "tc_brightness",
        "tc_greenness",
        "tc_wetness",
    ]

    builder = TrainDatasetBuilder(
        train_data_dir=train_data_dir,
        patch_size=patch_size,
        overlap=overlap,
        bands=bands,
        exclude_nopositive=exclude_nopositive,
        exclude_nan=exclude_nan,
        device=device,
        append=append,
    )
    cache_manager = XarrayCacheManager(preprocess_cache)

    if append and (train_data_dir / "metadata.parquet").exists():
        metadata = gpd.read_parquet(train_data_dir / "metadata.parquet")
        already_processed_planet_ids = set(metadata["planet_id"].unique())
        logger.info(f"Already processed {len(already_processed_planet_ids)} samples.")
        footprints = footprints[~footprints.image_id.isin(already_processed_planet_ids)]

    for i, footprint in track(
        footprints.iterrows(), description="Processing samples", total=len(footprints), console=rich.get_console()
    ):
        s2_item = footprint.s2_item
        # Convert to stac item if dictionary
        if isinstance(s2_item, dict):
            s2_item = Item.from_dict(s2_item)

        s2_id = s2_item.id
        planet_id = footprint.image_id
        info_id = f"{s2_id=} -> {planet_id=} ({i + 1} of {len(footprints)})"
        try:
            logger.info(f"Processing sample {info_id}")

            if planet_data_dir is not None and (
                not footprint.fpath or pd.isna(footprint.fpath) or (not footprint.fpath.exists())
            ):
                logger.warning(
                    f"Footprint image {planet_id} at {footprint.fpath} does not exist. Skipping sample {info_id}..."
                )
                continue

            def _get_tile():
                s2ds = load_cdse_s2_sr_scene(s2_item, store=raw_data_store)

                # Crop to footprint geometry
                geom = Geometry(footprint.geometry, crs=footprints.crs)
                s2ds = s2ds.odc.crop(geom, apply_mask=True)
                # Crop above will change all dtypes to float32 -> change them back for s2_scl and qa mask
                s2ds["s2_scl"] = s2ds["s2_scl"].fillna(0.0).astype("uint8")
                s2ds["quality_data_mask"] = s2ds["quality_data_mask"].fillna(0.0).astype("uint8")

                # Preprocess as usual
                arctidem_res = 10
                arcticdem_buffer = ceil(tpi_outer_radius / arctidem_res * sqrt(2))
                arcticdem = load_arcticdem(
                    s2ds.odc.geobox, arcticdem_dir, resolution=arctidem_res, buffer=arcticdem_buffer
                )
                tcvis = load_tcvis(s2ds.odc.geobox, tcvis_dir)

                s2ds: xr.Dataset = preprocess_v2(
                    s2ds,
                    arcticdem,
                    tcvis,
                    tpi_outer_radius,
                    tpi_inner_radius,
                    device,
                )
                return s2ds

            with timer("Loading tile"):
                tile = cache_manager.get_or_create(
                    identifier=f"preprocess-s2train-v2-{s2_id}_{planet_id}",
                    creation_func=_get_tile,
                    force=force_preprocess,
                )
            logger.debug(f"Found tile with size {tile.sizes}")

            # Skip if the size is too small
            if tile.sizes["x"] < patch_size or tile.sizes["y"] < patch_size:
                logger.info(f"Skipping sample {info_id} due to small size {tile.sizes}.")
                continue

            footprint_labels = labels[labels.image_id == planet_id].to_crs(tile.odc.crs)
            region = _get_region_name(footprint, admin2)

            if planet_data_dir is not None:
                with timer("Align to PLANET"):
                    footprint_labels, offsets_info = _align_offsets(tile, footprint, footprint_labels)

            with timer("Save as patches"):
                builder.add_tile(
                    tile=tile,
                    labels=footprint_labels,
                    region=region,
                    sample_id=f"{s2_id}_{planet_id}",
                    metadata={
                        "planet_id": planet_id,
                        "s2_id": s2_id,
                        "fpath": footprint.fpath,
                        **offsets_info,
                    },
                )

            logger.info(f"Processed sample {info_id}")

        except (KeyboardInterrupt, SystemExit, SystemError):
            logger.info("Interrupted by user.")
            break
        except ProfileNotFound:
            logger.error("tried to download from CDSE@AWS but no CDSE credentials found. ")
            return
        except Exception as e:
            logger.warning(f"Could not process sample {info_id}. Skipping...")
            logger.exception(e)

    timer.summary()

    if len(builder) == 0:
        logger.warning("No samples were processed. Exiting...")
        return

    builder.finalize(
        {
            "planet_data_dir": planet_data_dir,
            "labels_dir": labels_dir,
            "arcticdem_dir": arcticdem_dir,
            "tcvis_dir": tcvis_dir,
            "ee_project": ee_project,
            "ee_use_highvolume": ee_use_highvolume,
            "tpi_outer_radius": tpi_outer_radius,
            "tpi_inner_radius": tpi_inner_radius,
        }
    )

shell

shell()

Open an interactive shell.

Source code in darts/src/darts/cli.py
@app.command
def shell():
    """Open an interactive shell."""
    app.interactive_shell()

start_app

start_app()

Wrapp to start the app.

Source code in darts/src/darts/cli.py
def start_app():
    """Wrapp to start the app."""
    try:
        # First time initialization of the logging manager
        LoggingManager.setup_logging()
        app.meta()
    except KeyboardInterrupt:
        logger.info("Interrupted by user. Closing...")
    except SystemExit:
        logger.info("Closing...")
    except Exception as e:
        logger.exception(e)