Skip to content

feat(RHOAIENG-26487): Cluster lifecycling via RayJob #873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: ray-jobs-feature
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/codeflare_sdk/ray/rayjobs/pretty_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def print_job_status(job_info: RayJobInfo):

# Add timing information if available
if job_info.start_time:
table.add_row()
table.add_row(f"[bold]Started:[/bold] {job_info.start_time}")

# Add attempt counts if there are failures
Expand Down
168 changes: 133 additions & 35 deletions src/codeflare_sdk/ray/rayjobs/rayjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
from typing import Dict, Any, Optional, Tuple
from odh_kuberay_client.kuberay_job_api import RayjobApi

from ..cluster.cluster import Cluster
from ..cluster.config import ClusterConfiguration
from ..cluster.build_ray_cluster import build_ray_cluster

from .status import (
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
)
from . import pretty_print

# Set up logging

logger = logging.getLogger(__name__)


Expand All @@ -42,74 +46,110 @@ class RayJob:
def __init__(
self,
job_name: str,
cluster_name: str,
cluster_name: Optional[str] = None,
cluster_config: Optional[ClusterConfiguration] = None,
namespace: str = "default",
entrypoint: str = "None",
entrypoint: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
shutdown_after_job_finishes: bool = True,
ttl_seconds_after_finished: int = 0,
active_deadline_seconds: Optional[int] = None,
):
"""
Initialize a RayJob instance.

Args:
name: The name for the Ray job
namespace: The Kubernetes namespace to submit the job to (default: "default")
cluster_name: The name of the Ray cluster to submit the job to
**kwargs: Additional configuration options
job_name: The name for the Ray job
cluster_name: The name of an existing Ray cluster (optional if cluster_config provided)
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
namespace: The Kubernetes namespace (default: "default")
entrypoint: The Python script or command to run (required for submission)
runtime_env: Ray runtime environment configuration (optional)
shutdown_after_job_finishes: Whether to automatically cleanup the cluster after job completion (default: True)
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
"""
# Validate input parameters
if cluster_name is None and cluster_config is None:
raise ValueError("Either cluster_name or cluster_config must be provided")

if cluster_name is not None and cluster_config is not None:
raise ValueError("Cannot specify both cluster_name and cluster_config")

self.name = job_name
self.namespace = namespace
self.cluster_name = cluster_name
self.entrypoint = entrypoint
self.runtime_env = runtime_env
self.shutdown_after_job_finishes = shutdown_after_job_finishes
self.ttl_seconds_after_finished = ttl_seconds_after_finished
self.active_deadline_seconds = active_deadline_seconds

# Cluster configuration
self._cluster_name = cluster_name
self._cluster_config = cluster_config

# Determine cluster name for the job
if cluster_config is not None:
# Ensure cluster config has the same namespace as the job
if cluster_config.namespace is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow on note for myself later today: can we auto-detect namespace if running in RHOAI workbench like we do now for clusters?

cluster_config.namespace = namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to use ClusterConfiguration for creating the rayClusterSpec, we should either:

  • don't take a Namespace in the ClusterConfig as Kuberay will create the Cluster in the same namespace as the RayJob

or

  • remove the requirement for k8s_namespace for the new RayJob object.

Lets put namespace in just one place and let the RayCluster inherit it from the job. I will add this next week when updating this PR

elif cluster_config.namespace != namespace:
logger.warning(
f"Cluster config namespace ({cluster_config.namespace}) differs from job namespace ({namespace})"
)

self.cluster_name = cluster_config.name or f"{job_name}-cluster"
# Update the cluster config name if it wasn't set
if not cluster_config.name:
cluster_config.name = self.cluster_name
else:
self.cluster_name = cluster_name

# Initialize the KubeRay job API client
self._api = RayjobApi()

logger.info(f"Initialized RayJob: {self.name} in namespace: {self.namespace}")

def submit(
self,
) -> str:
def submit(self) -> str:
"""
Submit the Ray job to the Kubernetes cluster.

Args:
entrypoint: The Python script or command to run
runtime_env: Ray runtime environment configuration (optional)
The RayJob CRD will automatically:
- Create a new cluster if cluster_config was provided
- Use existing cluster if cluster_name was provided
- Clean up resources based on shutdown_after_job_finishes setting

Returns:
The job ID/name if submission was successful

Raises:
RuntimeError: If the job has already been submitted or submission fails
ValueError: If entrypoint is not provided
RuntimeError: If job submission fails
"""
# Validate required parameters
if not self.entrypoint:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: add validation for runtime_env

nit: add helper functions to validate shape etc of runtime_env and entrypoint

raise ValueError("entrypoint must be provided to submit a RayJob")

# Build the RayJob custom resource
rayjob_cr = self._build_rayjob_cr(
entrypoint=self.entrypoint,
runtime_env=self.runtime_env,
)
rayjob_cr = self._build_rayjob_cr()

# Submit the job
logger.info(
f"Submitting RayJob {self.name} to RayCluster {self.cluster_name} in namespace {self.namespace}"
)
# Submit the job - KubeRay operator handles everything else
logger.info(f"Submitting RayJob {self.name} to KubeRay operator")
result = self._api.submit_job(k8s_namespace=self.namespace, job=rayjob_cr)

