MLOps Pipelines: Automating the ML Lifecycle
·6 min read

MLOps Pipelines: Automating the ML Lifecycle

Build end-to-end ML pipelines with automated training and deployment—but complexity breeds hidden failures

By Alex Thompson, MLOps ArchitectMLOpsML pipelineKubeflow

MLOps Pipelines: Automating the ML Lifecycle

MLOps brings DevOps practices to machine learning. This guide implements production ML pipelines with Kubeflow and MLflow.

MLflow Experiment Tracking

Track experiments with automatic logging:

import mlflow
import mlflow.pytorch
from sklearn.metrics import accuracy_score, f1_score
import torch
import torch.nn as nn

class MLflowTrainer:
    """Training with comprehensive experiment tracking"""
    def __init__(self, experiment_name="my_experiment"):
        mlflow.set_experiment(experiment_name)
        self.run = None
    
    def train(self, model, train_loader, val_loader, config):
        """Train with automatic logging"""
        with mlflow.start_run() as run:
            self.run = run
            
            # Log hyperparameters
            mlflow.log_params({
                'learning_rate': config['lr'],
                'batch_size': config['batch_size'],
                'epochs': config['epochs'],
                'optimizer': config['optimizer'],
                'model_architecture': model.__class__.__name__
            })
            
            # Log model architecture
            mlflow.set_tag("model_class", str(type(model)))
            
            # Training loop
            optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
            criterion = nn.CrossEntropyLoss()
            
            for epoch in range(config['epochs']):
                # Training
                model.train()
                train_loss = 0
                for batch_idx, (data, target) in enumerate(train_loader):
                    optimizer.zero_grad()
                    output = model(data)
                    loss = criterion(output, target)
                    loss.backward()
                    optimizer.step()
                    
                    train_loss += loss.item()
                
                # Validation
                model.eval()
                val_loss = 0
                all_preds = []
                all_targets = []
                
                with torch.no_grad():
                    for data, target in val_loader:
                        output = model(data)
                        val_loss += criterion(output, target).item()
                        
                        preds = output.argmax(dim=1)
                        all_preds.extend(preds.cpu().numpy())
                        all_targets.extend(target.cpu().numpy())
                
                # Calculate metrics
                accuracy = accuracy_score(all_targets, all_preds)
                f1 = f1_score(all_targets, all_preds, average='weighted')
                
                # Log metrics
                mlflow.log_metrics({
                    'train_loss': train_loss / len(train_loader),
                    'val_loss': val_loss / len(val_loader),
                    'val_accuracy': accuracy,
                    'val_f1': f1
                }, step=epoch)
                
                print(f"Epoch {epoch}: val_acc={accuracy:.4f}, val_f1={f1:.4f}")
                
                # ⚠️ Detect training anomalies
                if epoch > 5 and accuracy < 0.1:
                    mlflow.set_tag("status", "failed_to_learn")
                    print("⚠️ Model not learning, aborting")
                    break
            
            # Log final model
            mlflow.pytorch.log_model(model, "model")
            
            # Log model size
            import os
            model_path = "temp_model.pt"
            torch.save(model.state_dict(), model_path)
            model_size_mb = os.path.getsize(model_path) / 1e6
            mlflow.log_metric("model_size_mb", model_size_mb)
            os.remove(model_path)
            
            # Register model if performance threshold met
            if accuracy > 0.9:
                model_uri = f"runs:/{run.info.run_id}/model"
                mlflow.register_model(model_uri, "ProductionModel")
                print("✅ Model registered for production")
            else:
                print(f"⚠️ Model accuracy {accuracy:.2%} below threshold, not registered")
            
            return model

# Usage
trainer = MLflowTrainer("image_classification")
config = {
    'lr': 0.001,
    'batch_size': 64,
    'epochs': 50,
    'optimizer': 'Adam'
}
trained_model = trainer.train(model, train_loader, val_loader, config)

Kubeflow Pipeline

Orchestrate end-to-end ML workflow:

