Skip to content

Combined Reference

All references on one page

darts

DARTS processing pipeline.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

darts_acquisition

Acquisition of data from various sources for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

download_admin_files(admin_dir)

Download the admin files for the regions.

Files will be stored under [admin_dir]/adm1.shp and [admin_dir]/adm2.shp.

Parameters:

Name Type Description Default
admin_dir Path

The path to the admin files.

required
Source code in darts-acquisition/src/darts_acquisition/admin.py
def download_admin_files(admin_dir: Path):
    """Download the admin files for the regions.

    Files will be stored under [admin_dir]/adm1.shp and [admin_dir]/adm2.shp.

    Args:
        admin_dir (Path): The path to the admin files.

    """
    tick_fstart = time.perf_counter()

    # Download the admin files
    admin_1_url = "https://github.com/wmgeolab/geoBoundaries/raw/main/releaseData/CGAZ/geoBoundariesCGAZ_ADM1.zip"
    admin_2_url = "https://github.com/wmgeolab/geoBoundaries/raw/main/releaseData/CGAZ/geoBoundariesCGAZ_ADM2.zip"

    admin_dir.mkdir(exist_ok=True, parents=True)

    logger.debug(f"Downloading {admin_1_url} to {admin_dir.resolve()}")
    _download_zip(admin_1_url, admin_dir)

    logger.debug(f"Downloading {admin_2_url} to {admin_dir.resolve()}")
    _download_zip(admin_2_url, admin_dir)

    tick_fend = time.perf_counter()
    logger.info(f"Downloaded admin files in {tick_fend - tick_fstart:.2f} seconds")

load_arcticdem(geobox, data_dir, resolution, buffer=0, persist=True)

Load the ArcticDEM for the given geobox, fetch new data from the STAC server if necessary.

Parameters:

Name Type Description Default
geobox GeoBox

The geobox for which the tile should be loaded.

required
data_dir Path | str

The directory where the ArcticDEM data is stored.

required
resolution Literal[2, 10, 32]

The resolution of the ArcticDEM data in m.

required
buffer int

The buffer around the projected (epsg:3413) geobox in pixels. Defaults to 0.

0
persist bool

If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

True

Returns:

Type Description
Dataset

xr.Dataset: The ArcticDEM tile, with a buffer applied. Note: The buffer is applied in the arcticdem dataset's CRS, hence the orientation might be different. Final dataset is NOT matched to the reference CRS and resolution.

Warning

Geobox must be in a meter based CRS.

Usage

Since the API of the load_arcticdem is based on GeoBox, one can load a specific ROI based on an existing Xarray DataArray:

import xarray as xr
import odc.geo.xr

from darts_aquisition import load_arcticdem

# Assume "optical" is an already loaded s2 based dataarray

arcticdem = load_arcticdem(
    optical.odc.geobox,
    "/path/to/arcticdem-parent-directory",
    resolution=2,
    buffer=ceil(self.tpi_outer_radius / 2 * sqrt(2))
)

# Now we can for example match the resolution and extent of the optical data:
arcticdem = arcticdem.odc.reproject(optical.odc.geobox, resampling="cubic")

The buffer parameter is used to extend the region of interest by a certain amount of pixels. This comes handy when calculating e.g. the Topographic Position Index (TPI), which requires a buffer around the region of interest to remove edge effects.

Source code in darts-acquisition/src/darts_acquisition/arcticdem/datacube.py
def load_arcticdem(
    geobox: GeoBox,
    data_dir: Path | str,
    resolution: RESOLUTIONS,
    buffer: int = 0,
    persist: bool = True,
) -> xr.Dataset:
    """Load the ArcticDEM for the given geobox, fetch new data from the STAC server if necessary.

    Args:
        geobox (GeoBox): The geobox for which the tile should be loaded.
        data_dir (Path | str): The directory where the ArcticDEM data is stored.
        resolution (Literal[2, 10, 32]): The resolution of the ArcticDEM data in m.
        buffer (int, optional): The buffer around the projected (epsg:3413) geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.

    Returns:
        xr.Dataset: The ArcticDEM tile, with a buffer applied.
            Note: The buffer is applied in the arcticdem dataset's CRS, hence the orientation might be different.
            Final dataset is NOT matched to the reference CRS and resolution.

    Warning:
        Geobox must be in a meter based CRS.

    Usage:
        Since the API of the `load_arcticdem` is based on GeoBox, one can load a specific ROI based on an existing Xarray DataArray:

        ```python
        import xarray as xr
        import odc.geo.xr

        from darts_aquisition import load_arcticdem

        # Assume "optical" is an already loaded s2 based dataarray

        arcticdem = load_arcticdem(
            optical.odc.geobox,
            "/path/to/arcticdem-parent-directory",
            resolution=2,
            buffer=ceil(self.tpi_outer_radius / 2 * sqrt(2))
        )

        # Now we can for example match the resolution and extent of the optical data:
        arcticdem = arcticdem.odc.reproject(optical.odc.geobox, resampling="cubic")
        ```

        The `buffer` parameter is used to extend the region of interest by a certain amount of pixels.
        This comes handy when calculating e.g. the Topographic Position Index (TPI), which requires a buffer around the region of interest to remove edge effects.

    """  # noqa: E501
    tick_fstart = time.perf_counter()

    data_dir = Path(data_dir) if isinstance(data_dir, str) else data_dir

    datacube_fpath = data_dir / f"datacube_{resolution}m_v4.1.zarr"
    storage = zarr.storage.FSStore(datacube_fpath)
    logger.debug(f"Getting ArcticDEM tile from {datacube_fpath.resolve()}")

    # ! The geobox must be in a meter based CRS
    logger.debug(f"Found a reference resolution of {geobox.resolution.x}m")

    # Check if the zarr data already exists
    if not datacube_fpath.exists():
        logger.debug(f"Creating a new zarr datacube at {datacube_fpath.resolve()} with {storage=}")
        create_empty_datacube(
            "ArcticDEM Data Cube",
            storage,
            DATA_EXTENT[resolution],
            CHUNK_SIZE,
            DATA_VARS,
            DATA_VARS_META,
            DATA_VARS_ENCODING,
        )

    # Get the adjacent arcticdem tiles
    # Note: We could also use pystac here, but this would result in a slight performance decrease
    # because of the network overhead, hence we use the extent file
    # Download the extent, download if the file does not exist
    extent_fpath = data_dir / f"ArcticDEM_Mosaic_Index_v4_1_{resolution}m.parquet"
    with download_lock:
        if not extent_fpath.exists():
            download_arcticdem_extent(data_dir)
    extent = gpd.read_parquet(extent_fpath)

    # Add a buffer around the geobox to get the adjacent tiles
    reference_geobox = geobox.to_crs("epsg:3413", resolution=resolution).pad(buffer)
    adjacent_tiles = extent[extent.intersects(reference_geobox.extent.geom)]

    # Download the adjacent tiles (if necessary)
    with download_lock:
        procedural_download_datacube(storage, adjacent_tiles)

    # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
    chunks = None if persist else "auto"
    arcticdem_datacube = xr.open_zarr(storage, mask_and_scale=False, chunks=chunks).set_coords("spatial_ref")

    # Get an AOI slice of the datacube
    arcticdem_aoi = arcticdem_datacube.odc.crop(reference_geobox.extent, apply_mask=False)

    # The following code would load the lazy zarr data from disk into memory
    if persist:
        tick_sload = time.perf_counter()
        arcticdem_aoi = arcticdem_aoi.load()
        tick_eload = time.perf_counter()
        logger.debug(f"ArcticDEM AOI loaded from disk in {tick_eload - tick_sload:.2f} seconds")

    # Change dtype of the datamask to uint8 for later reproject_match
    arcticdem_aoi["datamask"] = arcticdem_aoi.datamask.astype("uint8")

    logger.info(
        f"ArcticDEM tile {'loaded' if persist else 'lazy-opened'} in {time.perf_counter() - tick_fstart:.2f} seconds"
    )
    return arcticdem_aoi

load_arcticdem_from_vrt(slope_vrt, elevation_vrt, reference_dataset)

Load ArcticDEM data and reproject it to match the reference dataset.

Parameters:

Name Type Description Default
slope_vrt Path

Path to the ArcticDEM slope VRT file.

required
elevation_vrt Path

Path to the ArcticDEM elevation VRT file.

required
reference_dataset Dataset

The reference dataset to reproject, resampled and cropped the ArcticDEM data to.

required

Returns:

Type Description
Dataset

xr.Dataset: The ArcticDEM data reprojected, resampled and cropped to match the reference dataset.

