Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Log test results to file #6627

Draft
wants to merge 66 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
16a8cb6
log test run
tohtana Oct 15, 2024
3ff9cea
enable logging in workflow
tohtana Oct 15, 2024
101bab7
run grep regardless of pytest return code
tohtana Oct 15, 2024
c04d6c1
fix return code from grep
tohtana Oct 15, 2024
d58b427
exclude skipped tests from failure logging
tohtana Oct 15, 2024
5434f53
fix handling return code
tohtana Oct 15, 2024
3d27593
Merge branch 'master' into tohtana/log_run_tests
tohtana Oct 16, 2024
75fe4ad
add logging in tests
tohtana Oct 16, 2024
18d2da1
Merge branch 'tohtana/log_run_tests' of github.com:microsoft/DeepSpee…
tohtana Oct 16, 2024
1e6b3e5
Merge branch 'master' into tohtana/log_run_tests
tohtana Oct 16, 2024
a1c766b
disable NCCL_SOCKET_IFNAME
tohtana Oct 16, 2024
56febde
fix args for test func
tohtana Oct 16, 2024
7fab557
pin torch version
tohtana Oct 16, 2024
b0091a9
Merge branch 'master' into tohtana/log_run_tests
tohtana Oct 23, 2024
409ed6d
unpin torch version
tohtana Oct 23, 2024
969b7f7
Merge branch 'master' into tohtana/log_run_tests
tohtana Oct 24, 2024
8c4cd1d
set file path for filestore
tohtana Oct 24, 2024
6a7b640
use /dev/shm for filestore
tohtana Oct 24, 2024
9d0216a
Merge branch 'tohtana/log_run_tests' of github.com:microsoft/DeepSpee…
tohtana Oct 24, 2024
7508150
add info to tag
tohtana Oct 25, 2024
e52ca96
shorten process group timeout
tohtana Oct 25, 2024
58cb5a9
set device
tohtana Oct 25, 2024
9e64183
Run on specialized runner
loadams Oct 25, 2024
3fad973
set blank to NCCL_SOCKET_IFNAME
tohtana Oct 25, 2024
2096a1a
Merge branch 'tohtana/log_run_tests' of github.com:microsoft/DeepSpee…
tohtana Oct 25, 2024
6669f93
Merge branch 'master' into tohtana/log_run_tests
loadams Oct 28, 2024
6bef245
pass error in test to parent process
tohtana Oct 28, 2024
95a6426
Merge branch 'tohtana/log_run_tests' of github.com:microsoft/DeepSpee…
tohtana Oct 28, 2024
b143903
set timeout of closing pool
tohtana Oct 28, 2024
4357a6e
recreate pool when test fails
tohtana Oct 28, 2024
07c18c8
add log outputs
tohtana Oct 28, 2024
b221b5f
fix flag
tohtana Oct 28, 2024
fafb2d9
handle nccl error
tohtana Oct 29, 2024
dcb3bbd
init pg exclusively
tohtana Oct 29, 2024
48561fa
fix lock
tohtana Oct 29, 2024
616eb4d
fix removal of lock file
tohtana Oct 29, 2024
fa4bcec
use O_EXCL for lock
tohtana Oct 29, 2024
acc77d9
simplify lock
tohtana Oct 29, 2024
c8612d8
add random wait
tohtana Oct 29, 2024
65111c1
increase retry count
tohtana Oct 29, 2024
44fb6fe
stop using init_process_group_exclusively
tohtana Oct 29, 2024
a1e4eee
catch nccl init error
tohtana Oct 29, 2024
a1c0123
change timeout
tohtana Oct 29, 2024
0afe7d1
enable reuse_dist_env
tohtana Oct 29, 2024
3649914
set reuse_dist_env=True as default
tohtana Oct 29, 2024
ecc93f9
do not reuse dist env for non-daemonic process
tohtana Oct 29, 2024
96d520f
fix device selection for reuse dist env
tohtana Oct 29, 2024
f7573d1
record pool cache at every test
tohtana Oct 29, 2024
91fc68a
fix teadown
tohtana Oct 29, 2024
54bb4e6
fix condition to clean process pool
tohtana Oct 29, 2024
46a4ac8
fix teardown
tohtana Oct 29, 2024
85fa337
add condition of cleaning
tohtana Oct 29, 2024
4dbfb51
add test
tohtana Oct 30, 2024
3d6b7ea
move call to set device
tohtana Oct 30, 2024
65ffac9
fix world size
tohtana Oct 30, 2024
61409dd
Merge branch 'master' into tohtana/log_run_tests
loadams Oct 30, 2024
c420d42
add cleaning of global state
tohtana Oct 30, 2024
35ccf6c
Merge branch 'tohtana/log_run_tests' of github.com:microsoft/DeepSpee…
tohtana Oct 30, 2024
bebb59c
Switch version back to run on non-debug runners
loadams Oct 31, 2024
3a880da
Merge branch 'master' into tohtana/log_run_tests
tohtana Oct 31, 2024
b6da93d
Merge branch 'master' into tohtana/log_run_tests
loadams Nov 1, 2024
89f03af
Fix after merge
loadams Nov 1, 2024
5f3b63f
Fix function signature from merge conflicts
loadams Nov 1, 2024
e6a6705
Add mpi4py
loadams Nov 1, 2024
5e59b82
Merge branch 'master' into tohtana/log_run_tests
loadams Nov 4, 2024
bb4c5b6
Merge branch 'master' into tohtana/log_run_tests
loadams Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cpu-torch-latest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

