Skip to content

darts.pipelines

darts.pipelines

Predefined pipelines for DARTS.

AOISentinel2BlockPipeline dataclass

AOISentinel2BlockPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    aoi_shapefile: pathlib.Path = None,
    start_date: str = None,
    end_date: str = None,
    max_cloud_cover: int = 10,
    input_cache: pathlib.Path = pathlib.Path(
        "data/cache/input"
    ),
)

Bases: darts.pipelines.block_v1._BaseBlockPipeline

Pipeline for Sentinel 2 data based on an area of interest.

Parameters:

  • aoi_shapefile (pathlib.Path, default: None ) –

    The shapefile containing the area of interest.

  • start_date (str, default: None ) –

    The start date of the time series in YYYY-MM-DD format.

  • end_date (str, default: None ) –

    The end date of the time series in YYYY-MM-DD format.

  • max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for filtering the Sentinel 2 scenes. Defaults to 10.

  • input_cache (pathlib.Path, default: pathlib.Path('data/cache/input') ) –

    The directory to use for caching the input data. Defaults to Path("data/cache/input").

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

aoi_shapefile class-attribute instance-attribute

aoi_shapefile: pathlib.Path = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

input_cache class-attribute instance-attribute

input_cache: pathlib.Path = pathlib.Path("data/cache/input")

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

start_date class-attribute instance-attribute

start_date: str = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for AOI Sentinel 2 data.

Source code in darts/src/darts/pipelines/block_v1.py
@staticmethod
def cli(*, pipeline: "AOISentinel2BlockPipeline"):
    """Run the sequential pipeline for AOI Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/block_v1.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    import xarray as xr
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_legacy_fast

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []

    tmp_dir = self.output_data_dir / "tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)

    res_dir = self.output_data_dir / "results"
    res_dir.mkdir(parents=True, exist_ok=True)

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue

        with timer("Loading & Preprocessing", log=False):
            tile = self._load_tile(tilekey)

            arcticdem = load_arcticdem(
                tile.odc.geobox,
                self.arcticdem_dir,
                resolution=arcticdem_resolution,
                buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
            )

            tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)

            tile = preprocess_legacy_fast(
                tile,
                arcticdem,
                tcvis,
                self.tpi_outer_radius,
                self.tpi_inner_radius,
                "cpu",
            )

            tile.to_netcdf(tmp_dir / f"{tile_id}_preprocessed.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Segmenting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_preprocessed.nc", engine="h5netcdf").set_coords(
                "spatial_ref"
            )
            tile = ensemble.segment_tile(
                tile,
                patch_size=self.patch_size,
                overlap=self.overlap,
                batch_size=self.batch_size,
                reflection=self.reflection,
                keep_inputs=self.write_model_outputs,
            )
            tile.to_netcdf(tmp_dir / f"{tile_id}_segmented.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Postprocessing & Exporting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_segmented.nc", engine="h5netcdf").set_coords("spatial_ref")
            tile = prepare_export(
                tile,
                bin_threshold=self.binarization_threshold,
                mask_erosion_size=self.mask_erosion_size,
                min_object_size=self.min_object_size,
                quality_level=self.quality_level,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
                device=self.device,
            )

            export_tile(
                tile,
                outpath,
                bands=self.export_bands,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
            )

        n_tiles += 1
        results.append(
            {
                "tile_id": tile_id,
                "output_path": str(outpath.resolve()),
                "status": "success",
                "error": None,
            }
        )
        logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")

    if len(results) > 0:
        pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
    if len(timer.durations) > 0:
        timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
    logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
    timer.summary(printer=logger.info)

AOISentinel2Pipeline dataclass

AOISentinel2Pipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    aoi_shapefile: pathlib.Path = None,
    start_date: str = None,
    end_date: str = None,
    max_cloud_cover: int = 10,
    s2_source: typing.Literal["gee", "cdse"] = "cdse",
    s2_download_cache: pathlib.Path = pathlib.Path(
        "data/cache/s2gee"
    ),
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for Sentinel 2 data based on an area of interest.

Parameters:

  • aoi_shapefile (pathlib.Path, default: None ) –

    The shapefile containing the area of interest.

  • start_date (str, default: None ) –

    The start date of the time series in YYYY-MM-DD format.

  • end_date (str, default: None ) –

    The end date of the time series in YYYY-MM-DD format.

  • max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for filtering the Sentinel 2 scenes. Defaults to 10.

  • s2_download_cache (pathlib.Path, default: pathlib.Path('data/cache/s2gee') ) –

    The directory to use for caching the Sentinel 2 download data. Defaults to Path("data/cache/s2gee").

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • edge_erosion_size (int, default: None ) –

    If the edge-cropping should have a different witdth, than the (inner) mask erosion, set it here. Defaults to mask_erosion_size.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

aoi_shapefile class-attribute instance-attribute

aoi_shapefile: pathlib.Path = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

s2_download_cache class-attribute instance-attribute

s2_download_cache: pathlib.Path = pathlib.Path(
    "data/cache/s2gee"
)

s2_source class-attribute instance-attribute

s2_source: typing.Literal['gee', 'cdse'] = 'cdse'

start_date class-attribute instance-attribute

start_date: str = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for AOI Sentinel 2 data.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "AOISentinel2Pipeline"):
    """Run the sequential pipeline for AOI Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer, stopwatch

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)
            with timer("Loading ArcticDEM", log=False):
                arcticdem = load_arcticdem(
                    tile.odc.geobox,
                    self.arcticdem_dir,
                    resolution=arcticdem_resolution,
                    buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                )
            with timer("Loading TCVis", log=False):
                tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)
            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )
            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )
            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    metadata=export_metadata,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

