Source code for sagemaker.hyperpod.training.hyperpod_pytorch_job

  1from pydantic import ConfigDict, Field
  2
  3from sagemaker.hyperpod.cli.constants.command_constants import (
  4    INSTANCE_TYPE_LABEL,
  5    NEURON_RESOURCE_LIMIT_KEY,
  6    NVIDIA_GPU_RESOURCE_LIMIT_KEY,
  7    EFA_RESOURCE_LIMIT_KEY,
  8)
  9from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
 10    _HyperPodPytorchJob, HyperPodPytorchJobStatus
 11)
 12from sagemaker.hyperpod.common.config.metadata import Metadata
 13from kubernetes import client, config, stream
 14from typing import List, Optional, ClassVar
 15from sagemaker.hyperpod.common.utils import (
 16    handle_exception,
 17    get_default_namespace,
 18    setup_logging,
 19    verify_kubernetes_version_compatibility
 20)
 21from sagemaker.hyperpod.common.telemetry.telemetry_logging import (
 22    _hyperpod_telemetry_emitter,
 23)
 24from sagemaker.hyperpod.common.telemetry.constants import Feature
 25import yaml
 26import logging
 27
 28from sagemaker.hyperpod.training.quota_allocation_util import (
 29    _is_valid,
 30    _get_resources_from_compute_quotas,
 31    _get_resources_from_instance,
 32    _get_limits,
 33    _resolve_default_memory_values,
 34    _set_default_accelerators_val,
 35    _validate_accelerators_inputs,
 36    _validate_efa_inputs,
 37    _resolve_default_cpu_values,
 38    _trim_resource_requests,
 39)
 40from sagemaker.hyperpod.training.constants import INSTANCE_RESOURCES, INSTANCE_TYPE_MIG_PROFILES
 41from sagemaker.hyperpod.training.accelerator_partition_util import (
 42    _get_accelerator_partition,
 43    _set_default_accelerator_partition_val,
 44)
 45
 46TRAINING_GROUP = "sagemaker.amazonaws.com"
 47API_VERSION = "v1"
 48PLURAL = "hyperpodpytorchjobs"
 49KIND = "HyperPodPyTorchJob"
 50TRAINING_OPERATOR_NAMESPACE = "aws-hyperpod"
 51TRAINING_OPERATOR_LABEL = "hp-training-control-plane"
 52NVIDIA_RESOURCE_KEY = NVIDIA_GPU_RESOURCE_LIMIT_KEY
 53NEURON_RESOURCE_KEY = NEURON_RESOURCE_LIMIT_KEY
 54EFA_RESOURCE_KEY = EFA_RESOURCE_LIMIT_KEY
 55
