Skip to main content
πŸŽ“ Claude Code Masterclass Learn AI-assisted development on Udemy β€” plus the companion book on Leanpub & Amazon. Start Learning
ML Pipelines on Kubernetes: Kubeflow vs Argo Workflows
AI

ML Pipelines on Kubernetes: Kubeflow vs Argo Workflows

Build reproducible ML training pipelines on Kubernetes. Compare Kubeflow Pipelines and Argo Workflows for data prep, training, and evaluation.

LB
Luca Berton
Β· 1 min read

ML Pipeline Requirements

A production ML pipeline needs:

  1. Reproducibility β€” same inputs = same outputs
  2. Versioning β€” track data, code, model versions
  3. Orchestration β€” DAG execution with dependencies
  4. Scalability β€” distribute training across GPUs
  5. Monitoring β€” track experiments, metrics, artifacts

Kubeflow Pipelines vs Argo Workflows

FeatureKubeflow PipelinesArgo Workflows
PurposeML-specific orchestrationGeneral workflow engine
UIExperiment tracking + pipeline vizWorkflow visualization
SDKPython (kfp)Python (hera) / YAML
ML featuresExperiments, metrics, artifactsBasic (via plugins)
CachingBuilt-in step cachingManual implementation
Model registryIntegratedExternal (MLflow)
ComplexityHigh (many components)Medium
Resource usageHeavy (MySQL, MinIO, etc.)Light

Kubeflow Pipelines

Installation

# Full Kubeflow (heavy)
kubectl apply -k "github.com/kubeflow/manifests//apps/pipeline/upstream/env/cert-manager/platform-agnostic-multi-user?ref=v1.9.0"

# Standalone pipelines only (lighter)
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-emissary?ref=2.2.0"

Define Pipeline (Python SDK)

from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset, Model, Metrics

@dsl.component(base_image="python:3.11", packages_to_install=["pandas", "scikit-learn"])
def prepare_data(
    raw_data: Input[Dataset],
    train_data: Output[Dataset],
    test_data: Output[Dataset],
    test_size: float = 0.2,
):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_parquet(raw_data.path)
    train, test = train_test_split(df, test_size=test_size, random_state=42)
    train.to_parquet(train_data.path)
    test.to_parquet(test_data.path)

@dsl.component(base_image="pytorch/pytorch:2.4.0-cuda12.1-cudnn9-runtime")
def train_model(
    train_data: Input[Dataset],
    model_output: Output[Model],
    metrics: Output[Metrics],
    epochs: int = 10,
    learning_rate: float = 1e-4,
):
    import torch
    # ... training code ...
    metrics.log_metric("accuracy", 0.95)
    metrics.log_metric("loss", 0.05)
    torch.save(model.state_dict(), model_output.path)

@dsl.component(base_image="python:3.11")
def evaluate_model(
    model: Input[Model],
    test_data: Input[Dataset],
    metrics: Output[Metrics],
    threshold: float = 0.9,
) -> bool:
    # ... evaluation code ...
    accuracy = evaluate(model, test_data)
    metrics.log_metric("test_accuracy", accuracy)
    return accuracy >= threshold

@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(epochs: int = 10, lr: float = 1e-4):
    data_task = prepare_data(raw_data=dsl.importer(uri="s3://data/raw.parquet"))

    train_task = train_model(
        train_data=data_task.outputs["train_data"],
        epochs=epochs,
        learning_rate=lr,
    ).set_gpu_limit(1).set_memory_limit("16Gi")

    eval_task = evaluate_model(
        model=train_task.outputs["model_output"],
        test_data=data_task.outputs["test_data"],
    )

    with dsl.Condition(eval_task.output == True):
        deploy_model(model=train_task.outputs["model_output"])

# Compile to YAML
compiler.Compiler().compile(training_pipeline, "pipeline.yaml")

Argo Workflows

Installation

kubectl create namespace argo
kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.6.0/quick-start-minimal.yaml

ML Pipeline with Hera (Python SDK)

from hera.workflows import Workflow, Steps, Container, Parameter, Artifact

with Workflow(
    generate_name="ml-pipeline-",
    namespace="ml-pipelines",
    entrypoint="ml-steps",
) as w:
    # Data preparation step
    prepare = Container(
        name="prepare-data",
        image="myregistry/data-prep:latest",
        command=["python", "prepare.py"],
        args=["--output", "/data/processed"],
        resources={"requests": {"memory": "8Gi", "cpu": "4"}},
        outputs=[Artifact(name="processed-data", path="/data/processed")],
    )

    # Training step (GPU)
    train = Container(
        name="train-model",
        image="myregistry/trainer:latest",
        command=["torchrun", "--nproc_per_node=4", "train.py"],
        args=[
            "--data", "/data/processed",
            "--output", "/models/checkpoint",
            "--epochs", "{{inputs.parameters.epochs}}",
        ],
        resources={
            "limits": {"nvidia.com/gpu": "4", "memory": "64Gi"},
        },
        inputs=[
            Artifact(name="processed-data", path="/data/processed"),
            Parameter(name="epochs"),
        ],
        outputs=[Artifact(name="model", path="/models/checkpoint")],
    )

    # Evaluation step
    evaluate = Container(
        name="evaluate",
        image="myregistry/evaluator:latest",
        command=["python", "evaluate.py"],
        inputs=[Artifact(name="model", path="/models/checkpoint")],
    )

    with Steps(name="ml-steps"):
        prepare(arguments={"parameters": []})
        train(
            arguments={
                "artifacts": [{"name": "processed-data", "from": "{{steps.prepare-data.outputs.artifacts.processed-data}}"}],
                "parameters": [{"name": "epochs", "value": "10"}],
            }
        )
        evaluate(
            arguments={
                "artifacts": [{"name": "model", "from": "{{steps.train-model.outputs.artifacts.model}}"}],
            }
        )

w.create()

Decision Framework

Choose Kubeflow Pipelines when:

  • βœ… Full ML platform needed (experiments, model registry, serving)
  • βœ… Team wants built-in experiment tracking
  • βœ… Need pipeline caching (skip unchanged steps)
  • βœ… Multi-user environment with access control
  • βœ… Heavy ML workloads are primary use case

Choose Argo Workflows when:

  • βœ… Already using Argo for CI/CD or other workflows
  • βœ… Want lightweight deployment (fewer dependencies)
  • βœ… ML is one of many workflow types
  • βœ… Prefer YAML-native configuration
  • βœ… Need DAG + steps + loops + conditions flexibility
  • βœ… Using MLflow separately for experiment tracking

Free 30-min AI & Cloud consultation

Book Now