AOISentinel2RayPipeline dataclass

AOISentinel2RayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    aoi_shapefile: pathlib.Path = None,
    start_date: str = None,
    end_date: str = None,
    max_cloud_cover: int = 10,
    input_cache: pathlib.Path = pathlib.Path(
        "data/cache/input"
    ),
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for Sentinel 2 data based on an area of interest.

Parameters:

  • aoi_shapefile (pathlib.Path, default: None ) –

    The shapefile containing the area of interest.

  • start_date (str, default: None ) –

    The start date of the time series in YYYY-MM-DD format.

  • end_date (str, default: None ) –

    The end date of the time series in YYYY-MM-DD format.

  • max_cloud_cover (int, default: 10 ) –

    The maximum cloud cover percentage to use for filtering the Sentinel 2 scenes. Defaults to 10.

  • input_cache (pathlib.Path, default: pathlib.Path('data/cache/input') ) –

    The directory to use for caching the input data. Defaults to Path("data/cache/input").

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

aoi_shapefile class-attribute instance-attribute

aoi_shapefile: pathlib.Path = None

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

end_date class-attribute instance-attribute

end_date: str = None

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

input_cache class-attribute instance-attribute

input_cache: pathlib.Path = pathlib.Path("data/cache/input")

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

max_cloud_cover class-attribute instance-attribute

max_cloud_cover: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

start_date class-attribute instance-attribute