Source code in darts-acquisition/src/darts_acquisition/arcticdem/vrt.py
def load_arcticdem_from_vrt(slope_vrt: Path, elevation_vrt: Path, reference_dataset: xr.Dataset) -> xr.Dataset:
    """Load ArcticDEM data and reproject it to match the reference dataset.

    Args:
        slope_vrt (Path): Path to the ArcticDEM slope VRT file.
        elevation_vrt (Path): Path to the ArcticDEM elevation VRT file.
        reference_dataset (xr.Dataset): The reference dataset to reproject, resampled and cropped the ArcticDEM data to.

    Returns:
        xr.Dataset: The ArcticDEM data reprojected, resampled and cropped to match the reference dataset.


    """
    start_time = time.time()
    logger.debug(f"Loading ArcticDEM slope from {slope_vrt.resolve()} and elevation from {elevation_vrt.resolve()}")

    slope = load_vrt(slope_vrt, reference_dataset)
    slope: xr.Dataset = (
        slope.assign_attrs({"data_source": "arcticdem", "long_name": "Slope"})
        .rio.write_nodata(float("nan"))
        .astype("float32")
        .to_dataset(name="slope")
    )

    relative_elevation = load_vrt(elevation_vrt, reference_dataset)
    relative_elevation: xr.Dataset = (
        relative_elevation.assign_attrs({"data_source": "arcticdem", "long_name": "Relative Elevation", "units": "m"})
        .fillna(0)
        .rio.write_nodata(0)
        .astype("int16")
        .to_dataset(name="relative_elevation")
    )

    articdem_ds = xr.merge([relative_elevation, slope])
    logger.debug(f"Loaded ArcticDEM data in {time.time() - start_time} seconds.")
    return articdem_ds

load_planet_masks(fpath)

Load the valid and quality data masks from a Planet scene.

Parameters:

Name Type Description Default
fpath str | Path

The file path to the Planet scene from which to derive the masks.

required

Raises:

Type Description
FileNotFoundError

If no matching UDM-2 TIFF file is found in the specified path.

Returns:

Type Description
Dataset

xr.Dataset: A merged xarray Dataset containing two data masks: - 'valid_data_mask': A mask indicating valid (1) and no data (0). - 'quality_data_mask': A mask indicating high quality (1) and low quality (0).

Source code in darts-acquisition/src/darts_acquisition/planet.py
def load_planet_masks(fpath: str | Path) -> xr.Dataset:
    """Load the valid and quality data masks from a Planet scene.

    Args:
        fpath (str | Path): The file path to the Planet scene from which to derive the masks.

    Raises:
        FileNotFoundError: If no matching UDM-2 TIFF file is found in the specified path.

    Returns:
        xr.Dataset: A merged xarray Dataset containing two data masks:
            - 'valid_data_mask': A mask indicating valid (1) and no data (0).
            - 'quality_data_mask': A mask indicating high quality (1) and low quality (0).

    """
    start_time = time.time()

    # Convert to Path object if a string is provided
    fpath = fpath if isinstance(fpath, Path) else Path(fpath)

    logger.debug(f"Loading data masks from {fpath.resolve()}")

    # Get imagepath
    udm_path = next(fpath.glob("*_udm2.tif"), None)
    if not udm_path:
        udm_path = next(fpath.glob("*_udm2_clip.tif"), None)
    if not udm_path:
        raise FileNotFoundError(f"No matching UDM-2 TIFF files found in {fpath.resolve()} (.glob('*_udm2.tif'))")

    # See udm classes here: https://developers.planet.com/docs/data/udm-2/
    da_udm = xr.open_dataarray(udm_path)

    invalids = da_udm.sel(band=8).fillna(0) != 0
    low_quality = da_udm.sel(band=[2, 3, 4, 5, 6]).max(axis=0) == 1
    high_quality = ~low_quality & ~invalids
    qa_ds = xr.Dataset(coords={c: da_udm.coords[c] for c in da_udm.coords})
    qa_ds["quality_data_mask"] = (
        xr.zeros_like(da_udm.sel(band=8)).where(invalids, 0).where(low_quality, 1).where(high_quality, 2)
    )
    qa_ds["quality_data_mask"].attrs = {
        "data_source": "planet",
        "long_name": "Quality data mask",
        "description": "0 = Invalid, 1 = Low Quality, 2 = High Quality",
    }
    logger.debug(f"Loaded data masks in {time.time() - start_time} seconds.")
    return qa_ds

load_planet_scene(fpath)

Load a PlanetScope satellite GeoTIFF file and return it as an xarray datset.

Parameters:

Name Type Description Default
fpath str | Path

The path to the directory containing the TIFF files or a specific path to the TIFF file.

required

Returns:

Type Description
Dataset

xr.Dataset: The loaded dataset

Raises:

Type Description
FileNotFoundError

If no matching TIFF file is found in the specified path.

Source code in darts-acquisition/src/darts_acquisition/planet.py
def load_planet_scene(fpath: str | Path) -> xr.Dataset:
    """Load a PlanetScope satellite GeoTIFF file and return it as an xarray datset.

    Args:
        fpath (str | Path): The path to the directory containing the TIFF files or a specific path to the TIFF file.

    Returns:
        xr.Dataset: The loaded dataset

    Raises:
        FileNotFoundError: If no matching TIFF file is found in the specified path.

    """
    start_time = time.time()

    # Convert to Path object if a string is provided
    fpath = fpath if isinstance(fpath, Path) else Path(fpath)

    # Check if the directory contains a PSOrthoTile or PSScene
    planet_type = parse_planet_type(fpath)
    logger.debug(f"Loading Planet PS {planet_type.capitalize()} from {fpath.resolve()}")

    # Get imagepath
    ps_image = next(fpath.glob("*_SR.tif"), None)
    if not ps_image:
        ps_image = next(fpath.glob("*_SR_clip.tif"), None)
    if not ps_image:
        raise FileNotFoundError(f"No matching TIFF files found in {fpath.resolve()} (.glob('*_SR.tif'))")

    # Define band names and corresponding indices
    planet_da = xr.open_dataarray(ps_image)

    # Create a dataset with the bands
    bands = ["blue", "green", "red", "nir"]
    ds_planet = (
        planet_da.fillna(0).rio.write_nodata(0).astype("uint16").assign_coords({"band": bands}).to_dataset(dim="band")
    )
    for var in ds_planet.variables:
        ds_planet[var].assign_attrs(
            {
                "long_name": f"PLANET {var.capitalize()}",
                "data_source": "planet",
                "planet_type": planet_type,
                "units": "Reflectance",
            }
        )
    ds_planet.attrs = {"tile_id": fpath.parent.stem if planet_type == "orthotile" else fpath.stem}
    logger.debug(f"Loaded Planet scene in {time.time() - start_time} seconds.")
    return ds_planet

load_s2_masks(fpath, reference_geobox)

Load the valid and quality data masks from a Sentinel 2 scene.

Parameters:

Name Type Description Default
fpath str | Path

The path to the directory containing the TIFF files.

required
reference_geobox GeoBox

The reference geobox to reproject, resample and crop the masks data to.

required

Returns:

Type Description
Dataset

xr.Dataset: A merged xarray Dataset containing two data masks: - 'valid_data_mask': A mask indicating valid (1) and no data (0). - 'quality_data_mask': A mask indicating high quality (1) and low quality (0).

Source code in darts-acquisition/src/darts_acquisition/s2.py
def load_s2_masks(fpath: str | Path, reference_geobox: GeoBox) -> xr.Dataset:
    """Load the valid and quality data masks from a Sentinel 2 scene.

    Args:
        fpath (str | Path): The path to the directory containing the TIFF files.
        reference_geobox (GeoBox): The reference geobox to reproject, resample and crop the masks data to.


    Returns:
        xr.Dataset: A merged xarray Dataset containing two data masks:
            - 'valid_data_mask': A mask indicating valid (1) and no data (0).
            - 'quality_data_mask': A mask indicating high quality (1) and low quality (0).

    """
    start_time = time.time()

    # Convert to Path object if a string is provided
    fpath = fpath if isinstance(fpath, Path) else Path(fpath)

    logger.debug(f"Loading data masks from {fpath.resolve()}")

    # TODO: SCL band in SR file
    try:
        scl_path = next(fpath.glob("*_SCL*.tif"))
    except StopIteration:
        logger.warning("Found no data quality mask (SCL). No masking will occur.")
        valid_data_mask = (odc.geo.xr.xr_zeros(reference_geobox, dtype="uint8") + 1).to_dataset(name="valid_data_mask")
        valid_data_mask.attrs = {"data_source": "s2", "long_name": "Valid Data Mask"}
        quality_data_mask = odc.geo.xr.xr_zeros(reference_geobox, dtype="uint8").to_dataset(name="quality_data_mask")
        quality_data_mask.attrs = {"data_source": "s2", "long_name": "Quality Data Mask"}
        qa_ds = xr.merge([valid_data_mask, quality_data_mask])
        return qa_ds

    # See scene classes here: https://custom-scripts.sentinel-hub.com/custom-scripts/sentinel-2/scene-classification/
    da_scl = xr.open_dataarray(scl_path)

    da_scl = da_scl.odc.reproject(reference_geobox, sampling="nearest")

    # Match crs
    da_scl = da_scl.rio.write_crs(reference_geobox.crs)

    # TODO: new masking method
    qa_ds = xr.Dataset(coords={c: da_scl.coords[c] for c in da_scl.coords})
    qa_ds = da_scl.sel(band=1).fillna(0)
    qa_ds = convert_masks(qa_ds)

    logger.debug(f"Loaded data masks in {time.time() - start_time} seconds.")
    return qa_ds

