Skip to content

adp

darts_segmentation.training.adp

Abstract Data Parallelism (ADP) module for DARTS Segmentation.

RunI module-attribute

RunI = typing.TypeVar('I')

RunO module-attribute

RunO = typing.TypeVar('O')

logger module-attribute

logger = logging.getLogger(
    __name__.replace("darts_", "darts.")
)

_adp

Source code in darts-segmentation/src/darts_segmentation/training/adp.py
def _adp(
    process_inputs: list[RunI],
    is_parallel: bool,
    devices: list[int],
    available_devices: Queue,
    _run: Callable[[RunI], RunO],
) -> Generator[tuple[RunI, RunO], None, None]:
    # Handling different parallelization strategies
    if is_parallel:
        logger.debug("Using parallel strategy for ADP")
        for device in devices:
            logger.debug(f"Adding device {device} to available devices queue")
            available_devices.put(device)
        with ProcessPoolExecutor(max_workers=len(devices)) as executor:
            futures = {executor.submit(_run, inp): inp for inp in process_inputs}

            for future in as_completed(futures):
                inp = futures[future]
                try:
                    output = future.result()
                except Exception as e:
                    logger.error(f"Error in {inp}: {e}", exc_info=True)
                    continue

                yield inp, output
    else:
        logger.debug("Using serial strategy for ADP")
        for inp in process_inputs:
            try:
                output = _run(inp)
            except Exception as e:
                logger.error(f"Error in {inp}: {e}", exc_info=True)
                continue
            yield inp, output