start_date: str = None

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for AOI Sentinel 2 data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "AOISentinel2RayPipeline"):
    """Run the sequential pipeline for AOI Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")

PlanetBlockPipeline dataclass

PlanetBlockPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    orthotiles_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSOrthoTile"
    ),
    scenes_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSScene"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.block_v1._BaseBlockPipeline

Pipeline for PlanetScope data.

Parameters:

  • orthotiles_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSOrthoTile') ) –

    The directory containing the PlanetScope orthotiles.

  • scenes_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSScene') ) –

    The directory containing the PlanetScope scenes.

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSOrthoTile"
)

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSScene"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Planet data.

Source code in darts/src/darts/pipelines/block_v1.py
@staticmethod
def cli(*, pipeline: "PlanetBlockPipeline"):
    """Run the sequential pipeline for Planet data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/block_v1.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    import xarray as xr
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_legacy_fast

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []

    tmp_dir = self.output_data_dir / "tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)

    res_dir = self.output_data_dir / "results"
    res_dir.mkdir(parents=True, exist_ok=True)

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue

        with timer("Loading & Preprocessing", log=False):
            tile = self._load_tile(tilekey)

            arcticdem = load_arcticdem(
                tile.odc.geobox,
                self.arcticdem_dir,
                resolution=arcticdem_resolution,
                buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
            )

            tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)

            tile = preprocess_legacy_fast(
                tile,
                arcticdem,
                tcvis,
                self.tpi_outer_radius,
                self.tpi_inner_radius,
                "cpu",
            )

            tile.to_netcdf(tmp_dir / f"{tile_id}_preprocessed.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Segmenting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_preprocessed.nc", engine="h5netcdf").set_coords(
                "spatial_ref"
            )
            tile = ensemble.segment_tile(
                tile,
                patch_size=self.patch_size,
                overlap=self.overlap,
                batch_size=self.batch_size,
                reflection=self.reflection,
                keep_inputs=self.write_model_outputs,
            )
            tile.to_netcdf(tmp_dir / f"{tile_id}_segmented.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Postprocessing & Exporting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_segmented.nc", engine="h5netcdf").set_coords("spatial_ref")
            tile = prepare_export(
                tile,
                bin_threshold=self.binarization_threshold,
                mask_erosion_size=self.mask_erosion_size,
                min_object_size=self.min_object_size,
                quality_level=self.quality_level,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
                device=self.device,
            )

            export_tile(
                tile,
                outpath,
                bands=self.export_bands,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
            )

        n_tiles += 1
        results.append(
            {
                "tile_id": tile_id,
                "output_path": str(outpath.resolve()),
                "status": "success",
                "error": None,
            }
        )
        logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")

    if len(results) > 0:
        pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
    if len(timer.durations) > 0:
        timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
    logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
    timer.summary(printer=logger.info)

PlanetPipeline dataclass

PlanetPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    orthotiles_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSOrthoTile"
    ),
    scenes_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSScene"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for PlanetScope data.

Parameters:

  • orthotiles_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSOrthoTile') ) –

    The directory containing the PlanetScope orthotiles.

  • scenes_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSScene') ) –

    The directory containing the PlanetScope scenes.

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • edge_erosion_size (int, default: None ) –

    If the edge-cropping should have a different witdth, than the (inner) mask erosion, set it here. Defaults to mask_erosion_size.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSOrthoTile"
)

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSScene"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Planet data.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "PlanetPipeline"):
    """Run the sequential pipeline for Planet data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer, stopwatch

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)
            with timer("Loading ArcticDEM", log=False):
                arcticdem = load_arcticdem(
                    tile.odc.geobox,
                    self.arcticdem_dir,
                    resolution=arcticdem_resolution,
                    buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                )
            with timer("Loading TCVis", log=False):
                tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)
            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )
            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )
            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    metadata=export_metadata,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

PlanetRayPipeline dataclass

PlanetRayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    orthotiles_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSOrthoTile"
    ),
    scenes_dir: pathlib.Path = pathlib.Path(
        "data/input/planet/PSScene"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for PlanetScope data.

Parameters:

  • orthotiles_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSOrthoTile') ) –

    The directory containing the PlanetScope orthotiles.

  • scenes_dir (pathlib.Path, default: pathlib.Path('data/input/planet/PSScene') ) –

    The directory containing the PlanetScope scenes.

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

orthotiles_dir class-attribute instance-attribute

orthotiles_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSOrthoTile"
)

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

scenes_dir class-attribute instance-attribute

scenes_dir: pathlib.Path = pathlib.Path(
    "data/input/planet/PSScene"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Planet data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "PlanetRayPipeline"):
    """Run the sequential pipeline for Planet data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")

Sentinel2BlockPipeline dataclass