load_s2_scene(fpath)

Load a Sentinel 2 satellite GeoTIFF file and return it as an xarray datset.

Parameters:

Name Type Description Default
fpath str | Path

The path to the directory containing the TIFF files.

required

Returns:

Type Description
Dataset

xr.Dataset: The loaded dataset

Raises:

Type Description
FileNotFoundError

If no matching TIFF file is found in the specified path.

Source code in darts-acquisition/src/darts_acquisition/s2.py
def load_s2_scene(fpath: str | Path) -> xr.Dataset:
    """Load a Sentinel 2 satellite GeoTIFF file and return it as an xarray datset.

    Args:
        fpath (str | Path): The path to the directory containing the TIFF files.

    Returns:
        xr.Dataset: The loaded dataset

    Raises:
        FileNotFoundError: If no matching TIFF file is found in the specified path.

    """
    start_time = time.time()

    # Convert to Path object if a string is provided
    fpath = fpath if isinstance(fpath, Path) else Path(fpath)

    logger.debug(f"Loading Sentinel 2 scene from {fpath.resolve()}")

    # Get imagepath
    try:
        s2_image = next(fpath.glob("*_SR*.tif"))
    except StopIteration:
        raise FileNotFoundError(f"No matching TIFF files found in {fpath.resolve()} (.glob('*_SR*.tif'))")

    # Define band names and corresponding indices
    s2_da = xr.open_dataarray(s2_image)

    # Create a dataset with the bands
    bands = ["blue", "green", "red", "nir"]
    ds_s2 = s2_da.fillna(0).rio.write_nodata(0).astype("uint16").assign_coords({"band": bands}).to_dataset(dim="band")

    for var in ds_s2.data_vars:
        ds_s2[var].assign_attrs(
            {"data_source": "s2", "long_name": f"Sentinel 2 {var.capitalize()}", "units": "Reflectance"}
        )

    planet_crop_id, s2_tile_id, tile_id = parse_s2_tile_id(fpath)
    ds_s2.attrs["planet_crop_id"] = planet_crop_id
    ds_s2.attrs["s2_tile_id"] = s2_tile_id
    ds_s2.attrs["tile_id"] = tile_id
    logger.debug(f"Loaded Sentinel 2 scene in {time.time() - start_time} seconds.")
    return ds_s2

load_tcvis(geobox, data_dir, buffer=0, persist=True)

Load the TCVIS for the given geobox, fetch new data from GEE if necessary.

Parameters:

Name Type Description Default
geobox GeoBox

The geobox to load the data for.

required
data_dir Path | str

The directory to store the downloaded data for faster access for consecutive calls.

required
buffer int

The buffer around the geobox in pixels. Defaults to 0.

0
persist bool

If the data should be persisted in memory. If not, this will return a Dask backed Dataset. Defaults to True.

True

Returns:

Type Description
Dataset

xr.Dataset: The TCVIS dataset.

Usage

Since the API of the load_tcvis is based on GeoBox, one can load a specific ROI based on an existing Xarray DataArray:

import xarray as xr
import odc.geo.xr

from darts_aquisition import load_tcvis

# Assume "optical" is an already loaded s2 based dataarray

tcvis = load_tcvis(
    optical.odc.geobox,
    "/path/to/tcvis-parent-directory",
)

# Now we can for example match the resolution and extent of the optical data:
tcvis = tcvis.odc.reproject(optical.odc.geobox, resampling="cubic")
Source code in darts-acquisition/src/darts_acquisition/tcvis.py
def load_tcvis(
    geobox: GeoBox,
    data_dir: Path | str,
    buffer: int = 0,
    persist: bool = True,
) -> xr.Dataset:
    """Load the TCVIS for the given geobox, fetch new data from GEE if necessary.

    Args:
        geobox (GeoBox): The geobox to load the data for.
        data_dir (Path | str): The directory to store the downloaded data for faster access for consecutive calls.
        buffer (int, optional): The buffer around the geobox in pixels. Defaults to 0.
        persist (bool, optional): If the data should be persisted in memory.
            If not, this will return a Dask backed Dataset. Defaults to True.

    Returns:
        xr.Dataset: The TCVIS dataset.

    Usage:
        Since the API of the `load_tcvis` is based on GeoBox, one can load a specific ROI based on an existing Xarray DataArray:

        ```python
        import xarray as xr
        import odc.geo.xr

        from darts_aquisition import load_tcvis

        # Assume "optical" is an already loaded s2 based dataarray

        tcvis = load_tcvis(
            optical.odc.geobox,
            "/path/to/tcvis-parent-directory",
        )

        # Now we can for example match the resolution and extent of the optical data:
        tcvis = tcvis.odc.reproject(optical.odc.geobox, resampling="cubic")
        ```

    """  # noqa: E501
    tick_fstart = time.perf_counter()

    data_dir = Path(data_dir) if isinstance(data_dir, str) else data_dir

    datacube_fpath = data_dir / "tcvis_2000-2019.zarr"
    storage = zarr.storage.FSStore(datacube_fpath)
    logger.debug(f"Loading TCVis from {datacube_fpath.resolve()}")

    if not datacube_fpath.exists():
        logger.debug(f"Creating a new zarr datacube at {datacube_fpath.resolve()} with {storage=}")
        create_empty_datacube(
            title="Landsat Trends TCVIS 2000-2019",
            storage=storage,
            geobox=DATA_EXTENT,
            chunk_size=CHUNK_SIZE,
            data_vars=DATA_VARS,
            meta=DATA_VARS_META,
            var_encoding=DATA_VARS_ENCODING,
        )

    # Download the adjacent tiles (if necessary)
    reference_geobox = geobox.to_crs("epsg:4326", resolution=DATA_EXTENT.resolution.x).pad(buffer)
    with download_lock:
        procedural_download_datacube(storage, reference_geobox)

    # Load the datacube and set the spatial_ref since it is set as a coordinate within the zarr format
    chunks = None if persist else "auto"
    tcvis_datacube = xr.open_zarr(storage, mask_and_scale=False, chunks=chunks).set_coords("spatial_ref")

    # Get an AOI slice of the datacube
    tcvis_aoi = tcvis_datacube.odc.crop(reference_geobox.extent, apply_mask=False)

    # The following code would load the lazy zarr data from disk into memory
    if persist:
        tick_sload = time.perf_counter()
        tcvis_aoi = tcvis_aoi.load()
        tick_eload = time.perf_counter()
        logger.debug(f"TCVIS AOI loaded from disk in {tick_eload - tick_sload:.2f} seconds")

    logger.info(
        f"TCVIS tile {'loaded' if persist else 'lazy-opened'} in {time.perf_counter() - tick_fstart:.2f} seconds"
    )
    return tcvis_aoi

darts_ensemble

Inference and model ensembling for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

darts_export

Dataset export for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

export_arcticdem_datamask(tile, out_dir)

Export the arcticdem data mask as a GeoTIFF file.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_arcticdem_datamask(tile: xr.Dataset, out_dir: Path):
    """Export the arcticdem data mask as a GeoTIFF file.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "arcticdem_data_mask.tif"
    with stopuhr(f"Exporting arcticdem data mask to {fpath}", logger.debug):
        tile["arcticdem_data_mask"].rio.to_raster(fpath, driver="GTiff", compress="LZW")

export_binarized(tile, out_dir, export_ensemble_inputs=False, tags={})

Export the binarized segmentation layer to a file.

If export_ensemble_inputs is set to True and the ensemble used at least two models for inference, the binarized segmentation of the models will be written as individual files as well.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
export_ensemble_inputs bool

Also save the model outputs, not only the ensemble result. Only applies if the inference result is an ensemble result and has at least two inputs. Defaults to False.

False
tags dict

optional GeoTIFF metadata to be written. Defaults to no additional metadata.

{}
Source code in darts-export/src/darts_export/inference.py
def export_binarized(tile: xr.Dataset, out_dir: Path, export_ensemble_inputs: bool = False, tags: dict = {}):
    """Export the binarized segmentation layer to a file.

    If `export_ensemble_inputs` is set to True and the ensemble used at least two models for inference,
    the binarized segmentation of the models will be written as individual files as well.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.
        export_ensemble_inputs (bool, optional): Also save the model outputs, not only the ensemble result.
            Only applies if the inference result is an ensemble result and has at least two inputs.
            Defaults to False.
        tags (dict, optional): optional GeoTIFF metadata to be written. Defaults to no additional metadata.

    """
    subset_names = _get_subset_names(tile)
    if export_ensemble_inputs and len(subset_names) > 1:
        for subset in _get_subset_names(tile):
            tick_estart = time.perf_counter()
            layer_name = f"binarized_segmentation-{subset}"
            fpath = out_dir / f"{layer_name}.tif"
            tile[layer_name].rio.to_raster(fpath, driver="GTiff", tags=tags, compress="LZW")
            tick_eend = time.perf_counter()
            logger.debug(f"Exported binarized segmentation for {subset} to {fpath} in {tick_eend - tick_estart:.2f}s")

    fpath = out_dir / "binarized.tif"
    with stopuhr(f"Exporting binarized segmentation to {fpath}", logger.debug):
        tile["binarized_segmentation"].rio.to_raster(fpath, driver="GTiff", tags=tags, compress="LZW")

export_datamask(tile, out_dir)

Export the data mask as a GeoTIFF file.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_datamask(tile: xr.Dataset, out_dir: Path):
    """Export the data mask as a GeoTIFF file.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "data_mask.tif"
    with stopuhr(f"Exporting data mask to {fpath}", logger.debug):
        tile["quality_data_mask"].rio.to_raster(fpath, driver="GTiff", compress="LZW")

