Skip to content

darts.pipelines.sequential_v2

Sequential implementation of the v2 pipelines.

logger module-attribute

logger = logging.getLogger(__name__)

PlanetPipeline dataclass

PlanetPipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    orthotiles_dir: pathlib.Path | None = None,
    scenes_dir: pathlib.Path | None = None,
    image_ids: list = None,
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing PlanetScope data.

Processes PlanetScope imagery (both orthotiles and scenes) for RTS segmentation. Supports both offline and online processing modes.

Data Structure

Expects PlanetScope data organized as: - Orthotiles: orthotiles_dir/tile_id/scene_id/ - Scenes: scenes_dir/scene_id/

Parameters:

  • orthotiles_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope orthotiles. If None, uses default path from DARTS paths. Defaults to None.

  • scenes_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope scenes. If None, uses default path from DARTS paths. Defaults to None.

  • image_ids (list | None, default: None ) –

    List of image/scene IDs to process. If None, processes all images found in orthotiles_dir and scenes_dir. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/planet. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

offline class-attribute instance-attribute

offline: bool = False

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path | None = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    super().__post_init__()
    self.output_data_dir = self.output_data_dir or (paths.out / "planet")
    self.orthotiles_dir = self.orthotiles_dir or paths.planet_orthotiles()
    self.scenes_dir = self.scenes_dir or paths.planet_scenes()

cli staticmethod

Run the sequential pipeline for PlanetScope data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "PlanetPipeline"):
    """Run the sequential pipeline for PlanetScope data.

    Args:
        pipeline: Configured PlanetPipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.PlanetPipeline,
    aux: bool = False,
)

Download all necessary data for offline processing.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(*, pipeline: "PlanetPipeline", aux: bool = False):
    """Download all necessary data for offline processing.

    Args:
        pipeline: Configured PlanetPipeline instance.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."
    pipeline.__post_init__()
    pipeline.prepare_data(optical=False, aux=aux)

prepare_data

prepare_data(optical: bool = False, aux: bool = False)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    self._validate()
    self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if aux:
        # Get the ensemble to check which auxiliary data is necessary
        ensemble = self._load_ensemble()
        needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

Sentinel2Pipeline dataclass

Sentinel2Pipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    scene_ids: list[str] | None = None,
    scene_id_file: pathlib.Path | None = None,
    tile_ids: list[str] | None = None,
    aoi_file: pathlib.Path | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    max_cloud_cover: int | None = 10,
    max_snow_cover: int | None = 10,
    months: list[int] | None = None,
    years: list[int] | None = None,
    prep_data_scene_id_file: pathlib.Path | None = None,
    sentinel2_grid_dir: pathlib.Path | None = None,
    raw_data_store: pathlib.Path | None = None,
    no_raw_data_store: bool = False,
    raw_data_source: typing.Literal["gee", "cdse"] = "cdse",
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing Sentinel-2 data.

Processes Sentinel-2 Surface Reflectance (SR) imagery from either CDSE or Google Earth Engine. Supports multiple scene selection methods and flexible filtering options.

Source Selection

The data source is specified via the raw_data_source parameter: - "cdse": Copernicus Data Space Ecosystem (CDSE) - "gee": Google Earth Engine (GEE)

Both sources require accounts and proper credential setup on the system.

Scene Selection

Scenes can be selected using one of four mutually exclusive methods (priority order):

  1. scene_ids: Direct list of Sentinel-2 scene IDs
  2. scene_id_file: JSON file containing scene IDs
  3. tile_ids: List of Sentinel-2 tile IDs (e.g., "33UVP") with optional filters
  4. aoi_file: Shapefile defining area of interest with optional filters
Filtering Options

When using tile_ids or aoi_file, scenes can be filtered by: - Cloud/snow cover: max_cloud_cover, max_snow_cover - Date range: start_date and end_date (YYYY-MM-DD format) - OR specific months/years: months (1-12) and years

Note: Date range takes priority over month/year filtering. Warning: No temporal filtering may cause rate-limit errors. Note: Month/year filtering is experimental and only implemented for CDSE.

Offline Processing

Use cli_prepare_data to download data for offline use. The prep_data_scene_id_file stores scene IDs from queries for offline reuse.

