第10章:MLOps与部署
本章介绍如何将训练好的AI模型部署到生产环境,包括模型保存、服务化部署、容器化以及持续监控等MLOps核心实践。
10.1 模型训练 Pipeline
一个完整的机器学习项目需要规范的训练流程,确保实验可复现、结果可追踪。
10.1.1 标准训练流程
典型的模型训练pipeline包含以下阶段:
ML Pipeline 阶段:
- 数据收集:从各种来源获取原始数据
- 数据预处理:清洗、转换、特征工程
- 数据划分:训练集/验证集/测试集拆分
- 模型训练:使用训练数据拟合模型
- 模型验证:在验证集上调参
- 模型评估:在测试集上最终评估
- 模型保存:导出模型用于部署
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import json
# ========== 1. 数据加载与预处理 ==========
def load_and_preprocess_data(filepath):
"""加载并预处理数据"""
df = pd.read_csv(filepath)
# 处理缺失值
df = df.dropna()
# 特征工程(示例)
df['feature_ratio'] = df['feature_a'] / (df['feature_b'] + 1e-8)
# 分离特征和标签
X = df.drop('target', axis=1)
y = df['target']
return X, y
# ========== 2. 数据划分 ==========
def split_data(X, y, test_size=0.2, val_size=0.1, random_state=42):
"""三级划分:训练/验证/测试"""
# 先分测试集
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=y
)
# 再分验证集
val_ratio = val_size / (1 - test_size)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_ratio, random_state=random_state, stratify=y_temp
)
return X_train, X_val, X_test, y_train, y_val, y_test
# ========== 3. 特征预处理 ==========
def preprocess_features(X_train, X_val, X_test):
"""标准化特征"""
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
return X_train_scaled, X_val_scaled, X_test_scaled, scaler
# ========== 4. 模型训练 ==========
def train_model(X_train, y_train, X_val, y_val):
"""训练模型并选择最佳参数"""
# 参数搜索(简化示例)
best_score = 0
best_model = None
best_params = {}
for n_estimators in [100, 200]:
for max_depth in [10, 20, None]:
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
score = model.score(X_val, y_val)
if score > best_score:
best_score = score
best_model = model
best_params = {'n_estimators': n_estimators, 'max_depth': max_depth}
print(f"最佳验证分数: {best_score:.4f}")
print(f"最佳参数: {best_params}")
return best_model, best_params
# ========== 5. 模型评估 ==========
def evaluate_model(model, X_test, y_test, label_names=None):
"""全面评估模型"""
y_pred = model.predict(X_test)
print("分类报告:")
print(classification_report(y_test, y_pred, target_names=label_names))
print("混淆矩阵:")
print(confusion_matrix(y_test, y_pred))
# 返回评估指标
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred, average='weighted'),
'recall': recall_score(y_test, y_pred, average='weighted'),
'f1_score': f1_score(y_test, y_pred, average='weighted')
}
return metrics
# ========== 6. 完整Pipeline执行 ==========
def run_ml_pipeline(data_path, output_dir='./model_output'):
"""执行完整的ML pipeline"""
import os
os.makedirs(output_dir, exist_ok=True)
# 数据准备
X, y = load_and_preprocess_data(data_path)
X_train, X_val, X_test, y_train, y_val, y_test = split_data(X, y)
X_train_s, X_val_s, X_test_s, scaler = preprocess_features(X_train, X_val, X_test)
# 训练
model, params = train_model(X_train_s, y_train, X_val_s, y_val)
# 最终评估
metrics = evaluate_model(model, X_test_s, y_test)
# 保存模型和元数据
model_data = {
'model': model,
'scaler': scaler,
'feature_names': list(X.columns),
'best_params': params,
'metrics': metrics
}
joblib.dump(model_data, f'{output_dir}/model_bundle.pkl')
# 保存训练记录
with open(f'{output_dir}/training_log.json', 'w') as f:
json.dump({
'params': params,
'metrics': metrics,
'train_size': len(X_train),
'val_size': len(X_val),
'test_size': len(X_test)
}, f, indent=2)
print(f"模型已保存到 {output_dir}")
return model_data
# 使用示例
# run_ml_pipeline('data.csv')
10.1.2 实验跟踪
实验跟踪帮助记录每次训练的参数、指标和 artifact,方便对比和复现。
主流实验跟踪工具:
- Weights & Biases (W&B):功能全面,可视化优秀
- MLflow:开源,可自托管,生态丰富
- TensorBoard:TensorFlow配套,轻量级
- Neptune:专业MLOps平台
Weights & Biases 示例
# pip install wandb
import wandb
# 登录(首次运行需要)
# wandb.login()
# 初始化项目
wandb.init(
project="my-classifier",
name="experiment-1",
config={
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 10,
"model": "ResNet50"
}
)
config = wandb.config
# 训练循环
for epoch in range(config.epochs):
# ... 训练代码 ...
train_loss = 0.5 - epoch * 0.01 # 示例
val_acc = 0.7 + epoch * 0.02 # 示例
# 记录指标
wandb.log({
"epoch": epoch,
"train_loss": train_loss,
"val_accuracy": val_acc
})
# 保存模型
wandb.save("model.pth")
# 结束
wandb.finish()
MLflow 示例
# pip install mlflow
import mlflow
import mlflow.sklearn
# 开始实验
mlflow.set_experiment("my-experiment")
with mlflow.start_run():
# 记录参数
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 10)
# 训练模型
model = RandomForestClassifier(n_estimators=100, max_depth=10)
model.fit(X_train, y_train)
# 记录指标
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# 记录模型
mlflow.sklearn.log_model(model, "model")
# 记录 artifact
mlflow.log_artifact("data.csv")
# 启动MLflow UI: mlflow ui
10.2 模型保存与加载
选择合适的模型格式对部署至关重要。不同格式在兼容性、性能和易用性上各有优劣。
10.2.1 格式对比
| 格式 | 适用框架 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| Pickle/Joblib | 通用(Python) | 简单易用,保存完整对象 | Python版本依赖,安全性风险 | Python原型开发 |
| ONNX | 跨框架 | 跨平台,推理优化 | 部分算子不支持 | 生产部署,跨语言 |
| TorchScript | PyTorch | 脱离Python依赖,可序列化 | 动态图支持有限 | PyTorch生产部署 |
| SavedModel | TensorFlow | TF Serving原生支持 | TF专用 | TensorFlow生产部署 |
| GGML/gguf | LLM | 量化支持,llama.cpp兼容 | 主要用于LLM | 大模型本地部署 |
10.2.2 Pickle/Joblib(Scikit-learn)
import pickle
import joblib
from sklearn.ensemble import RandomForestClassifier
# 训练模型
model = RandomForestClassifier()
model.fit(X_train, y_train)
# 方法1: Pickle
with open('model.pkl', 'wb') as f:
pickle.dump(model, f)
with open('model.pkl', 'rb') as f:
loaded_model = pickle.load(f)
# 方法2: Joblib(推荐,对numpy数组更高效)
joblib.dump(model, 'model.joblib')
loaded_model = joblib.load('model.joblib')
# 预测
predictions = loaded_model.predict(X_test)
10.2.3 PyTorch 模型保存
import torch
import torch.nn as nn
# 方式1: 只保存模型参数(推荐)
torch.save(model.state_dict(), 'model_weights.pth')
# 加载
model = MyModel()
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()
# 方式2: 保存完整模型(包含结构)
torch.save(model, 'model_complete.pth')
model = torch.load('model_complete.pth')
# 方式3: 保存Checkpoint(包含优化器状态)
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}
torch.save(checkpoint, 'checkpoint.pth')
# 恢复训练
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
10.2.4 ONNX 格式导出
ONNX(Open Neural Network Exchange)是跨框架的开放格式,便于在不同平台部署。
# PyTorch 导出 ONNX
import torch
model.eval()
dummy_input = torch.randn(1, 3, 224, 224) # 根据模型调整
torch.onnx.export(
model,
dummy_input,
"model.onnx",
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
# 验证ONNX模型
import onnx
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
# 使用ONNX Runtime推理
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession("model.onnx")
input_name = session.get_inputs()[0].name
# 准备输入
data = np.random.randn(1, 3, 224, 224).astype(np.float32)
# 推理
outputs = session.run(None, {input_name: data})
print(outputs[0])
ONNX优势:
- 跨平台:可在移动端、嵌入式设备、浏览器运行
- 推理加速:支持TensorRT、ONNX Runtime等高性能推理引擎
- 可视化:使用Netron工具可直观查看模型结构
10.2.5 TorchScript
TorchScript将PyTorch模型转换为可序列化的中间表示,支持C++部署。
import torch
# 方式1: Tracing(跟踪执行路径)
model.eval()
example_input = torch.randn(1, 3, 224, 224)
traced_model = torch.jit.trace(model, example_input)
# 保存
traced_model.save("model_traced.pt")
# 加载
loaded_model = torch.jit.load("model_traced.pt")
output = loaded_model(example_input)
# 方式2: Scripting(支持控制流)
class MyModule(torch.nn.Module):
def forward(self, x):
if x.sum() > 0:
return x
else:
return -x
scripted_model = torch.jit.script(MyModule())
scripted_model.save("model_scripted.pt")
10.3 部署方式
模型部署方式根据应用场景和规模不同而各异,从简单的API服务到云原生部署。
10.3.1 REST API 部署(FastAPI)
FastAPI是现代Python Web框架,提供高性能API服务,适合模型部署。
# 安装: pip install fastapi uvicorn python-multipart
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import uvicorn
# 创建FastAPI应用
app = FastAPI(
title="ML Prediction API",
description="机器学习模型预测服务",
version="1.0.0"
)
# 加载模型(启动时加载一次)
model_bundle = joblib.load('model_bundle.pkl')
model = model_bundle['model']
scaler = model_bundle['scaler']
feature_names = model_bundle['feature_names']
# 定义请求数据模型
class PredictionRequest(BaseModel):
features: list[float]
class Config:
json_schema_extra = {
"example": {
"features": [5.1, 3.5, 1.4, 0.2]
}
}
class PredictionResponse(BaseModel):
prediction: int
probability: list[float]
confidence: float
# 健康检查
@app.get("/health")
def health_check():
return {"status": "healthy", "model_loaded": model is not None}
# 预测端点
@app.post("/predict", response_model=PredictionResponse)
def predict(request: PredictionRequest):
try:
# 验证输入维度
if len(request.features) != len(feature_names):
raise HTTPException(
status_code=400,
detail=f"Expected {len(feature_names)} features, got {len(request.features)}"
)
# 预处理
features = np.array(request.features).reshape(1, -1)
features_scaled = scaler.transform(features)
# 预测
prediction = model.predict(features_scaled)[0]
probabilities = model.predict_proba(features_scaled)[0]
confidence = float(np.max(probabilities))
return PredictionResponse(
prediction=int(prediction),
probability=probabilities.tolist(),
confidence=confidence
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 批量预测
@app.post("/predict/batch")
def predict_batch(requests: list[PredictionRequest]):
features = np.array([r.features for r in requests])
features_scaled = scaler.transform(features)
predictions = model.predict(features_scaled)
return {"predictions": predictions.tolist()}
# 运行: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
FastAPI部署要点:
- 使用Pydantic模型验证输入输出
- 模型在启动时加载,避免每次请求重复加载
- 添加健康检查端点便于监控
- 使用uvicorn作为ASGI服务器
- 生产环境建议使用gunicorn + uvicorn workers
测试API
# 启动服务
uvicorn main:app --host 0.0.0.0 --port 8000
# 查看文档(自动生成)
open http://localhost:8000/docs
# 测试预测接口
curl -X POST "http://localhost:8000/predict" \
-H "Content-Type: application/json" \
-d '{"features": [5.1, 3.5, 1.4, 0.2]}'
10.3.2 Docker 容器化部署
容器化确保开发、测试、生产环境一致性,是生产部署的标准做法。
Dockerfile 示例
# 使用官方Python镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制模型和代码
COPY model_bundle.pkl .
COPY main.py .
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/model_bundle.pkl
- LOG_LEVEL=info
volumes:
- ./logs:/app/logs
restart: unless-stopped
# 可选:添加Redis缓存
redis:
image: redis:alpine
ports:
- "6379:6379"
# 可选:添加监控
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
# 构建镜像
docker build -t my-ml-api:latest .
# 运行容器
docker run -d -p 8000:8000 --name ml-api my-ml-api:latest
# 或使用docker-compose
docker-compose up -d
10.3.3 云服务平台
云平台提供托管的机器学习服务,简化部署和运维。
主流云ML平台:
- AWS SageMaker:端到端ML平台,Notebook→训练→部署
- 阿里云PAI:国内领先的机器学习平台,支持大模型
- Google Vertex AI:GCP的ML平台,AutoML能力强
- Azure Machine Learning:微软云ML解决方案
- Hugging Face Inference API:零代码部署Transformer模型
AWS SageMaker 部署示例
import boto3
import sagemaker
from sagemaker.pytorch import PyTorchModel
# 配置
role = sagemaker.get_execution_role()
s3_model_path = 's3://my-bucket/model.tar.gz'
# 创建模型
pytorch_model = PyTorchModel(
model_data=s3_model_path,
role=role,
framework_version='1.13',
py_version='py39',
entry_point='inference.py'
)
# 部署端点
predictor = pytorch_model.deploy(
initial_instance_count=1,
instance_type='ml.m5.large',
endpoint_name='my-endpoint'
)
# 调用
result = predictor.predict(data)
# 清理
predictor.delete_endpoint()
10.3.4 边缘部署
边缘部署适用于低延迟、离线、隐私敏感等场景。
边缘推理工具:
- TensorRT:NVIDIA GPU高性能推理
- ONNX Runtime:跨平台,支持多种硬件
- TensorFlow Lite:移动端和嵌入式
- OpenVINO:Intel硬件优化
- llama.cpp:大模型CPU推理
TensorRT 优化示例
# TensorRT优化PyTorch模型
import torch
from torch2trt import torch2trt
# 加载模型
model = torch.load('model.pth').eval().cuda()
# 创建示例输入
x = torch.randn(1, 3, 224, 224).cuda()
# 转换为TensorRT
model_trt = torch2trt(model, [x])
# 推理
y = model_trt(x)
# 保存
torch.save(model_trt.state_dict(), 'model_trt.pth')
TensorFlow Lite 转换
import tensorflow as tf
# 转换SavedModel到TFLite
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model_dir')
converter.optimizations = [tf.lite.Optimize.DEFAULT] # 量化优化
tflite_model = converter.convert()
# 保存
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
10.4 模型监控与迭代
模型上线后需要持续监控和迭代,确保性能不随时间衰减。
10.4.1 模型漂移检测
数据漂移和概念漂移会导致模型性能下降,需要及时发现。
漂移类型:
- 数据漂移(Data Drift):输入数据分布发生变化
- 概念漂移(Concept Drift):特征与标签的关系变化
- 标签漂移(Label Drift):输出分布发生变化
import numpy as np
from scipy import stats
def detect_data_drift(reference_data, current_data, threshold=0.05):
"""
使用KS检验检测数据漂移
Args:
reference_data: 基准数据(训练数据分布)
current_data: 当前数据(生产数据)
threshold: p值阈值,低于此值认为发生漂移
Returns:
漂移检测结果
"""
drift_detected = []
for feature_idx in range(reference_data.shape[1]):
ref_feature = reference_data[:, feature_idx]
curr_feature = current_data[:, feature_idx]
# KS检验
statistic, p_value = stats.ks_2samp(ref_feature, curr_feature)
is_drift = p_value < threshold
drift_detected.append({
'feature': feature_idx,
'p_value': p_value,
'drift_detected': is_drift
})
return drift_detected
# 使用示例
# drift_results = detect_data_drift(X_train, X_production)
10.4.2 A/B测试
A/B测试用于比较不同模型版本的效果,确保新模型更优后再全量上线。
import random
from enum import Enum
class ModelVersion(Enum):
CONTROL = "v1" # 对照组(当前版本)
TREATMENT = "v2" # 实验组(新版本)
# 模型路由(按流量比例分配)
def route_request(user_id: str, traffic_split: float = 0.1):
"""
根据用户ID哈希进行一致的流量分配
Args:
user_id: 用户标识
traffic_split: 实验组流量比例(0-1)
"""
# 使用哈希确保同一用户始终分配到同一版本
hash_val = hash(user_id) % 10000 / 10000
if hash_val < traffic_split:
return ModelVersion.TREATMENT
return ModelVersion.CONTROL
# 预测接口
@app.post("/predict")
def predict_with_ab_test(request: PredictionRequest, user_id: str):
version = route_request(user_id, traffic_split=0.2)
if version == ModelVersion.CONTROL:
prediction = model_v1.predict(request.features)
else:
prediction = model_v2.predict(request.features)
# 记录用于后续分析
log_prediction(user_id, version.value, request.features, prediction)
return {
"prediction": prediction,
"model_version": version.value
}
10.4.3 持续训练(CT)流程
持续训练(Continuous Training)是MLOps的核心,实现模型的自动更新。
CT Pipeline 组件:
- 数据触发器:新数据到达或漂移检测触发
- 自动化训练:使用新数据重新训练模型
- 自动验证:测试集评估、基准对比
- 人工审核:关键模型人工确认
- 自动部署:通过验证后自动上线
- 监控反馈:持续监控新模型表现
# 简化的CT Pipeline示例
import datetime
from dataclasses import dataclass
@dataclass
class ModelArtifact:
model: object
version: str
metrics: dict
training_date: datetime.datetime
class ContinuousTrainingPipeline:
def __init__(self):
self.current_model = None
self.model_history = []
def should_retrain(self, drift_score: float, min_samples: int = 1000) -> bool:
"""判断是否触发重训练"""
return drift_score > 0.5 # 漂移分数阈值
def train_new_model(self, new_data) -> ModelArtifact:
"""训练新模型"""
# 数据预处理
X, y = self.preprocess(new_data)
# 划分训练/测试
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 训练
model = self.train(X_train, y_train)
# 评估
metrics = self.evaluate(model, X_test, y_test)
# 版本号
version = f"v{len(self.model_history) + 1}.{datetime.datetime.now():%Y%m%d}"
artifact = ModelArtifact(
model=model,
version=version,
metrics=metrics,
training_date=datetime.datetime.now()
)
return artifact
def validate_and_deploy(self, new_artifact: ModelArtifact) -> bool:
"""验证并部署新模型"""
# 1. 性能检查:新模型必须比旧模型好
if self.current_model:
old_f1 = self.current_model.metrics.get('f1', 0)
new_f1 = new_artifact.metrics.get('f1', 0)
if new_f1 <= old_f1 * 0.95: # 至少保持95%性能
print(f"新模型性能不足: {new_f1:.4f} vs {old_f1:.4f}")
return False
# 2. 关键指标检查
if new_artifact.metrics.get('accuracy', 0) < 0.8:
print("准确率低于阈值,拒绝部署")
return False
# 3. 部署
self.deploy(new_artifact)
self.current_model = new_artifact
self.model_history.append(new_artifact)
print(f"成功部署模型 {new_artifact.version}")
return True
def run_pipeline(self, new_data, drift_score: float):
"""运行完整pipeline"""
if not self.should_retrain(drift_score):
print("未达到重训练条件")
return
print("触发持续训练...")
artifact = self.train_new_model(new_data)
self.validate_and_deploy(artifact)
# 以下方法需根据具体场景实现
def preprocess(self, data):
pass
def train(self, X, y):
pass
def evaluate(self, model, X, y):
pass
def deploy(self, artifact):
pass
MLOps 工具链推荐
| 用途 | 开源方案 | 商业方案 |
|---|---|---|
| Pipeline编排 | Kubeflow, Airflow | AWS Step Functions |
| 实验跟踪 | MLflow, DVC | Weights & Biases |
| 特征存储 | Feast | Tecton |
| 模型注册 | MLflow Model Registry | AWS Model Registry |
| 监控 | Evidently, WhyLabs | Fiddler, Arize |
| 部署 | Seldon, KServe | SageMaker Endpoints |
本章小结
本章涵盖了AI模型从训练到生产的完整MLOps流程:
- 训练Pipeline:标准化流程 + 实验跟踪确保可复现
- 模型保存:ONNX适合跨平台,TorchScript适合PyTorch生产
- 服务部署:FastAPI提供高性能API,Docker保证环境一致
- 边缘部署:TensorRT、ONNX Runtime提供低延迟推理
- 持续运维:漂移检测 + A/B测试 + 持续训练确保模型长期有效
MLOps的核心思想是将软件工程的最佳实践应用到机器学习项目中,实现自动化、可监控、可迭代的AI应用交付。
学习建议:
- 从小项目开始实践完整的部署流程
- 使用云平台托管服务降低运维复杂度
- 建立完善的监控和告警机制
- 关注模型安全和隐私保护