import kfp
from kfp import dsl
from kfp.components import func_to_container_op

@func_to_container_op
def preprocess_data(input_path: str, output_path: str) -> str:
    """Data preprocessing component"""
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import pickle
    
    # Load raw data
    df = pd.read_csv(input_path)
    
    # Clean data
    df = df.dropna()
    df = df[df['value'] > 0]  # Remove invalid values
    
    # Feature engineering
    df['feature_ratio'] = df['feature_a'] / (df['feature_b'] + 1e-8)
    df['feature_log'] = np.log1p(df['feature_c'])
    
    # Scale features
    scaler = StandardScaler()
    feature_cols = [c for c in df.columns if c != 'target']
    df[feature_cols] = scaler.fit_transform(df[feature_cols])
    
    # Save processed data and scaler
    df.to_csv(output_path, index=False)
    with open(f"{output_path}.scaler.pkl", 'wb') as f:
        pickle.dump(scaler, f)
    
    print(f"Processed {len(df)} samples")
    return output_path

@func_to_container_op
def train_model(data_path: str, model_output_path: str, hyperparams: dict) -> str:
    """Training component"""
    import pandas as pd
    import torch
    import torch.nn as nn
    from sklearn.model_selection import train_test_split
    
    # Load data
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1).values
    y = df['target'].values
    
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
    
    # Define model
    class SimpleNN(nn.Module):
        def __init__(self, input_dim, hidden_dim, output_dim):
            super().__init__()
            self.fc1 = nn.Linear(input_dim, hidden_dim)
            self.fc2 = nn.Linear(hidden_dim, output_dim)
        
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            return self.fc2(x)
    
    model = SimpleNN(X_train.shape[1], hyperparams['hidden_dim'], 10)
    
    # Train
    optimizer = torch.optim.Adam(model.parameters(), lr=hyperparams['lr'])
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(hyperparams['epochs']):
        # Training code...
        pass
    
    # Save model
    torch.save(model.state_dict(), model_output_path)
    
    return model_output_path

@func_to_container_op
def evaluate_model(model_path: str, test_data_path: str) -> dict:
    """Evaluation component"""
    import torch
    import pandas as pd
    from sklearn.metrics import accuracy_score, classification_report
    
    # Load model and data
    # ... evaluation code
    
    metrics = {
        'accuracy': 0.95,
        'f1_score': 0.93
    }
    
    return metrics

@func_to_container_op
def deploy_model(model_path: str, metrics: dict, threshold: float = 0.9):
    """Conditional deployment based on metrics"""
    if metrics['accuracy'] >= threshold:
        # Deploy to production
        print(f"✅ Deploying model (accuracy: {metrics['accuracy']})")
        # kubectl apply -f deployment.yaml
        # or TorchServe API call
    else:
        print(f"⚠️ Model accuracy {metrics['accuracy']} below threshold {threshold}")
        raise Exception("Model performance insufficient for deployment")

@dsl.pipeline(
    name='End-to-End ML Pipeline',
    description='Automated training and deployment pipeline'
)
def ml_pipeline(
    input_data_path: str = 's3://my-bucket/raw-data.csv',
    model_registry: str = 's3://my-bucket/models/',
    accuracy_threshold: float = 0.9
):
    """Complete ML pipeline"""
    # Step 1: Preprocess
    preprocess_task = preprocess_data(
        input_path=input_data_path,
        output_path='/tmp/processed_data.csv'
    )
    
    # Step 2: Train
    hyperparams = {'lr': 0.001, 'hidden_dim': 128, 'epochs': 50}
    train_task = train_model(
        data_path=preprocess_task.output,
        model_output_path='/tmp/model.pt',
        hyperparams=hyperparams
    )
    train_task.after(preprocess_task)
    
    # Step 3: Evaluate
    eval_task = evaluate_model(
        model_path=train_task.output,
        test_data_path='/tmp/test_data.csv'
    )
    eval_task.after(train_task)
    
    # Step 4: Deploy (conditional)
    deploy_task = deploy_model(
        model_path=train_task.output,
        metrics=eval_task.output,
        threshold=accuracy_threshold
    )
    deploy_task.after(eval_task)