Parameters:

  • scene_ids (list[str] | None, default: None ) –

    Direct list of Sentinel-2 scene IDs to process. Defaults to None.

  • scene_id_file (pathlib.Path | None, default: None ) –

    JSON file containing scene IDs to process. Defaults to None.

  • tile_ids (list[str] | None, default: None ) –

    List of Sentinel-2 tile IDs (requires filtering params). Defaults to None.

  • aoi_file (pathlib.Path | None, default: None ) –

    Shapefile with area of interest (requires filtering params). Defaults to None.

  • start_date (str | None, default: None ) –

    Start date for filtering (YYYY-MM-DD format). Defaults to None.

  • end_date (str | None, default: None ) –

    End date for filtering (YYYY-MM-DD format). Defaults to None.

  • max_cloud_cover (int | None, default: 10 ) –

    Maximum cloud cover percentage (0-100). Defaults to 10.

  • max_snow_cover (int | None, default: 10 ) –

    Maximum snow cover percentage (0-100). Defaults to 10.

  • months (list[int] | None, default: None ) –

    Filter by months (1-12). Defaults to None.

  • years (list[int] | None, default: None ) –

    Filter by years. Defaults to None.

  • prep_data_scene_id_file (pathlib.Path | None, default: None ) –

    File to store/load scene IDs for offline processing. Written during prepare_data, read during offline run. Defaults to None.

  • sentinel2_grid_dir (pathlib.Path | None, default: None ) –

    Directory for Sentinel-2 grid shapefiles. Used only in prepare_data with tile_ids. If None, uses default path. Defaults to None.

  • raw_data_store (pathlib.Path | None, default: None ) –

    Directory for storing raw Sentinel-2 data locally. If None, uses default path based on raw_data_source. Defaults to None.

  • no_raw_data_store (bool, default: False ) –

    If True, processes data in-memory without local storage. Overrides raw_data_store. Defaults to False.

  • raw_data_source (typing.Literal['gee', 'cdse'], default: 'cdse' ) –

    Data source to use. Defaults to "cdse".

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/sentinel2-{raw_data_source}. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Requires pre-downloaded data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data to output directory. Defaults to False.

aoi_file class-attribute instance-attribute

aoi_file: pathlib.Path | None = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str | None = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int | None = 10

max_snow_cover class-attribute instance-attribute

max_snow_cover: int | None = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

months class-attribute instance-attribute

months: list[int] | None = None

no_raw_data_store class-attribute instance-attribute

no_raw_data_store: bool = False

offline class-attribute instance-attribute

offline: bool = False

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

prep_data_scene_id_file class-attribute instance-attribute

prep_data_scene_id_file: pathlib.Path | None = None

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

raw_data_source class-attribute instance-attribute

raw_data_source: typing.Literal['gee', 'cdse'] = 'cdse'

raw_data_store class-attribute instance-attribute

raw_data_store: pathlib.Path | None = None

reflection class-attribute instance-attribute

reflection: int = 0

scene_id_file class-attribute instance-attribute

scene_id_file: pathlib.Path | None = None

scene_ids class-attribute instance-attribute

scene_ids: list[str] | None = None

sentinel2_grid_dir class-attribute instance-attribute

sentinel2_grid_dir: pathlib.Path | None = None

start_date class-attribute instance-attribute

start_date: str | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tile_ids class-attribute instance-attribute

tile_ids: list[str] | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

years class-attribute instance-attribute

years: list[int] | None = None

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    logger.debug("Before super")
    super().__post_init__()
    logger.debug("After super")
    self.output_data_dir = self.output_data_dir or (paths.out / f"sentinel2-{self.raw_data_source}")
    self.raw_data_store = self.raw_data_store or paths.sentinel2_raw_data(self.raw_data_source)
    if self.no_raw_data_store:
        self.raw_data_store = None

cli staticmethod