Sentinel2BlockPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    sentinel2_dir: pathlib.Path = pathlib.Path(
        "data/input/sentinel2"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.block_v1._BaseBlockPipeline

Pipeline for Sentinel 2 data.

Parameters:

  • sentinel2_dir (pathlib.Path, default: pathlib.Path('data/input/sentinel2') ) –

    The directory containing the Sentinel 2 scenes. Defaults to Path("data/input/sentinel2").

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

sentinel2_dir class-attribute instance-attribute

sentinel2_dir: pathlib.Path = pathlib.Path(
    "data/input/sentinel2"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Sentinel 2 data.

Source code in darts/src/darts/pipelines/block_v1.py
@staticmethod
def cli(*, pipeline: "Sentinel2BlockPipeline"):
    """Run the sequential pipeline for Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/block_v1.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    import xarray as xr
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_legacy_fast

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []

    tmp_dir = self.output_data_dir / "tmp"
    tmp_dir.mkdir(parents=True, exist_ok=True)

    res_dir = self.output_data_dir / "results"
    res_dir.mkdir(parents=True, exist_ok=True)

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue

        with timer("Loading & Preprocessing", log=False):
            tile = self._load_tile(tilekey)

            arcticdem = load_arcticdem(
                tile.odc.geobox,
                self.arcticdem_dir,
                resolution=arcticdem_resolution,
                buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
            )

            tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)

            tile = preprocess_legacy_fast(
                tile,
                arcticdem,
                tcvis,
                self.tpi_outer_radius,
                self.tpi_inner_radius,
                "cpu",
            )

            tile.to_netcdf(tmp_dir / f"{tile_id}_preprocessed.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Segmenting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_preprocessed.nc", engine="h5netcdf").set_coords(
                "spatial_ref"
            )
            tile = ensemble.segment_tile(
                tile,
                patch_size=self.patch_size,
                overlap=self.overlap,
                batch_size=self.batch_size,
                reflection=self.reflection,
                keep_inputs=self.write_model_outputs,
            )
            tile.to_netcdf(tmp_dir / f"{tile_id}_segmented.nc", mode="w", engine="h5netcdf")

    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        with timer("Postprocessing & Exporting", log=False):
            tile = xr.open_dataset(tmp_dir / f"{tile_id}_segmented.nc", engine="h5netcdf").set_coords("spatial_ref")
            tile = prepare_export(
                tile,
                bin_threshold=self.binarization_threshold,
                mask_erosion_size=self.mask_erosion_size,
                min_object_size=self.min_object_size,
                quality_level=self.quality_level,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
                device=self.device,
            )

            export_tile(
                tile,
                outpath,
                bands=self.export_bands,
                ensemble_subsets=models.keys() if self.write_model_outputs else [],
            )

        n_tiles += 1
        results.append(
            {
                "tile_id": tile_id,
                "output_path": str(outpath.resolve()),
                "status": "success",
                "error": None,
            }
        )
        logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")

    if len(results) > 0:
        pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
    if len(timer.durations) > 0:
        timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
    logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
    timer.summary(printer=logger.info)

Sentinel2Pipeline dataclass

Sentinel2Pipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    device: typing.Literal["cuda", "cpu", "auto"]
    | int
    | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    edge_erosion_size: int | None = None,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    sentinel2_dir: pathlib.Path = pathlib.Path(
        "data/input/sentinel2"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.sequential_v2._BasePipeline

Pipeline for Sentinel 2 data.

Parameters:

  • sentinel2_dir (pathlib.Path, default: pathlib.Path('data/input/sentinel2') ) –

    The directory containing the Sentinel 2 scenes. Defaults to Path("data/input/sentinel2").

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int, default: None ) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • edge_erosion_size (int, default: None ) –

    If the edge-cropping should have a different witdth, than the (inner) mask erosion, set it here. Defaults to mask_erosion_size.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis", "metadata" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

device class-attribute instance-attribute

device: (
    typing.Literal["cuda", "cpu", "auto"] | int | None
) = None

edge_erosion_size class-attribute instance-attribute

edge_erosion_size: int | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

sentinel2_dir class-attribute instance-attribute

sentinel2_dir: pathlib.Path = pathlib.Path(
    "data/input/sentinel2"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Sentinel 2 data.

Source code in darts/src/darts/pipelines/sequential_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2Pipeline"):
    """Run the sequential pipeline for Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/sequential_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    from stopuhr import Chronometer, stopwatch

    timer = Chronometer(printer=logger.debug)

    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import pandas as pd
    import smart_geocubes
    import torch
    from darts_acquisition import load_arcticdem, load_tcvis
    from darts_ensemble import EnsembleV1
    from darts_export import export_tile, missing_outputs
    from darts_postprocessing import prepare_export
    from darts_preprocessing import preprocess_v2

    from darts.utils.cuda import decide_device
    from darts.utils.logging import LoggingManager

    self.device = decide_device(self.device)

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    ensemble = EnsembleV1(models, device=torch.device(self.device))

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)

    # Iterate over all the data
    tileinfo = self._tileinfos()
    n_tiles = 0
    logger.info(f"Found {len(tileinfo)} tiles to process.")
    results = []
    for i, (tilekey, outpath) in enumerate(tileinfo):
        tile_id = self._get_tile_id(tilekey)
        try:
            if not self.overwrite:
                mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
                if mo == "none":
                    logger.info(f"Tile {tile_id} already processed. Skipping...")
                    continue
                if mo == "some":
                    logger.warning(
                        f"Tile {tile_id} seems to be already processed, "
                        "but some of the requested outputs are missing. "
                        "Skipping because overwrite=False..."
                    )
                    continue

            with timer("Loading Optical", log=False):
                tile = self._load_tile(tilekey)
            with timer("Loading ArcticDEM", log=False):
                arcticdem = load_arcticdem(
                    tile.odc.geobox,
                    self.arcticdem_dir,
                    resolution=arcticdem_resolution,
                    buffer=ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2)),
                )
            with timer("Loading TCVis", log=False):
                tcvis = load_tcvis(tile.odc.geobox, self.tcvis_dir)
            with timer("Preprocessing", log=False):
                tile = preprocess_v2(
                    tile,
                    arcticdem,
                    tcvis,
                    self.tpi_outer_radius,
                    self.tpi_inner_radius,
                    self.device,
                )
            with timer("Segmenting", log=False):
                tile = ensemble.segment_tile(
                    tile,
                    patch_size=self.patch_size,
                    overlap=self.overlap,
                    batch_size=self.batch_size,
                    reflection=self.reflection,
                    keep_inputs=self.write_model_outputs,
                )
            with timer("Postprocessing", log=False):
                tile = prepare_export(
                    tile,
                    bin_threshold=self.binarization_threshold,
                    mask_erosion_size=self.mask_erosion_size,
                    min_object_size=self.min_object_size,
                    quality_level=self.quality_level,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    device=self.device,
                    edge_erosion_size=self.edge_erosion_size,
                )

            export_metadata = self._result_metadata(tilekey)

            with timer("Exporting", log=False):
                export_tile(
                    tile,
                    outpath,
                    bands=self.export_bands,
                    ensemble_subsets=models.keys() if self.write_model_outputs else [],
                    metadata=export_metadata,
                )

            n_tiles += 1
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "success",
                    "error": None,
                }
            )
            logger.info(f"Processed sample {i + 1} of {len(tileinfo)} '{tilekey}' ({tile_id=}).")
        except KeyboardInterrupt:
            logger.warning("Keyboard interrupt detected.\nExiting...")
            raise KeyboardInterrupt
        except Exception as e:
            logger.warning(f"Could not process '{tilekey}' ({tile_id=}).\nSkipping...")
            logger.exception(e)
            results.append(
                {
                    "tile_id": tile_id,
                    "output_path": str(outpath.resolve()),
                    "status": "failed",
                    "error": str(e),
                }
            )
        finally:
            if len(results) > 0:
                pd.DataFrame(results).to_parquet(self.output_data_dir / f"{current_time}.results.parquet")
            if len(timer.durations) > 0:
                timer.export().to_parquet(self.output_data_dir / f"{current_time}.stopuhr.parquet")
            if len(stopwatch.durations) > 0:
                stopwatch.export().to_parquet(self.output_data_dir / f"{current_time}.stopwatch.parquet")
    else:
        logger.info(f"Processed {n_tiles} tiles to {self.output_data_dir.resolve()}.")
        timer.summary(printer=logger.info)