# Compile and run pipeline
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
    
    client = kfp.Client(host='http://kubeflow.example.com')
    run = client.create_run_from_pipeline_func(
        ml_pipeline,
        arguments={
            'input_data_path': 's3://data/train.csv',
            'accuracy_threshold': 0.92
        }
    )

Data Versioning with DVC

Version control for datasets:

# Initialize DVC
dvc init

# Track dataset
dvc add data/train.csv
git add data/train.csv.dvc data/.gitignore
git commit -m "Add training data v1"

# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push

# Create data pipeline
dvc run -n preprocess \
  -d data/raw.csv \
  -o data/processed.csv \
  python preprocess.py

# Reproduce pipeline
dvc repro

Continuous Training

Automatically retrain on new data:

import schedule
import time
from datetime import datetime

class ContinuousTrainer:
    """Automated retraining system"""
    def __init__(self, data_source, model_registry):
        self.data_source = data_source
        self.model_registry = model_registry
        self.last_training_time = None
        self.performance_history = []
    
    def check_data_drift(self):
        """Detect if retraining is needed"""
        from scipy.stats import ks_2samp
        
        # Load current production data vs new data
        prod_data = load_production_data()
        new_data = load_new_data(self.data_source)
        
        # Kolmogorov-Smirnov test for distribution shift
        for feature in prod_data.columns:
            statistic, pvalue = ks_2samp(prod_data[feature], new_data[feature])
            
            if pvalue < 0.05:  # Significant drift detected
                print(f"⚠️ Data drift detected in {feature} (p={pvalue:.4f})")
                return True
        
        return False
    
    def check_performance_degradation(self):
        """Monitor production model performance"""
        current_accuracy = get_production_accuracy()
        
        if len(self.performance_history) > 0:
            baseline_accuracy = self.performance_history[0]
            
            if current_accuracy < baseline_accuracy * 0.95:
                print(f"⚠️ Performance degradation: {current_accuracy:.2%} vs {baseline_accuracy:.2%}")
                return True
        
        self.performance_history.append(current_accuracy)
        return False
    
    def trigger_training(self):
        """Execute training pipeline"""
        print(f"🔄 Triggering retraining at {datetime.now()}")
        
        # Run Kubeflow pipeline
        client = kfp.Client(host='http://kubeflow.example.com')
        run = client.create_run_from_pipeline_func(ml_pipeline, arguments={})
        
        self.last_training_time = datetime.now()
        
        # Monitor pipeline execution
        run.wait_for_run_completion(timeout=3600)
        
        print("✅ Retraining complete")
    
    def scheduled_check(self):
        """Periodic check for retraining triggers"""
        print(f"Checking retraining conditions at {datetime.now()}")
        
        if self.check_data_drift() or self.check_performance_degradation():
            self.trigger_training()
        else:
            print("No retraining needed")

# Schedule daily checks
trainer = ContinuousTrainer(data_source='s3://bucket/data', model_registry='s3://bucket/models')
schedule.every().day.at("02:00").do(trainer.scheduled_check)

while True:
    schedule.run_pending()
    time.sleep(3600)

Warnings ⚠️

Pipeline Complexity: Multi-stage pipelines accumulate failure modes. The 2034 "Pipeline Cascade" occurred when 300-step ML pipelines became impossible to debug.

Hidden Dependencies: Data lineage tracking fails, causing silent data quality issues.

Automation Runaway: Continuous training without human oversight deployed progressively worse models for weeks before detection.

Related Chronicles: The MLOps Meltdown (2034) - Automated systems deploying broken models

Tools: Kubeflow, MLflow, DVC, Airflow, Prefect, Weights & Biases

Research: Continuous learning systems, online learning, model monitoring

Share this article

Related Research