export_dem(tile, out_dir)

Export the DEM data as a GeoTIFF file.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_dem(tile: xr.Dataset, out_dir: Path):
    """Export the DEM data as a GeoTIFF file.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "dem.tif"
    with stopuhr(f"Exporting DEM data to {fpath}", logger.debug):
        tile[["slope", "relative_elevation"]].rio.to_raster(fpath, driver="GTiff", compress="LZW")

export_extent(tile, out_dir)

Export the extent of the prediction as a vector dataset in GeoPackage and GeoParquet format.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_extent(tile: xr.Dataset, out_dir: Path):
    """Export the extent of the prediction as a vector dataset in GeoPackage and GeoParquet format.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath_gpkg = out_dir / "prediction_extent.gpkg"
    fpath_parquet = out_dir / "prediction_extent.parquet"
    with stopuhr(f"Exporting extent to {fpath_gpkg} and {fpath_parquet}", logger.debug):
        polygon_gdf = vectorization.vectorize(tile, "quality_data_mask", minimum_mapping_unit=0)
        polygon_gdf.to_file(fpath_gpkg, layer="prediction_extent")
        polygon_gdf.to_parquet(fpath_parquet)

export_optical(tile, out_dir)

Export the optical data as a GeoTIFF file.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_optical(tile: xr.Dataset, out_dir: Path):
    """Export the optical data as a GeoTIFF file.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "optical.tif"
    with stopuhr(f"Exporting optical data to {fpath}", logger.debug):
        tile[["red", "green", "blue", "nir"]].rio.to_raster(fpath, driver="GTiff", compress="LZW")

export_polygonized(tile, out_dir, export_ensemble_inputs=False, minimum_mapping_unit=32)

Export the binarized probabilities as a vector dataset in GeoPackage and GeoParquet format.

If export_ensemble_inputs is set to True and the ensemble used at least two models for inference, the vectorized binarized segmentation of the models will be written as individual files as well.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
export_ensemble_inputs bool

Also save the model outputs, not only the ensemble result. Only applies if the inference result is an ensemble result and has at least two inputs. Defaults to False.

False
minimum_mapping_unit int

segments covering less pixel are removed. Defaults to 32.

32
Source code in darts-export/src/darts_export/inference.py
def export_polygonized(
    tile: xr.Dataset, out_dir: Path, export_ensemble_inputs: bool = False, minimum_mapping_unit: int = 32
):
    """Export the binarized probabilities as a vector dataset in GeoPackage and GeoParquet format.

    If `export_ensemble_inputs` is set to True and the ensemble used at least two models for inference,
    the vectorized binarized segmentation of the models will be written as individual files as well.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.
        export_ensemble_inputs (bool, optional): Also save the model outputs, not only the ensemble result.
            Only applies if the inference result is an ensemble result and has at least two inputs.
            Defaults to False.
        minimum_mapping_unit (int, optional): segments covering less pixel are removed. Defaults to 32.

    """
    subset_names = _get_subset_names(tile)
    if export_ensemble_inputs and len(subset_names) > 1:
        for subset in _get_subset_names(tile):
            tick_estart = time.perf_counter()
            layer_name = f"binarized_segmentation-{subset}"
            fpath_gpkg = out_dir / f"prediction_segments-{subset}.gpkg"
            fpath_parquet = out_dir / f"prediction_segments-{subset}.parquet"
            polygon_gdf = vectorization.vectorize(tile, layer_name, minimum_mapping_unit=minimum_mapping_unit)
            polygon_gdf.to_file(fpath_gpkg, layer=f"prediction_segments-{subset}")
            polygon_gdf.to_parquet(fpath_parquet)
            tick_eend = time.perf_counter()
            logger.debug(
                f"Exported binarized segmentation for {subset} to {fpath_gpkg} and {fpath_parquet}"
                f" in {tick_eend - tick_estart:.2f}s"
            )

    fpath_gpkg = out_dir / "prediction_segments.gpkg"
    fpath_parquet = out_dir / "prediction_segments.parquet"
    with stopuhr(f"Exporting binarized segmentation to {fpath_gpkg} and {fpath_parquet}", logger.debug):
        polygon_gdf = vectorization.vectorize(tile, "binarized_segmentation", minimum_mapping_unit=minimum_mapping_unit)
        polygon_gdf.to_file(fpath_gpkg, layer="prediction_segments")
        polygon_gdf.to_parquet(fpath_parquet)

export_probabilities(tile, out_dir, export_ensemble_inputs=False, tags={})

Export the probabilities layer to a file.

If export_ensemble_inputs is set to True and the ensemble used at least two models for inference, the probabilities of the models will be written as individual files as well.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
export_ensemble_inputs bool

Also save the model outputs, not only the ensemble result. Only applies if the inference result is an ensemble result and has at least two inputs. Defaults to False.

False
tags dict

optional GeoTIFF metadata to be written. Defaults to no additional metadata.

{}
Source code in darts-export/src/darts_export/inference.py
def export_probabilities(tile: xr.Dataset, out_dir: Path, export_ensemble_inputs: bool = False, tags: dict = {}):
    """Export the probabilities layer to a file.

    If `export_ensemble_inputs` is set to True and the ensemble used at least two models for inference,
    the probabilities of the models will be written as individual files as well.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.
        export_ensemble_inputs (bool, optional): Also save the model outputs, not only the ensemble result.
            Only applies if the inference result is an ensemble result and has at least two inputs.
            Defaults to False.
        tags (dict, optional): optional GeoTIFF metadata to be written. Defaults to no additional metadata.

    """
    subset_names = _get_subset_names(tile)
    if export_ensemble_inputs and len(subset_names) > 1:
        for subset in _get_subset_names(tile):
            tick_estart = time.perf_counter()
            layer_name = f"probabilities-{subset}"
            fpath = out_dir / f"{layer_name}.tif"
            tile[layer_name].rio.to_raster(fpath, driver="GTiff", tags=tags, compress="LZW")
            tick_eend = time.perf_counter()
            logger.debug(f"Exported probabilities for {subset} to {fpath} in {tick_eend - tick_estart:.2f}s")

    fpath = out_dir / "probabilities.tif"
    with stopuhr(f"Exporting probabilities to {fpath}", logger.debug):
        tile["probabilities"].rio.to_raster(fpath, driver="GTiff", tags=tags, compress="LZW")

export_tcvis(tile, out_dir)

Export the TCVIS data as a GeoTIFF file.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_tcvis(tile: xr.Dataset, out_dir: Path):
    """Export the TCVIS data as a GeoTIFF file.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "tcvis.tif"
    with stopuhr(f"Exporting TCVIS data to {fpath}", logger.debug):
        tile[["tc_brightness", "tc_greenness", "tc_wetness"]].rio.to_raster(fpath, driver="GTiff", compress="LZW")

export_thumbnail(tile, out_dir)

Export a thumbnail of the optical data.

Parameters:

Name Type Description Default
tile Dataset

The inference result.

required
out_dir Path

The path where to export to.

required
Source code in darts-export/src/darts_export/inference.py
def export_thumbnail(tile: xr.Dataset, out_dir: Path):
    """Export a thumbnail of the optical data.

    Args:
        tile (xr.Dataset): The inference result.
        out_dir (Path): The path where to export to.

    """
    fpath = out_dir / "thumbnail.jpg"
    with stopuhr(f"Exporting thumbnail to {fpath}", logger.debug):
        fig = thumbnail(tile)
        fig.savefig(fpath)
        fig.clear()

darts_postprocessing

Postprocessing steps for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

darts_preprocessing

Data preprocessing and feature engineering for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

preprocess_legacy(ds_optical, ds_arcticdem, ds_tcvis)