Sentinel2RayPipeline dataclass

Sentinel2RayPipeline(
    model_files: list[pathlib.Path] = None,
    output_data_dir: pathlib.Path = pathlib.Path(
        "data/output"
    ),
    arcticdem_dir: pathlib.Path = pathlib.Path(
        "data/download/arcticdem"
    ),
    tcvis_dir: pathlib.Path = pathlib.Path(
        "data/download/tcvis"
    ),
    num_cpus: int = 1,
    devices: list[int] | None = None,
    ee_project: str | None = None,
    ee_use_highvolume: bool = True,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    patch_size: int = 1024,
    overlap: int = 256,
    batch_size: int = 8,
    reflection: int = 0,
    binarization_threshold: float = 0.5,
    mask_erosion_size: int = 10,
    min_object_size: int = 32,
    quality_level: int
    | typing.Literal[
        "high_quality", "low_quality", "none"
    ] = 1,
    export_bands: list[str] = (
        lambda: [
            "probabilities",
            "binarized",
            "polygonized",
            "extent",
            "thumbnail",
        ]
    )(),
    write_model_outputs: bool = False,
    overwrite: bool = False,
    sentinel2_dir: pathlib.Path = pathlib.Path(
        "data/input/sentinel2"
    ),
    image_ids: list = None,
)

