Skip to content

darts.pipelines

Predefined pipelines for DARTS.

PlanetPipeline dataclass

PlanetPipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    orthotiles_dir: pathlib.Path | None = None,
    scenes_dir: pathlib.Path | None = None,
    image_ids: list = None,
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing PlanetScope data.

Processes PlanetScope imagery (both orthotiles and scenes) for RTS segmentation. Supports both offline and online processing modes.

Data Structure

Expects PlanetScope data organized as: - Orthotiles: orthotiles_dir/tile_id/scene_id/ - Scenes: scenes_dir/scene_id/

Parameters:

  • orthotiles_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope orthotiles. If None, uses default path from DARTS paths. Defaults to None.

  • scenes_dir (pathlib.Path | None, default: None ) –

    Directory containing PlanetScope scenes. If None, uses default path from DARTS paths. Defaults to None.

  • image_ids (list | None, default: None ) –

    List of image/scene IDs to process. If None, processes all images found in orthotiles_dir and scenes_dir. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/planet. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

offline class-attribute instance-attribute

offline: bool = False

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path | None = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    super().__post_init__()
    self.output_data_dir = self.output_data_dir or (paths.out / "planet")
    self.orthotiles_dir = self.orthotiles_dir or paths.planet_orthotiles()
    self.scenes_dir = self.scenes_dir or paths.planet_scenes()

cli staticmethod

Run the sequential pipeline for PlanetScope data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "PlanetPipeline"):
    """Run the sequential pipeline for PlanetScope data.

    Args:
        pipeline: Configured PlanetPipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.PlanetPipeline,
    aux: bool = False,
)

Download all necessary data for offline processing.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(*, pipeline: "PlanetPipeline", aux: bool = False):
    """Download all necessary data for offline processing.

    Args:
        pipeline: Configured PlanetPipeline instance.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."
    pipeline.__post_init__()
    pipeline.prepare_data(optical=False, aux=aux)

prepare_data

prepare_data(optical: bool = False, aux: bool = False)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    self._validate()
    self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if aux:
        # Get the ensemble to check which auxiliary data is necessary
        ensemble = self._load_ensemble()
        needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

PlanetRayPipeline dataclass

PlanetRayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    orthotiles_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSOrthoTile"
    ),
    scenes_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSScene"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for PlanetScope data.