Preprocess optical data with legacy (DARTS v1) preprocessing steps.

The processing steps are: - Calculate NDVI - Merge everything into a single ds.

Parameters:

Name Type Description Default
ds_optical Dataset

The Planet scene optical data or Sentinel 2 scene optical data.

required
ds_arcticdem Dataset

The ArcticDEM data.

required
ds_tcvis Dataset

The TCVIS data.

required

Returns:

Type Description
Dataset

xr.Dataset: The preprocessed dataset.

Source code in darts-preprocessing/src/darts_preprocessing/preprocess.py
def preprocess_legacy(
    ds_optical: xr.Dataset,
    ds_arcticdem: xr.Dataset,
    ds_tcvis: xr.Dataset,
) -> xr.Dataset:
    """Preprocess optical data with legacy (DARTS v1) preprocessing steps.

    The processing steps are:
    - Calculate NDVI
    - Merge everything into a single ds.

    Args:
        ds_optical (xr.Dataset): The Planet scene optical data or Sentinel 2 scene optical data.
        ds_arcticdem (xr.Dataset): The ArcticDEM data.
        ds_tcvis (xr.Dataset): The TCVIS data.

    Returns:
        xr.Dataset: The preprocessed dataset.

    """
    # Calculate NDVI
    ds_ndvi = calculate_ndvi(ds_optical)

    # Reproject TCVIS to optical data
    ds_tcvis = ds_tcvis.odc.reproject(ds_optical.odc.geobox, resampling="cubic")

    # Since this function expects the arcticdem to be loaded from a VRT, which already contains slope and tpi,
    # we dont need to calculate them here

    # merge to final dataset
    ds_merged = xr.merge([ds_optical, ds_ndvi, ds_arcticdem, ds_tcvis])

    return ds_merged

preprocess_legacy_fast(ds_merged, ds_arcticdem, ds_tcvis, tpi_outer_radius=100, tpi_inner_radius=0, device=DEFAULT_DEVICE)

Preprocess optical data with legacy (DARTS v1) preprocessing steps, but with new data concepts.

The processing steps are: - Calculate NDVI - Calculate slope and relative elevation from ArcticDEM - Merge everything into a single ds.

The main difference to preprocess_legacy is the new data concept of the arcticdem. Instead of using already preprocessed arcticdem data which are loaded from a VRT, this step expects the raw arcticdem data and calculates slope and relative elevation on the fly.

Parameters:

Name Type Description Default
ds_merged Dataset

The Planet scene optical data or Sentinel 2 scene optical dataset including data_masks.

required
ds_arcticdem Dataset

The ArcticDEM dataset.

required
ds_tcvis Dataset

The TCVIS dataset.

required
tpi_outer_radius int

The outer radius of the annulus kernel for the tpi calculation in m. Defaults to 100m.

100
tpi_inner_radius int

The inner radius of the annulus kernel for the tpi calculation in m. Defaults to 0.

0
device Literal['cuda', 'cpu'] | int

The device to run the tpi and slope calculations on. If "cuda" take the first device (0), if int take the specified device. Defaults to "cuda" if cuda is available, else "cpu".

DEFAULT_DEVICE

Returns:

Type Description
Dataset

xr.Dataset: The preprocessed dataset.

Source code in darts-preprocessing/src/darts_preprocessing/preprocess.py
def preprocess_legacy_fast(
    ds_merged: xr.Dataset,
    ds_arcticdem: xr.Dataset,
    ds_tcvis: xr.Dataset,
    tpi_outer_radius: int = 100,
    tpi_inner_radius: int = 0,
    device: Literal["cuda", "cpu"] | int = DEFAULT_DEVICE,
) -> xr.Dataset:
    """Preprocess optical data with legacy (DARTS v1) preprocessing steps, but with new data concepts.

    The processing steps are:
    - Calculate NDVI
    - Calculate slope and relative elevation from ArcticDEM
    - Merge everything into a single ds.

    The main difference to preprocess_legacy is the new data concept of the arcticdem.
    Instead of using already preprocessed arcticdem data which are loaded from a VRT, this step expects the raw
    arcticdem data and calculates slope and relative elevation on the fly.

    Args:
        ds_merged (xr.Dataset): The Planet scene optical data or Sentinel 2 scene optical dataset including data_masks.
        ds_arcticdem (xr.Dataset): The ArcticDEM dataset.
        ds_tcvis (xr.Dataset): The TCVIS dataset.
        tpi_outer_radius (int, optional): The outer radius of the annulus kernel for the tpi calculation
            in m. Defaults to 100m.
        tpi_inner_radius (int, optional): The inner radius of the annulus kernel for the tpi calculation
            in m. Defaults to 0.
        device (Literal["cuda", "cpu"] | int, optional): The device to run the tpi and slope calculations on.
            If "cuda" take the first device (0), if int take the specified device.
            Defaults to "cuda" if cuda is available, else "cpu".

    Returns:
        xr.Dataset: The preprocessed dataset.

    """
    tick_fstart = time.perf_counter()
    logger.debug("Starting fast v1 preprocessing.")

    # Calculate NDVI
    ds_merged["ndvi"] = calculate_ndvi(ds_merged).ndvi

    # Reproject TCVIS to optical data
    tick_sproj = time.perf_counter()
    ds_tcvis = ds_tcvis.odc.reproject(ds_merged.odc.geobox, resampling="cubic")
    tick_eproj = time.perf_counter()
    logger.debug(f"Reprojection of TCVIS done in {tick_eproj - tick_sproj:.2f} seconds.")

    ds_merged["tc_brightness"] = ds_tcvis.tc_brightness
    ds_merged["tc_greenness"] = ds_tcvis.tc_greenness
    ds_merged["tc_wetness"] = ds_tcvis.tc_wetness

    # Calculate TPI and slope from ArcticDEM
    tick_sproj = time.perf_counter()
    ds_arcticdem = ds_arcticdem.odc.reproject(ds_merged.odc.geobox.buffered(tpi_outer_radius), resampling="cubic")
    tick_eproj = time.perf_counter()
    logger.debug(f"Reprojection of ArcticDEM done in {tick_eproj - tick_sproj:.2f} seconds.")

    ds_arcticdem = preprocess_legacy_arcticdem_fast(ds_arcticdem, tpi_outer_radius, tpi_inner_radius, device)
    ds_arcticdem = ds_arcticdem.odc.crop(ds_merged.odc.geobox.extent)
    # For some reason, we need to reindex, because the reproject + crop of the arcticdem sometimes results
    # in floating point errors. These error are at the order of 1e-10, hence, way below millimeter precision.
    ds_arcticdem = ds_arcticdem.reindex_like(ds_merged)

    ds_merged["dem"] = ds_arcticdem.dem
    ds_merged["relative_elevation"] = ds_arcticdem.tpi
    ds_merged["slope"] = ds_arcticdem.slope
    ds_merged["arcticdem_data_mask"] = ds_arcticdem.datamask

    # Update datamask with arcticdem mask
    # with xr.set_options(keep_attrs=True):
    #     ds_merged["quality_data_mask"] = ds_merged.quality_data_mask * ds_arcticdem.datamask
    # ds_merged.quality_data_mask.attrs["data_source"] += " + ArcticDEM"

    tick_fend = time.perf_counter()
    logger.info(f"Preprocessing done in {tick_fend - tick_fstart:.2f} seconds.")
    return ds_merged

darts_segmentation

Image segmentation of thaw-slumps for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

SMPSegmenter

An actor that keeps a model as its state and segments tiles.