Bases: darts.pipelines.ray_v2._BaseRayPipeline

Pipeline for Sentinel 2 data.

Parameters:

  • sentinel2_dir (pathlib.Path, default: pathlib.Path('data/input/sentinel2') ) –

    The directory containing the Sentinel 2 scenes. Defaults to Path("data/input/sentinel2").

  • image_ids (list, default: None ) –

    The list of image ids to process. If None, all images in the directory will be processed. Defaults to None.

  • model_files (pathlib.Path | list[pathlib.Path], default: None ) –

    The path to the models to use for segmentation. Can also be a single Path to only use one model. This implies write_model_outputs=False If a list is provided, will use an ensemble of the models.

  • output_data_dir (pathlib.Path, default: pathlib.Path('data/output') ) –

    The "output" directory. Defaults to Path("data/output").

  • arcticdem_dir (pathlib.Path, default: pathlib.Path('data/download/arcticdem') ) –

    The directory containing the ArcticDEM data (the datacube and the extent files). Will be created and downloaded if it does not exist. Defaults to Path("data/download/arcticdem").

  • tcvis_dir (pathlib.Path, default: pathlib.Path('data/download/tcvis') ) –

    The directory containing the TCVis data. Defaults to Path("data/download/tcvis").

  • device (typing.Literal['cuda', 'cpu'] | int) –

    The device to run the model on. If "cuda" take the first device (0), if int take the specified device. If "auto" try to automatically select a free GPU (<50% memory usage). Defaults to "cuda" if available, else "cpu".

  • ee_project (str, default: None ) –

    The Earth Engine project ID or number to use. May be omitted if project is defined within persistent API credentials obtained via earthengine authenticate.

  • ee_use_highvolume (bool, default: True ) –

    Whether to use the high volume server (https://earthengine-highvolume.googleapis.com).

  • tpi_outer_radius (int, default: 100 ) –

    The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

  • tpi_inner_radius (int, default: 0 ) –

    The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

  • patch_size (int, default: 1024 ) –

    The patch size to use for inference. Defaults to 1024.

  • overlap (int, default: 256 ) –

    The overlap to use for inference. Defaults to 16.

  • batch_size (int, default: 8 ) –

    The batch size to use for inference. Defaults to 8.

  • reflection (int, default: 0 ) –

    The reflection padding to use for inference. Defaults to 0.

  • binarization_threshold (float, default: 0.5 ) –

    The threshold to binarize the probabilities. Defaults to 0.5.

  • mask_erosion_size (int, default: 10 ) –

    The size of the disk to use for mask erosion and the edge-cropping. Defaults to 10.

  • min_object_size (int, default: 32 ) –

    The minimum object size to keep in pixel. Defaults to 32.

  • quality_level (int | typing.Literal['high_quality', 'low_quality', 'none'], default: 1 ) –

    The quality level to use for the segmentation. Can also be an int. In this case 0="none" 1="low_quality" 2="high_quality". Defaults to 1.

  • export_bands (list[str], default: (lambda: ['probabilities', 'binarized', 'polygonized', 'extent', 'thumbnail'])() ) –

    The bands to export. Can be a list of "probabilities", "binarized", "polygonized", "extent", "thumbnail", "optical", "dem", "tcvis" or concrete band-names. Defaults to ["probabilities", "binarized", "polygonized", "extent", "thumbnail"].

  • write_model_outputs (bool, default: False ) –

    Also save the model outputs, not only the ensemble result. Defaults to False.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing files. Defaults to False.

arcticdem_dir class-attribute instance-attribute

arcticdem_dir: pathlib.Path = pathlib.Path(
    "data/download/arcticdem"
)

batch_size class-attribute instance-attribute

batch_size: int = 8

binarization_threshold class-attribute instance-attribute

binarization_threshold: float = 0.5

devices class-attribute instance-attribute

devices: list[int] | None = None

ee_project class-attribute instance-attribute

ee_project: str | None = None

ee_use_highvolume class-attribute instance-attribute

ee_use_highvolume: bool = True

export_bands class-attribute instance-attribute

export_bands: list[str] = dataclasses.field(
    default_factory=lambda: [
        "probabilities",
        "binarized",
        "polygonized",
        "extent",
        "thumbnail",
    ]
)

image_ids class-attribute instance-attribute

image_ids: list = None

mask_erosion_size class-attribute instance-attribute

mask_erosion_size: int = 10

min_object_size class-attribute instance-attribute

min_object_size: int = 32

model_files class-attribute instance-attribute

model_files: list[pathlib.Path] = None

num_cpus class-attribute instance-attribute

num_cpus: int = 1

output_data_dir class-attribute instance-attribute

output_data_dir: pathlib.Path = pathlib.Path('data/output')

overlap class-attribute instance-attribute

overlap: int = 256

overwrite class-attribute instance-attribute

overwrite: bool = False

patch_size class-attribute instance-attribute

patch_size: int = 1024

quality_level class-attribute instance-attribute

quality_level: (
    int
    | typing.Literal["high_quality", "low_quality", "none"]
) = 1

reflection class-attribute instance-attribute

reflection: int = 0

sentinel2_dir class-attribute instance-attribute

sentinel2_dir: pathlib.Path = pathlib.Path(
    "data/input/sentinel2"
)

tcvis_dir class-attribute instance-attribute

tcvis_dir: pathlib.Path = pathlib.Path(
    "data/download/tcvis"
)

tpi_inner_radius class-attribute instance-attribute

tpi_inner_radius: int = 0

tpi_outer_radius class-attribute instance-attribute

tpi_outer_radius: int = 100

write_model_outputs class-attribute instance-attribute

write_model_outputs: bool = False

cli staticmethod

Run the sequential pipeline for Sentinel 2 data.

Source code in darts/src/darts/pipelines/ray_v2.py
@staticmethod
def cli(*, pipeline: "Sentinel2RayPipeline"):
    """Run the sequential pipeline for Sentinel 2 data."""
    pipeline.run()

run

run()
Source code in darts/src/darts/pipelines/ray_v2.py
def run(self):  # noqa: C901
    if self.model_files is None or len(self.model_files) == 0:
        raise ValueError("No model files provided. Please provide a list of model files.")
    if len(self.export_bands) == 0:
        raise ValueError("No export bands provided. Please provide a list of export bands.")

    current_time = time.strftime("%Y-%m-%d_%H-%M-%S")
    logger.info(f"Starting pipeline at {current_time}.")

    # Storing the configuration as JSON file
    self.output_data_dir.mkdir(parents=True, exist_ok=True)
    with open(self.output_data_dir / f"{current_time}.config.json", "w") as f:
        config = asdict(self)
        # Convert everything to json serializable
        for key, value in config.items():
            if isinstance(value, Path):
                config[key] = str(value.resolve())
            elif isinstance(value, list):
                config[key] = [str(v.resolve()) if isinstance(v, Path) else v for v in value]
        json.dump(config, f)

    if self.devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(str(d) for d in self.devices)
    from darts.utils.cuda import debug_info

    debug_info()

    from darts.utils.earthengine import init_ee

    init_ee(self.ee_project, self.ee_use_highvolume)

    import ray

    ray_context = ray.init(
        num_cpus=self.num_cpus,  # We use one CPU per Ray task
        num_gpus=len(self.devices) if self.devices is not None else None,
    )
    logger.debug(f"Ray initialized with context: {ray_context}")
    logger.info(f"Ray Dashboard URL: {ray_context.dashboard_url}")
    logger.debug(f"Ray cluster resources: {ray.cluster_resources()}")
    logger.debug(f"Ray available resources: {ray.available_resources()}")

    # Initlize ee in every worker
    @ray.remote
    def init_worker():
        init_ee(self.ee_project, self.ee_use_highvolume)

    num_workers = int(ray.cluster_resources().get("CPU", 1))
    logger.info(f"Initializing {num_workers} Ray workers with Earth Engine.")
    ray.get([init_worker.remote() for _ in range(num_workers)])

    import smart_geocubes
    from darts_export import missing_outputs

    from darts.pipelines._ray_wrapper import (
        _export_tile_ray,
        _load_aux,
        _prepare_export_ray,
        _preprocess_ray,
        _RayEnsembleV1,
    )
    from darts.utils.logging import LoggingManager

    # determine models to use
    if isinstance(self.model_files, Path):
        self.model_files = [self.model_files]
        self.write_model_outputs = False
    models = {model_file.stem: model_file for model_file in self.model_files}
    # ray_ensemble = _RayEnsembleV1.remote(models)

    # Create the datacubes if they do not exist
    LoggingManager.apply_logging_handlers("smart_geocubes")
    arcticdem_resolution = self._arcticdem_resolution()
    if arcticdem_resolution == 2:
        accessor = smart_geocubes.ArcticDEM2m(self.arcticdem_dir)
    elif arcticdem_resolution == 10:
        accessor = smart_geocubes.ArcticDEM10m(self.arcticdem_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    accessor = smart_geocubes.TCTrend(self.tcvis_dir)
    if not accessor.created:
        accessor.create(overwrite=False)
    adem_buffer = ceil(self.tpi_outer_radius / arcticdem_resolution * sqrt(2))

    # Get files to process
    tileinfo: list[RayInputDict] = []
    for i, (tilekey, outpath) in enumerate(self._tileinfos()):
        tile_id = self._get_tile_id(tilekey)
        if not self.overwrite:
            mo = missing_outputs(outpath, bands=self.export_bands, ensemble_subsets=models.keys())
            if mo == "none":
                logger.info(f"Tile {tile_id} already processed. Skipping...")
                continue
            if mo == "some":
                logger.warning(
                    f"Tile {tile_id} already processed. Some outputs are missing."
                    " Skipping because overwrite=False..."
                )
                continue
        tileinfo.append({"tilekey": tilekey, "outpath": str(outpath.resolve()), "tile_id": tile_id})
    tileinfo = tileinfo[:10]
    logger.info(f"Found {len(tileinfo)} tiles to process.")

    # Ray data pipeline
    # TODO: setup device stuff correctly
    ds = ray.data.from_items(tileinfo)
    ds = ds.map(self._load_tile, num_cpus=1)
    ds = ds.map(
        _load_aux,
        fn_kwargs={
            "arcticdem_dir": self.arcticdem_dir,
            "arcticdem_resolution": arcticdem_resolution,
            "buffer": adem_buffer,
            "tcvis_dir": self.tcvis_dir,
        },
        num_cpus=1,
    )
    ds = ds.map(
        _preprocess_ray,
        fn_kwargs={
            "tpi_outer_radius": self.tpi_outer_radius,
            "tpi_inner_radius": self.tpi_inner_radius,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
        concurrency=4,
    )
    ds = ds.map(
        _RayEnsembleV1,
        fn_constructor_kwargs={"model_dict": models},
        fn_kwargs={
            "patch_size": self.patch_size,
            "overlap": self.overlap,
            "batch_size": self.batch_size,
            "reflection": self.reflection,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
        num_gpus=0.8,
        concurrency=1,
    )
    ds = ds.map(
        _prepare_export_ray,
        fn_kwargs={
            "binarization_threshold": self.binarization_threshold,
            "mask_erosion_size": self.mask_erosion_size,
            "min_object_size": self.min_object_size,
            "quality_level": self.quality_level,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
            "device": "cuda",  # Ray will handle the device allocation
        },
        num_cpus=1,
        num_gpus=0.1,
    )
    ds = ds.map(
        _export_tile_ray,
        fn_kwargs={
            "export_bands": self.export_bands,
            "models": models,
            "write_model_outputs": self.write_model_outputs,
        },
        num_cpus=1,
    )
    logger.debug(f"Ray dataset: {ds}")
    logger.info("Ray pipeline created. Starting execution...")
    # This should trigger the execution
    ds.write_parquet(f"local://{self.output_data_dir.resolve()!s}/ray_output.parquet")
    logger.info(f"Ray pipeline finished. Output written to {self.output_data_dir.resolve()!s}/ray_output.parquet")