Parameters:

  • orthotiles_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSOrthoTile') ) –

    The directory containing the PlanetScope orthotiles.

  • scenes_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSScene') ) –

    The directory containing the PlanetScope scenes.

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSOrthoTile"
)

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSScene"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Planet data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "PlanetRayPipeline"):
    """Run the sequential pipeline for Planet data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")

Sentinel2Pipeline dataclass

Sentinel2Pipeline(
    model_files: list[pathlib.Path] = None,
    default_dirs: darts_utils.paths.DefaultPaths = (
        lambda: darts_utils.paths.DefaultPaths()
    )(),
    output_data_dir: pathlib.Path | None = None,
    arcticdem_dir: pathlib.Path | None = None,
    tcvis_dir: pathlib.Path | None = None,
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    offline: bool = False,
    debug_data: bool = False,
    scene_ids: list[str] | None = None,
    scene_id_file: pathlib.Path | None = None,
    tile_ids: list[str] | None = None,
    aoi_file: pathlib.Path | None = None,
    start_date: str | None = None,
    end_date: str | None = None,
    max_cloud_cover: int | None = 10,
    max_snow_cover: int | None = 10,
    months: list[int] | None = None,
    years: list[int] | None = None,
    prep_data_scene_id_file: pathlib.Path | None = None,
    sentinel2_grid_dir: pathlib.Path | None = None,
    raw_data_store: pathlib.Path | None = None,
    no_raw_data_store: bool = False,
    raw_data_source: typing.Literal["gee", "cdse"] = "cdse",
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for processing Sentinel-2 data.

Processes Sentinel-2 Surface Reflectance (SR) imagery from either CDSE or Google Earth Engine. Supports multiple scene selection methods and flexible filtering options.

Source Selection

The data source is specified via the raw_data_source parameter: - "cdse": Copernicus Data Space Ecosystem (CDSE) - "gee": Google Earth Engine (GEE)

Both sources require accounts and proper credential setup on the system.

Scene Selection

Scenes can be selected using one of four mutually exclusive methods (priority order):

  1. scene_ids: Direct list of Sentinel-2 scene IDs
  2. scene_id_file: JSON file containing scene IDs
  3. tile_ids: List of Sentinel-2 tile IDs (e.g., "33UVP") with optional filters
  4. aoi_file: Shapefile defining area of interest with optional filters
Filtering Options

When using tile_ids or aoi_file, scenes can be filtered by: - Cloud/snow cover: max_cloud_cover, max_snow_cover - Date range: start_date and end_date (YYYY-MM-DD format) - OR specific months/years: months (1-12) and years

Note: Date range takes priority over month/year filtering. Warning: No temporal filtering may cause rate-limit errors. Note: Month/year filtering is experimental and only implemented for CDSE.

Offline Processing

Use cli_prepare_data to download data for offline use. The prep_data_scene_id_file stores scene IDs from queries for offline reuse.

Parameters:

  • scene_ids (list[str] | None, default: None ) –

    Direct list of Sentinel-2 scene IDs to process. Defaults to None.

  • scene_id_file (pathlib.Path | None, default: None ) –

    JSON file containing scene IDs to process. Defaults to None.

  • tile_ids (list[str] | None, default: None ) –

    List of Sentinel-2 tile IDs (requires filtering params). Defaults to None.

  • aoi_file (pathlib.Path | None, default: None ) –

    Shapefile with area of interest (requires filtering params). Defaults to None.

  • start_date (str | None, default: None ) –

    Start date for filtering (YYYY-MM-DD format). Defaults to None.

  • end_date (str | None, default: None ) –

    End date for filtering (YYYY-MM-DD format). Defaults to None.

  • max_cloud_cover (int | None, default: 10 ) –

    Maximum cloud cover percentage (0-100). Defaults to 10.

  • max_snow_cover (int | None, default: 10 ) –

    Maximum snow cover percentage (0-100). Defaults to 10.

  • months (list[int] | None, default: None ) –

    Filter by months (1-12). Defaults to None.

  • years (list[int] | None, default: None ) –

    Filter by years. Defaults to None.

  • prep_data_scene_id_file (pathlib.Path | None, default: None ) –

    File to store/load scene IDs for offline processing. Written during prepare_data, read during offline run. Defaults to None.

  • sentinel2_grid_dir (pathlib.Path | None, default: None ) –

    Directory for Sentinel-2 grid shapefiles. Used only in prepare_data with tile_ids. If None, uses default path. Defaults to None.

  • raw_data_store (pathlib.Path | None, default: None ) –

    Directory for storing raw Sentinel-2 data locally. If None, uses default path based on raw_data_source. Defaults to None.

  • no_raw_data_store (bool, default: False ) –

    If True, processes data in-memory without local storage. Overrides raw_data_store. Defaults to False.

  • raw_data_source (typing.Literal['gee', 'cdse'], default: 'cdse' ) –

    Data source to use. Defaults to "cdse".

  • model_files (pathlib.Path | list[pathlib.Path] | None, default: None ) –

    Path(s) to model file(s) for segmentation. Single Path implies write_model_outputs=False. If None, searches default model directory for all .pt files. Defaults to None.

  • output_data_dir (pathlib.Path | None, default: None ) –

    Output directory for results. If None, uses {default_out}/sentinel2-{raw_data_source}. Defaults to None.

  • arcticdem_dir (pathlib.Path | None, default: None ) –

    Directory for ArcticDEM datacube. Will be created/downloaded if needed. If None, uses default path. Defaults to None.

  • tcvis_dir (pathlib.Path | None, default: None ) –

    Directory for TCVis data. If None, uses default path. Defaults to None.

  • device (typing.Literal['cuda', 'cpu', 'auto'] | int | None, default: None ) –

    Computation device. "cuda" uses GPU 0, int specifies GPU index, "auto" selects free GPU. Defaults to None.

  • ee_project (str | None, default: None ) –

    Earth Engine project ID. May be omitted if defined in persistent credentials. Defaults to None.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use EE high-volume server. Defaults to True.

  • tpi_outer_radius (int, default: 100 ) –

    Outer radius (m) for TPI calculation. Defaults to 100.

  • tpi_inner_radius (int, default: 0 ) –

    Inner radius (m) for TPI calculation. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    Patch size for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    Overlap between patches. Defaults to 256.

  • batch_size (int, default: 8 ) –

    Batch size for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    Reflection padding for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    Threshold for binarizing probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    Disk size for mask erosion and inner edge cropping. Defaults to 10.

  • edge_erosion_size (int | None, default: None ) –

    Size for outer edge cropping. If None, uses mask_erosion_size. Defaults to None.

  • min_object_size (int, default: 32 ) –

    Minimum object size (pixels) to keep. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    Quality filtering level. 0="none", 1="low_quality", 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    Bands to export. Can include "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata", or specific band names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Save individual model outputs (not just ensemble). Defaults to False.

  • overwrite (bool, default: False ) –

    Overwrite existing output files. Defaults to False.

  • offline (bool, default: False ) –

    Skip downloading missing data. Requires pre-downloaded data. Defaults to False.

  • debug_data (bool, default: False ) –

    Write intermediate debugging data to output directory. Defaults to False.

aoi_file class-attribute instance-attribute

aoi_file: pathlib.Path | None = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path | None = None

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

debug_data class-attribute instance-attribute

debug_data: bool = False

default_dirs class-attribute instance-attribute

default_dirs: darts_utils.paths.DefaultPaths = dataclasses.field(
    default_factory=lambda: darts_utils.paths.DefaultPaths()
)

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str | None = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int | None = 10

max_snow_cover class-attribute instance-attribute

max_snow_cover: int | None = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

months class-attribute instance-attribute

months: list[int] | None = None

no_raw_data_store class-attribute instance-attribute

no_raw_data_store: bool = False

offline class-attribute instance-attribute

offline: bool = False

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path | None = None

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

prep_data_scene_id_file class-attribute instance-attribute

prep_data_scene_id_file: pathlib.Path | None = None

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

raw_data_source class-attribute instance-attribute

raw_data_source: typing.Literal['gee', 'cdse'] = 'cdse'

raw_data_store class-attribute instance-attribute

raw_data_store: pathlib.Path | None = None

reflection class-attribute instance-attribute

reflection: int = 0

scene_id_file class-attribute instance-attribute

scene_id_file: pathlib.Path | None = None

scene_ids class-attribute instance-attribute

scene_ids: list[str] | None = None

sentinel2_grid_dir class-attribute instance-attribute

sentinel2_grid_dir: pathlib.Path | None = None

start_date class-attribute instance-attribute

start_date: str | None = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path | None = None

tile_ids class-attribute instance-attribute

tile_ids: list[str] | None = None

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

years class-attribute instance-attribute

years: list[int] | None = None

__post_init__

__post_init__()
Source code in darts/src/darts/pipelines/sequential_v2.py
def __post_init__(self):  # noqa: D105
    logger.debug("Before super")
    super().__post_init__()
    logger.debug("After super")
    self.output_data_dir = self.output_data_dir or (paths.out / f"sentinel2-{self.raw_data_source}")
    self.raw_data_store = self.raw_data_store or paths.sentinel2_raw_data(self.raw_data_source)
    if self.no_raw_data_store:
        self.raw_data_store = None

cli staticmethod

Run the sequential pipeline for Sentinel-2 data.

Parameters:

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2Pipeline"):
    """Run the sequential pipeline for Sentinel-2 data.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.

    """
    pipeline.__post_init__()
    pipeline.run()

cli_prepare_data staticmethod

cli_prepare_data(
    *,
    pipeline: darts.pipelines.sequential_v2.Sentinel2Pipeline,
    optical: bool = False,
    aux: bool = False,
)

Download all necessary data for offline processing.

Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data. Stores scene IDs in prep_data_scene_id_file if specified for later offline use.

Parameters:

  • pipeline (darts.pipelines.sequential_v2.Sentinel2Pipeline) –

    Configured Sentinel2Pipeline instance.

  • optical (bool, default: False ) –

    If True, downloads optical (Sentinel-2) imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli_prepare_data(*, pipeline: "Sentinel2Pipeline", optical: bool = False, aux: bool = False):
    """Download all necessary data for offline processing.

    Queries the data source (CDSE or GEE) for scene IDs and downloads optical and/or auxiliary data.
    Stores scene IDs in `prep_data_scene_id_file` if specified for later offline use.

    Args:
        pipeline: Configured Sentinel2Pipeline instance.
        optical: If True, downloads optical (Sentinel-2) imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis). Defaults to False.

    """
    assert not pipeline.offline, "Pipeline must be online to prepare data for offline usage."

    # !: Because of an unknown bug, __post_init__ is not initialized automatically
    pipeline.__post_init__()

    logger.debug(f"Preparing data with {optical=}, {aux=}.")

    if pipeline.prep_data_scene_id_file is not None:
        if pipeline.prep_data_scene_id_file.exists():
            logger.warning(
                f"Prep-data scene id file {pipeline.prep_data_scene_id_file=} already exists. "
                "It will be overwritten."
            )
            pipeline.prep_data_scene_id_file.unlink()
    pipeline.prepare_data(optical=optical, aux=aux)

prepare_data

prepare_data(optical: bool = False, aux: bool = False)

Download and prepare data for offline processing.

Validates configuration, determines data requirements from models, and downloads requested data (optical imagery and/or auxiliary data).

Parameters:

  • optical (bool, default: False ) –

    If True, downloads optical imagery. Defaults to False.

  • aux (bool, default: False ) –

    If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def prepare_data(self, optical: bool = False, aux: bool = False):
    """Download and prepare data for offline processing.

    Validates configuration, determines data requirements from models,
    and downloads requested data (optical imagery and/or auxiliary data).

    Args:
        optical: If True, downloads optical imagery. Defaults to False.
        aux: If True, downloads auxiliary data (ArcticDEM, TCVis) as needed. Defaults to False.

    Raises:
        KeyboardInterrupt: If user interrupts execution.
        SystemExit: If the process is terminated.
        SystemError: If a system error occurs.

    """
    assert optical or aux, "Nothing to prepare. Please set optical and/or aux to True."

    self._validate()
    self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    from darts_acquisition import download_arcticdem, download_tcvis
    from stopuhr import Chronometer

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if aux:
        # Get the ensemble to check which auxiliary data is necessary
        ensemble = self._load_ensemble()
        needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

        if not needs_arcticdem and not needs_tcvis:
            logger.warning("No auxiliary data required by the models. Skipping download of auxiliary data...")
        else:
            logger.info(f"Models {needs_tcvis=} {needs_arcticdem=}.")
            self._create_auxiliary_datacubes(arcticdem=needs_arcticdem, tcvis=needs_tcvis)

            # Predownload auxiliary
            aoi = self._tile_aoi()
            if needs_arcticdem:
                logger.info("start download ArcticDEM")
                with timer("Downloading ArcticDEM"):
                    download_arcticdem(aoi, self.arcticdem_dir, resolution=self._arcticdem_resolution())
            if needs_tcvis:
                logger.info("start download TCVIS")
                init_ee(self.ee_project, self.ee_use_highvolume)
                with timer("Downloading TCVis"):
                    download_tcvis(aoi, self.tcvis_dir)

    # Predownload tiles if optical flag is set
    if not optical:
        return

    # Iterate over all the data
    with timer("Loading Optical"):
        tileinfo = self._tileinfos()
        n_tiles = 0
        logger.info(f"Found {len(tileinfo)} tiles to download.")
        for i, (tilekey, _) in enumerate(tileinfo):
            tile_id = self._get_tile_id(tilekey)
            try:
                self._download_tile(tilekey)
                n_tiles += 1
                logger.info(f"Downloaded sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
            except (KeyboardInterrupt, SystemError, SystemExit) as e:
                logger.warning(f"{type(e).__name__} detected.\nExiting...")
                raise e
            except Exception as e:
                logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
                logger.exception(e)
        else:
            logger.info(f"Downloaded {n_tiles} tiles.")

run

run()

Run the complete segmentation pipeline.

Executes the full pipeline including: 1. Configuration validation and dumping 2. Loading ensemble models 3. Creating/loading auxiliary datacubes 4. Processing each tile: - Loading optical data - Loading auxiliary data (ArcticDEM, TCVis) as needed - Preprocessing - Segmentation - Postprocessing - Exporting results 5. Saving results and timing information

Results are saved to the output directory with timestamped configuration, results parquet file, and timing information.

Raises:

Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    """Run the complete segmentation pipeline.

    Executes the full pipeline including:
    1. Configuration validation and dumping
    2. Loading ensemble models
    3. Creating/loading auxiliary datacubes
    4. Processing each tile:
       - Loading optical data
       - Loading auxiliary data (ArcticDEM, TCVis) as needed
       - Preprocessing
       - Segmentation
       - Postprocessing
       - Exporting results
    5. Saving results and timing information

    Results are saved to the output directory with timestamped configuration,
    results parquet file, and timing information.

    Raises:
        KeyboardInterrupt: If user interrupts execution.

    """
    self._validate()
    current_time = self._dump_config()

    from darts.utils.cuda import debug_info

    debug_info()

    import pandas as pd
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2
    from stopuhr import Chronometer, stopwatch

    from darts.utils.cuda import decide_device
    from darts.utils.earthengine import init_ee

    timer = Chronometer(printer=logger.debug)
    self.device = decide_device(self.device)

    if not self.offline:
        init_ee(self.ee_project, self.ee_use_highvolume)

    self._create_auxiliary_datacubes()

    # determine models to use
    ensemble = self._load_ensemble()
    ensemble_subsets = ensemble.model_names
    needs_arcticdem, needs_tcvis = self._check_aux_needs(ensemble)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=ensemble_subsets)
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)

            if needs_arcticdem:
                with timer("Loading ArcticDEM", log=False):
                    arcticdem_resolution = self._arcticdem_resolution()
                    arcticdem = load_arcticdem(
                        tile.odc.geobox,
                        self.arcticdem_dir,
                        resolution=arcticdem_resolution,
                        buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                        offline=self.offline,
                    )
            else:
                arcticdem = None

            if needs_tcvis:
                with timer("Loading TCVis", log=False):
                    tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir, offline=self.offline)
            else:
                tcvis = None

            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )

            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )

            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=ensemble_subsets if self.write_model_outputs else [],
                    metadata=export_metadata,
                    debug=self.debug_data,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.timer.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

Sentinel2RayPipeline dataclass

Sentinel2RayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    aoi_shapefile: pathlib.Path = None,
    start_date: str = None,
    end_date: str = None,
    max_cloud_cover: int = 10,
    input_cache: pathlib.Path = pathlib.Path(
        "data/cache/input"
    ),
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for Sentinel 2 data based on an area of interest.

Parameters:

  • aoi_shapefile (pathlib.Path, default: None ) –

    The shapefile containing the area of interest.

  • start_date (str, default: None ) –

    The start date of the time series in YYYY-MM-DD format.

  • end_date (str, default: None ) –

    The end date of the time series in YYYY-MM-DD format.

  • max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for filtering the Sentinel 2 scenes. Defaults to 10.

  • input_cache (pathlib.Path, default: pathlib.Path('data/cache/input') ) –

    The directory to use for caching the input data. Defaults to Path("data/cache/input").

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

aoi_shapefile class-attribute instance-attribute

aoi_shapefile: pathlib.Path = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

input_cache class-attribute instance-attribute

input_cache: pathlib.Path = pathlib.Path("data/cache/input")

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

start_date class-attribute instance-attribute

start_date: str = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for AOI Sentinel 2 data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2RayPipeline"):
    """Run the sequential pipeline for AOI Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")