Source code for sagemaker.hyperpod.inference.hp_endpoint_base

  1from typing import Union
  2import logging
  3import yaml
  4from types import SimpleNamespace
  5from kubernetes import client, config
  6from sagemaker.hyperpod.inference.config.constants import *
  7from sagemaker.hyperpod.inference.config.hp_jumpstart_endpoint_config import (
  8    _HPJumpStartEndpoint,
  9)
 10from sagemaker.hyperpod.inference.config.hp_endpoint_config import (
 11    _HPEndpoint,
 12)
 13from sagemaker.hyperpod.common.config.metadata import Metadata
 14from sagemaker.hyperpod.common.utils import (
 15    handle_exception,
 16    setup_logging,
 17    get_default_namespace,
 18    verify_kubernetes_version_compatibility,
 19)
 20from sagemaker.hyperpod.common.telemetry.telemetry_logging import (
 21    _hyperpod_telemetry_emitter,
 22)
 23from sagemaker.hyperpod.common.telemetry.constants import Feature
 24
 25
[docs] 26class HPEndpointBase: 27 """Base class for HyperPod inference endpoints. 28 29 This class provides common functionality for managing inference endpoints 30 on SageMaker HyperPod clusters orchestrated by Amazon EKS. It handles 31 Kubernetes API interactions for creating, listing, getting, and deleting 32 inference endpoints. 33 """ 34 is_kubeconfig_loaded = False 35 36 @classmethod 37 def get_logger(cls): 38 """Get logger instance for the class. 39 40 **Returns:** 41 42 logging.Logger: Logger instance for this module. 43 """ 44 return logging.getLogger(__name__) 45 46 @classmethod 47 def verify_kube_config(cls): 48 """Verify and load Kubernetes configuration. 49 50 Loads the Kubernetes configuration if not already loaded and verifies 51 Kubernetes version compatibility. 52 """ 53 if not cls.is_kubeconfig_loaded: 54 config.load_kube_config() 55 cls.is_kubeconfig_loaded = True 56 57 # Verify Kubernetes version compatibility 58 verify_kubernetes_version_compatibility(cls.get_logger()) 59
[docs] 60 @classmethod 61 def call_create_api( 62 cls, 63 metadata: Metadata, 64 kind: str, 65 spec: Union[_HPJumpStartEndpoint, _HPEndpoint], 66 debug: bool = False, 67 ): 68 """Create an inference endpoint using Kubernetes API. 69 70 **Parameters:** 71 72 .. list-table:: 73 :header-rows: 1 74 :widths: 20 20 60 75 76 * - Parameter 77 - Type 78 - Description 79 * - metadata 80 - Metadata 81 - Kubernetes metadata object containing name, namespace, labels, and annotations 82 * - kind 83 - str 84 - Kubernetes resource kind (e.g., 'HPJumpStartEndpoint') 85 * - spec 86 - Union[_HPJumpStartEndpoint, _HPEndpoint] 87 - Endpoint specification 88 89 **Raises:** 90 91 Exception: If endpoint creation fails 92 93 .. dropdown:: Usage Examples 94 :open: 95 96 .. code-block:: python 97 98 >>> from sagemaker.hyperpod.inference.config.hp_jumpstart_endpoint_config import _HPJumpStartEndpoint 99 >>> from sagemaker.hyperpod.common.config.metadata import Metadata 100 >>> spec = _HPJumpStartEndpoint(...) 101 >>> metadata = Metadata(name="my-endpoint", namespace="default") 102 >>> HPEndpointBase.call_create_api(metadata, "HPJumpStartEndpoint", spec) 103 """ 104 cls.verify_kube_config() 105 106 logger = cls.get_logger() 107 logger = setup_logging(logger, debug) 108 109 custom_api = client.CustomObjectsApi() 110 111 body = { 112 "apiVersion": INFERENCE_FULL_API_VERSION, 113 "kind": kind, 114 "metadata": metadata.model_dump(exclude_none=True), 115 "spec": spec.model_dump(exclude_none=True), 116 } 117 118 logger.debug("Creating endpoint with config:\n%s", yaml.dump(body)) 119 120 try: 121 custom_api.create_namespaced_custom_object( 122 group=INFERENCE_GROUP, 123 version=INFERENCE_API_VERSION, 124 namespace=metadata.namespace, 125 plural=KIND_PLURAL_MAP[kind], 126 body=body, 127 ) 128 except Exception as e: 129 logger.error(f"Failed to create endpoint in namespace {metadata.namespace}!") 130 handle_exception(e, metadata.name, metadata.namespace, debug=debug)
131
[docs] 132 @classmethod 133 def call_list_api( 134 cls, 135 kind: str, 136 namespace: str, 137 ): 138 """List inference endpoints using Kubernetes API. 139 140 **Parameters:** 141 142 .. list-table:: 143 :header-rows: 1 144 :widths: 20 20 60 145 146 * - Parameter 147 - Type 148 - Description 149 * - kind 150 - str 151 - Kubernetes resource kind to list 152 * - namespace 153 - str 154 - Kubernetes namespace to list endpoints from 155 156 **Returns:** 157 158 dict: List of endpoints in the specified namespace 159 160 **Raises:** 161 162 Exception: If listing endpoints fails 163 164 .. dropdown:: Usage Examples 165 :open: 166 167 .. code-block:: python 168 169 >>> endpoints = HPEndpointBase.call_list_api("HPJumpStartEndpoint", "default") 170 >>> print(f"Found {len(endpoints['items'])} endpoints") 171 """ 172 cls.verify_kube_config() 173 174 custom_api = client.CustomObjectsApi() 175 176 try: 177 return custom_api.list_namespaced_custom_object( 178 group=INFERENCE_GROUP, 179 version=INFERENCE_API_VERSION, 180 namespace=namespace, 181 plural=KIND_PLURAL_MAP[kind], 182 ) 183 except Exception as e: 184 handle_exception(e, "", namespace)
185
[docs] 186 @classmethod 187 def call_get_api( 188 cls, 189 name: str, 190 kind: str, 191 namespace: str, 192 ): 193 """Get a specific inference endpoint using Kubernetes API. 194 195 **Parameters:** 196 197 .. list-table:: 198 :header-rows: 1 199 :widths: 20 20 60 200 201 * - Parameter 202 - Type 203 - Description 204 * - name 205 - str 206 - Name of the endpoint to retrieve 207 * - kind 208 - str 209 - Kubernetes resource kind 210 * - namespace 211 - str 212 - Kubernetes namespace containing the endpoint 213 214 **Returns:** 215 216 dict: Endpoint details 217 218 **Raises:** 219 220 Exception: If retrieving endpoint fails 221 222 .. dropdown:: Usage Examples 223 :open: 224 225 .. code-block:: python 226 227 >>> endpoint = HPEndpointBase.call_get_api("my-endpoint", "HPJumpStartEndpoint", "default") 228 >>> print(endpoint['metadata']['name']) 229 """ 230 cls.verify_kube_config() 231 232 custom_api = client.CustomObjectsApi() 233 234 try: 235 return custom_api.get_namespaced_custom_object( 236 group=INFERENCE_GROUP, 237 version=INFERENCE_API_VERSION, 238 namespace=namespace, 239 plural=KIND_PLURAL_MAP[kind], 240 name=name, 241 ) 242 except Exception as e: 243 # Map kind to correct resource type 244 resource_type = 'hyp_jumpstart_endpoint' if kind == 'JumpStartModel' else 'hyp_custom_endpoint' 245 handle_exception(e, name, namespace, 246 operation_type='get', resource_type=resource_type)
247
[docs] 248 def call_delete_api( 249 self, 250 name: str, 251 kind: str, 252 namespace: str, 253 ): 254 """Delete an inference endpoint using Kubernetes API. 255 256 **Parameters:** 257 258 .. list-table:: 259 :header-rows: 1 260 :widths: 20 20 60 261 262 * - Parameter 263 - Type 264 - Description 265 * - name 266 - str 267 - Name of the endpoint to delete 268 * - kind 269 - str 270 - Kubernetes resource kind 271 * - namespace 272 - str 273 - Kubernetes namespace containing the endpoint 274 275 **Raises:** 276 277 Exception: If deleting endpoint fails 278 279 .. dropdown:: Usage Examples 280 :open: 281 282 .. code-block:: python 283 284 >>> base = HPEndpointBase() 285 >>> base.call_delete_api("my-endpoint", "HPJumpStartEndpoint", "default") 286 """ 287 self.verify_kube_config() 288 289 custom_api = client.CustomObjectsApi() 290 291 try: 292 custom_api.delete_namespaced_custom_object( 293 group=INFERENCE_GROUP, 294 version=INFERENCE_API_VERSION, 295 namespace=namespace, 296 plural=KIND_PLURAL_MAP[kind], 297 name=name, 298 ) 299 except Exception as e: 300 # Map kind to correct resource type 301 resource_type = 'hyp_jumpstart_endpoint' if kind == 'JumpStartModel' else 'hyp_custom_endpoint' 302 handle_exception(e, name, namespace, 303 operation_type='delete', resource_type=resource_type)
304
[docs] 305 @classmethod 306 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_operator_logs") 307 def get_operator_logs(cls, since_hours: float): 308 """Get logs from the inference operator. 309 310 Retrieves logs from the HyperPod inference operator pods for debugging 311 and monitoring purposes. 312 313 **Parameters:** 314 315 .. list-table:: 316 :header-rows: 1 317 :widths: 20 20 60 318 319 * - Parameter 320 - Type 321 - Description 322 * - since_hours 323 - float 324 - Number of hours back to retrieve logs from 325 326 **Returns:** 327 328 str: Operator logs with timestamps 329 330 **Raises:** 331 332 Exception: If no operator pods found or log retrieval fails 333 334 .. dropdown:: Usage Examples 335 :open: 336 337 .. code-block:: python 338 339 >>> logs = HPEndpointBase.get_operator_logs(1.0) 340 >>> print(logs) 341 >>> 342 >>> # Get logs from last 30 minutes 343 >>> logs = HPEndpointBase.get_operator_logs(0.5) 344 """ 345 cls.verify_kube_config() 346 347 v1 = client.CoreV1Api() 348 349 pods = v1.list_namespaced_pod(namespace=OPERATOR_NAMESPACE) 350 351 if not pods.items: 352 raise Exception( 353 "No pod found in namespace hyperpod-inference-operator-system" 354 ) 355 356 # Get logs from first pod 357 first_pod = pods.items[0] 358 pod_name = first_pod.metadata.name 359 360 try: 361 logs = v1.read_namespaced_pod_log( 362 name=pod_name, 363 namespace=OPERATOR_NAMESPACE, 364 timestamps=True, 365 since_seconds=int(3600 * since_hours), 366 ) 367 except Exception as e: 368 handle_exception(e, pod_name, OPERATOR_NAMESPACE) 369 370 return logs
371
[docs] 372 @classmethod 373 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_logs_endpoint") 374 def get_logs( 375 cls, 376 pod: str, 377 container: str = None, 378 namespace=None, 379 ): 380 """Get logs from a specific pod. 381 382 Retrieves logs from a pod associated with an inference endpoint. 383 384 **Parameters:** 385 386 .. list-table:: 387 :header-rows: 1 388 :widths: 20 20 60 389 390 * - Parameter 391 - Type 392 - Description 393 * - pod 394 - str 395 - Name of the pod to get logs from 396 * - container 397 - str, optional 398 - Container name. If not specified, uses the first container in the pod 399 * - namespace 400 - str, optional 401 - Kubernetes namespace. If not specified, uses the default namespace 402 403 **Returns:** 404 405 str: Pod logs with timestamps 406 407 **Raises:** 408 409 Exception: If log retrieval fails 410 411 .. dropdown:: Usage Examples 412 :open: 413 414 .. code-block:: python 415 416 >>> logs = HPEndpointBase.get_logs("my-pod-name") 417 >>> print(logs) 418 >>> 419 >>> # Get logs from specific container 420 >>> logs = HPEndpointBase.get_logs("my-pod", container="inference") 421 >>> 422 >>> # Get logs from specific namespace 423 >>> logs = HPEndpointBase.get_logs("my-pod", namespace="my-namespace") 424 """ 425 cls.verify_kube_config() 426 427 v1 = client.CoreV1Api() 428 429 if not namespace: 430 namespace = get_default_namespace() 431 432 pod_details = v1.read_namespaced_pod( 433 name=pod, 434 namespace=namespace, 435 ) 436 437 # if pod has multiple containers, get logs in the first container 438 if not container: 439 container = pod_details.spec.containers[0].name 440 441 try: 442 logs = v1.read_namespaced_pod_log( 443 name=pod, 444 namespace=namespace, 445 container=container, 446 timestamps=True, 447 ) 448 except Exception as e: 449 handle_exception(e, pod, namespace) 450 451 return logs
452
[docs] 453 @classmethod 454 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods_endpoint") 455 def list_pods(cls, namespace=None): 456 """List all pods in a namespace. 457 458 **Parameters:** 459 460 .. list-table:: 461 :header-rows: 1 462 :widths: 20 20 60 463 464 * - Parameter 465 - Type 466 - Description 467 * - namespace 468 - str, optional 469 - Kubernetes namespace to list pods from. If not specified, uses the default namespace 470 471 **Returns:** 472 473 List[str]: List of pod names in the namespace 474 475 .. dropdown:: Usage Examples 476 :open: 477 478 .. code-block:: python 479 480 >>> pods = HPEndpointBase.list_pods() 481 >>> print(f"Found {len(pods)} pods: {pods}") 482 >>> 483 >>> # List pods in specific namespace 484 >>> pods = HPEndpointBase.list_pods(namespace="my-namespace") 485 """ 486 cls.verify_kube_config() 487 488 if not namespace: 489 namespace = get_default_namespace() 490 491 v1 = client.CoreV1Api() 492 response = v1.list_namespaced_pod(namespace=namespace) 493 494 pods = [] 495 for item in response.items: 496 pods.append(item.metadata.name) 497 498 return pods
499
[docs] 500 @classmethod 501 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_namespaces") 502 def list_namespaces(cls): 503 """List all available Kubernetes namespaces. 504 505 **Returns:** 506 507 List[str]: List of namespace names 508 509 .. dropdown:: Usage Examples 510 :open: 511 512 .. code-block:: python 513 514 >>> namespaces = HPEndpointBase.list_namespaces() 515 >>> print(f"Available namespaces: {namespaces}") 516 """ 517 cls.verify_kube_config() 518 519 v1 = client.CoreV1Api() 520 response = v1.list_namespace() 521 522 namespaces = [] 523 for item in response.items: 524 namespaces.append(item.metadata.name) 525 526 return namespaces