- name: Install pytorch
run: |
pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
pip install torch==2.4.1 torchvision --index-url https://download.pytorch.org/whl/cpu
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"

Expand Down
26 changes: 23 additions & 3 deletions .github/workflows/nv-torch-latest-v100.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- name: Install pytorch
run: |
pip install -U --cache-dir $TORCH_CACHE torch torchvision --index-url https://download.pytorch.org/whl/cu121
pip install -U --cache-dir $TORCH_CACHE torch==2.4.1 torchvision --index-url https://download.pytorch.org/whl/cu121
python -c "import torch; print('torch:', torch.__version__, torch)"
python -c "import torch; print('CUDA available:', torch.cuda.is_available())"

Expand All @@ -55,5 +55,25 @@ jobs:
run: |
unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch
cd tests
pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1"
pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1"
TEST_LOG_FILE="/tmp/test_log_${GITHUB_RUN_ID}.log"
echo "Running tests and logging to ${TEST_LOG_FILE}"
# Let this line return true so that we can grep for "Failed" in the log file
set +e
NCCL_SOCKET_IFNAME="" RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.4" --cuda_ver="12.1"
PYTEST_EXIT_CODE=$?
if [ $PYTEST_EXIT_CODE -ne 0 ]; then
# We don't clean the file here for debugging
echo "pytest failed with exit code $PYTEST_EXIT_CODE"
exit $PYTEST_EXIT_CODE
fi
grep "Failed" ${TEST_LOG_FILE}
rm -f ${TEST_LOG_FILE}
# Do the same as above
RUNNING_TEST_LOG_FILE=${TEST_LOG_FILE} pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.4" --cuda_ver="12.1"
PYTEST_EXIT_CODE=$?
grep "Failed" ${TEST_LOG_FILE}
if [ $PYTEST_EXIT_CODE -ne 0 ]; then
echo "pytest failed with exit code $PYTEST_EXIT_CODE"
exit $PYTEST_EXIT_CODE
fi
rm -f ${TEST_LOG_FILE}
174 changes: 131 additions & 43 deletions tests/unit/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path
import fcntl
import traceback

import torch
import torch.multiprocessing as mp
Expand All @@ -24,6 +26,7 @@

# Worker timeout for tests that hang
DEEPSPEED_TEST_TIMEOUT = int(os.environ.get('DS_UNITTEST_TIMEOUT', '600'))
RUNNING_TEST_LOG_FILE = os.environ.get("RUNNING_TEST_LOG_FILE", None)

