Abstract Data Parallelism (ADP) module for DARTS Segmentation.
_adp
_adp(
process_inputs: list[
darts_segmentation.training.adp.RunI
],
is_parallel: bool,
devices: list[int],
available_devices: multiprocessing.Queue,
_run: collections.abc.Callable[
[darts_segmentation.training.adp.RunI],
darts_segmentation.training.adp.RunO,
],
) -> collections.abc.Generator[
tuple[
darts_segmentation.training.adp.RunI,
darts_segmentation.training.adp.RunO,
],
None,
None,
]
Source code in darts-segmentation/src/darts_segmentation/training/adp.py
| def _adp(
process_inputs: list[RunI],
is_parallel: bool,
devices: list[int],
available_devices: Queue,
_run: Callable[[RunI], RunO],
) -> Generator[tuple[RunI, RunO], None, None]:
# Handling different parallelization strategies
if is_parallel:
logger.debug("Using parallel strategy for ADP")
for device in devices:
logger.debug(f"Adding device {device} to available devices queue")
available_devices.put(device)
with ProcessPoolExecutor(max_workers=len(devices)) as executor:
futures = {executor.submit(_run, inp): inp for inp in process_inputs}
for future in as_completed(futures):
inp = futures[future]
try:
output = future.result()
except Exception as e:
logger.error(f"Error in {inp}: {e}", exc_info=True)
continue
yield inp, output
else:
logger.debug("Using serial strategy for ADP")
for inp in process_inputs:
try:
output = _run(inp)
except Exception as e:
logger.error(f"Error in {inp}: {e}", exc_info=True)
continue
yield inp, output
|