-
Notifications
You must be signed in to change notification settings - Fork 55
feat(RHOAIENG-26487): Cluster lifecycling via RayJob #873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: ray-jobs-feature
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,18 @@ | |
from typing import Dict, Any, Optional, Tuple | ||
from odh_kuberay_client.kuberay_job_api import RayjobApi | ||
|
||
from ..cluster.cluster import Cluster | ||
from ..cluster.config import ClusterConfiguration | ||
from ..cluster.build_ray_cluster import build_ray_cluster | ||
|
||
from .status import ( | ||
RayJobDeploymentStatus, | ||
CodeflareRayJobStatus, | ||
RayJobInfo, | ||
) | ||
from . import pretty_print | ||
|
||
# Set up logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
|
@@ -42,74 +46,110 @@ class RayJob: | |
def __init__( | ||
self, | ||
job_name: str, | ||
cluster_name: str, | ||
cluster_name: Optional[str] = None, | ||
cluster_config: Optional[ClusterConfiguration] = None, | ||
namespace: str = "default", | ||
entrypoint: str = "None", | ||
entrypoint: Optional[str] = None, | ||
runtime_env: Optional[Dict[str, Any]] = None, | ||
shutdown_after_job_finishes: bool = True, | ||
ttl_seconds_after_finished: int = 0, | ||
active_deadline_seconds: Optional[int] = None, | ||
): | ||
""" | ||
Initialize a RayJob instance. | ||
|
||
Args: | ||
name: The name for the Ray job | ||
namespace: The Kubernetes namespace to submit the job to (default: "default") | ||
cluster_name: The name of the Ray cluster to submit the job to | ||
**kwargs: Additional configuration options | ||
job_name: The name for the Ray job | ||
cluster_name: The name of an existing Ray cluster (optional if cluster_config provided) | ||
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided) | ||
namespace: The Kubernetes namespace (default: "default") | ||
entrypoint: The Python script or command to run (required for submission) | ||
runtime_env: Ray runtime environment configuration (optional) | ||
shutdown_after_job_finishes: Whether to automatically cleanup the cluster after job completion (default: True) | ||
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0) | ||
active_deadline_seconds: Maximum time the job can run before being terminated (optional) | ||
""" | ||
# Validate input parameters | ||
if cluster_name is None and cluster_config is None: | ||
raise ValueError("Either cluster_name or cluster_config must be provided") | ||
|
||
if cluster_name is not None and cluster_config is not None: | ||
raise ValueError("Cannot specify both cluster_name and cluster_config") | ||
|
||
self.name = job_name | ||
self.namespace = namespace | ||
self.cluster_name = cluster_name | ||
self.entrypoint = entrypoint | ||
self.runtime_env = runtime_env | ||
self.shutdown_after_job_finishes = shutdown_after_job_finishes | ||
self.ttl_seconds_after_finished = ttl_seconds_after_finished | ||
self.active_deadline_seconds = active_deadline_seconds | ||
|
||
# Cluster configuration | ||
self._cluster_name = cluster_name | ||
self._cluster_config = cluster_config | ||
|
||
# Determine cluster name for the job | ||
if cluster_config is not None: | ||
# Ensure cluster config has the same namespace as the job | ||
if cluster_config.namespace is None: | ||
cluster_config.namespace = namespace | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are going to use ClusterConfiguration for creating the
or
Lets put namespace in just one place and let the RayCluster inherit it from the job. I will add this next week when updating this PR |
||
elif cluster_config.namespace != namespace: | ||
logger.warning( | ||
f"Cluster config namespace ({cluster_config.namespace}) differs from job namespace ({namespace})" | ||
) | ||
|
||
self.cluster_name = cluster_config.name or f"{job_name}-cluster" | ||
# Update the cluster config name if it wasn't set | ||
if not cluster_config.name: | ||
cluster_config.name = self.cluster_name | ||
else: | ||
self.cluster_name = cluster_name | ||
|
||
# Initialize the KubeRay job API client | ||
self._api = RayjobApi() | ||
|
||
logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}") | ||
|
||
def submit( | ||
self, | ||
) -> str: | ||
def submit(self) -> str: | ||
""" | ||
Submit the Ray job to the Kubernetes cluster. | ||
|
||
Args: | ||
entrypoint: The Python script or command to run | ||
runtime_env: Ray runtime environment configuration (optional) | ||
The RayJob CRD will automatically: | ||
- Create a new cluster if cluster_config was provided | ||
- Use existing cluster if cluster_name was provided | ||
- Clean up resources based on shutdown_after_job_finishes setting | ||
|
||
Returns: | ||
The job ID/name if submission was successful | ||
|
||
Raises: | ||
RuntimeError: If the job has already been submitted or submission fails | ||
ValueError: If entrypoint is not provided | ||
RuntimeError: If job submission fails | ||
""" | ||
# Validate required parameters | ||
if not self.entrypoint: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: add validation for runtime_env nit: add helper functions to validate shape etc of runtime_env and entrypoint |
||
raise ValueError("entrypoint must be provided to submit a RayJob") | ||
|
||
# Build the RayJob custom resource | ||
rayjob_cr = self._build_rayjob_cr( | ||
entrypoint=self.entrypoint, | ||
runtime_env=self.runtime_env, | ||
) | ||
rayjob_cr = self._build_rayjob_cr() | ||
|
||
# Submit the job | ||
logger.info( | ||
f"Submitting RayJob {self.name} to RayCluster {self.cluster_name} in namespace {self.namespace}" | ||
) | ||
# Submit the job - KubeRay operator handles everything else | ||
logger.info(f"Submitting RayJob {self.name} to KubeRay operator") | ||
result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr) | ||
|
||
if result: | ||
logger.info(f"Successfully submitted RayJob {self.name}") | ||
if self.shutdown_after_job_finishes: | ||
logger.info( | ||
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion" | ||
) | ||
return self.name | ||
else: | ||
raise RuntimeError(f"Failed to submit RayJob {self.name}") | ||
|
||
def _build_rayjob_cr( | ||
self, | ||
entrypoint: str, | ||
runtime_env: Optional[Dict[str, Any]] = None, | ||
) -> Dict[str, Any]: | ||
def _build_rayjob_cr(self) -> Dict[str, Any]: | ||
""" | ||
Build the RayJob custom resource specification. | ||
|
||
This creates a minimal RayJob CR that can be extended later. | ||
Build the RayJob custom resource specification using native RayJob capabilities. | ||
""" | ||
# Basic RayJob custom resource structure | ||
rayjob_cr = { | ||
|
@@ -120,17 +160,75 @@ def _build_rayjob_cr( | |
"namespace": self.namespace, | ||
}, | ||
"spec": { | ||
"entrypoint": entrypoint, | ||
"clusterSelector": {"ray.io/cluster": self.cluster_name}, | ||
"entrypoint": self.entrypoint, | ||
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes, | ||
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished, | ||
}, | ||
} | ||
|
||
# Add active deadline if specified | ||
if self.active_deadline_seconds: | ||
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds | ||
|
||
# Add runtime environment if specified | ||
if runtime_env: | ||
rayjob_cr["spec"]["runtimeEnvYAML"] = str(runtime_env) | ||
if self.runtime_env: | ||
rayjob_cr["spec"]["runtimeEnvYAML"] = str(self.runtime_env) | ||
|
||
# Configure cluster: either use existing or create new | ||
if self._cluster_config is not None: | ||
# Use rayClusterSpec to create a new cluster - leverage existing build logic | ||
ray_cluster_spec = self._build_ray_cluster_spec() | ||
rayjob_cr["spec"]["rayClusterSpec"] = ray_cluster_spec | ||
logger.info(f"RayJob will create new cluster: {self.cluster_name}") | ||
else: | ||
# Use clusterSelector to reference existing cluster | ||
rayjob_cr["spec"]["clusterSelector"] = {"ray.io/cluster": self.cluster_name} | ||
logger.info(f"RayJob will use existing cluster: {self.cluster_name}") | ||
|
||
return rayjob_cr | ||
|
||
def _build_ray_cluster_spec(self) -> Dict[str, Any]: | ||
""" | ||
Build the RayCluster spec from ClusterConfiguration using existing build_ray_cluster logic. | ||
|
||
Returns: | ||
Dict containing the RayCluster spec for embedding in RayJob | ||
""" | ||
if not self._cluster_config: | ||
raise RuntimeError("No cluster configuration provided") | ||
|
||
# Create a shallow copy of the cluster config to avoid modifying the original | ||
import copy | ||
|
||
temp_config = copy.copy(self._cluster_config) | ||
|
||
# Ensure we get a RayCluster (not AppWrapper) and don't write to file | ||
temp_config.appwrapper = False | ||
temp_config.write_to_file = False | ||
|
||
# Create a minimal Cluster object for the build process | ||
from ..cluster.cluster import Cluster | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we move in-function imports to top of file? unsure of best python practice there |
||
|
||
temp_cluster = Cluster.__new__(Cluster) # Create without calling __init__ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. weird syntax here although it might just be me. I'll look into this |
||
temp_cluster.config = temp_config | ||
|
||
""" | ||
For now, RayJob with a new/auto-created cluster will not work with Kueue. | ||
This is due to the Kueue label not being propagated to the RayCluster. | ||
""" | ||
|
||
# Use the existing build_ray_cluster function to generate the RayCluster | ||
ray_cluster_dict = build_ray_cluster(temp_cluster) | ||
|
||
# Extract just the RayCluster spec - RayJob CRD doesn't support metadata in rayClusterSpec | ||
# Note: CodeFlare Operator should still create dashboard routes for the RayCluster | ||
ray_cluster_spec = ray_cluster_dict["spec"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. propogate the metadata from the ray job here. Anything we need for the cluster, add to the rayjob |
||
|
||
logger.info( | ||
f"Built RayCluster spec using existing build logic for cluster: {self.cluster_name}" | ||
) | ||
return ray_cluster_spec | ||
|
||
def status( | ||
self, print_to_console: bool = True | ||
) -> Tuple[CodeflareRayJobStatus, bool]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow on note for myself later today: can we auto-detect namespace if running in RHOAI workbench like we do now for clusters?