Run the sequential pipeline for Sentinel-2 data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2Pipeline"):
    """Run the sequential pipeline for Sentinel-2 data.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.Sentinel2Pipeline,
    optical: bool = False,
    aux: bool = False,
)

Download all necessary data for offline processing.

Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data. Stores scene IDs in prep_data_scene_id_file if specified for later offline use.

Parameters:

  • pipeline (darts.pipelines.sequential_v2.Sentinel2Pipeline) –

    Configured Sentinel2Pipeline instance.

  • optical (bool, default: False ) –

    If True, downloads optical (Sentinel-2) imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(*, pipeline: "Sentinel2Pipeline", optical: bool = False, aux: bool = False):
    """Download all necessary data for offline processing.

    Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data.
    Stores scene IDs in `prep_data_scene_id_file` if specified for later offline use.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.
        optical: If True, downloads optical (Sentinel-2) imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."

    # !: Because of an unknown bug, __post_init__ is not initialized automatically
    pipeline.__post_init__()

    logger.debug(f"Preparing data with {optical=}, {aux=}.")

    if pipeline.prep_data_scene_id_file is not None:
        if pipeline.prep_data_scene_id_file.exists():
            logger.warning(
                f"Prep-data scene id file {pipeline.prep_data_scene_id_file=} already exists. "
                "It will be overwritten."
            )
            pipeline.prep_data_scene_id_file.unlink()
    pipeline.prepare_data(optical=optical, aux=aux)

prepare_data

prepare_data(optical: bool = False, aux: bool = False)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    self._validate()
    self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if aux:
        # Get the ensemble to check which auxiliary data is necessary
        ensemble = self._load_ensemble()
        needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

_BasePipeline dataclass

_BasePipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
)

Bases: abc.ABC

Base class for all v2 pipelines.

This class provides the run and prepare_data methods which are the main entry points for all pipelines.

This class is meant to be subclassed by the specific pipelines (e.g., PlanetPipeline, Sentinel2Pipeline). Subclasses must implement the following abstract methods: - _arcticdem_resolution: Return the ArcticDEM resolution to use (2, 10, or 32 meters). - _get_tile_id: Extract a tile identifier from a tilekey. - _tileinfos: Return a list of (tilekey, output_path) tuples for all tiles to process. - _load_tile: Load optical data for a given tilekey. - _tile_aoi: Return a GeoDataFrame representing the area of interest for all tiles.

Optionally, subclasses can override _download_tile to implement data download functionality.

The subclass must also be a dataclass to fully inherit all parameters of this class.

Parameters:

  • model_files (list[pathlib.Path] | None, default: None ) –

    List of model file paths to use for segmentation. If None, will search the default model directory for all .pt files. Defaults to None.

  • default_dirs (darts_utils.paths.DefaultPaths, default: (lambda: darts_utils.paths.DefaultPaths())() ) –

    Default directory paths configuration. Defaults to DefaultPaths().

  • output_data_dir (pathlib.Path | None, default: None ) –

    The output directory for results. If None, will use the default output directory based on DARTS paths. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory containing ArcticDEM datacube and extent files. If None, will use the default directory based on DARTS paths and resolution. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory containing TCVis data. If None, will use the default TCVis directory. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Device for computation. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU, "cpu" uses CPU. Defaults to None (auto-selected).

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use Earth Engine high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius in meters for TPI (Topographic Position Index) calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius in meters for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches during inference. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Size of disk for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, defaults to mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size in pixels to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. Can be 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export, e.g., "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Whether to save individual model outputs (not just ensemble). Automatically set to False if only one model is used. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    If True, will not attempt to download any missing data. Defaults to False.

  • debug_data (bool, default: False ) –

    If True, writes intermediate data for debugging purposes. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

offline class-attribute instance-attribute

offline: bool = False

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):
    paths.set_defaults(self.default_dirs)
    # The defaults will be overwritten in the respective realizations
    self.output_data_dir = self.output_data_dir or paths.out
    self.model_files = self.model_files or list(paths.models.glob("*.pt"))
    if self.arcticdem_dir is None:
        arcticdem_resolution = self._arcticdem_resolution()
        self.arcticdem_dir = paths.arcticdem(arcticdem_resolution)
    self.tcvis_dir = self.tcvis_dir or paths.tcvis()
    if self.edge_erosion_size is None:
        self.edge_erosion_size = self.mask_erosion_size

prepare_data

prepare_data(optical: bool = False, aux: bool = False)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    self._validate()
    self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if aux:
        # Get the ensemble to check which auxiliary data is necessary
        ensemble = self._load_ensemble()
        needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)