diff --git a/tests/kfto/kfto_training_test.go b/tests/kfto/kfto_training_test.go index 4903c1d7..cef64932 100644 --- a/tests/kfto/kfto_training_test.go +++ b/tests/kfto/kfto_training_test.go @@ -19,6 +19,7 @@ package kfto import ( "fmt" "testing" + "time" kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/gomega" @@ -29,15 +30,39 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestPyTorchJobWithCuda(t *testing.T) { - runKFTOPyTorchJob(t, GetCudaTrainingImage(), "nvidia.com/gpu", 1) +func TestPyTorchJobSingleNodeSingleGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), Nvidia, 1, 0) } -func TestPyTorchJobWithROCm(t *testing.T) { - runKFTOPyTorchJob(t, GetROCmTrainingImage(), "amd.com/gpu", 1) +func TestPyTorchJobSingleNodeMultiGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), Nvidia, 2, 0) } -func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) { +func TestPyTorchJobMultiNodeSingleGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), Nvidia, 1, 1) +} + +func TestPyTorchJobMultiNodeMultiGpuWithCuda(t *testing.T) { + runKFTOPyTorchJob(t, GetCudaTrainingImage(), Nvidia, 2, 1) +} + +func TestPyTorchJobSingleNodeSingleGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), Amd, 1, 0) +} + +func TestPyTorchJobSingleNodeMultiGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), Amd, 2, 0) +} + +func TestPyTorchJobMultiNodeSingleGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), Amd, 1, 1) +} + +func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) { + runKFTOPyTorchJob(t, GetROCmTrainingImage(), Amd, 2, 1) +} + +func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) { test := With(t) // Create a namespace @@ -54,20 +79,36 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) // Create training PyTorch job - tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpuLabel, numGpus, outputPvc.Name, image) + tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpu, numGpus, numberOfWorkerNodes, outputPvc.Name, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble). Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue))) + if IsOpenShift(test) && gpu == Nvidia { + // Check that GPUs were utilized recently + // That itself doesn't guarantee that PyTorchJob generated the load in GPU, but is the best we can achieve for now + totalNumOfGpus := (numberOfWorkerNodes + 1) * numGpus + test.Eventually(OpenShiftPrometheusNvidiaGpuUtil(test, namespace), 15*time.Minute). + Should( + And( + HaveLen(totalNumOfGpus), + ContainElement( + // Check that at lest some GPU was utilized on more than 50% + HaveField("Value", BeNumerically(">", 50)), + ), + ), + ) + test.T().Logf("%d GPUs were successfully utilized", totalNumOfGpus) + } + // Make sure the PyTorch job succeeded test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue))) test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name) - } -func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { tuningJob := &kftov1.PyTorchJob{ TypeMeta: metav1.TypeMeta{ APIVersion: corev1.SchemeGroupVersion.String(), @@ -78,14 +119,33 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, Spec: kftov1.PyTorchJobSpec{ PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{ - "Master": { + kftov1.PyTorchJobReplicaTypeMaster: { Replicas: Ptr(int32(1)), RestartPolicy: "OnFailure", Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-llm", + }, + }, Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-llm", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, Tolerations: []corev1.Toleration{ { - Key: gpuLabel, + Key: gpu.ResourceLabel, Operator: corev1.TolerationOpExists, }, }, @@ -124,12 +184,12 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{ "/bin/bash", "-c", - `python /etc/config/hf_llm_training.py \ + `torchrun /etc/config/hf_llm_training.py \ --model_uri /tmp/model/bloom-560m \ --model_dir /tmp/model/bloom-560m \ - --dataset_file /tmp/all_datasets/alpaca_data_hundredth.json \ + --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ --transformer_type AutoModelForCausalLM \ - --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch"}' \ + --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, }, Env: []corev1.EnvVar{ @@ -145,6 +205,10 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, Name: "TOKENIZERS_PARALLELISM", Value: "false", }, + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, }, VolumeMounts: []corev1.VolumeMount{ { @@ -162,14 +226,14 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)), + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), }, Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("8Gi"), - corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)), + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), }, }, SecurityContext: &corev1.SecurityContext{ @@ -207,6 +271,146 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, }, }, }, + kftov1.PyTorchJobReplicaTypeWorker: { + Replicas: Ptr(int32(numberOfWorkerNodes)), + RestartPolicy: "OnFailure", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "kfto-llm", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "kfto-llm", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: gpu.ResourceLabel, + Operator: corev1.TolerationOpExists, + }, + }, + InitContainers: []corev1.Container{ + { + Name: "copy-model", + Image: GetBloomModelImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"}, + }, + { + Name: "copy-dataset", + Image: GetAlpacaDatasetImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Command: []string{"/bin/sh", "-c"}, + Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"}, + }, + }, + Containers: []corev1.Container{ + { + Name: "pytorch", + Image: baseImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/bash", "-c", + `torchrun /etc/config/hf_llm_training.py \ + --model_uri /tmp/model/bloom-560m \ + --model_dir /tmp/model/bloom-560m \ + --dataset_file /tmp/all_datasets/alpaca_data_tenth.json \ + --transformer_type AutoModelForCausalLM \ + --training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \ + --lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`, + }, + Env: []corev1.EnvVar{ + { + Name: "HF_HOME", + Value: "/tmp/.cache", + }, + { + Name: "TRITON_CACHE_DIR", + Value: "/tmp/.triton", + }, + { + Name: "TOKENIZERS_PARALLELISM", + Value: "false", + }, + { + Name: "NCCL_DEBUG", + Value: "INFO", + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "config-volume", + MountPath: "/etc/config", + }, + { + Name: "tmp-volume", + MountPath: "/tmp", + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)), + }, + }, + SecurityContext: &corev1.SecurityContext{ + RunAsNonRoot: Ptr(true), + ReadOnlyRootFilesystem: Ptr(true), + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: config.Name, + }, + }, + }, + }, + { + Name: "tmp-volume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + }, }, }, } diff --git a/tests/kfto/resources/hf_llm_training.py b/tests/kfto/resources/hf_llm_training.py index 7a28137a..e37557d3 100644 --- a/tests/kfto/resources/hf_llm_training.py +++ b/tests/kfto/resources/hf_llm_training.py @@ -160,8 +160,9 @@ def train_model(model, transformer_type, train_data, eval_data, tokenizer, train mlm=False, ) - # Train the model. + # Train and save the model. trainer.train() + trainer.save_model() def parse_arguments(): diff --git a/tests/kfto/support.go b/tests/kfto/support.go index ac7ea5a2..a0091404 100644 --- a/tests/kfto/support.go +++ b/tests/kfto/support.go @@ -18,9 +18,21 @@ package kfto import ( "embed" + "time" . "github.com/onsi/gomega" . "github.com/project-codeflare/codeflare-common/support" + prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1" + prometheusmodel "github.com/prometheus/common/model" +) + +type Gpu struct { + ResourceLabel string +} + +var ( + Nvidia = Gpu{ResourceLabel: "nvidia.com/gpu"} + Amd = Gpu{ResourceLabel: "amd.com/gpu"} ) //go:embed resources/* @@ -32,3 +44,21 @@ func ReadFile(t Test, fileName string) []byte { t.Expect(err).NotTo(HaveOccurred()) return file } + +func OpenShiftPrometheusNvidiaGpuUtil(test Test, namespace string) func(g Gomega) prometheusmodel.Vector { + return func(g Gomega) prometheusmodel.Vector { + prometheusApiClient := GetOpenShiftPrometheusApiClient(test) + result, warnings, err := prometheusApiClient.Query(test.Ctx(), "DCGM_FI_DEV_GPU_UTIL", time.Now(), prometheusapiv1.WithTimeout(5*time.Second)) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(warnings).Should(HaveLen(0)) + + var util prometheusmodel.Vector + for _, sample := range result.(prometheusmodel.Vector) { + if string(sample.Metric["exported_namespace"]) == namespace { + util = append(util, sample) + } + } + + return util + } +}