warn_reuse_dist_env = False

Expand Down Expand Up @@ -128,6 +131,80 @@ def set_accelerator_visible():
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list)


def write_to_log_with_lock(log_file_path: str, header: str, msg: str):
with open(log_file_path, 'a+') as f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(f"{header} {msg}\n")
f.flush()
finally:
fcntl.flock(f, fcntl.LOCK_UN)


def make_test_tag(request):
if request is None:
return "[xdist_worker={get_xdist_worker_id()}][NO_REQUEST]"

class_name = request.cls.__name__ if request.cls else "NO_CLASS"
test_name = request.node.name
return f"[xdist_worker={get_xdist_worker_id()}][{class_name}][{test_name}]"


class LogTestRun(ABC):

def __init__(self, log_file, tag, num_procs):
self.log_file = log_file
self.num_procs = num_procs
self.header = tag

def write(self, msg):
write_to_log_with_lock(self.log_file, self.header, msg)

def __enter__(self):
if self.log_file is None:
return
self._enter()
self.start_time = time.time()

def __exit__(self, exc_type, exc_val, exc_tb):
if self.log_file is None:
return

self.elapsed_time = time.time() - self.start_time
self._exit(exc_type, exc_val, exc_tb)

@abstractmethod
def _enter(self):
...

@abstractmethod
def _exit(self, exc_type, exc_val, exc_tb):
...


class LogTestRunBaseProcess(LogTestRun):

def __init__(self, log_file, tag, num_procs):
super().__init__(log_file, tag, num_procs)

def _enter(self):
self.write(f"Running with {self.num_procs} processes")

