1from pydantic import ConfigDict, Field
2
3from sagemaker.hyperpod.cli.constants.command_constants import (
4 INSTANCE_TYPE_LABEL,
5 NEURON_RESOURCE_LIMIT_KEY,
6 NVIDIA_GPU_RESOURCE_LIMIT_KEY,
7 EFA_RESOURCE_LIMIT_KEY,
8)
9from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
10 _HyperPodPytorchJob, HyperPodPytorchJobStatus
11)
12from sagemaker.hyperpod.common.config.metadata import Metadata
13from kubernetes import client, config, stream
14from typing import List, Optional, ClassVar
15from sagemaker.hyperpod.common.utils import (
16 handle_exception,
17 get_default_namespace,
18 setup_logging,
19 verify_kubernetes_version_compatibility
20)
21from sagemaker.hyperpod.common.telemetry.telemetry_logging import (
22 _hyperpod_telemetry_emitter,
23)
24from sagemaker.hyperpod.common.telemetry.constants import Feature
25import yaml
26import logging
27
28from sagemaker.hyperpod.training.quota_allocation_util import (
29 _is_valid,
30 _get_resources_from_compute_quotas,
31 _get_resources_from_instance,
32 _get_limits,
33 _resolve_default_memory_values,
34 _set_default_accelerators_val,
35 _validate_accelerators_inputs,
36 _validate_efa_inputs,
37 _resolve_default_cpu_values,
38 _trim_resource_requests,
39)
40from sagemaker.hyperpod.training.constants import INSTANCE_RESOURCES, INSTANCE_TYPE_MIG_PROFILES
41from sagemaker.hyperpod.training.accelerator_partition_util import (
42 _get_accelerator_partition,
43 _set_default_accelerator_partition_val,
44)
45
46TRAINING_GROUP = "sagemaker.amazonaws.com"
47API_VERSION = "v1"
48PLURAL = "hyperpodpytorchjobs"
49KIND = "HyperPodPyTorchJob"
50TRAINING_OPERATOR_NAMESPACE = "aws-hyperpod"
51TRAINING_OPERATOR_LABEL = "hp-training-control-plane"
52NVIDIA_RESOURCE_KEY = NVIDIA_GPU_RESOURCE_LIMIT_KEY
53NEURON_RESOURCE_KEY = NEURON_RESOURCE_LIMIT_KEY
54EFA_RESOURCE_KEY = EFA_RESOURCE_LIMIT_KEY
55
[docs]
56class HyperPodPytorchJob(_HyperPodPytorchJob):
57 """HyperPod PyTorch job for distributed training on Amazon SageMaker HyperPod clusters.
58
59 This class provides methods to create, manage, and monitor PyTorch training jobs
60 on SageMaker HyperPod clusters orchestrated by Amazon EKS.
61
62 """
63 is_kubeconfig_loaded: ClassVar[bool] = False
64
65 model_config = ConfigDict(extra="forbid")
66
67 metadata: Metadata = Field(
68 description="The metadata of the HyperPodPytorchJob",
69 )
70
71 status: Optional[HyperPodPytorchJobStatus] = Field(
72 default=None, description="The status of the HyperPodPytorchJob"
73 )
74
75 @classmethod
76 def get_logger(cls):
77 return logging.getLogger(__name__)
78
79 @classmethod
80 def verify_kube_config(cls):
81 if not cls.is_kubeconfig_loaded:
82 config.load_kube_config()
83 cls.is_kubeconfig_loaded = True
84
85 # Verify Kubernetes version compatibility
86 verify_kubernetes_version_compatibility(cls.get_logger())
87 @classmethod
88 def _extract_numeric_value(cls, value):
89 """Extract numeric value from strings like '1.5Gi' -> 1.5"""
90 if not value:
91 return None
92 import re
93 match = re.match(r'^([0-9]*\.?[0-9]+)', str(value))
94 return float(match.group(1)) if match else None
95
96 @classmethod
97 def _process_replica_resources(cls, data):
98 """Process and validate replica resource configuration."""
99 try:
100 node_count = data.get('replicas', None)
101
102 # Extract nested configuration with validation
103 template = data.get('template', {})
104 spec = template.get('spec', {})
105 node_selector = spec.get('nodeSelector', {})
106 instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None
107
108 containers = spec.get('containers', [])
109
110 if not containers:
111 raise ValueError("No containers found in template spec")
112
113 container = containers[0]
114 resources = container.get('resources', {})
115 requests = resources.get('requests', {})
116 limits = resources.get('limits', {})
117
118 # Check if accelerators are specified without instance_type
119 accel_keys = ['accelerators', NVIDIA_RESOURCE_KEY, NEURON_RESOURCE_KEY]
120 has_accelerators = any(requests.get(k) and str(requests.get(k)) != "0" for k in accel_keys)
121 has_accelerators_limit = any(limits.get(k) and str(limits.get(k)) != "0" for k in accel_keys)
122 if not instance_type and (has_accelerators or has_accelerators_limit):
123 raise ValueError("instance_type is required when specifying accelerator resources")
124
125 if not instance_type:
126 return None
127
128 accelerators = None
129 if requests.get('accelerators'):
130 accelerators = int(requests.get('accelerators'))
131 elif requests.get(NVIDIA_RESOURCE_KEY):
132 accelerators = int(requests.get(NVIDIA_RESOURCE_KEY))
133 elif requests.get(NEURON_RESOURCE_KEY):
134 accelerators = int(requests.get(NEURON_RESOURCE_KEY))
135
136 # Extract resource values
137 vcpu = None
138 if requests.get('cpu'):
139 vcpu = float(requests.get('cpu'))
140 elif requests.get('vcpu'):
141 vcpu = float(requests.get('vcpu'))
142
143 vcpu_limit = None
144 if limits.get('cpu'):
145 vcpu_limit = float(limits.get('cpu'))
146 elif limits.get('vcpu'):
147 vcpu_limit = float(limits.get('vcpu'))
148
149 memory = cls._extract_numeric_value(requests.get('memory'))
150 memory_limit = cls._extract_numeric_value(limits.get('memory'))
151
152 accelerators_limit = None
153 if limits.get('accelerators'):
154 accelerators_limit = int(limits.get('accelerators'))
155 elif limits.get(NVIDIA_RESOURCE_KEY):
156 accelerators_limit = int(limits.get(NVIDIA_RESOURCE_KEY))
157 elif limits.get(NEURON_RESOURCE_KEY):
158 accelerators_limit = int(limits.get(NEURON_RESOURCE_KEY))
159
160 acc_req, acc_lim = _set_default_accelerators_val(instance_type, accelerators, accelerators_limit)
161 _validate_accelerators_inputs(instance_type, acc_req, acc_lim)
162
163 efa_interfaces = None
164 if requests.get(EFA_RESOURCE_KEY):
165 efa_interfaces = int(requests.get(EFA_RESOURCE_KEY))
166
167 efa_interfaces_limit = None
168 if limits.get(EFA_RESOURCE_KEY):
169 efa_interfaces_limit = int(limits.get(EFA_RESOURCE_KEY))
170
171 _validate_efa_inputs(instance_type, efa_interfaces, efa_interfaces_limit)
172
173 accelerator_partition_type, accelerator_partition_count, accelerator_partition_limit = (
174 _get_accelerator_partition(requests, limits)
175 )
176
177 # Validate configuration
178 valid, error = _is_valid(vcpu, memory, acc_req, acc_lim, node_count, instance_type, accelerator_partition_type,
179 accelerator_partition_count, accelerator_partition_limit)
180 if not valid:
181 raise ValueError(error)
182
183 acc_partition_req, acc_partition_lim = _set_default_accelerator_partition_val(accelerator_partition_count, accelerator_partition_limit)
184
185 requests_values = _get_resources_from_compute_quotas(instance_type, vcpu, memory, acc_req, accelerator_partition_type, acc_partition_req, efa_interfaces)
186 if requests_values is None:
187 requests_values = _get_resources_from_instance(instance_type, node_count=1)
188 _trim_resource_requests(instance_type, requests_values)
189 if NVIDIA_RESOURCE_KEY in requests_values:
190 acc_lim = requests_values[NVIDIA_RESOURCE_KEY]
191 elif NEURON_RESOURCE_KEY in requests_values:
192 acc_lim = requests_values[NEURON_RESOURCE_KEY]
193
194 efa_lim = requests_values.get(EFA_RESOURCE_KEY)
195 if efa_lim is not None:
196 efa_lim = int(efa_lim)
197
198 limits_values = _get_limits(instance_type, vcpu_limit, memory_limit, acc_lim, accelerator_partition_type, acc_partition_lim, efa_lim)
199 _resolve_default_memory_values(instance_type, requests_values, limits_values)
200 _resolve_default_cpu_values(instance_type, requests_values)
201
202 # Update data with calculated values
203 data['template']['spec']['containers'][0]['resources']['requests'] = requests_values
204 data['template']['spec']['containers'][0]['resources']['limits'] = limits_values
205
206 return data
207 except KeyError as e:
208 raise ValueError(f"Missing required configuration key: {str(e)}")
209
210 @classmethod
211 def _get_container_resources(cls, replica_spec):
212 """Extract container resources from replica spec."""
213 container_resources = replica_spec['template']['spec']['containers'][0]['resources']
214 return container_resources['requests'], container_resources['limits']
215
[docs]
216 @classmethod
217 def allocate_quotas_if_applicable(cls, spec):
218 try:
219 spec_dict = spec.model_dump()
220 replica_spec = spec_dict['replicaSpecs'][0]
221 cls._process_replica_resources(replica_spec)
222
223 # Update the original spec object directly
224 requests, limits = cls._get_container_resources(replica_spec)
225 spec.replicaSpecs[0].template.spec.containers[0].resources.requests = requests
226 spec.replicaSpecs[0].template.spec.containers[0].resources.limits = limits
227
228 return spec
229 except ValueError as e:
230 raise ValueError(e)
231 except Exception as e:
232 # In case of any other exception, return original spec
233 return spec
234
[docs]
235 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob")
236 def create(self, debug=False):
237 """Create and submit the HyperPod PyTorch job to the Kubernetes cluster.
238
239 **Parameters:**
240
241 .. list-table::
242 :header-rows: 1
243 :widths: 20 20 60
244
245 * - Parameter
246 - Type
247 - Description
248 * - debug
249 - bool, optional
250 - Enable debug logging. Defaults to False.
251
252 **Raises:**
253
254 Exception: If the job creation fails or Kubernetes API call fails
255
256 .. dropdown:: Usage Examples
257 :open:
258
259 .. code-block:: python
260
261 >>> job = HyperPodPytorchJob(metadata=Metadata(name="my-job"), ...)
262 >>> job.create()
263 >>>
264 >>> # Create with debug logging
265 >>> job.create(debug=True)
266 """
267 self.verify_kube_config()
268
269 logger = self.get_logger()
270 logger = setup_logging(logger, debug)
271
272 spec = _HyperPodPytorchJob(**self.model_dump(by_alias=True, exclude_none=True))
273
274 if not self.metadata.namespace:
275 self.metadata.namespace = get_default_namespace()
276
277 spec = self.allocate_quotas_if_applicable(spec)
278 if spec.replicaSpecs[0].replicas is None or spec.replicaSpecs[0].replicas == 0:
279 spec.replicaSpecs[0].replicas = 1 # default value
280
281 config = {
282 "apiVersion": f"{TRAINING_GROUP}/{API_VERSION}",
283 "kind": KIND,
284 "metadata": self.metadata.model_dump(exclude_none=True),
285 "spec": spec.model_dump(exclude_none=True),
286 }
287
288 custom_api = client.CustomObjectsApi()
289 logger.debug(
290 "Deploying HyperPodPytorchJob with config:\n%s",
291 yaml.dump(config),
292 )
293
294 try:
295 custom_api.create_namespaced_custom_object(
296 group=TRAINING_GROUP,
297 version=API_VERSION,
298 namespace=self.metadata.namespace,
299 plural=PLURAL,
300 body=config,
301 )
302 logger.info(f"Successfully submitted HyperPodPytorchJob '{self.metadata.name}'!")
303 except Exception as e:
304 logger.error(f"Failed to create HyperPodPytorchJob {self.metadata.name}!")
305 handle_exception(e, self.metadata.name, self.metadata.namespace, debug=debug)
306
307
308
[docs]
309 @classmethod
310 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs")
311 def list(cls, namespace=None) -> List["HyperPodPytorchJob"]:
312 """
313 List all HyperPod PyTorch jobs in the specified namespace.
314
315 **Parameters:**
316
317 .. list-table::
318 :header-rows: 1
319 :widths: 20 20 60
320
321 * - Parameter
322 - Type
323 - Description
324 * - namespace
325 - str, optional
326 - The Kubernetes namespace to list jobs from. If None, uses the default namespace from current context.
327
328 **Returns:**
329
330 List[HyperPodPytorchJob]: List of HyperPodPytorchJob instances found in the namespace
331
332 **Raises:**
333
334 Exception: If the Kubernetes API call fails or jobs cannot be retrieved
335
336 Notes
337 -----
338 This method requires a valid kubeconfig to be available and will
339 automatically load it if not already loaded.
340
341 .. dropdown:: Usage Examples
342 :open:
343
344 .. code-block:: python
345
346 >>> jobs = HyperPodPytorchJob.list()
347 >>> print(f"Found {len(jobs)} jobs")
348 >>>
349 >>> # List jobs in specific namespace
350 >>> jobs = HyperPodPytorchJob.list(namespace="my-namespace")
351 """
352 cls.verify_kube_config()
353
354 if namespace is None:
355 namespace = get_default_namespace()
356
357 logger = cls.get_logger()
358 logger = setup_logging(logger)
359
360 custom_api = client.CustomObjectsApi()
361
362 try:
363 hp_job_list = custom_api.list_namespaced_custom_object(
364 group=TRAINING_GROUP,
365 version=API_VERSION,
366 namespace=namespace,
367 plural=PLURAL,
368 )
369 return _load_hp_job_list(hp_job_list)
370 except Exception as e:
371 logger.error(f"Failed to list HyperpodPytorchJobs!")
372 handle_exception(e, "", namespace)
373
[docs]
374 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "delete_pytorchjob")
375 def delete(self):
376 """Delete the HyperPod PyTorch job from the Kubernetes cluster.
377
378 **Raises:**
379
380 Exception: If the job deletion fails or Kubernetes API call fails
381
382 .. dropdown:: Usage Examples
383 :open:
384
385 .. code-block:: python
386
387 >>> job = HyperPodPytorchJob.get("my-job")
388 >>> job.delete()
389 """
390 self.verify_kube_config()
391
392 logger = self.get_logger()
393 logger = setup_logging(logger)
394
395 custom_api = client.CustomObjectsApi()
396
397 try:
398 custom_api.delete_namespaced_custom_object(
399 group=TRAINING_GROUP,
400 version=API_VERSION,
401 namespace=self.metadata.namespace,
402 plural=PLURAL,
403 name=self.metadata.name,
404 )
405 logger.info(f"Successful deleted HyperPodPytorchJob '{self.metadata.name}'!")
406 except Exception as e:
407 logger.error(f"Failed to delete HyperPodPytorchJob {self.metadata.name}!")
408 handle_exception(e, self.metadata.name, self.metadata.namespace,
409 operation_type='delete', resource_type='training_job')
410
411 # Clean up associated ConfigMap created during job submission
412 configmap_name = f"training-config-{self.metadata.name}"
413 try:
414 client.CoreV1Api().delete_namespaced_config_map(
415 name=configmap_name,
416 namespace=self.metadata.namespace,
417 )
418 logger.info(f"Deleted ConfigMap '{configmap_name}'")
419 except Exception:
420 # ConfigMap may not exist (e.g. non-recipe jobs) — ignore
421 pass
422
[docs]
423 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "exec_pytorchjob")
424 def exec_command(self, command: List[str], pod: Optional[str] = None,
425 all_pods: bool = False, container: Optional[str] = None):
426 """Execute a command in one or all pods associated with this job."""
427
428 self.verify_kube_config()
429
430 logger = self.get_logger()
431 logger = setup_logging(logger)
432
433 namespace = self.metadata.namespace
434 job_name = self.metadata.name
435
436 pods = self.list_pods()
437 if not pods:
438 logger.error(f"No pods found for training job {job_name} in namespace {namespace}")
439 raise RuntimeError(f"No pods found for training job {job_name} in namespace {namespace}")
440
441 if container is None:
442 container = self.replicaSpecs[0].template.spec.containers[0].name
443
444 try:
445 if all_pods:
446 output = ""
447 for pod_name in pods:
448 output += f"=== Pod: {pod_name} ===\n"
449 output += self._exec_command_on_pod(pod_name, command, container)
450 output += "\n"
451 logger.info(f"Successfully executed command on all pods for job {job_name}")
452 return output
453 else:
454 if pod not in pods:
455 logger.error(f"Pod {pod} not found in job {job_name}")
456 raise ValueError(f"Pod '{pod}' not found in job '{job_name}'")
457
458 result = self._exec_command_on_pod(pod, command, container)
459 logger.info(f"Successfully executed command on pod {pod}")
460 return result
461
462 except Exception as e:
463 logger.error(f"Failed to execute command on job {job_name}")
464 handle_exception(e, job_name, namespace)
465
466 def _exec_command_on_pod(self, pod: str, command: List[str], container: Optional[str] = None):
467 from kubernetes.client.exceptions import ApiException
468 try:
469 return stream.stream(
470 client.CoreV1Api().connect_get_namespaced_pod_exec,
471 stderr=True,
472 stdout=True,
473 name=pod,
474 namespace=self.metadata.namespace,
475 command=command,
476 container=container
477 )
478 except ApiException as e:
479 if e.status == 400 and "does not have a host assigned" in str(e.body):
480 raise RuntimeError(
481 f"Cannot exec into pod '{pod}': pod is not running (no host assigned). "
482 f"The job may have already completed or the pod is still pending."
483 ) from e
484 raise
485
486
[docs]
487 @classmethod
488 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob")
489 def get(cls, name, namespace=None) -> "HyperPodPytorchJob":
490 """Get a specific HyperPod PyTorch job by name.
491
492 **Parameters:**
493
494 .. list-table::
495 :header-rows: 1
496 :widths: 20 20 60
497
498 * - Parameter
499 - Type
500 - Description
501 * - name
502 - str
503 - The name of the HyperPod PyTorch job to retrieve
504 * - namespace
505 - str, optional
506 - The Kubernetes namespace to search in. If None, uses the default namespace from current context.
507
508 **Returns:**
509
510 HyperPodPytorchJob: The requested HyperPod PyTorch job instance
511
512 **Raises:**
513
514 Exception: If the job is not found or Kubernetes API call fails
515
516 .. dropdown:: Usage Examples
517 :open:
518
519 .. code-block:: python
520
521 >>> job = HyperPodPytorchJob.get("my-job")
522 >>> print(job.metadata.name)
523 >>>
524 >>> # Get job from specific namespace
525 >>> job = HyperPodPytorchJob.get("my-job", namespace="my-namespace")
526 """
527 cls.verify_kube_config()
528
529 if namespace is None:
530 namespace = get_default_namespace()
531
532 logger = cls.get_logger()
533 logger = setup_logging(logger)
534
535 custom_api = client.CustomObjectsApi()
536
537 try:
538 response = custom_api.get_namespaced_custom_object(
539 group=TRAINING_GROUP,
540 version=API_VERSION,
541 namespace=namespace,
542 plural=PLURAL,
543 name=name,
544 _request_timeout=10,
545 )
546 return _load_hp_job(response)
547 except AttributeError as e:
548 if "getheaders" in str(e):
549 raise Exception(
550 f"Resource '{name}' not found in namespace '{namespace}'. "
551 f"Please check the resource name and namespace."
552 ) from e
553 raise
554 except Exception as e:
555 handle_exception(e, name, namespace,
556 operation_type='get', resource_type='training_job')
557
[docs]
558 def refresh(self) -> "HyperPodPytorchJob":
559 """Refresh the job status by fetching the latest state from the Kubernetes cluster.
560
561 **Returns:**
562
563 HyperPodPytorchJob: The updated job instance with refreshed status
564
565 **Raises:**
566
567 Exception: If the refresh operation fails or Kubernetes API call fails
568
569 .. dropdown:: Usage Examples
570 :open:
571
572 .. code-block:: python
573
574 >>> job = HyperPodPytorchJob.get("my-job")
575 >>> updated_job = job.refresh()
576 >>> print(updated_job.status)
577 """
578 self.verify_kube_config()
579
580 logger = self.get_logger()
581 logger = setup_logging(logger)
582
583 custom_api = client.CustomObjectsApi()
584
585 try:
586 response = custom_api.get_namespaced_custom_object(
587 group=TRAINING_GROUP,
588 version=API_VERSION,
589 namespace=self.metadata.namespace,
590 plural=PLURAL,
591 name=self.metadata.name,
592 )
593 self.status = HyperPodPytorchJobStatus.model_validate(
594 response["status"], by_name=True
595 )
596 except Exception as e:
597 logger.error(f"Failed to refresh HyperPodPytorchJob {self.metadata.name}!")
598 handle_exception(e, self.metadata.name, self.metadata.namespace)
599
[docs]
600 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods_pytorchjob")
601 def list_pods(self) -> List[str]:
602 """List all pods associated with this HyperPod PyTorch job.
603
604 **Returns:**
605
606 List[str]: List of pod names associated with this job
607
608 **Raises:**
609
610 Exception: If listing pods fails or Kubernetes API call fails
611
612 .. dropdown:: Usage Examples
613 :open:
614
615 .. code-block:: python
616
617 >>> job = HyperPodPytorchJob.get("my-job")
618 >>> pods = job.list_pods()
619 >>> print(f"Found {len(pods)} pods: {pods}")
620 """
621 self.verify_kube_config()
622
623 logger = self.get_logger()
624 logger = setup_logging(logger)
625
626 try:
627 config.load_kube_config()
628 v1 = client.CoreV1Api()
629
630 response = v1.list_namespaced_pod(
631 self.metadata.namespace,
632 label_selector=f"HPJob={self.metadata.name}",
633 )
634 pods = [pod.metadata.name for pod in response.items]
635 return pods
636 except Exception as e:
637 logger.error(f"Failed to list pod in namespace {self.metadata.namespace}!")
638 handle_exception(e, self.metadata.name, self.metadata.namespace)
639
[docs]
640 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob_logs_from_pod")
641 def get_logs_from_pod(self, pod_name: str, container: Optional[str] = None) -> str:
642 """Get logs from a specific pod associated with this HyperPod PyTorch job.
643
644 **Parameters:**
645
646 .. list-table::
647 :header-rows: 1
648 :widths: 20 20 60
649
650 * - Parameter
651 - Type
652 - Description
653 * - pod_name
654 - str
655 - The name of the pod to get logs from
656 * - container
657 - str, optional
658 - The container name within the pod. If None, uses the first container.
659
660 **Returns:**
661
662 str: The log output from the specified pod and container
663
664 **Raises:**
665
666 Exception: If getting logs fails or Kubernetes API call fails
667
668 .. dropdown:: Usage Examples
669 :open:
670
671 .. code-block:: python
672
673 >>> job = HyperPodPytorchJob.get("my-job")
674 >>> pods = job.list_pods()
675 >>> logs = job.get_logs_from_pod(pods[0])
676 >>> print(logs)
677 >>>
678 >>> # Get logs from specific container
679 >>> logs = job.get_logs_from_pod(pods[0], container="pytorch")
680 """
681 self.verify_kube_config()
682
683 logger = self.get_logger()
684 logger = setup_logging(logger)
685
686 if container is None:
687 # If container name is not set, get logs from the first container in the pod
688 container = self.replicaSpecs[0].template.spec.containers[0].name
689
690 try:
691 config.load_kube_config()
692 v1 = client.CoreV1Api()
693
694 response = v1.read_namespaced_pod_log(
695 name=pod_name,
696 namespace=self.metadata.namespace,
697 timestamps=True,
698 container=container,
699 _preload_content=False,
700 )
701 logs = response.data.decode("utf-8")
702 return logs
703 except Exception as e:
704 logger.error(f"Failed to get logs from pod {pod_name}!")
705 handle_exception(e, self.metadata.name, self.metadata.namespace)
706
[docs]
707 @classmethod
708 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_operator_logs_pytorchjob")
709 def get_operator_logs(cls, since_hours: float):
710 cls.verify_kube_config()
711
712 v1 = client.CoreV1Api()
713
714 # Get pods with the training operator label directly
715 pods = v1.list_namespaced_pod(
716 namespace=TRAINING_OPERATOR_NAMESPACE,
717 label_selector=TRAINING_OPERATOR_LABEL
718 )
719
720 if not pods.items:
721 raise Exception(
722 f"No training operator pod found with label {TRAINING_OPERATOR_LABEL}"
723 )
724
725 # Use the first pod found
726 operator_pod = pods.items[0]
727 pod_name = operator_pod.metadata.name
728
729 try:
730 logs = v1.read_namespaced_pod_log(
731 name=pod_name,
732 namespace=TRAINING_OPERATOR_NAMESPACE,
733 timestamps=True,
734 since_seconds=int(3600 * since_hours),
735 )
736 except Exception as e:
737 handle_exception(e, pod_name, TRAINING_OPERATOR_NAMESPACE)
738
739 return logs
740
741
742def list_accelerator_partition_types(instance_type: str) -> List[str]:
743 """List available accelerator partition types for an instance type."""
744 config.load_kube_config()
745
746 if instance_type not in INSTANCE_RESOURCES:
747 raise ValueError(f"Invalid instance type '{instance_type}'")
748
749 if instance_type not in INSTANCE_TYPE_MIG_PROFILES:
750 raise ValueError(f"Instance type '{instance_type}' does not support accelerator partitions")
751
752 try:
753 possible_partition_types = set(INSTANCE_TYPE_MIG_PROFILES[instance_type])
754 available_partition_types = set()
755
756 v1 = client.CoreV1Api()
757 label_selector = f"node.kubernetes.io/instance-type={instance_type}"
758 nodes = v1.list_node(label_selector=label_selector).items
759
760 for node in nodes:
761 if not node.status or not node.status.allocatable:
762 continue
763
764 for partition_type in possible_partition_types:
765 if partition_type in available_partition_types:
766 continue
767
768 resource_key = f"nvidia.com/{partition_type}"
769 allocatable_partitions = node.status.allocatable.get(resource_key)
770 if allocatable_partitions and int(allocatable_partitions) > 0:
771 available_partition_types.add(partition_type)
772
773 return sorted(available_partition_types)
774
775 except Exception as e:
776 raise RuntimeError(f"Failed to query cluster for accelerator partitions: {e}")
777
778
779def _load_hp_job(response: dict) -> HyperPodPytorchJob:
780
781 spec = _HyperPodPytorchJob.model_validate(response["spec"], by_name=True)
782 metadata = Metadata(**response["metadata"])
783
784 if "status" in response:
785 status = HyperPodPytorchJobStatus.model_validate(
786 response["status"], by_name=True
787 )
788
789 else:
790 status = None
791
792 job = HyperPodPytorchJob(
793 metadata=metadata,
794 status=status,
795 **spec.model_dump(by_alias=True, exclude_none=True),
796 )
797 return job
798
799
800def _load_hp_job_list(response: dict) -> List[HyperPodPytorchJob]:
801 job_list = []
802 for hp_job in response["items"]:
803 job = _load_hp_job(hp_job)
804 job_list.append(job)
805 return job_list