Source code in darts-segmentation/src/darts_segmentation/segment.py
class SMPSegmenter:
    """An actor that keeps a model as its state and segments tiles."""

    config: SMPSegmenterConfig
    model: nn.Module
    device: torch.device

    def __init__(self, model_checkpoint: Path | str, device: torch.device = DEFAULT_DEVICE):
        """Initialize the segmenter.

        Args:
            model_checkpoint (Path): The path to the model checkpoint.
            device (torch.device): The device to run the model on.
                Defaults to torch.device("cuda") if cuda is available, else torch.device("cpu").

        """
        model_checkpoint = model_checkpoint if isinstance(model_checkpoint, Path) else Path(model_checkpoint)
        self.device = device
        ckpt = torch.load(model_checkpoint, map_location=self.device)
        self.config = validate_config(ckpt["config"])
        # Overwrite the encoder weights with None, because we load our own
        self.config["model"] |= {"encoder_weights": None}
        self.model = smp.create_model(**self.config["model"])
        self.model.to(self.device)
        self.model.load_state_dict(ckpt["statedict"])
        self.model.eval()

        logger.debug(
            f"Successfully loaded model from {model_checkpoint.resolve()} with inputs: "
            f"{self.config['input_combination']}"
        )

    def tile2tensor(self, tile: xr.Dataset) -> torch.Tensor:
        """Take a tile and convert it to a pytorch tensor.

        Respects the input combination from the config.

        Returns:
            A torch tensor for the full tile consisting of the bands specified in `self.band_combination`.

        """
        bands = []
        # e.g. input_combination: ["red", "green", "blue", "relative_elevation", ...]
        # tile.data_vars: ["red", "green", "blue", "relative_elevation", ...]

        for feature_name in self.config["input_combination"]:
            norm = self.config["norm_factors"][feature_name]
            band_data = tile[feature_name]
            # Normalize the band data
            band_data = band_data * norm
            bands.append(torch.from_numpy(band_data.to_numpy().astype("float32")))

        return torch.stack(bands, dim=0)

    def tile2tensor_batched(self, tiles: list[xr.Dataset]) -> torch.Tensor:
        """Take a list of tiles and convert them to a pytorch tensor.

        Respects the the input combination from the config.

        Returns:
            A torch tensor for the full tile consisting of the bands specified in `self.band_combination`.

        """
        bands = []
        for feature_name in self.config["input_combination"]:
            norm = self.config["norm_factors"][feature_name]
            for tile in tiles:
                band_data = tile[feature_name]
                # Normalize the band data
                band_data = band_data * norm
                bands.append(torch.from_numpy(band_data.to_numpy().astype("float32")))
        # TODO: Test this
        return torch.stack(bands, dim=0).reshape(len(tiles), len(self.config["input_combination"]), *bands[0].shape)

    def segment_tile(
        self, tile: xr.Dataset, patch_size: int = 1024, overlap: int = 16, batch_size: int = 8, reflection: int = 0
    ) -> xr.Dataset:
        """Run inference on a tile.

        Args:
            tile: The input tile, containing preprocessed, harmonized data.
            patch_size (int): The size of the patches. Defaults to 1024.
            overlap (int): The size of the overlap. Defaults to 16.
            batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
                Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
            reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

        Returns:
            Input tile augmented by a predicted `probabilities` layer with type float32 and range [0, 1].

        """
        # Convert the tile to a tensor
        tensor_tile = self.tile2tensor(tile)

        # Create a batch dimension, because predict expects it
        tensor_tile = tensor_tile.unsqueeze(0)

        probabilities = predict_in_patches(
            self.model, tensor_tile, patch_size, overlap, batch_size, reflection, self.device
        ).squeeze(0)

        # Highly sophisticated DL-based predictor
        # TODO: is there a better way to pass metadata?
        tile["probabilities"] = tile["red"].copy(data=probabilities.cpu().numpy())
        tile["probabilities"].attrs = {
            "long_name": "Probabilities",
        }
        tile["probabilities"] = tile["probabilities"].fillna(float("nan")).rio.write_nodata(float("nan"))

        # Cleanup cuda memory
        del tensor_tile, probabilities
        free_torch()

        return tile

    def segment_tile_batched(
        self,
        tiles: list[xr.Dataset],
        patch_size: int = 1024,
        overlap: int = 16,
        batch_size: int = 8,
        reflection: int = 0,
    ) -> list[xr.Dataset]:
        """Run inference on a list of tiles.

        Args:
            tiles: The input tiles, containing preprocessed, harmonized data.
            patch_size (int): The size of the patches. Defaults to 1024.
            overlap (int): The size of the overlap. Defaults to 16.
            batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
                Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
            reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

        Returns:
            A list of input tiles augmented by a predicted `probabilities` layer with type float32 and range [0, 1].

        """
        # Convert the tiles to tensors
        # TODO: maybe create a batched tile2tensor function?
        # tensor_tiles = [self.tile2tensor(tile).to(self.dev) for tile in tiles]
        tensor_tiles = self.tile2tensor_batched(tiles)

        # Create a batch dimension, because predict expects it
        tensor_tiles = torch.stack(tensor_tiles, dim=0)

        probabilities = predict_in_patches(
            self.model, tensor_tiles, patch_size, overlap, batch_size, reflection, self.device
        )

        # Highly sophisticated DL-based predictor
        for tile, probs in zip(tiles, probabilities):
            # TODO: is there a better way to pass metadata?
            tile["probabilities"] = tile["red"].copy(data=probs.cpu().numpy())
            tile["probabilities"].attrs = {
                "long_name": "Probabilities",
            }
            tile["probabilities"] = tile["probabilities"].fillna(float("nan")).rio.write_nodata(float("nan"))

        # Cleanup cuda memory
        del tensor_tiles, probabilities
        free_torch()

        return tiles

    def __call__(
        self,
        input: xr.Dataset | list[xr.Dataset],
        patch_size: int = 1024,
        overlap: int = 16,
        batch_size: int = 8,
        reflection: int = 0,
    ) -> xr.Dataset | list[xr.Dataset]:
        """Run inference on a single tile or a list of tiles.

        Args:
            input (xr.Dataset | list[xr.Dataset]): A single tile or a list of tiles.
            patch_size (int): The size of the patches. Defaults to 1024.
            overlap (int): The size of the overlap. Defaults to 16.
            batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
                Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
            reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

        Returns:
            A single tile or a list of tiles augmented by a predicted `probabilities` layer, depending on the input.
            Each `probability` has type float32 and range [0, 1].

        Raises:
            ValueError: in case the input is not an xr.Dataset or a list of xr.Dataset

        """
        if isinstance(input, xr.Dataset):
            return self.segment_tile(
                input, patch_size=patch_size, overlap=overlap, batch_size=batch_size, reflection=reflection
            )
        elif isinstance(input, list):
            return self.segment_tile_batched(
                input, patch_size=patch_size, overlap=overlap, batch_size=batch_size, reflection=reflection
            )
        else:
            raise ValueError(f"Expected xr.Dataset or list of xr.Dataset, got {type(input)}")

config = validate_config(ckpt['config']) instance-attribute

device = device instance-attribute

model = smp.create_model(**self.config['model']) instance-attribute

__call__(input, patch_size=1024, overlap=16, batch_size=8, reflection=0)

Run inference on a single tile or a list of tiles.

Parameters:

Name Type Description Default
input Dataset | list[Dataset]

A single tile or a list of tiles.

required
patch_size int

The size of the patches. Defaults to 1024.

1024
overlap int

The size of the overlap. Defaults to 16.

16
batch_size int

The batch size for the prediction, NOT the batch_size of input tiles. Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.

8
reflection int

Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

0

Returns:

Type Description
Dataset | list[Dataset]

A single tile or a list of tiles augmented by a predicted probabilities layer, depending on the input.

Dataset | list[Dataset]

Each probability has type float32 and range [0, 1].

Raises:

Type Description
ValueError

in case the input is not an xr.Dataset or a list of xr.Dataset

Source code in darts-segmentation/src/darts_segmentation/segment.py
def __call__(
    self,
    input: xr.Dataset | list[xr.Dataset],
    patch_size: int = 1024,
    overlap: int = 16,
    batch_size: int = 8,
    reflection: int = 0,
) -> xr.Dataset | list[xr.Dataset]:
    """Run inference on a single tile or a list of tiles.

    Args:
        input (xr.Dataset | list[xr.Dataset]): A single tile or a list of tiles.
        patch_size (int): The size of the patches. Defaults to 1024.
        overlap (int): The size of the overlap. Defaults to 16.
        batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
            Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
        reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

    Returns:
        A single tile or a list of tiles augmented by a predicted `probabilities` layer, depending on the input.
        Each `probability` has type float32 and range [0, 1].

    Raises:
        ValueError: in case the input is not an xr.Dataset or a list of xr.Dataset

    """
    if isinstance(input, xr.Dataset):
        return self.segment_tile(
            input, patch_size=patch_size, overlap=overlap, batch_size=batch_size, reflection=reflection
        )
    elif isinstance(input, list):
        return self.segment_tile_batched(
            input, patch_size=patch_size, overlap=overlap, batch_size=batch_size, reflection=reflection
        )
    else:
        raise ValueError(f"Expected xr.Dataset or list of xr.Dataset, got {type(input)}")

__init__(model_checkpoint, device=DEFAULT_DEVICE)

Initialize the segmenter.

Parameters:

Name Type Description Default
model_checkpoint Path

The path to the model checkpoint.

required
device device

The device to run the model on. Defaults to torch.device("cuda") if cuda is available, else torch.device("cpu").

DEFAULT_DEVICE
Source code in darts-segmentation/src/darts_segmentation/segment.py
def __init__(self, model_checkpoint: Path | str, device: torch.device = DEFAULT_DEVICE):
    """Initialize the segmenter.

    Args:
        model_checkpoint (Path): The path to the model checkpoint.
        device (torch.device): The device to run the model on.
            Defaults to torch.device("cuda") if cuda is available, else torch.device("cpu").

    """
    model_checkpoint = model_checkpoint if isinstance(model_checkpoint, Path) else Path(model_checkpoint)
    self.device = device
    ckpt = torch.load(model_checkpoint, map_location=self.device)
    self.config = validate_config(ckpt["config"])
    # Overwrite the encoder weights with None, because we load our own
    self.config["model"] |= {"encoder_weights": None}
    self.model = smp.create_model(**self.config["model"])
    self.model.to(self.device)
    self.model.load_state_dict(ckpt["statedict"])
    self.model.eval()

    logger.debug(
        f"Successfully loaded model from {model_checkpoint.resolve()} with inputs: "
        f"{self.config['input_combination']}"
    )

