ML Pipeline Requirements
A production ML pipeline needs:
- Reproducibility β same inputs = same outputs
- Versioning β track data, code, model versions
- Orchestration β DAG execution with dependencies
- Scalability β distribute training across GPUs
- Monitoring β track experiments, metrics, artifacts
Kubeflow Pipelines vs Argo Workflows
| Feature | Kubeflow Pipelines | Argo Workflows |
|---|---|---|
| Purpose | ML-specific orchestration | General workflow engine |
| UI | Experiment tracking + pipeline viz | Workflow visualization |
| SDK | Python (kfp) | Python (hera) / YAML |
| ML features | Experiments, metrics, artifacts | Basic (via plugins) |
| Caching | Built-in step caching | Manual implementation |
| Model registry | Integrated | External (MLflow) |
| Complexity | High (many components) | Medium |
| Resource usage | Heavy (MySQL, MinIO, etc.) | Light |
Kubeflow Pipelines
Installation
# Full Kubeflow (heavy)
kubectl apply -k "github.com/kubeflow/manifests//apps/pipeline/upstream/env/cert-manager/platform-agnostic-multi-user?ref=v1.9.0"
# Standalone pipelines only (lighter)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-emissary?ref=2.2.0"Define Pipeline (Python SDK)
from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(
raw_data: Input[Dataset],
train_data: Output[Dataset],
test_data: Output[Dataset],
test_size: float = 0.2,
):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_parquet(raw_data.path)
train, test = train_test_split(df, test_size=test_size, random_state=42)
train.to_parquet(train_data.path)
test.to_parquet(test_data.path)
@dsl.component(base_image="pytorch/pytorch:2.4.0-cuda12.1-cudnn9-runtime")
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
epochs: int = 10,
learning_rate: float = 1e-4,
):
import torch
# ... training code ...
metrics.log_metric("accuracy", 0.95)
metrics.log_metric("loss", 0.05)
torch.save(model.state_dict(), model_output.path)
@dsl.component(base_image="python:3.11")
def evaluate_model(
model: Input[Model],
test_data: Input[Dataset],
metrics: Output[Metrics],
threshold: float = 0.9,
) -> bool:
# ... evaluation code ...
accuracy = evaluate(model, test_data)
metrics.log_metric("test_accuracy", accuracy)
return accuracy >= threshold
@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(epochs: int = 10, lr: float = 1e-4):
data_task = prepare_data(raw_data=dsl.importer(uri="s3://data/raw.parquet"))
train_task = train_model(
train_data=data_task.outputs["train_data"],
epochs=epochs,
learning_rate=lr,
).set_gpu_limit(1).set_memory_limit("16Gi")
eval_task = evaluate_model(
model=train_task.outputs["model_output"],
test_data=data_task.outputs["test_data"],
)
with dsl.Condition(eval_task.output == True):
deploy_model(model=train_task.outputs["model_output"])
# Compile to YAML
compiler.Compiler().compile(training_pipeline, "pipeline.yaml")Argo Workflows
Installation
kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.6.0/quick-start-minimal.yamlML Pipeline with Hera (Python SDK)
from hera.workflows import Workflow, Steps, Container, Parameter, Artifact
with Workflow(
generate_name="ml-pipeline-",
namespace="ml-pipelines",
entrypoint="ml-steps",
) as w:
# Data preparation step
prepare = Container(
name="prepare-data",
image="myregistry/data-prep:latest",
command=["python", "prepare.py"],
args=["--output", "/data/processed"],
resources={"requests": {"memory": "8Gi", "cpu": "4"}},
outputs=[Artifact(name="processed-data", path="/data/processed")],
)
# Training step (GPU)
train = Container(
name="train-model",
image="myregistry/trainer:latest",
command=["torchrun", "--nproc_per_node=4", "train.py"],
args=[
"--data", "/data/processed",
"--output", "/models/checkpoint",
"--epochs", "{{inputs.parameters.epochs}}",
],
resources={
"limits": {"nvidia.com/gpu": "4", "memory": "64Gi"},
},
inputs=[
Artifact(name="processed-data", path="/data/processed"),
Parameter(name="epochs"),
],
outputs=[Artifact(name="model", path="/models/checkpoint")],
)
# Evaluation step
evaluate = Container(
name="evaluate",
image="myregistry/evaluator:latest",
command=["python", "evaluate.py"],
inputs=[Artifact(name="model", path="/models/checkpoint")],
)
with Steps(name="ml-steps"):
prepare(arguments={"parameters": []})
train(
arguments={
"artifacts": [{"name": "processed-data", "from": "{{steps.prepare-data.outputs.artifacts.processed-data}}"}],
"parameters": [{"name": "epochs", "value": "10"}],
}
)
evaluate(
arguments={
"artifacts": [{"name": "model", "from": "{{steps.train-model.outputs.artifacts.model}}"}],
}
)
w.create()Decision Framework
Choose Kubeflow Pipelines when:
- β Full ML platform needed (experiments, model registry, serving)
- β Team wants built-in experiment tracking
- β Need pipeline caching (skip unchanged steps)
- β Multi-user environment with access control
- β Heavy ML workloads are primary use case
Choose Argo Workflows when:
- β Already using Argo for CI/CD or other workflows
- β Want lightweight deployment (fewer dependencies)
- β ML is one of many workflow types
- β Prefer YAML-native configuration
- β Need DAG + steps + loops + conditions flexibility
- β Using MLflow separately for experiment tracking