1from typing import Union
2import logging
3import yaml
4from types import SimpleNamespace
5from kubernetes import client, config
6from sagemaker.hyperpod.inference.config.constants import *
7from sagemaker.hyperpod.inference.config.hp_jumpstart_endpoint_config import (
8 _HPJumpStartEndpoint,
9)
10from sagemaker.hyperpod.inference.config.hp_endpoint_config import (
11 _HPEndpoint,
12)
13from sagemaker.hyperpod.common.config.metadata import Metadata
14from sagemaker.hyperpod.common.utils import (
15 handle_exception,
16 setup_logging,
17 get_default_namespace,
18 verify_kubernetes_version_compatibility,
19)
20from sagemaker.hyperpod.common.telemetry.telemetry_logging import (
21 _hyperpod_telemetry_emitter,
22)
23from sagemaker.hyperpod.common.telemetry.constants import Feature
24
25
[docs]
26class HPEndpointBase:
27 """Base class for HyperPod inference endpoints.
28
29 This class provides common functionality for managing inference endpoints
30 on SageMaker HyperPod clusters orchestrated by Amazon EKS. It handles
31 Kubernetes API interactions for creating, listing, getting, and deleting
32 inference endpoints.
33 """
34 is_kubeconfig_loaded = False
35
36 @classmethod
37 def get_logger(cls):
38 """Get logger instance for the class.
39
40 **Returns:**
41
42 logging.Logger: Logger instance for this module.
43 """
44 return logging.getLogger(__name__)
45
46 @classmethod
47 def verify_kube_config(cls):
48 """Verify and load Kubernetes configuration.
49
50 Loads the Kubernetes configuration if not already loaded and verifies
51 Kubernetes version compatibility.
52 """
53 if not cls.is_kubeconfig_loaded:
54 config.load_kube_config()
55 cls.is_kubeconfig_loaded = True
56
57 # Verify Kubernetes version compatibility
58 verify_kubernetes_version_compatibility(cls.get_logger())
59
[docs]
60 @classmethod
61 def call_create_api(
62 cls,
63 metadata: Metadata,
64 kind: str,
65 spec: Union[_HPJumpStartEndpoint, _HPEndpoint],
66 debug: bool = False,
67 ):
68 """Create an inference endpoint using Kubernetes API.
69
70 **Parameters:**
71
72 .. list-table::
73 :header-rows: 1
74 :widths: 20 20 60
75
76 * - Parameter
77 - Type
78 - Description
79 * - metadata
80 - Metadata
81 - Kubernetes metadata object containing name, namespace, labels, and annotations
82 * - kind
83 - str
84 - Kubernetes resource kind (e.g., 'HPJumpStartEndpoint')
85 * - spec
86 - Union[_HPJumpStartEndpoint, _HPEndpoint]
87 - Endpoint specification
88
89 **Raises:**
90
91 Exception: If endpoint creation fails
92
93 .. dropdown:: Usage Examples
94 :open:
95
96 .. code-block:: python
97
98 >>> from sagemaker.hyperpod.inference.config.hp_jumpstart_endpoint_config import _HPJumpStartEndpoint
99 >>> from sagemaker.hyperpod.common.config.metadata import Metadata
100 >>> spec = _HPJumpStartEndpoint(...)
101 >>> metadata = Metadata(name="my-endpoint", namespace="default")
102 >>> HPEndpointBase.call_create_api(metadata, "HPJumpStartEndpoint", spec)
103 """
104 cls.verify_kube_config()
105
106 logger = cls.get_logger()
107 logger = setup_logging(logger, debug)
108
109 custom_api = client.CustomObjectsApi()
110
111 body = {
112 "apiVersion": INFERENCE_FULL_API_VERSION,
113 "kind": kind,
114 "metadata": metadata.model_dump(exclude_none=True),
115 "spec": spec.model_dump(exclude_none=True),
116 }
117
118 logger.debug("Creating endpoint with config:\n%s", yaml.dump(body))
119
120 try:
121 custom_api.create_namespaced_custom_object(
122 group=INFERENCE_GROUP,
123 version=INFERENCE_API_VERSION,
124 namespace=metadata.namespace,
125 plural=KIND_PLURAL_MAP[kind],
126 body=body,
127 )
128 except Exception as e:
129 logger.error(f"Failed to create endpoint in namespace {metadata.namespace}!")
130 handle_exception(e, metadata.name, metadata.namespace, debug=debug)
131
[docs]
132 @classmethod
133 def call_list_api(
134 cls,
135 kind: str,
136 namespace: str,
137 ):
138 """List inference endpoints using Kubernetes API.
139
140 **Parameters:**
141
142 .. list-table::
143 :header-rows: 1
144 :widths: 20 20 60
145
146 * - Parameter
147 - Type
148 - Description
149 * - kind
150 - str
151 - Kubernetes resource kind to list
152 * - namespace
153 - str
154 - Kubernetes namespace to list endpoints from
155
156 **Returns:**
157
158 dict: List of endpoints in the specified namespace
159
160 **Raises:**
161
162 Exception: If listing endpoints fails
163
164 .. dropdown:: Usage Examples
165 :open:
166
167 .. code-block:: python
168
169 >>> endpoints = HPEndpointBase.call_list_api("HPJumpStartEndpoint", "default")
170 >>> print(f"Found {len(endpoints['items'])} endpoints")
171 """
172 cls.verify_kube_config()
173
174 custom_api = client.CustomObjectsApi()
175
176 try:
177 return custom_api.list_namespaced_custom_object(
178 group=INFERENCE_GROUP,
179 version=INFERENCE_API_VERSION,
180 namespace=namespace,
181 plural=KIND_PLURAL_MAP[kind],
182 )
183 except Exception as e:
184 handle_exception(e, "", namespace)
185
[docs]
186 @classmethod
187 def call_get_api(
188 cls,
189 name: str,
190 kind: str,
191 namespace: str,
192 ):
193 """Get a specific inference endpoint using Kubernetes API.
194
195 **Parameters:**
196
197 .. list-table::
198 :header-rows: 1
199 :widths: 20 20 60
200
201 * - Parameter
202 - Type
203 - Description
204 * - name
205 - str
206 - Name of the endpoint to retrieve
207 * - kind
208 - str
209 - Kubernetes resource kind
210 * - namespace
211 - str
212 - Kubernetes namespace containing the endpoint
213
214 **Returns:**
215
216 dict: Endpoint details
217
218 **Raises:**
219
220 Exception: If retrieving endpoint fails
221
222 .. dropdown:: Usage Examples
223 :open:
224
225 .. code-block:: python
226
227 >>> endpoint = HPEndpointBase.call_get_api("my-endpoint", "HPJumpStartEndpoint", "default")
228 >>> print(endpoint['metadata']['name'])
229 """
230 cls.verify_kube_config()
231
232 custom_api = client.CustomObjectsApi()
233
234 try:
235 return custom_api.get_namespaced_custom_object(
236 group=INFERENCE_GROUP,
237 version=INFERENCE_API_VERSION,
238 namespace=namespace,
239 plural=KIND_PLURAL_MAP[kind],
240 name=name,
241 )
242 except Exception as e:
243 # Map kind to correct resource type
244 resource_type = 'hyp_jumpstart_endpoint' if kind == 'JumpStartModel' else 'hyp_custom_endpoint'
245 handle_exception(e, name, namespace,
246 operation_type='get', resource_type=resource_type)
247
[docs]
248 def call_delete_api(
249 self,
250 name: str,
251 kind: str,
252 namespace: str,
253 ):
254 """Delete an inference endpoint using Kubernetes API.
255
256 **Parameters:**
257
258 .. list-table::
259 :header-rows: 1
260 :widths: 20 20 60
261
262 * - Parameter
263 - Type
264 - Description
265 * - name
266 - str
267 - Name of the endpoint to delete
268 * - kind
269 - str
270 - Kubernetes resource kind
271 * - namespace
272 - str
273 - Kubernetes namespace containing the endpoint
274
275 **Raises:**
276
277 Exception: If deleting endpoint fails
278
279 .. dropdown:: Usage Examples
280 :open:
281
282 .. code-block:: python
283
284 >>> base = HPEndpointBase()
285 >>> base.call_delete_api("my-endpoint", "HPJumpStartEndpoint", "default")
286 """
287 self.verify_kube_config()
288
289 custom_api = client.CustomObjectsApi()
290
291 try:
292 custom_api.delete_namespaced_custom_object(
293 group=INFERENCE_GROUP,
294 version=INFERENCE_API_VERSION,
295 namespace=namespace,
296 plural=KIND_PLURAL_MAP[kind],
297 name=name,
298 )
299 except Exception as e:
300 # Map kind to correct resource type
301 resource_type = 'hyp_jumpstart_endpoint' if kind == 'JumpStartModel' else 'hyp_custom_endpoint'
302 handle_exception(e, name, namespace,
303 operation_type='delete', resource_type=resource_type)
304
[docs]
305 @classmethod
306 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_operator_logs")
307 def get_operator_logs(cls, since_hours: float):
308 """Get logs from the inference operator.
309
310 Retrieves logs from the HyperPod inference operator pods for debugging
311 and monitoring purposes.
312
313 **Parameters:**
314
315 .. list-table::
316 :header-rows: 1
317 :widths: 20 20 60
318
319 * - Parameter
320 - Type
321 - Description
322 * - since_hours
323 - float
324 - Number of hours back to retrieve logs from
325
326 **Returns:**
327
328 str: Operator logs with timestamps
329
330 **Raises:**
331
332 Exception: If no operator pods found or log retrieval fails
333
334 .. dropdown:: Usage Examples
335 :open:
336
337 .. code-block:: python
338
339 >>> logs = HPEndpointBase.get_operator_logs(1.0)
340 >>> print(logs)
341 >>>
342 >>> # Get logs from last 30 minutes
343 >>> logs = HPEndpointBase.get_operator_logs(0.5)
344 """
345 cls.verify_kube_config()
346
347 v1 = client.CoreV1Api()
348
349 pods = v1.list_namespaced_pod(namespace=OPERATOR_NAMESPACE)
350
351 if not pods.items:
352 raise Exception(
353 "No pod found in namespace hyperpod-inference-operator-system"
354 )
355
356 # Get logs from first pod
357 first_pod = pods.items[0]
358 pod_name = first_pod.metadata.name
359
360 try:
361 logs = v1.read_namespaced_pod_log(
362 name=pod_name,
363 namespace=OPERATOR_NAMESPACE,
364 timestamps=True,
365 since_seconds=int(3600 * since_hours),
366 )
367 except Exception as e:
368 handle_exception(e, pod_name, OPERATOR_NAMESPACE)
369
370 return logs
371
[docs]
372 @classmethod
373 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "get_logs_endpoint")
374 def get_logs(
375 cls,
376 pod: str,
377 container: str = None,
378 namespace=None,
379 ):
380 """Get logs from a specific pod.
381
382 Retrieves logs from a pod associated with an inference endpoint.
383
384 **Parameters:**
385
386 .. list-table::
387 :header-rows: 1
388 :widths: 20 20 60
389
390 * - Parameter
391 - Type
392 - Description
393 * - pod
394 - str
395 - Name of the pod to get logs from
396 * - container
397 - str, optional
398 - Container name. If not specified, uses the first container in the pod
399 * - namespace
400 - str, optional
401 - Kubernetes namespace. If not specified, uses the default namespace
402
403 **Returns:**
404
405 str: Pod logs with timestamps
406
407 **Raises:**
408
409 Exception: If log retrieval fails
410
411 .. dropdown:: Usage Examples
412 :open:
413
414 .. code-block:: python
415
416 >>> logs = HPEndpointBase.get_logs("my-pod-name")
417 >>> print(logs)
418 >>>
419 >>> # Get logs from specific container
420 >>> logs = HPEndpointBase.get_logs("my-pod", container="inference")
421 >>>
422 >>> # Get logs from specific namespace
423 >>> logs = HPEndpointBase.get_logs("my-pod", namespace="my-namespace")
424 """
425 cls.verify_kube_config()
426
427 v1 = client.CoreV1Api()
428
429 if not namespace:
430 namespace = get_default_namespace()
431
432 pod_details = v1.read_namespaced_pod(
433 name=pod,
434 namespace=namespace,
435 )
436
437 # if pod has multiple containers, get logs in the first container
438 if not container:
439 container = pod_details.spec.containers[0].name
440
441 try:
442 logs = v1.read_namespaced_pod_log(
443 name=pod,
444 namespace=namespace,
445 container=container,
446 timestamps=True,
447 )
448 except Exception as e:
449 handle_exception(e, pod, namespace)
450
451 return logs
452
[docs]
453 @classmethod
454 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_pods_endpoint")
455 def list_pods(cls, namespace=None):
456 """List all pods in a namespace.
457
458 **Parameters:**
459
460 .. list-table::
461 :header-rows: 1
462 :widths: 20 20 60
463
464 * - Parameter
465 - Type
466 - Description
467 * - namespace
468 - str, optional
469 - Kubernetes namespace to list pods from. If not specified, uses the default namespace
470
471 **Returns:**
472
473 List[str]: List of pod names in the namespace
474
475 .. dropdown:: Usage Examples
476 :open:
477
478 .. code-block:: python
479
480 >>> pods = HPEndpointBase.list_pods()
481 >>> print(f"Found {len(pods)} pods: {pods}")
482 >>>
483 >>> # List pods in specific namespace
484 >>> pods = HPEndpointBase.list_pods(namespace="my-namespace")
485 """
486 cls.verify_kube_config()
487
488 if not namespace:
489 namespace = get_default_namespace()
490
491 v1 = client.CoreV1Api()
492 response = v1.list_namespaced_pod(namespace=namespace)
493
494 pods = []
495 for item in response.items:
496 pods.append(item.metadata.name)
497
498 return pods
499
[docs]
500 @classmethod
501 @_hyperpod_telemetry_emitter(Feature.HYPERPOD, "list_namespaces")
502 def list_namespaces(cls):
503 """List all available Kubernetes namespaces.
504
505 **Returns:**
506
507 List[str]: List of namespace names
508
509 .. dropdown:: Usage Examples
510 :open:
511
512 .. code-block:: python
513
514 >>> namespaces = HPEndpointBase.list_namespaces()
515 >>> print(f"Available namespaces: {namespaces}")
516 """
517 cls.verify_kube_config()
518
519 v1 = client.CoreV1Api()
520 response = v1.list_namespace()
521
522 namespaces = []
523 for item in response.items:
524 namespaces.append(item.metadata.name)
525
526 return namespaces