segment_tile(tile, patch_size=1024, overlap=16, batch_size=8, reflection=0)

Run inference on a tile.

Parameters:

Name Type Description Default
tile Dataset

The input tile, containing preprocessed, harmonized data.

required
patch_size int

The size of the patches. Defaults to 1024.

1024
overlap int

The size of the overlap. Defaults to 16.

16
batch_size int

The batch size for the prediction, NOT the batch_size of input tiles. Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.

8
reflection int

Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

0

Returns:

Type Description
Dataset

Input tile augmented by a predicted probabilities layer with type float32 and range [0, 1].

Source code in darts-segmentation/src/darts_segmentation/segment.py
def segment_tile(
    self, tile: xr.Dataset, patch_size: int = 1024, overlap: int = 16, batch_size: int = 8, reflection: int = 0
) -> xr.Dataset:
    """Run inference on a tile.

    Args:
        tile: The input tile, containing preprocessed, harmonized data.
        patch_size (int): The size of the patches. Defaults to 1024.
        overlap (int): The size of the overlap. Defaults to 16.
        batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
            Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
        reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

    Returns:
        Input tile augmented by a predicted `probabilities` layer with type float32 and range [0, 1].

    """
    # Convert the tile to a tensor
    tensor_tile = self.tile2tensor(tile)

    # Create a batch dimension, because predict expects it
    tensor_tile = tensor_tile.unsqueeze(0)

    probabilities = predict_in_patches(
        self.model, tensor_tile, patch_size, overlap, batch_size, reflection, self.device
    ).squeeze(0)

    # Highly sophisticated DL-based predictor
    # TODO: is there a better way to pass metadata?
    tile["probabilities"] = tile["red"].copy(data=probabilities.cpu().numpy())
    tile["probabilities"].attrs = {
        "long_name": "Probabilities",
    }
    tile["probabilities"] = tile["probabilities"].fillna(float("nan")).rio.write_nodata(float("nan"))

    # Cleanup cuda memory
    del tensor_tile, probabilities
    free_torch()

    return tile

segment_tile_batched(tiles, patch_size=1024, overlap=16, batch_size=8, reflection=0)

Run inference on a list of tiles.

Parameters:

Name Type Description Default
tiles list[Dataset]

The input tiles, containing preprocessed, harmonized data.

required
patch_size int

The size of the patches. Defaults to 1024.

1024
overlap int

The size of the overlap. Defaults to 16.

16
batch_size int

The batch size for the prediction, NOT the batch_size of input tiles. Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.

8
reflection int

Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

0

Returns:

Type Description
list[Dataset]

A list of input tiles augmented by a predicted probabilities layer with type float32 and range [0, 1].

Source code in darts-segmentation/src/darts_segmentation/segment.py
def segment_tile_batched(
    self,
    tiles: list[xr.Dataset],
    patch_size: int = 1024,
    overlap: int = 16,
    batch_size: int = 8,
    reflection: int = 0,
) -> list[xr.Dataset]:
    """Run inference on a list of tiles.

    Args:
        tiles: The input tiles, containing preprocessed, harmonized data.
        patch_size (int): The size of the patches. Defaults to 1024.
        overlap (int): The size of the overlap. Defaults to 16.
        batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
            Tensor will be sliced into patches and these again will be infered in batches. Defaults to 8.
        reflection (int): Reflection-Padding which will be applied to the edges of the tensor. Defaults to 0.

    Returns:
        A list of input tiles augmented by a predicted `probabilities` layer with type float32 and range [0, 1].

    """
    # Convert the tiles to tensors
    # TODO: maybe create a batched tile2tensor function?
    # tensor_tiles = [self.tile2tensor(tile).to(self.dev) for tile in tiles]
    tensor_tiles = self.tile2tensor_batched(tiles)

    # Create a batch dimension, because predict expects it
    tensor_tiles = torch.stack(tensor_tiles, dim=0)

    probabilities = predict_in_patches(
        self.model, tensor_tiles, patch_size, overlap, batch_size, reflection, self.device
    )

    # Highly sophisticated DL-based predictor
    for tile, probs in zip(tiles, probabilities):
        # TODO: is there a better way to pass metadata?
        tile["probabilities"] = tile["red"].copy(data=probs.cpu().numpy())
        tile["probabilities"].attrs = {
            "long_name": "Probabilities",
        }
        tile["probabilities"] = tile["probabilities"].fillna(float("nan")).rio.write_nodata(float("nan"))

    # Cleanup cuda memory
    del tensor_tiles, probabilities
    free_torch()

    return tiles

tile2tensor(tile)

Take a tile and convert it to a pytorch tensor.

Respects the input combination from the config.

Returns:

Type Description
Tensor

A torch tensor for the full tile consisting of the bands specified in self.band_combination.

Source code in darts-segmentation/src/darts_segmentation/segment.py
def tile2tensor(self, tile: xr.Dataset) -> torch.Tensor:
    """Take a tile and convert it to a pytorch tensor.

    Respects the input combination from the config.

    Returns:
        A torch tensor for the full tile consisting of the bands specified in `self.band_combination`.

    """
    bands = []
    # e.g. input_combination: ["red", "green", "blue", "relative_elevation", ...]
    # tile.data_vars: ["red", "green", "blue", "relative_elevation", ...]

    for feature_name in self.config["input_combination"]:
        norm = self.config["norm_factors"][feature_name]
        band_data = tile[feature_name]
        # Normalize the band data
        band_data = band_data * norm
        bands.append(torch.from_numpy(band_data.to_numpy().astype("float32")))

    return torch.stack(bands, dim=0)

tile2tensor_batched(tiles)

Take a list of tiles and convert them to a pytorch tensor.

Respects the the input combination from the config.

Returns:

Type Description
Tensor

A torch tensor for the full tile consisting of the bands specified in self.band_combination.

Source code in darts-segmentation/src/darts_segmentation/segment.py
def tile2tensor_batched(self, tiles: list[xr.Dataset]) -> torch.Tensor:
    """Take a list of tiles and convert them to a pytorch tensor.

    Respects the the input combination from the config.

    Returns:
        A torch tensor for the full tile consisting of the bands specified in `self.band_combination`.

    """
    bands = []
    for feature_name in self.config["input_combination"]:
        norm = self.config["norm_factors"][feature_name]
        for tile in tiles:
            band_data = tile[feature_name]
            # Normalize the band data
            band_data = band_data * norm
            bands.append(torch.from_numpy(band_data.to_numpy().astype("float32")))
    # TODO: Test this
    return torch.stack(bands, dim=0).reshape(len(tiles), len(self.config["input_combination"]), *bands[0].shape)

SMPSegmenterConfig

Bases: TypedDict

Configuration for the segmentor.

Source code in darts-segmentation/src/darts_segmentation/segment.py
class SMPSegmenterConfig(TypedDict):
    """Configuration for the segmentor."""

    input_combination: list[str]
    model: dict[str, Any]
    norm_factors: dict[str, float]

input_combination instance-attribute

model instance-attribute

norm_factors instance-attribute

create_patches(tensor_tiles, patch_size, overlap, return_coords=False)

Create patches from a tensor.

Parameters:

Name Type Description Default
tensor_tiles Tensor

The input tensor. Shape: (BS, C, H, W).

required
patch_size int

The size of the patches.

required
overlap int

The size of the overlap.

required
return_coords bool

Whether to return the coordinates of the patches. Can be used for debugging. Defaults to False.

False

Returns:

Type Description
Tensor

torch.Tensor: The patches. Shape: (BS, N_h, N_w, C, patch_size, patch_size).