[docs] 56class HyperPodPytorchJob(_HyperPodPytorchJob): 57 """HyperPod PyTorch job for distributed training on Amazon SageMaker HyperPod clusters. 58 59 This class provides methods to create, manage, and monitor PyTorch training jobs 60 on SageMaker HyperPod clusters orchestrated by Amazon EKS. 61 62 """ 63 is_kubeconfig_loaded: ClassVar[bool] = False 64 65 model_config = ConfigDict(extra="forbid") 66 67 metadata: Metadata = Field( 68 description="The metadata of the HyperPodPytorchJob", 69 ) 70 71 status: Optional[HyperPodPytorchJobStatus] = Field( 72 default=None, description="The status of the HyperPodPytorchJob" 73 ) 74 75 @classmethod 76 def get_logger(cls): 77 return logging.getLogger(__name__) 78 79 @classmethod 80 def verify_kube_config(cls): 81 if not cls.is_kubeconfig_loaded: 82 config.load_kube_config() 83 cls.is_kubeconfig_loaded = True 84 85 # Verify Kubernetes version compatibility 86 verify_kubernetes_version_compatibility(cls.get_logger()) 87 @classmethod 88 def _extract_numeric_value(cls, value): 89 """Extract numeric value from strings like '1.5Gi' -> 1.5""" 90 if not value: 91 return None 92 import re 93 match = re.match(r'^([0-9]*\.?[0-9]+)', str(value)) 94 return float(match.group(1)) if match else None 95 96 @classmethod 97 def _process_replica_resources(cls, data): 98 """Process and validate replica resource configuration.""" 99 try: 100 node_count = data.get('replicas', None) 101 102 # Extract nested configuration with validation 103 template = data.get('template', {}) 104 spec = template.get('spec', {}) 105 node_selector = spec.get('nodeSelector', {}) 106 instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None 107 108 containers = spec.get('containers', []) 109 110 if not containers: 111 raise ValueError("No containers found in template spec") 112 113 container = containers[0] 114 resources = container.get('resources', {}) 115 requests = resources.get('requests', {}) 116 limits = resources.get('limits', {}) 117 118 # Check if accelerators are specified without instance_type 119 accel_keys = ['accelerators', NVIDIA_RESOURCE_KEY, NEURON_RESOURCE_KEY] 120 has_accelerators = any(requests.get(k) and str(requests.get(k)) != "0" for k in accel_keys) 121 has_accelerators_limit = any(limits.get(k) and str(limits.get(k)) != "0" for k in accel_keys) 122 if not instance_type and (has_accelerators or has_accelerators_limit): 123 raise ValueError("instance_type is required when specifying accelerator resources") 124 125 if not instance_type: 126 return None 127 128 accelerators = None 129 if requests.get('accelerators'): 130 accelerators = int(requests.get('accelerators')) 131 elif requests.get(NVIDIA_RESOURCE_KEY): 132 accelerators = int(requests.get(NVIDIA_RESOURCE_KEY)) 133 elif requests.get(NEURON_RESOURCE_KEY): 134 accelerators = int(requests.get(NEURON_RESOURCE_KEY)) 135 136 # Extract resource values 137 vcpu = None 138 if requests.get('cpu'): 139 vcpu = float(requests.get('cpu')) 140 elif requests.get('vcpu'): 141 vcpu = float(requests.get('vcpu')) 142 143 vcpu_limit = None 144 if limits.get('cpu'): 145 vcpu_limit = float(limits.get('cpu')) 146 elif limits.get('vcpu'): 147 vcpu_limit = float(limits.get('vcpu')) 148 149 memory = cls._extract_numeric_value(requests.get('memory')) 150 memory_limit = cls._extract_numeric_value(limits.get('memory')) 151 152 accelerators_limit = None 153 if limits.get('accelerators'): 154 accelerators_limit = int(limits.get('accelerators')) 155 elif limits.get(NVIDIA_RESOURCE_KEY): 156 accelerators_limit = int(limits.get(NVIDIA_RESOURCE_KEY)) 157 elif limits.get(NEURON_RESOURCE_KEY): 158 accelerators_limit = int(limits.get(NEURON_RESOURCE_KEY)) 159 160 acc_req, acc_lim = _set_default_accelerators_val(instance_type, accelerators, accelerators_limit) 161 _validate_accelerators_inputs(instance_type, acc_req, acc_lim) 162 163 efa_interfaces = None 164 if requests.get(EFA_RESOURCE_KEY): 165 efa_interfaces = int(requests.get(EFA_RESOURCE_KEY)) 166 167 efa_interfaces_limit = None 168 if limits.get(EFA_RESOURCE_KEY): 169 efa_interfaces_limit = int(limits.get(EFA_RESOURCE_KEY)) 170 171 _validate_efa_inputs(instance_type, efa_interfaces, efa_interfaces_limit) 172 173 accelerator_partition_type, accelerator_partition_count, accelerator_partition_limit = ( 174 _get_accelerator_partition(requests, limits) 175 ) 176 177 # Validate configuration 178 valid, error = _is_valid(vcpu, memory, acc_req, acc_lim, node_count, instance_type, accelerator_partition_type, 179 accelerator_partition_count, accelerator_partition_limit) 180 if not valid: 181 raise ValueError(error) 182 183 acc_partition_req, acc_partition_lim = _set_default_accelerator_partition_val(accelerator_partition_count, accelerator_partition_limit) 184 185 requests_values = _get_resources_from_compute_quotas(instance_type, vcpu, memory, acc_req, accelerator_partition_type, acc_partition_req, efa_interfaces) 186 if requests_values is None: 187 requests_values = _get_resources_from_instance(instance_type, node_count=1) 188 _trim_resource_requests(instance_type, requests_values) 189 if NVIDIA_RESOURCE_KEY in requests_values: 190 acc_lim = requests_values[NVIDIA_RESOURCE_KEY] 191 elif NEURON_RESOURCE_KEY in requests_values: 192 acc_lim = requests_values[NEURON_RESOURCE_KEY] 193 194 efa_lim = requests_values.get(EFA_RESOURCE_KEY) 195 if efa_lim is not None: 196 efa_lim = int(efa_lim) 197 198 limits_values = _get_limits(instance_type, vcpu_limit, memory_limit, acc_lim, accelerator_partition_type, acc_partition_lim, efa_lim) 199 _resolve_default_memory_values(instance_type, requests_values, limits_values) 200 _resolve_default_cpu_values(instance_type, requests_values) 201 202 # Update data with calculated values 203 data['template']['spec']['containers'][0]['resources']['requests'] = requests_values 204 data['template']['spec']['containers'][0]['resources']['limits'] = limits_values 205 206 return data 207 except KeyError as e: 208 raise ValueError(f"Missing required configuration key: {str(e)}") 209 210 @classmethod 211 def _get_container_resources(cls, replica_spec): 212 """Extract container resources from replica spec.""" 213 container_resources = replica_spec['template']['spec']['containers'][0]['resources'] 214 return container_resources['requests'], container_resources['limits'] 215
[docs] 216 @classmethod 217 def allocate_quotas_if_applicable(cls, spec): 218 try: 219 spec_dict = spec.model_dump() 220 replica_spec = spec_dict['replicaSpecs'][0] 221 cls._process_replica_resources(replica_spec) 222 223 # Update the original spec object directly 224 requests, limits = cls._get_container_resources(replica_spec) 225 spec.replicaSpecs[0].template.spec.containers[0].resources.requests = requests 226 spec.replicaSpecs[0].template.spec.containers[0].resources.limits = limits 227 228 return spec 229 except ValueError as e: 230 raise ValueError(e) 231 except Exception as e: 232 # In case of any other exception, return original spec 233 return spec
234
[docs] 235 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "create_pytorchjob") 236 def create(self, debug=False): 237 """Create and submit the HyperPod PyTorch job to the Kubernetes cluster. 238 239 **Parameters:** 240 241 .. list-table:: 242 :header-rows: 1 243 :widths: 20 20 60 244 245 * - Parameter 246 - Type 247 - Description 248 * - debug 249 - bool, optional 250 - Enable debug logging. Defaults to False. 251 252 **Raises:** 253 254 Exception: If the job creation fails or Kubernetes API call fails 255 256 .. dropdown:: Usage Examples 257 :open: 258 259 .. code-block:: python 260 261 >>> job = HyperPodPytorchJob(metadata=Metadata(name="my-job"), ...) 262 >>> job.create() 263 >>> 264 >>> # Create with debug logging 265 >>> job.create(debug=True) 266 """ 267 self.verify_kube_config() 268 269 logger = self.get_logger() 270 logger = setup_logging(logger, debug) 271 272 spec = _HyperPodPytorchJob(**self.model_dump(by_alias=True, exclude_none=True)) 273 274 if not self.metadata.namespace: 275 self.metadata.namespace = get_default_namespace() 276 277 spec = self.allocate_quotas_if_applicable(spec) 278 if spec.replicaSpecs[0].replicas is None or spec.replicaSpecs[0].replicas == 0: 279 spec.replicaSpecs[0].replicas = 1 # default value 280 281 config = { 282 "apiVersion": f"{TRAINING_GROUP}/{API_VERSION}", 283 "kind": KIND, 284 "metadata": self.metadata.model_dump(exclude_none=True), 285 "spec": spec.model_dump(exclude_none=True), 286 } 287 288 custom_api = client.CustomObjectsApi() 289 logger.debug( 290 "Deploying HyperPodPytorchJob with config:\n%s", 291 yaml.dump(config), 292 ) 293 294 try: 295 custom_api.create_namespaced_custom_object( 296 group=TRAINING_GROUP, 297 version=API_VERSION, 298 namespace=self.metadata.namespace, 299 plural=PLURAL, 300 body=config, 301 ) 302 logger.info(f"Successfully submitted HyperPodPytorchJob '{self.metadata.name}'!") 303 except Exception as e: 304 logger.error(f"Failed to create HyperPodPytorchJob {self.metadata.name}!") 305 handle_exception(e, self.metadata.name, self.metadata.namespace, debug=debug)
306 307 308
[docs] 309 @classmethod 310 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pytorchjobs") 311 def list(cls, namespace=None) -> List["HyperPodPytorchJob"]: 312 """ 313 List all HyperPod PyTorch jobs in the specified namespace. 314 315 **Parameters:** 316 317 .. list-table:: 318 :header-rows: 1 319 :widths: 20 20 60 320 321 * - Parameter 322 - Type 323 - Description 324 * - namespace 325 - str, optional 326 - The Kubernetes namespace to list jobs from. If None, uses the default namespace from current context. 327 328 **Returns:** 329 330 List[HyperPodPytorchJob]: List of HyperPodPytorchJob instances found in the namespace 331 332 **Raises:** 333 334 Exception: If the Kubernetes API call fails or jobs cannot be retrieved 335 336 Notes 337 ----- 338 This method requires a valid kubeconfig to be available and will 339 automatically load it if not already loaded. 340 341 .. dropdown:: Usage Examples 342 :open: 343 344 .. code-block:: python 345 346 >>> jobs = HyperPodPytorchJob.list() 347 >>> print(f"Found {len(jobs)} jobs") 348 >>> 349 >>> # List jobs in specific namespace 350 >>> jobs = HyperPodPytorchJob.list(namespace="my-namespace") 351 """ 352 cls.verify_kube_config() 353 354 if namespace is None: 355 namespace = get_default_namespace() 356 357 logger = cls.get_logger() 358 logger = setup_logging(logger) 359 360 custom_api = client.CustomObjectsApi() 361 362 try: 363 hp_job_list = custom_api.list_namespaced_custom_object( 364 group=TRAINING_GROUP, 365 version=API_VERSION, 366 namespace=namespace, 367 plural=PLURAL, 368 ) 369 return _load_hp_job_list(hp_job_list) 370 except Exception as e: 371 logger.error(f"Failed to list HyperpodPytorchJobs!") 372 handle_exception(e, "", namespace)
373
[docs] 374 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "delete_pytorchjob") 375 def delete(self): 376 """Delete the HyperPod PyTorch job from the Kubernetes cluster. 377 378 **Raises:** 379 380 Exception: If the job deletion fails or Kubernetes API call fails 381 382 .. dropdown:: Usage Examples 383 :open: 384 385 .. code-block:: python 386 387 >>> job = HyperPodPytorchJob.get("my-job") 388 >>> job.delete() 389 """ 390 self.verify_kube_config() 391 392 logger = self.get_logger() 393 logger = setup_logging(logger) 394 395 custom_api = client.CustomObjectsApi() 396 397 try: 398 custom_api.delete_namespaced_custom_object( 399 group=TRAINING_GROUP, 400 version=API_VERSION, 401 namespace=self.metadata.namespace, 402 plural=PLURAL, 403 name=self.metadata.name, 404 ) 405 logger.info(f"Successful deleted HyperPodPytorchJob '{self.metadata.name}'!") 406 except Exception as e: 407 logger.error(f"Failed to delete HyperPodPytorchJob {self.metadata.name}!") 408 handle_exception(e, self.metadata.name, self.metadata.namespace, 409 operation_type='delete', resource_type='training_job') 410 411 # Clean up associated ConfigMap created during job submission 412 configmap_name = f"training-config-{self.metadata.name}" 413 try: 414 client.CoreV1Api().delete_namespaced_config_map( 415 name=configmap_name, 416 namespace=self.metadata.namespace, 417 ) 418 logger.info(f"Deleted ConfigMap '{configmap_name}'") 419 except Exception: 420 # ConfigMap may not exist (e.g. non-recipe jobs) — ignore 421 pass
422
[docs] 423 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "exec_pytorchjob") 424 def exec_command(self, command: List[str], pod: Optional[str] = None, 425 all_pods: bool = False, container: Optional[str] = None): 426 """Execute a command in one or all pods associated with this job.""" 427 428 self.verify_kube_config() 429 430 logger = self.get_logger() 431 logger = setup_logging(logger) 432 433 namespace = self.metadata.namespace 434 job_name = self.metadata.name 435 436 pods = self.list_pods() 437 if not pods: 438 logger.error(f"No pods found for training job {job_name} in namespace {namespace}") 439 raise RuntimeError(f"No pods found for training job {job_name} in namespace {namespace}") 440 441 if container is None: 442 container = self.replicaSpecs[0].template.spec.containers[0].name 443 444 try: 445 if all_pods: 446 output = "" 447 for pod_name in pods: 448 output += f"=== Pod: {pod_name} ===\n" 449 output += self._exec_command_on_pod(pod_name, command, container) 450 output += "\n" 451 logger.info(f"Successfully executed command on all pods for job {job_name}") 452 return output 453 else: 454 if pod not in pods: 455 logger.error(f"Pod {pod} not found in job {job_name}") 456 raise ValueError(f"Pod '{pod}' not found in job '{job_name}'") 457 458 result = self._exec_command_on_pod(pod, command, container) 459 logger.info(f"Successfully executed command on pod {pod}") 460 return result 461 462 except Exception as e: 463 logger.error(f"Failed to execute command on job {job_name}") 464 handle_exception(e, job_name, namespace)
465 466 def _exec_command_on_pod(self, pod: str, command: List[str], container: Optional[str] = None): 467 from kubernetes.client.exceptions import ApiException 468 try: 469 return stream.stream( 470 client.CoreV1Api().connect_get_namespaced_pod_exec, 471 stderr=True, 472 stdout=True, 473 name=pod, 474 namespace=self.metadata.namespace, 475 command=command, 476 container=container 477 ) 478 except ApiException as e: 479 if e.status == 400 and "does not have a host assigned" in str(e.body): 480 raise RuntimeError( 481 f"Cannot exec into pod '{pod}': pod is not running (no host assigned). " 482 f"The job may have already completed or the pod is still pending." 483 ) from e 484 raise 485 486
[docs] 487 @classmethod 488 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob") 489 def get(cls, name, namespace=None) -> "HyperPodPytorchJob": 490 """Get a specific HyperPod PyTorch job by name. 491 492 **Parameters:** 493 494 .. list-table:: 495 :header-rows: 1 496 :widths: 20 20 60 497 498 * - Parameter 499 - Type 500 - Description 501 * - name 502 - str 503 - The name of the HyperPod PyTorch job to retrieve 504 * - namespace 505 - str, optional 506 - The Kubernetes namespace to search in. If None, uses the default namespace from current context. 507 508 **Returns:** 509 510 HyperPodPytorchJob: The requested HyperPod PyTorch job instance 511 512 **Raises:** 513 514 Exception: If the job is not found or Kubernetes API call fails 515 516 .. dropdown:: Usage Examples 517 :open: 518 519 .. code-block:: python 520 521 >>> job = HyperPodPytorchJob.get("my-job") 522 >>> print(job.metadata.name) 523 >>> 524 >>> # Get job from specific namespace 525 >>> job = HyperPodPytorchJob.get("my-job", namespace="my-namespace") 526 """ 527 cls.verify_kube_config() 528 529 if namespace is None: 530 namespace = get_default_namespace() 531 532 logger = cls.get_logger() 533 logger = setup_logging(logger) 534 535 custom_api = client.CustomObjectsApi() 536 537 try: 538 response = custom_api.get_namespaced_custom_object( 539 group=TRAINING_GROUP, 540 version=API_VERSION, 541 namespace=namespace, 542 plural=PLURAL, 543 name=name, 544 _request_timeout=10, 545 ) 546 return _load_hp_job(response) 547 except AttributeError as e: 548 if "getheaders" in str(e): 549 raise Exception( 550 f"Resource '{name}' not found in namespace '{namespace}'. " 551 f"Please check the resource name and namespace." 552 ) from e 553 raise 554 except Exception as e: 555 handle_exception(e, name, namespace, 556 operation_type='get', resource_type='training_job')
557
[docs] 558 def refresh(self) -> "HyperPodPytorchJob": 559 """Refresh the job status by fetching the latest state from the Kubernetes cluster. 560 561 **Returns:** 562 563 HyperPodPytorchJob: The updated job instance with refreshed status 564 565 **Raises:** 566 567 Exception: If the refresh operation fails or Kubernetes API call fails 568 569 .. dropdown:: Usage Examples 570 :open: 571 572 .. code-block:: python 573 574 >>> job = HyperPodPytorchJob.get("my-job") 575 >>> updated_job = job.refresh() 576 >>> print(updated_job.status) 577 """ 578 self.verify_kube_config() 579 580 logger = self.get_logger() 581 logger = setup_logging(logger) 582 583 custom_api = client.CustomObjectsApi() 584 585 try: 586 response = custom_api.get_namespaced_custom_object( 587 group=TRAINING_GROUP, 588 version=API_VERSION, 589 namespace=self.metadata.namespace, 590 plural=PLURAL, 591 name=self.metadata.name, 592 ) 593 self.status = HyperPodPytorchJobStatus.model_validate( 594 response["status"], by_name=True 595 ) 596 except Exception as e: 597 logger.error(f"Failed to refresh HyperPodPytorchJob {self.metadata.name}!") 598 handle_exception(e, self.metadata.name, self.metadata.namespace)
599
[docs] 600 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods_pytorchjob") 601 def list_pods(self) -> List[str]: 602 """List all pods associated with this HyperPod PyTorch job. 603 604 **Returns:** 605 606 List[str]: List of pod names associated with this job 607 608 **Raises:** 609 610 Exception: If listing pods fails or Kubernetes API call fails 611 612 .. dropdown:: Usage Examples 613 :open: 614 615 .. code-block:: python 616 617 >>> job = HyperPodPytorchJob.get("my-job") 618 >>> pods = job.list_pods() 619 >>> print(f"Found {len(pods)} pods: {pods}") 620 """ 621 self.verify_kube_config() 622 623 logger = self.get_logger() 624 logger = setup_logging(logger) 625 626 try: 627 config.load_kube_config() 628 v1 = client.CoreV1Api() 629 630 response = v1.list_namespaced_pod( 631 self.metadata.namespace, 632 label_selector=f"HPJob={self.metadata.name}", 633 ) 634 pods = [pod.metadata.name for pod in response.items] 635 return pods 636 except Exception as e: 637 logger.error(f"Failed to list pod in namespace {self.metadata.namespace}!") 638 handle_exception(e, self.metadata.name, self.metadata.namespace)
639
[docs] 640 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_pytorchjob_logs_from_pod") 641 def get_logs_from_pod(self, pod_name: str, container: Optional[str] = None) -> str: 642 """Get logs from a specific pod associated with this HyperPod PyTorch job. 643 644 **Parameters:** 645 646 .. list-table:: 647 :header-rows: 1 648 :widths: 20 20 60 649 650 * - Parameter 651 - Type 652 - Description 653 * - pod_name 654 - str 655 - The name of the pod to get logs from 656 * - container 657 - str, optional 658 - The container name within the pod. If None, uses the first container. 659 660 **Returns:** 661 662 str: The log output from the specified pod and container 663 664 **Raises:** 665 666 Exception: If getting logs fails or Kubernetes API call fails 667 668 .. dropdown:: Usage Examples 669 :open: 670 671 .. code-block:: python 672 673 >>> job = HyperPodPytorchJob.get("my-job") 674 >>> pods = job.list_pods() 675 >>> logs = job.get_logs_from_pod(pods[0]) 676 >>> print(logs) 677 >>> 678 >>> # Get logs from specific container 679 >>> logs = job.get_logs_from_pod(pods[0], container="pytorch") 680 """ 681 self.verify_kube_config() 682 683 logger = self.get_logger() 684 logger = setup_logging(logger) 685 686 if container is None: 687 # If container name is not set, get logs from the first container in the pod 688 container = self.replicaSpecs[0].template.spec.containers[0].name 689 690 try: 691 config.load_kube_config() 692 v1 = client.CoreV1Api() 693 694 response = v1.read_namespaced_pod_log( 695 name=pod_name, 696 namespace=self.metadata.namespace, 697 timestamps=True, 698 container=container, 699 _preload_content=False, 700 ) 701 logs = response.data.decode("utf-8") 702 return logs 703 except Exception as e: 704 logger.error(f"Failed to get logs from pod {pod_name}!") 705 handle_exception(e, self.metadata.name, self.metadata.namespace)
706
[docs] 707 @classmethod 708 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_operator_logs_pytorchjob") 709 def get_operator_logs(cls, since_hours: float): 710 cls.verify_kube_config() 711 712 v1 = client.CoreV1Api() 713 714 # Get pods with the training operator label directly 715 pods = v1.list_namespaced_pod( 716 namespace=TRAINING_OPERATOR_NAMESPACE, 717 label_selector=TRAINING_OPERATOR_LABEL 718 ) 719 720 if not pods.items: 721 raise Exception( 722 f"No training operator pod found with label {TRAINING_OPERATOR_LABEL}" 723 ) 724 725 # Use the first pod found 726 operator_pod = pods.items[0] 727 pod_name = operator_pod.metadata.name 728 729 try: 730 logs = v1.read_namespaced_pod_log( 731 name=pod_name, 732 namespace=TRAINING_OPERATOR_NAMESPACE, 733 timestamps=True, 734 since_seconds=int(3600 * since_hours), 735 ) 736 except Exception as e: 737 handle_exception(e, pod_name, TRAINING_OPERATOR_NAMESPACE) 738 739 return logs
740 741 742def list_accelerator_partition_types(instance_type: str) -> List[str]: 743 """List available accelerator partition types for an instance type.""" 744 config.load_kube_config() 745 746 if instance_type not in INSTANCE_RESOURCES: 747 raise ValueError(f"Invalid instance type '{instance_type}'") 748 749 if instance_type not in INSTANCE_TYPE_MIG_PROFILES: 750 raise ValueError(f"Instance type '{instance_type}' does not support accelerator partitions") 751 752 try: 753 possible_partition_types = set(INSTANCE_TYPE_MIG_PROFILES[instance_type]) 754 available_partition_types = set() 755 756 v1 = client.CoreV1Api() 757 label_selector = f"node.kubernetes.io/instance-type={instance_type}" 758 nodes = v1.list_node(label_selector=label_selector).items 759 760 for node in nodes: 761 if not node.status or not node.status.allocatable: 762 continue 763 764 for partition_type in possible_partition_types: 765 if partition_type in available_partition_types: 766 continue 767 768 resource_key = f"nvidia.com/{partition_type}" 769 allocatable_partitions = node.status.allocatable.get(resource_key) 770 if allocatable_partitions and int(allocatable_partitions) > 0: 771 available_partition_types.add(partition_type) 772 773 return sorted(available_partition_types) 774 775 except Exception as e: 776 raise RuntimeError(f"Failed to query cluster for accelerator partitions: {e}") 777 778 779def _load_hp_job(response: dict) -> HyperPodPytorchJob: 780 781 spec = _HyperPodPytorchJob.model_validate(response["spec"], by_name=True) 782 metadata = Metadata(**response["metadata"]) 783 784 if "status" in response: 785 status = HyperPodPytorchJobStatus.model_validate( 786 response["status"], by_name=True 787 ) 788 789 else: 790 status = None 791 792 job = HyperPodPytorchJob( 793 metadata=metadata, 794 status=status, 795 **spec.model_dump(by_alias=True, exclude_none=True), 796 ) 797 return job 798 799 800def _load_hp_job_list(response: dict) -> List[HyperPodPytorchJob]: 801 job_list = [] 802 for hp_job in response["items"]: 803 job = _load_hp_job(hp_job) 804 job_list.append(job) 805 return job_list