def _exit(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
tb_str = ''.join(traceback.format_tb(exc_tb))
if exc_type == Skipped:
self.write(
f"Skipping with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val}"
)
else:
self.write(
f"Failed with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s exc_type={exc_type} exc_val={exc_val} {tb_str}"
)
return False
loadams marked this conversation as resolved.
Show resolved Hide resolved
self.write(f"Finished with {self.num_procs} processes. elapsed_time={self.elapsed_time:.2f}s")


class DistributedExec(ABC):
"""
Base class for distributed execution of functions/methods. Contains common
Expand Down Expand Up @@ -155,8 +232,10 @@ def __call__(self, request=None):

if isinstance(world_size, int):
world_size = [world_size]

tag = make_test_tag(request)
for procs in world_size:
self._launch_procs(procs)
self._launch_procs(procs, tag)

def _get_fixture_kwargs(self, request, func):
if not request:
Expand All @@ -172,7 +251,7 @@ def _get_fixture_kwargs(self, request, func):
pass # test methods can have kwargs that are not fixtures
return fixture_kwargs

def _launch_daemonic_procs(self, num_procs):
def _launch_daemonic_procs(self, num_procs, tag):
# Create process pool or use cached one
master_port = None

Expand All @@ -198,7 +277,7 @@ def _launch_daemonic_procs(self, num_procs):
master_port = get_master_port()

# Run the test
args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)]
args = [(local_rank, num_procs, master_port, tag) for local_rank in range(num_procs)]
skip_msgs_async = pool.starmap_async(self._dist_run, args)

try:
Expand All @@ -218,7 +297,7 @@ def _launch_daemonic_procs(self, num_procs):
assert len(set(skip_msgs)) == 1, "Multiple different skip messages received"
pytest.skip(skip_msgs[0])

def _launch_non_daemonic_procs(self, num_procs):
def _launch_non_daemonic_procs(self, num_procs, tag):
assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes"

master_port = get_master_port()
Expand All @@ -227,7 +306,7 @@ def _launch_non_daemonic_procs(self, num_procs):
prev_start_method = mp.get_start_method()
mp.set_start_method('spawn', force=True)
for local_rank in range(num_procs):
p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, skip_msg))
p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, tag, skip_msg))
p.start()
processes.append(p)
mp.set_start_method(prev_start_method, force=True)
Expand Down Expand Up @@ -269,7 +348,7 @@ def _launch_non_daemonic_procs(self, num_procs):
# add a check here to assert all exit messages are equal
pytest.skip(skip_msg.get())

def _launch_procs(self, num_procs):
def _launch_procs(self, num_procs, tag):
# Verify we have enough accelerator devices to run this test
if get_accelerator().is_available() and get_accelerator().device_count() < num_procs:
pytest.skip(
Expand All @@ -284,47 +363,53 @@ def _launch_procs(self, num_procs):
mp.set_start_method('forkserver', force=True)

if self.non_daemonic_procs:
self._launch_non_daemonic_procs(num_procs)
self._launch_non_daemonic_procs(num_procs, tag)
else:
self._launch_daemonic_procs(num_procs)

def _dist_run(self, local_rank, num_procs, master_port, skip_msg=""):
if not dist.is_initialized():
""" Initialize deepspeed.comm and execute the user function. """
if self.set_dist_env:
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = str(master_port)
os.environ['LOCAL_RANK'] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ['RANK'] = str(local_rank)
# In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE
# DeepSpeed single node launcher would also set LOCAL_SIZE accordingly
os.environ['LOCAL_SIZE'] = str(num_procs)
os.environ['WORLD_SIZE'] = str(num_procs)

# turn off NCCL logging if set
os.environ.pop('NCCL_DEBUG', None)

if get_accelerator().is_available():
set_accelerator_visible()

if get_accelerator().is_available():
get_accelerator().set_device(local_rank)

if self.init_distributed:
deepspeed.init_distributed(dist_backend=self.backend)
dist.barrier()
self._launch_daemonic_procs(num_procs, tag)

def _dist_run(self, local_rank, num_procs, master_port, tag, skip_msg=""):

tag = f"{tag} [pid={os.getpid()},master_port={master_port},local_rank={local_rank},num_procs={num_procs}"
with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [setup _dist_run]", num_procs):
if not dist.is_initialized():
""" Initialize deepspeed.comm and execute the user function. """
if self.set_dist_env:
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = str(master_port)
os.environ['LOCAL_RANK'] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ['RANK'] = str(local_rank)
# In case of multiprocess launching LOCAL_SIZE should be same as WORLD_SIZE
# DeepSpeed single node launcher would also set LOCAL_SIZE accordingly
os.environ['LOCAL_SIZE'] = str(num_procs)
os.environ['WORLD_SIZE'] = str(num_procs)

# turn off NCCL logging if set
os.environ.pop('NCCL_DEBUG', None)

if get_accelerator().is_available():
set_accelerator_visible()

if get_accelerator().is_available():
get_accelerator().set_device(local_rank)

if self.init_distributed:
deepspeed.init_distributed(dist_backend=self.backend)
dist.barrier()

try:
self.run(**self._fixture_kwargs)
with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exec _dist_run]", num_procs):
self.run(**self._fixture_kwargs)
except BaseException as e:
if isinstance(e, Skipped):
if self.non_daemonic_procs:
skip_msg.put(e.msg)
with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, f"{tag} [exception _dist_run] {e.__class__} msg={e.msg}",
num_procs):
if isinstance(e, Skipped):
if self.non_daemonic_procs:
skip_msg.put(e.msg)
else:
skip_msg = e.msg
else:
skip_msg = e.msg
else:
raise e
raise e

return skip_msg

Expand Down Expand Up @@ -475,8 +560,11 @@ def __call__(self, request):

if isinstance(world_size, int):
world_size = [world_size]

for procs in world_size:
self._launch_procs(procs)
tag = make_test_tag(request)
with LogTestRunBaseProcess(RUNNING_TEST_LOG_FILE, tag, procs):
self._launch_procs(procs, tag)
time.sleep(0.5)

def _get_current_test_func(self, request):
Expand Down
Loading