Source code in darts-segmentation/src/darts_segmentation/utils.py
@torch.no_grad()
def create_patches(
    tensor_tiles: torch.Tensor, patch_size: int, overlap: int, return_coords: bool = False
) -> torch.Tensor:
    """Create patches from a tensor.

    Args:
        tensor_tiles (torch.Tensor): The input tensor. Shape: (BS, C, H, W).
        patch_size (int, optional): The size of the patches.
        overlap (int, optional): The size of the overlap.
        return_coords (bool, optional): Whether to return the coordinates of the patches.
            Can be used for debugging. Defaults to False.

    Returns:
        torch.Tensor: The patches. Shape: (BS, N_h, N_w, C, patch_size, patch_size).

    """
    start_time = time.time()
    logger.debug(
        f"Creating patches from a tensor with shape {tensor_tiles.shape} "
        f"with patch_size {patch_size} and overlap {overlap}"
    )
    assert tensor_tiles.dim() == 4, f"Expects tensor_tiles to has shape (BS, C, H, W), got {tensor_tiles.shape}"
    bs, c, h, w = tensor_tiles.shape
    assert h > patch_size > overlap
    assert w > patch_size > overlap

    step_size = patch_size - overlap

    # The problem with unfold is that is cuts off the last patch if it doesn't fit exactly
    # Padding could help, but then the next problem is that the view needs to get reshaped (copied in memory)
    # to fit the model input shape. Such a complex view can't be inserted into the model.
    # Since we need, doing it manually is currently our best choice, since be can avoid the padding.
    # patches = (
    #     tensor_tiles.unfold(2, patch_size, step_size).unfold(3, patch_size, step_size).transpose(1, 2).transpose(2, 3)
    # )
    # return patches

    nh, nw = math.ceil((h - overlap) / step_size), math.ceil((w - overlap) / step_size)
    # Create Patches of size (BS, N_h, N_w, C, patch_size, patch_size)
    patches = torch.zeros((bs, nh, nw, c, patch_size, patch_size), device=tensor_tiles.device)
    coords = torch.zeros((nh, nw, 5))
    for i, (y, x, patch_idx_h, patch_idx_w) in enumerate(patch_coords(h, w, patch_size, overlap)):
        patches[:, patch_idx_h, patch_idx_w, :] = tensor_tiles[:, :, y : y + patch_size, x : x + patch_size]
        coords[patch_idx_h, patch_idx_w, :] = torch.tensor([i, y, x, patch_idx_h, patch_idx_w])

    logger.debug(f"Creating {nh * nw} patches took {time.time() - start_time:.2f}s")
    if return_coords:
        return patches, coords
    else:
        return patches

patch_coords(h, w, patch_size, overlap)

Yield patch coordinates based on height, width, patch size and margin size.

Parameters:

Name Type Description Default
h int

Height of the image.

required
w int

Width of the image.

required
patch_size int

Patch size.

required
overlap int

Margin size.

required

Yields:

Type Description
tuple[int, int, int, int]

tuple[int, int, int, int]: The patch coordinates y, x, patch_idx_y and patch_idx_x.

Source code in darts-segmentation/src/darts_segmentation/utils.py
def patch_coords(h: int, w: int, patch_size: int, overlap: int) -> Generator[tuple[int, int, int, int], None, None]:
    """Yield patch coordinates based on height, width, patch size and margin size.

    Args:
        h (int): Height of the image.
        w (int): Width of the image.
        patch_size (int): Patch size.
        overlap (int): Margin size.

    Yields:
        tuple[int, int, int, int]: The patch coordinates y, x, patch_idx_y and patch_idx_x.

    """
    step_size = patch_size - overlap
    # Substract the overlap from h and w so that an exact match of the last patch won't create a duplicate
    for patch_idx_y, y in enumerate(range(0, h - overlap, step_size)):
        for patch_idx_x, x in enumerate(range(0, w - overlap, step_size)):
            if y + patch_size > h:
                y = h - patch_size
            if x + patch_size > w:
                x = w - patch_size
            yield y, x, patch_idx_y, patch_idx_x

predict_in_patches(model, tensor_tiles, patch_size, overlap, batch_size, reflection, device=torch.device, return_weights=False)

Predict on a tensor.

Parameters:

Name Type Description Default
model Module

The model to use for prediction.

required
tensor_tiles Tensor

The input tensor. Shape: (BS, C, H, W).

required
patch_size int

The size of the patches.

required
overlap int

The size of the overlap.

required
batch_size int

The batch size for the prediction, NOT the batch_size of input tiles. Tensor will be sliced into patches and these again will be infered in batches.

required
reflection int

Reflection-Padding which will be applied to the edges of the tensor.

required
device device

The device to use for the prediction.

device
return_weights bool

Whether to return the weights. Can be used for debugging. Defaults to False.

False

Returns:

Type Description
Tensor

The predicted tensor.

Source code in darts-segmentation/src/darts_segmentation/utils.py
@torch.no_grad()
def predict_in_patches(
    model: nn.Module,
    tensor_tiles: torch.Tensor,
    patch_size: int,
    overlap: int,
    batch_size: int,
    reflection: int,
    device=torch.device,
    return_weights: bool = False,
) -> torch.Tensor:
    """Predict on a tensor.

    Args:
        model: The model to use for prediction.
        tensor_tiles: The input tensor. Shape: (BS, C, H, W).
        patch_size (int): The size of the patches.
        overlap (int): The size of the overlap.
        batch_size (int): The batch size for the prediction, NOT the batch_size of input tiles.
            Tensor will be sliced into patches and these again will be infered in batches.
        reflection (int): Reflection-Padding which will be applied to the edges of the tensor.
        device (torch.device): The device to use for the prediction.
        return_weights (bool, optional): Whether to return the weights. Can be used for debugging. Defaults to False.

    Returns:
        The predicted tensor.

    """
    start_time = time.time()
    logger.debug(
        f"Predicting on a tensor with shape {tensor_tiles.shape} "
        f"with patch_size {patch_size}, overlap {overlap} and batch_size {batch_size} on device {device}"
    )
    assert tensor_tiles.dim() == 4, f"Expects tensor_tiles to has shape (BS, C, H, W), got {tensor_tiles.shape}"
    # Add a 1px + reflection border to avoid pixel loss when applying the soft margin and to reduce edge-artefacts
    p = 1 + reflection
    tensor_tiles = torch.nn.functional.pad(tensor_tiles, (p, p, p, p), mode="reflect")
    bs, c, h, w = tensor_tiles.shape
    step_size = patch_size - overlap
    nh, nw = math.ceil((h - overlap) / step_size), math.ceil((w - overlap) / step_size)

    # Create Patches of size (BS, N_h, N_w, C, patch_size, patch_size)
    patches = create_patches(tensor_tiles, patch_size=patch_size, overlap=overlap)

    # Flatten the patches so they fit to the model
    # (BS, N_h, N_w, C, patch_size, patch_size) -> (BS * N_h * N_w, C, patch_size, patch_size)
    patches = patches.view(bs * nh * nw, c, patch_size, patch_size)

    # Create a soft margin for the patches
    margin_ramp = torch.cat(
        [
            torch.linspace(0, 1, overlap),
            torch.ones(patch_size - 2 * overlap),
            torch.linspace(1, 0, overlap),
        ]
    )
    soft_margin = margin_ramp.reshape(1, 1, patch_size) * margin_ramp.reshape(1, patch_size, 1)
    soft_margin = soft_margin.to(patches.device)

    # Infer logits with model and turn into probabilities with sigmoid in a batched manner
    # TODO: check with ingmar and jonas if moving all patches to the device at the same time is a good idea
    patched_probabilities = torch.zeros_like(patches[:, 0, :, :])
    patches = patches.split(batch_size)
    n_skipped = 0
    for i, batch in enumerate(patches):
        # If batch contains only nans, skip it
        # TODO: This doesn't work as expected -> check if torch.isnan(batch).all() is correct
        if torch.isnan(batch).all(axis=0).any():
            patched_probabilities[i * batch_size : (i + 1) * batch_size] = 0
            n_skipped += 1
            continue
        # If batch contains some nans, replace them with zeros
        batch[torch.isnan(batch)] = 0

        batch = batch.to(device)
        # logger.debug(f"Predicting on batch {i + 1}/{len(patches)}")
        patched_probabilities[i * batch_size : (i + 1) * batch_size] = (
            torch.sigmoid(model(batch)).squeeze(1).to(patched_probabilities.device)
        )
        batch = batch.to(patched_probabilities.device)  # Transfer back to the original device to avoid memory leaks

    if n_skipped > 0:
        logger.debug(f"Skipped {n_skipped} batches because they only contained NaNs")

    patched_probabilities = patched_probabilities.view(bs, nh, nw, patch_size, patch_size)

    # Reconstruct the image from the patches
    prediction = torch.zeros(bs, h, w, device=tensor_tiles.device)
    weights = torch.zeros(bs, h, w, device=tensor_tiles.device)

    for y, x, patch_idx_h, patch_idx_w in patch_coords(h, w, patch_size, overlap):
        patch = patched_probabilities[:, patch_idx_h, patch_idx_w]
        prediction[:, y : y + patch_size, x : x + patch_size] += patch * soft_margin
        weights[:, y : y + patch_size, x : x + patch_size] += soft_margin

    # Avoid division by zero
    weights = torch.where(weights == 0, torch.ones_like(weights), weights)
    prediction = prediction / weights

    # Remove the 1px border and the padding
    prediction = prediction[:, p:-p, p:-p]
    logger.info(f"Predicting {nh * nw} patches took {time.time() - start_time:.2f}s")

    if return_weights:
        return prediction, weights
    else:
        return prediction

darts_superresolution

Image superresolution of Sentinel 2 imagery for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute

darts_utils

Utility functions for the DARTS dataset.

__version__ = importlib.metadata.version('darts-nextgen') module-attribute