Skip to content

Commit

Permalink
Enable preemption checkpointing
Browse files Browse the repository at this point in the history
Summary:

Expose ability to add a preemption checkpointing hook running in a separate process group.

Reviewed By: wat3rBro, ynonaolga

Differential Revision: D51115437
  • Loading branch information
Francisc Bungiu authored and facebook-github-bot committed Dec 7, 2023
1 parent d0e1668 commit 26ec181
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions d2go/runner/default_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import logging
import os
from collections import OrderedDict
from datetime import timedelta
from functools import lru_cache
from typing import Any, List, Optional, Type, Union

import detectron2.utils.comm as comm
import torch
import torch.distributed as dist
from d2go.checkpoint.api import is_distributed_checkpoint
from d2go.checkpoint.fsdp_checkpoint import FSDPCheckpointer
from d2go.config import CfgNode, CONFIG_SCALING_METHOD_REGISTRY, temp_defrost
Expand Down Expand Up @@ -78,6 +80,7 @@


ALL_TB_WRITERS = []
CONTROL_PG_TIMEOUT = timedelta(minutes=30)


@lru_cache()
Expand Down Expand Up @@ -156,10 +159,28 @@ def get_monitoring_service() -> Any:
return contextlib.nullcontext()


@fb_overwritable()
def create_preemption_hook(
cfg: CfgNode,
periodic_checkpointer: PeriodicCheckpointer,
process_group: Optional[dist.ProcessGroup],
) -> Any:
return None


class BaseRunner(object):
def __init__(self):
identifier = f"D2Go.Runner.{self.__class__.__name__}"
torch._C._log_api_usage_once(identifier)
# initialize the control pg for stuff like checkpoint and preemption handling
logger.info("Initializing control pg")
self._control_pg: Optional[dist.ProcessGroup] = None
if dist.is_initialized():
logger.info("Create gloo CPU control pg")
self._control_pg = dist.new_group(
backend=dist.Backend.GLOO,
timeout=CONTROL_PG_TIMEOUT,
)

def _initialize(self, cfg):
"""Runner should be initialized in the sub-process in ddp setting"""
Expand Down Expand Up @@ -520,6 +541,7 @@ def _get_trainer_hooks(
self._create_after_step_hook(
cfg, model, optimizer, scheduler, periodic_checkpointer
),
create_preemption_hook(cfg, periodic_checkpointer, self._control_pg),
hooks.EvalHook(
cfg.TEST.EVAL_PERIOD,
lambda: self.do_test(cfg, model, train_iter=trainer.iter),
Expand Down

0 comments on commit 26ec181

Please sign in to comment.