if result:
logger.info(f"Successfully submitted RayJob {self.name}")
if self.shutdown_after_job_finishes:
logger.info(
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion"
)
return self.name
else:
raise RuntimeError(f"Failed to submit RayJob {self.name}")

def _build_rayjob_cr(
self,
entrypoint: str,
runtime_env: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
def _build_rayjob_cr(self) -> Dict[str, Any]:
"""
Build the RayJob custom resource specification.

This creates a minimal RayJob CR that can be extended later.
Build the RayJob custom resource specification using native RayJob capabilities.
"""
# Basic RayJob custom resource structure
rayjob_cr = {
Expand All @@ -120,17 +160,75 @@ def _build_rayjob_cr(
"namespace": self.namespace,
},
"spec": {
"entrypoint": entrypoint,
"clusterSelector": {"ray.io/cluster": self.cluster_name},
"entrypoint": self.entrypoint,
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
},
}

# Add active deadline if specified
if self.active_deadline_seconds:
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds

# Add runtime environment if specified
if runtime_env:
rayjob_cr["spec"]["runtimeEnvYAML"] = str(runtime_env)
if self.runtime_env:
rayjob_cr["spec"]["runtimeEnvYAML"] = str(self.runtime_env)

# Configure cluster: either use existing or create new
if self._cluster_config is not None:
# Use rayClusterSpec to create a new cluster - leverage existing build logic
ray_cluster_spec = self._build_ray_cluster_spec()
rayjob_cr["spec"]["rayClusterSpec"] = ray_cluster_spec
logger.info(f"RayJob will create new cluster: {self.cluster_name}")
else:
# Use clusterSelector to reference existing cluster
rayjob_cr["spec"]["clusterSelector"] = {"ray.io/cluster": self.cluster_name}
logger.info(f"RayJob will use existing cluster: {self.cluster_name}")

return rayjob_cr

def _build_ray_cluster_spec(self) -> Dict[str, Any]:
"""
Build the RayCluster spec from ClusterConfiguration using existing build_ray_cluster logic.

Returns:
Dict containing the RayCluster spec for embedding in RayJob
"""
if not self._cluster_config:
raise RuntimeError("No cluster configuration provided")

# Create a shallow copy of the cluster config to avoid modifying the original
import copy

temp_config = copy.copy(self._cluster_config)

# Ensure we get a RayCluster (not AppWrapper) and don't write to file
temp_config.appwrapper = False
temp_config.write_to_file = False

# Create a minimal Cluster object for the build process
from ..cluster.cluster import Cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move in-function imports to top of file? unsure of best python practice there


temp_cluster = Cluster.__new__(Cluster) # Create without calling __init__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird syntax here although it might just be me. I'll look into this

temp_cluster.config = temp_config

"""
For now, RayJob with a new/auto-created cluster will not work with Kueue.
This is due to the Kueue label not being propagated to the RayCluster.
"""

# Use the existing build_ray_cluster function to generate the RayCluster
ray_cluster_dict = build_ray_cluster(temp_cluster)

# Extract just the RayCluster spec - RayJob CRD doesn't support metadata in rayClusterSpec
# Note: CodeFlare Operator should still create dashboard routes for the RayCluster
ray_cluster_spec = ray_cluster_dict["spec"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propogate the metadata from the ray job here. Anything we need for the cluster, add to the rayjob


logger.info(
f"Built RayCluster spec using existing build logic for cluster: {self.cluster_name}"
)
return ray_cluster_spec

def status(
self, print_to_console: bool = True
) -> Tuple[CodeflareRayJobStatus, bool]:
Expand Down
3 changes: 3 additions & 0 deletions src/codeflare_sdk/ray/rayjobs/test_pretty_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def test_print_job_status_running_format(mocker):
call("[bold]Status:[/bold] Running"),
call("[bold]RayCluster:[/bold] test-cluster"),
call("[bold]Namespace:[/bold] test-ns"),
call(), # Empty row before timing info
call("[bold]Started:[/bold] 2025-07-28T11:37:07Z"),
]
mock_inner_table.add_row.assert_has_calls(expected_calls)
Expand Down Expand Up @@ -166,6 +167,7 @@ def test_print_job_status_complete_format(mocker):
call("[bold]Status:[/bold] Complete"),
call("[bold]RayCluster:[/bold] prod-cluster"),
call("[bold]Namespace:[/bold] prod-ns"),
call(), # Empty row before timing info
call("[bold]Started:[/bold] 2025-07-28T11:37:07Z"),
]
mock_inner_table.add_row.assert_has_calls(expected_calls)
Expand Down Expand Up @@ -215,6 +217,7 @@ def test_print_job_status_failed_with_attempts_format(mocker):
call("[bold]Status:[/bold] Failed"),
call("[bold]RayCluster:[/bold] test-cluster"),
call("[bold]Namespace:[/bold] test-ns"),
call(), # Empty row before timing info
call("[bold]Started:[/bold] 2025-07-28T11:37:07Z"),
call("[bold]Failed Attempts:[/bold] 3"), # Failed attempts should be shown
]
Expand Down
Loading
Loading