实战案例2:自动化内容生成平台
构建一个功能完善的AI内容生成平台,支持文章生成、视频脚本、社交媒体内容批量生产。
一、项目概述
1.1 内容创作者痛点分析
在当今信息爆炸的时代,内容创作者面临着前所未有的挑战:
- 创意枯竭:长期持续产出导致灵感耗尽,难以保持内容新鲜感
- 时间成本:从选题到成稿需要数小时甚至数天,效率低下
- 多平台适配:同一内容需适配公众号、小红书、抖音、Twitter等不同平台格式
- 批量需求:电商、营销场景下需要大批量生成商品描述、推广文案
- 质量把控:AI生成内容质量不稳定,需要人工反复调整
1.2 平台核心功能
| 功能模块 | 功能描述 | 适用场景 |
|---|---|---|
| 智能生成 | 根据主题、风格、字数要求自动生成内容 | 日常内容创作 |
| 模板管理 | 自定义提示词模板,保存常用配置 | 固定格式内容 |
| 批量处理 | Excel/CSV导入批量生成,支持并发处理 | 电商、营销活动 |
| 多平台导出 | 一键导出适配不同平台的格式 | 全平台运营 |
| 历史管理 | 保存生成记录,支持版本对比和重新编辑 | 内容迭代优化 |
1.3 支持的内容类型
📄 公众号文章
长文创作,支持Markdown格式,自动分段、配图建议
📕 小红书文案
带emoji的短文案,自动添加热门标签,排版优化
🎬 短视频脚本
分镜脚本,包含镜头提示、时长、BGM建议
🐦 Twitter推文
280字符限制优化,话题标签推荐,线程支持
1.4 技术架构图
┌─────────────────────────────────────────────────────────────┐
│ 前端层 (React 18) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 生成页面 │ │ 模板管理 │ │ 批量处理 │ │ 历史记录 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│ REST API / WebSocket
┌─────────────────────────────────────────────────────────────┐
│ API网关层 (FastAPI) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 内容路由 │ │ 模板路由 │ │ 批量路由 │ │ 用户路由 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 业务服务层 │ │ 异步任务队列 │ │ 数据存储层 │
│ ┌─────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │生成服务 │ │ │ │ Celery │ │ │ │PostgreSQL │ │
│ │模板管理 │ │◄──┤ │ Worker │ │ │ │ 主库 │ │
│ │批量处理 │ │ │ └───────────┘ │ │ └───────────┘ │
│ │导出服务 │ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ └─────────┘ │ │ │ Redis │ │ │ │ Redis │ │
│ │ │ │ Broker │ │ │ │ 缓存 │ │
└───────────────┘ │ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ LLM服务层 │
│ ┌───────────┐ │
│ │ OpenAI │ │
│ │ Claude │ │
│ │ 文心一言 │ │
│ └───────────┘ │
└─────────────────┘
二、技术选型
| 层级 | 技术选型 | 版本 | 选择理由 |
|---|---|---|---|
| 前端框架 | React | 18.x | 组件化开发,生态丰富,Concurrent特性提升性能 |
| UI组件库 | Ant Design | 5.x | 企业级组件,支持主题定制,文档完善 |
| 后端框架 | FastAPI | 0.100+ | 高性能异步框架,自动生成API文档,类型安全 |
| 异步任务 | Celery | 5.x | 成熟的分布式任务队列,支持定时任务和任务监控 |
| LLM接口 | OpenAI/Claude/文心 | 最新 | 支持多模型切换,根据场景选择最优模型 |
| 数据库 | PostgreSQL | 15.x | 支持JSON字段,全文搜索,稳定可靠 |
| 缓存 | Redis | 7.x | 高性能缓存,同时作为Celery消息队列 |
| 文件存储 | 本地/OSS | - | 灵活配置,支持阿里云、AWS等云存储 |
| 部署 | Docker Compose | 2.x | 一键部署,环境隔离,便于扩展 |
三、开发环境准备
3.1 环境要求
- Python 3.10+
- Node.js 18+
- Docker & Docker Compose
- PostgreSQL 15+
- Redis 7+
3.2 环境搭建步骤
步骤1:安装Python依赖
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# macOS/Linux:
source venv/bin/activate
# 安装依赖
pip install fastapi uvicorn sqlalchemy psycopg2-binary celery redis python-dotenv openai anthropic pandas openpyxl
步骤2:安装前端依赖
# 使用create-react-app创建项目
npx create-react-app frontend --template typescript
cd frontend
# 安装Ant Design和其他依赖
npm install antd axios react-router-dom @ant-design/icons
# 启动开发服务器
npm start
步骤3:启动数据库和Redis(Docker)
# docker-compose.dev.yml
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: content_gen
POSTGRES_PASSWORD: your_password
POSTGRES_DB: content_generator
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
# 启动服务
docker-compose -f docker-compose.dev.yml up -d
四、后端开发
4.1 项目结构
content-generator/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI应用入口
│ ├── config.py # 配置文件
│ ├── database.py # 数据库连接
│ ├── models/ # 数据模型
│ │ ├── __init__.py
│ │ ├── content.py # 内容模型
│ │ ├── template.py # 模板模型
│ │ └── user.py # 用户模型
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── generator.py # 生成服务核心
│ │ ├── template_manager.py # 模板管理
│ │ ├── batch_processor.py # 批量处理
│ │ └── exporter.py # 导出服务
│ ├── tasks/ # Celery异步任务
│ │ └── generation.py
│ └── routers/ # API路由
│ ├── content.py
│ ├── template.py
│ └── batch.py
├── templates/ # 提示词模板
│ ├── article.txt
│ ├── xiaohongshu.txt
│ ├── video_script.txt
│ └── twitter.txt
├── alembic/ # 数据库迁移
├── requirements.txt
└── docker-compose.yml
4.2 配置文件 (config.py)
"""
应用配置文件
包含数据库、Redis、LLM等所有配置项
"""
import os
from dotenv import load_dotenv
from typing import Optional
# 加载环境变量
load_dotenv()
class Settings:
"""应用配置类"""
# ========== 应用基础配置 ==========
APP_NAME: str = "AI内容生成平台"
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
# ========== 数据库配置 ==========
# PostgreSQL连接配置
POSTGRES_USER: str = os.getenv("POSTGRES_USER", "content_gen")
POSTGRES_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "password")
POSTGRES_HOST: str = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PORT: int = int(os.getenv("POSTGRES_PORT", "5432"))
POSTGRES_DB: str = os.getenv("POSTGRES_DB", "content_generator")
# SQLAlchemy数据库URL
DATABASE_URL: str = (
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}"
f"@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
)
# ========== Redis配置 ==========
# Redis用于缓存和Celery消息队列
REDIS_HOST: str = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
REDIS_DB: int = int(os.getenv("REDIS_DB", "0"))
REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD")
# Celery Redis URL
CELERY_BROKER_URL: str = (
f"redis://{f':{REDIS_PASSWORD}@' if REDIS_PASSWORD else ''}"
f"{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
)
CELERY_RESULT_BACKEND: str = CELERY_BROKER_URL
# ========== LLM配置 ==========
# 默认使用的模型
DEFAULT_LLM_PROVIDER: str = os.getenv("DEFAULT_LLM_PROVIDER", "openai")
# OpenAI配置
OPENAI_API_KEY: Optional[str] = os.getenv("OPENAI_API_KEY")
OPENAI_BASE_URL: Optional[str] = os.getenv("OPENAI_BASE_URL")
OPENAI_MODEL: str = os.getenv("OPENAI_MODEL", "gpt-4")
# Claude配置
ANTHROPIC_API_KEY: Optional[str] = os.getenv("ANTHROPIC_API_KEY")
ANTHROPIC_MODEL: str = os.getenv("ANTHROPIC_MODEL", "claude-3-sonnet-20240229")
# 文心一言配置
WENXIN_API_KEY: Optional[str] = os.getenv("WENXIN_API_KEY")
WENXIN_SECRET_KEY: Optional[str] = os.getenv("WENXIN_SECRET_KEY")
# ========== 生成配置 ==========
# 默认生成参数
DEFAULT_MAX_TOKENS: int = 2000
DEFAULT_TEMPERATURE: float = 0.7
# 批量生成配置
BATCH_CONCURRENCY: int = 5 # 并发任务数
BATCH_CHUNK_SIZE: int = 100 # 每批处理数量
# ========== 文件存储配置 ==========
# 存储类型: local 或 oss
STORAGE_TYPE: str = os.getenv("STORAGE_TYPE", "local")
LOCAL_STORAGE_PATH: str = os.getenv("LOCAL_STORAGE_PATH", "./uploads")
# 阿里云OSS配置
OSS_ACCESS_KEY: Optional[str] = os.getenv("OSS_ACCESS_KEY")
OSS_SECRET_KEY: Optional[str] = os.getenv("OSS_SECRET_KEY")
OSS_BUCKET: Optional[str] = os.getenv("OSS_BUCKET")
OSS_ENDPOINT: Optional[str] = os.getenv("OSS_ENDPOINT")
# ========== 安全配置 ==========
SECRET_KEY: str = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7 # 7天
# 全局配置实例
settings = Settings()
4.3 数据模型 (models/content.py)
"""
内容数据模型
定义内容表结构和相关操作
"""
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Enum as SQLEnum, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
Base = declarative_base()
class ContentType(str, Enum):
"""内容类型枚举"""
ARTICLE = "article" # 公众号文章
XIAOHONGSHU = "xiaohongshu" # 小红书文案
VIDEO_SCRIPT = "video_script" # 视频脚本
TWITTER = "twitter" # Twitter推文
CUSTOM = "custom" # 自定义
class ContentStatus(str, Enum):
"""内容状态枚举"""
PENDING = "pending" # 等待生成
GENERATING = "generating" # 生成中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
class Content(Base):
"""
内容主表
存储生成的内容基本信息和元数据
"""
__tablename__ = "contents"
id = Column(Integer, primary_key=True, index=True)
# 基本信息
title = Column(String(500), nullable=True, comment="内容标题")
content_type = Column(SQLEnum(ContentType), nullable=False, comment="内容类型")
status = Column(SQLEnum(ContentStatus), default=ContentStatus.PENDING, comment="生成状态")
# 生成内容
generated_content = Column(Text, nullable=True, comment="生成的内容")
raw_response = Column(Text, nullable=True, comment="LLM原始响应")
# 生成参数
topic = Column(String(500), nullable=False, comment="生成主题")
params = Column(JSON, default=dict, comment="生成参数(JSON格式)")
# 使用模板ID(可选)
template_id = Column(Integer, ForeignKey("templates.id"), nullable=True, comment="使用的模板ID")
# 元数据
word_count = Column(Integer, nullable=True, comment="字数统计")
generation_time = Column(Integer, nullable=True, comment="生成耗时(毫秒)")
llm_provider = Column(String(50), nullable=True, comment="使用的LLM提供商")
llm_model = Column(String(100), nullable=True, comment="使用的模型")
# 批量任务关联
batch_id = Column(String(100), nullable=True, index=True, comment="批量任务ID")
batch_index = Column(Integer, nullable=True, comment="批量任务中的序号")
# 用户关联(可选)
user_id = Column(Integer, ForeignKey("users.id"), nullable=True, comment="用户ID")
# 时间戳
created_at = Column(DateTime, default=datetime.utcnow, comment="创建时间")
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment="更新时间")
# 关联关系
template = relationship("Template", back_populates="contents")
user = relationship("User", back_populates="contents")
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式,用于API返回"""
return {
"id": self.id,
"title": self.title,
"content_type": self.content_type.value,
"status": self.status.value,
"generated_content": self.generated_content,
"topic": self.topic,
"params": self.params,
"template_id": self.template_id,
"word_count": self.word_count,
"generation_time": self.generation_time,
"llm_provider": self.llm_provider,
"llm_model": self.llm_model,
"batch_id": self.batch_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class Template(Base):
"""
提示词模板表
存储用户自定义的生成模板
"""
__tablename__ = "templates"
id = Column(Integer, primary_key=True, index=True)
# 模板基本信息
name = Column(String(200), nullable=False, comment="模板名称")
description = Column(String(500), nullable=True, comment="模板描述")
content_type = Column(SQLEnum(ContentType), nullable=False, comment="适用内容类型")
# 模板内容
system_prompt = Column(Text, nullable=True, comment="系统提示词")
user_prompt_template = Column(Text, nullable=False, comment="用户提示词模板")
# 默认参数
default_params = Column(JSON, default=dict, comment="默认参数(JSON)")
# 模板变量定义
variables = Column(JSON, default=list, comment="模板变量列表")
# 状态
is_public = Column(Integer, default=0, comment="是否公开(0=私有,1=公开)")
is_system = Column(Integer, default=0, comment="是否系统模板")
# 统计信息
use_count = Column(Integer, default=0, comment="使用次数")
# 用户关联
user_id = Column(Integer, ForeignKey("users.id"), nullable=True)
# 时间戳
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关联关系
contents = relationship("Content", back_populates="template")
user = relationship("User", back_populates="templates")
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"content_type": self.content_type.value,
"system_prompt": self.system_prompt,
"user_prompt_template": self.user_prompt_template,
"default_params": self.default_params,
"variables": self.variables,
"is_public": bool(self.is_public),
"is_system": bool(self.is_system),
"use_count": self.use_count,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class BatchTask(Base):
"""
批量任务表
记录批量生成任务的执行状态
"""
__tablename__ = "batch_tasks"
id = Column(Integer, primary_key=True, index=True)
# 任务标识
batch_id = Column(String(100), unique=True, index=True, comment="批量任务ID")
task_name = Column(String(200), nullable=True, comment="任务名称")
# 任务状态
status = Column(String(50), default="pending", comment="任务状态")
total_count = Column(Integer, default=0, comment="总任务数")
completed_count = Column(Integer, default=0, comment="已完成数")
failed_count = Column(Integer, default=0, comment="失败数")
# 任务参数
content_type = Column(String(50), nullable=False, comment="内容类型")
template_id = Column(Integer, nullable=True, comment="使用的模板ID")
params = Column(JSON, default=dict, comment="生成参数")
# 源文件信息
source_file = Column(String(500), nullable=True, comment="上传的源文件路径")
result_file = Column(String(500), nullable=True, comment="结果文件路径")
# 错误信息
error_message = Column(Text, nullable=True, comment="错误信息")
# 时间戳
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime, nullable=True, comment="开始时间")
completed_at = Column(DateTime, nullable=True, comment="完成时间")
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"batch_id": self.batch_id,
"task_name": self.task_name,
"status": self.status,
"total_count": self.total_count,
"completed_count": self.completed_count,
"failed_count": self.failed_count,
"progress": round(self.completed_count / self.total_count * 100, 2) if self.total_count > 0 else 0,
"content_type": self.content_type,
"source_file": self.source_file,
"result_file": self.result_file,
"created_at": self.created_at.isoformat() if self.created_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}
4.4 生成服务核心 (services/generator.py)
"""
内容生成服务核心模块
负责与LLM交互,执行内容生成任务
"""
import os
import re
import time
import json
from typing import Dict, Any, Optional, List, AsyncGenerator
from pathlib import Path
import openai
from anthropic import Anthropic
from app.config import settings
from app.models.content import ContentType, ContentStatus
class LLMProvider:
"""LLM提供商基类"""
async def generate(self, prompt: str, system_prompt: Optional[str] = None, **kwargs) -> str:
"""生成内容,子类必须实现"""
raise NotImplementedError
class OpenAIProvider(LLMProvider):
"""OpenAI GPT模型提供商"""
def __init__(self):
self.client = openai.AsyncOpenAI(
api_key=settings.OPENAI_API_KEY,
base_url=settings.OPENAI_BASE_URL
)
self.model = settings.OPENAI_MODEL
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
max_tokens: int = None,
temperature: float = None,
**kwargs
) -> str:
"""调用OpenAI API生成内容"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=max_tokens or settings.DEFAULT_MAX_TOKENS,
temperature=temperature or settings.DEFAULT_TEMPERATURE,
**kwargs
)
return response.choices[0].message.content
class ClaudeProvider(LLMProvider):
"""Anthropic Claude模型提供商"""
def __init__(self):
self.client = Anthropic(api_key=settings.ANTHROPIC_API_KEY)
self.model = settings.ANTHROPIC_MODEL
async def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
max_tokens: int = None,
temperature: float = None,
**kwargs
) -> str:
"""调用Claude API生成内容"""
response = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens or settings.DEFAULT_MAX_TOKENS,
temperature=temperature or settings.DEFAULT_TEMPERATURE,
system=system_prompt,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
return response.content[0].text
class LLMFactory:
"""LLM提供商工厂类"""
_providers = {
"openai": OpenAIProvider,
"claude": ClaudeProvider,
}
@classmethod
def get_provider(cls, provider_name: str = None) -> LLMProvider:
"""获取LLM提供商实例"""
provider_name = provider_name or settings.DEFAULT_LLM_PROVIDER
provider_class = cls._providers.get(provider_name)
if not provider_class:
raise ValueError(f"不支持的LLM提供商: {provider_name}")
return provider_class()
class PromptTemplate:
"""提示词模板管理器"""
# 模板文件存放目录
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates"
# 默认模板缓存
_default_templates: Dict[ContentType, Dict[str, str]] = {}
@classmethod
def load_template(cls, content_type: ContentType) -> Dict[str, str]:
"""加载默认提示词模板"""
if content_type in cls._default_templates:
return cls._default_templates[content_type]
template_file = cls.TEMPLATES_DIR / f"{content_type.value}.txt"
if not template_file.exists():
# 返回默认模板
return cls._get_default_template(content_type)
content = template_file.read_text(encoding="utf-8")
# 解析模板,分离system和user部分
system_prompt = None
user_template = content
if "【SYSTEM】" in content:
parts = content.split("【SYSTEM】", 1)[1].split("【USER】", 1)
system_prompt = parts[0].strip()
user_template = parts[1].strip() if len(parts) > 1 else ""
template = {
"system_prompt": system_prompt,
"user_template": user_template
}
cls._default_templates[content_type] = template
return template
@classmethod
def _get_default_template(cls, content_type: ContentType) -> Dict[str, str]:
"""获取内置默认模板"""
templates = {
ContentType.ARTICLE: {
"system_prompt": "你是一位专业的内容创作专家,擅长撰写高质量的文章。",
"user_template": """请根据以下主题撰写一篇文章:
主题:{topic}
风格:{style}
字数:约{word_count}字
目标读者:{audience}
要求:
1. 标题吸引人,包含核心关键词
2. 结构清晰,有引言、正文、结论
3. 语言流畅,逻辑严密
4. 使用Markdown格式输出
请直接输出文章内容:"""
},
ContentType.XIAOHONGSHU: {
"system_prompt": "你是一位小红书爆款文案写手,擅长创作吸引人的种草文案。",
"user_template": """请为以下产品/主题创作一篇小红书文案:
主题:{topic}
风格:{style}
关键词:{keywords}
要求:
1. 标题包含emoji,吸引眼球
2. 正文使用emoji增加趣味性
3. 添加5-10个相关标签
4. 语气亲切自然,像朋友推荐
5. 结尾引导互动
请直接输出文案:"""
},
ContentType.VIDEO_SCRIPT: {
"system_prompt": "你是一位资深短视频编导,擅长创作 viral 视频脚本。",
"user_template": """请为以下主题创作一个短视频脚本:
主题:{topic}
时长:{duration}秒
风格:{style}
平台:{platform}
请按以下格式输出:
【视频标题】
【镜头1】
画面:
台词/字幕:
时长:X秒
【镜头2】
..."""
},
ContentType.TWITTER: {
"system_prompt": "你是一位Twitter/X内容专家,擅长在280字符内表达精彩观点。",
"user_template": """请为以下主题创作一条Twitter推文:
主题:{topic}
语气:{tone}
是否需要线程:{need_thread}
要求:
1. 控制在280字符以内
2. 添加相关hashtags
3. 开头吸引人,提高点击率
4. 可以提问或引导互动
请直接输出推文内容:"""
},
}
return templates.get(content_type, templates[ContentType.ARTICLE])
@classmethod
def render_template(
cls,
template: str,
variables: Dict[str, Any]
) -> str:
"""渲染模板,替换变量"""
result = template
for key, value in variables.items():
placeholder = "{" + key + "}"
result = result.replace(placeholder, str(value))
return result
class ContentGenerator:
"""
内容生成器主类
整合模板加载、LLM调用、结果处理
"""
def __init__(self, provider_name: str = None):
"""
初始化生成器
Args:
provider_name: LLM提供商名称,默认使用配置
"""
self.provider = LLMFactory.get_provider(provider_name)
self.provider_name = provider_name or settings.DEFAULT_LLM_PROVIDER
async def generate(
self,
content_type: ContentType,
topic: str,
params: Dict[str, Any] = None,
template_id: int = None,
system_prompt: str = None,
) -> Dict[str, Any]:
"""
生成内容主方法
Args:
content_type: 内容类型
topic: 生成主题
params: 生成参数
template_id: 自定义模板ID(可选)
system_prompt: 系统提示词(覆盖默认)
Returns:
包含生成结果的字典
"""
params = params or {}
start_time = time.time()
# 1. 准备提示词
prompt_data = await self._prepare_prompt(
content_type=content_type,
topic=topic,
params=params,
template_id=template_id,
system_prompt=system_prompt,
)
# 2. 调用LLM生成
try:
generated_text = await self.provider.generate(
prompt=prompt_data["user_prompt"],
system_prompt=prompt_data.get("system_prompt"),
max_tokens=params.get("max_tokens"),
temperature=params.get("temperature"),
)
except Exception as e:
raise GenerationError(f"LLM生成失败: {str(e)}")
# 3. 后处理
processed_result = self._post_process(
content_type=content_type,
raw_text=generated_text,
)
generation_time = int((time.time() - start_time) * 1000)
return {
"title": processed_result.get("title", ""),
"content": processed_result.get("content", generated_text),
"raw_response": generated_text,
"word_count": len(generated_text),
"generation_time": generation_time,
"llm_provider": self.provider_name,
"llm_model": self._get_model_name(),
"metadata": processed_result.get("metadata", {}),
}
async def _prepare_prompt(
self,
content_type: ContentType,
topic: str,
params: Dict[str, Any],
template_id: int = None,
system_prompt: str = None,
) -> Dict[str, str]:
"""准备提示词"""
# 如果有自定义模板,从数据库加载
if template_id:
# 这里应该查询数据库获取模板
# 简化处理,使用默认模板
template = PromptTemplate.load_template(content_type)
else:
template = PromptTemplate.load_template(content_type)
# 构建变量字典
variables = {
"topic": topic,
**params
}
# 渲染用户提示词
user_prompt = PromptTemplate.render_template(
template["user_template"],
variables
)
return {
"system_prompt": system_prompt or template.get("system_prompt"),
"user_prompt": user_prompt,
}
def _post_process(
self,
content_type: ContentType,
raw_text: str,
) -> Dict[str, Any]:
"""
生成结果后处理
提取标题、清理格式、提取元数据等
"""
result = {
"content": raw_text,
"title": "",
"metadata": {}
}
# 提取标题(第一行或Markdown标题)
lines = raw_text.strip().split('\n')
# 尝试提取Markdown标题
title_match = re.search(r'^#\s+(.+)$', raw_text, re.MULTILINE)
if title_match:
result["title"] = title_match.group(1).strip()
elif lines:
# 使用第一行作为标题(如果不是空行)
first_line = lines[0].strip()
if first_line and not first_line.startswith('```'):
result["title"] = first_line[:100] # 限制长度
# 根据内容类型特殊处理
if content_type == ContentType.XIAOHONGSHU:
# 提取标签
tags = re.findall(r'#([^#\s]+)', raw_text)
result["metadata"]["tags"] = tags
elif content_type == ContentType.VIDEO_SCRIPT:
# 提取镜头数量
shots = re.findall(r'【镜头\d+】', raw_text)
result["metadata"]["shot_count"] = len(shots)
elif content_type == ContentType.TWITTER:
# 检查字符数
result["metadata"]["char_count"] = len(raw_text)
result["metadata"]["is_valid"] = len(raw_text) <= 280
return result
def _get_model_name(self) -> str:
"""获取当前使用的模型名称"""
if self.provider_name == "openai":
return settings.OPENAI_MODEL
elif self.provider_name == "claude":
return settings.ANTHROPIC_MODEL
return "unknown"
async def generate_stream(
self,
content_type: ContentType,
topic: str,
params: Dict[str, Any] = None,
) -> AsyncGenerator[str, None]:
"""
流式生成内容
用于实时显示生成进度
"""
# 简化实现,实际应该使用LLM的stream API
result = await self.generate(content_type, topic, params)
# 模拟流式输出
content = result["content"]
chunk_size = 10
for i in range(0, len(content), chunk_size):
yield content[i:i+chunk_size]
class GenerationError(Exception):
"""生成错误异常"""
pass
4.5 模板管理服务 (services/template_manager.py)
"""
提示词模板管理服务
提供模板的增删改查和变量解析功能
"""
import re
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.models.content import Template, ContentType
class TemplateManager:
"""模板管理器"""
def __init__(self, db: Session):
"""
初始化模板管理器
Args:
db: 数据库会话
"""
self.db = db
def create_template(
self,
name: str,
content_type: ContentType,
user_prompt_template: str,
description: str = None,
system_prompt: str = None,
default_params: Dict = None,
user_id: int = None,
is_public: bool = False,
) -> Template:
"""
创建新模板
Args:
name: 模板名称
content_type: 内容类型
user_prompt_template: 用户提示词模板
description: 模板描述
system_prompt: 系统提示词
default_params: 默认参数
user_id: 创建用户ID
is_public: 是否公开
Returns:
创建的模板对象
"""
# 解析模板变量
variables = self._extract_variables(user_prompt_template)
template = Template(
name=name,
description=description,
content_type=content_type,
system_prompt=system_prompt,
user_prompt_template=user_prompt_template,
default_params=default_params or {},
variables=variables,
is_public=1 if is_public else 0,
user_id=user_id,
)
self.db.add(template)
self.db.commit()
self.db.refresh(template)
return template
def get_template(self, template_id: int) -> Optional[Template]:
"""
获取单个模板
Args:
template_id: 模板ID
Returns:
模板对象或None
"""
return self.db.query(Template).filter(Template.id == template_id).first()
def list_templates(
self,
content_type: ContentType = None,
user_id: int = None,
include_public: bool = True,
skip: int = 0,
limit: int = 100,
) -> List[Template]:
"""
获取模板列表
Args:
content_type: 按内容类型筛选
user_id: 按用户筛选
include_public: 是否包含公开模板
skip: 分页偏移
limit: 分页大小
Returns:
模板列表
"""
query = self.db.query(Template)
# 筛选条件
filters = []
if content_type:
filters.append(Template.content_type == content_type)
# 用户权限:自己的模板 + 公开模板 + 系统模板
if user_id is not None:
if include_public:
query = query.filter(
(Template.user_id == user_id) |
(Template.is_public == 1) |
(Template.is_system == 1)
)
else:
query = query.filter(Template.user_id == user_id)
# 排序:系统模板优先,然后按使用次数
query = query.order_by(
Template.is_system.desc(),
Template.use_count.desc(),
Template.created_at.desc()
)
return query.offset(skip).limit(limit).all()
def update_template(
self,
template_id: int,
user_id: int,
**kwargs
) -> Optional[Template]:
"""
更新模板
Args:
template_id: 模板ID
user_id: 操作用户ID(用于权限验证)
**kwargs: 更新的字段
Returns:
更新后的模板对象
"""
template = self.get_template(template_id)
if not template:
return None
# 权限检查:只能修改自己的模板(系统模板除外)
if template.is_system == 0 and template.user_id != user_id:
raise PermissionError("无权修改此模板")
# 更新字段
allowed_fields = [
"name", "description", "system_prompt",
"user_prompt_template", "default_params", "is_public"
]
for field in allowed_fields:
if field in kwargs:
setattr(template, field, kwargs[field])
# 如果更新了模板内容,重新解析变量
if "user_prompt_template" in kwargs:
template.variables = self._extract_variables(
kwargs["user_prompt_template"]
)
self.db.commit()
self.db.refresh(template)
return template
def delete_template(self, template_id: int, user_id: int) -> bool:
"""
删除模板
Args:
template_id: 模板ID
user_id: 操作用户ID
Returns:
是否删除成功
"""
template = self.get_template(template_id)
if not template:
return False
# 权限检查
if template.is_system == 1:
raise PermissionError("不能删除系统模板")
if template.user_id != user_id:
raise PermissionError("无权删除此模板")
self.db.delete(template)
self.db.commit()
return True
def increment_use_count(self, template_id: int):
"""增加模板使用次数"""
template = self.get_template(template_id)
if template:
template.use_count += 1
self.db.commit()
def _extract_variables(self, template: str) -> List[str]:
"""
从模板中提取变量名
支持的格式:{variable_name}
Args:
template: 模板字符串
Returns:
变量名列表
"""
pattern = r'\{([a-zA-Z_][a-zA-Z0-9_]*)\}'
variables = re.findall(pattern, template)
return list(set(variables)) # 去重
def validate_template(self, template: str) -> Dict[str, Any]:
"""
验证模板有效性
Args:
template: 模板字符串
Returns:
验证结果
"""
result = {
"valid": True,
"variables": [],
"errors": []
}
# 检查是否为空
if not template or not template.strip():
result["valid"] = False
result["errors"].append("模板不能为空")
return result
# 提取变量
result["variables"] = self._extract_variables(template)
# 检查变量名是否合法
for var in result["variables"]:
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', var):
result["errors"].append(f"非法变量名: {var}")
if result["errors"]:
result["valid"] = False
return result
def get_template_variables(self, template_id: int) -> List[Dict[str, Any]]:
"""
获取模板的变量定义
Args:
template_id: 模板ID
Returns:
变量定义列表
"""
template = self.get_template(template_id)
if not template:
return []
variables = []
for var_name in template.variables:
var_info = {
"name": var_name,
"type": "string", # 默认类型
"required": True,
"default": template.default_params.get(var_name, ""),
}
variables.append(var_info)
return variables
def render_preview(
self,
template_id: int,
sample_data: Dict[str, Any]
) -> Dict[str, str]:
"""
渲染模板预览
Args:
template_id: 模板ID
sample_data: 示例数据
Returns:
渲染结果
"""
template = self.get_template(template_id)
if not template:
return {"error": "模板不存在"}
# 合并默认参数和示例数据
data = {**template.default_params, **sample_data}
# 渲染提示词
user_prompt = template.user_prompt_template
for key, value in data.items():
placeholder = "{" + key + "}"
user_prompt = user_prompt.replace(placeholder, str(value))
return {
"system_prompt": template.system_prompt,
"user_prompt": user_prompt,
"variables_used": list(data.keys()),
}
4.6 批量处理服务 (services/batch_processor.py)
"""
批量内容处理服务
支持Excel/CSV导入,并发生成,结果导出
"""
import os
import uuid
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional
from pathlib import Path
import pandas as pd
from sqlalchemy.orm import Session
from app.config import settings
from app.models.content import Content, ContentType, ContentStatus, BatchTask
from app.services.generator import ContentGenerator
class BatchProcessor:
"""批量处理器"""
# 支持的文件格式
SUPPORTED_FORMATS = ['.xlsx', '.xls', '.csv']
# 必需的列
REQUIRED_COLUMNS = ['topic']
def __init__(self, db: Session):
"""
初始化批量处理器
Args:
db: 数据库会话
"""
self.db = db
def create_batch_task(
self,
content_type: ContentType,
params: Dict[str, Any],
template_id: int = None,
task_name: str = None,
) -> BatchTask:
"""
创建批量任务记录
Args:
content_type: 内容类型
params: 生成参数
template_id: 模板ID
task_name: 任务名称
Returns:
批量任务对象
"""
batch_id = str(uuid.uuid4())
task = BatchTask(
batch_id=batch_id,
task_name=task_name or f"批量任务-{datetime.now().strftime('%Y%m%d%H%M%S')}",
content_type=content_type.value,
template_id=template_id,
params=params,
status="pending",
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def parse_upload_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
解析上传的Excel/CSV文件
Args:
file_path: 文件路径
Returns:
数据列表,每项是一个字典
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
if path.suffix.lower() not in self.SUPPORTED_FORMATS:
raise ValueError(f"不支持的文件格式: {path.suffix}")
# 读取文件
try:
if path.suffix.lower() == '.csv':
df = pd.read_csv(file_path, encoding='utf-8')
else:
df = pd.read_excel(file_path)
except Exception as e:
# 尝试其他编码
df = pd.read_csv(file_path, encoding='gbk')
# 检查必需列
columns = [col.lower() for col in df.columns]
for required in self.REQUIRED_COLUMNS:
if required not in columns:
raise ValueError(f"缺少必需列: {required}")
# 转换为字典列表
records = df.to_dict('records')
# 标准化列名(小写)
normalized_records = []
for record in records:
normalized = {k.lower(): v for k, v in record.items()}
normalized_records.append(normalized)
return normalized_records
def download_template(self, content_type: ContentType) -> str:
"""
下载批量导入模板
Args:
content_type: 内容类型
Returns:
模板文件路径
"""
# 根据内容类型生成不同的模板
if content_type == ContentType.ARTICLE:
columns = ['topic', 'style', 'audience', 'word_count', 'keywords']
sample_data = [
{
'topic': '人工智能在医疗领域的应用',
'style': '科普',
'audience': '普通大众',
'word_count': 1500,
'keywords': 'AI,医疗,健康科技'
},
{
'topic': '如何培养高效学习习惯',
'style': '干货',
'audience': '学生群体',
'word_count': 1200,
'keywords': '学习方法,效率,自律'
}
]
elif content_type == ContentType.XIAOHONGSHU:
columns = ['topic', 'style', 'keywords', 'emoji_level']
sample_data = [
{
'topic': '这款面霜真的太好用了',
'style': '种草',
'keywords': '护肤,面霜,敏感肌',
'emoji_level': 'high'
},
{
'topic': '周末探店|藏在巷子里的宝藏咖啡馆',
'style': '探店',
'keywords': '咖啡,周末,探店',
'emoji_level': 'medium'
}
]
elif content_type == ContentType.VIDEO_SCRIPT:
columns = ['topic', 'duration', 'style', 'platform']
sample_data = [
{
'topic': '3分钟学会制作完美煎蛋',
'duration': 60,
'style': '教程',
'platform': '抖音'
},
{
'topic': '我的晨间日常vlog',
'duration': 120,
'style': 'vlog',
'platform': 'B站'
}
]
else:
columns = ['topic']
sample_data = [{'topic': '示例主题'}]
# 创建DataFrame
df = pd.DataFrame(sample_data)
# 确保目录存在
template_dir = Path(settings.LOCAL_STORAGE_PATH) / 'templates'
template_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_path = template_dir / f'batch_template_{content_type.value}_{timestamp}.xlsx'
df.to_excel(output_path, index=False)
return str(output_path)
async def process_batch(
self,
batch_id: str,
records: List[Dict[str, Any]],
content_type: ContentType,
params: Dict[str, Any],
template_id: int = None,
provider: str = None,
) -> Dict[str, Any]:
"""
处理批量生成任务
Args:
batch_id: 批量任务ID
records: 数据记录列表
content_type: 内容类型
params: 基础生成参数
template_id: 模板ID
provider: LLM提供商
Returns:
处理结果统计
"""
# 获取任务记录
task = self.db.query(BatchTask).filter(
BatchTask.batch_id == batch_id
).first()
if not task:
raise ValueError(f"任务不存在: {batch_id}")
# 更新任务状态
task.status = "processing"
task.total_count = len(records)
task.started_at = datetime.utcnow()
self.db.commit()
# 创建生成器
generator = ContentGenerator(provider)
# 创建内容记录
content_records = []
for idx, record in enumerate(records):
content = Content(
content_type=content_type,
topic=record.get('topic', ''),
params={**params, **record}, # 合并参数
template_id=template_id,
batch_id=batch_id,
batch_index=idx,
status=ContentStatus.PENDING,
)
self.db.add(content)
content_records.append(content)
self.db.commit()
# 并发处理
semaphore = asyncio.Semaphore(settings.BATCH_CONCURRENCY)
async def process_one(content: Content, record: Dict):
"""处理单个任务"""
async with semaphore:
try:
# 合并参数:全局参数 + 行数据
merged_params = {**params}
merged_params.update(record)
# 生成内容
result = await generator.generate(
content_type=content_type,
topic=record.get('topic', ''),
params=merged_params,
template_id=template_id,
)
# 更新记录
content.generated_content = result['content']
content.title = result['title']
content.status = ContentStatus.COMPLETED
content.word_count = result['word_count']
content.generation_time = result['generation_time']
content.llm_provider = result['llm_provider']
content.llm_model = result['llm_model']
# 更新任务进度
task.completed_count += 1
except Exception as e:
content.status = ContentStatus.FAILED
content.generated_content = f"生成失败: {str(e)}"
task.failed_count += 1
self.db.commit()
# 执行所有任务
tasks = [
process_one(content, record)
for content, record in zip(content_records, records)
]
await asyncio.gather(*tasks)
# 完成任务
task.status = "completed"
task.completed_at = datetime.utcnow()
self.db.commit()
return {
"batch_id": batch_id,
"total": len(records),
"completed": task.completed_count,
"failed": task.failed_count,
}
def export_results(
self,
batch_id: str,
format: str = 'xlsx'
) -> str:
"""
导出批量生成结果
Args:
batch_id: 批量任务ID
format: 导出格式 (xlsx/csv)
Returns:
导出文件路径
"""
# 查询所有相关内容
contents = self.db.query(Content).filter(
Content.batch_id == batch_id
).order_by(Content.batch_index).all()
if not contents:
raise ValueError("没有可导出的内容")
# 构建数据
data = []
for content in contents:
data.append({
'序号': content.batch_index + 1,
'主题': content.topic,
'标题': content.title,
'内容': content.generated_content,
'字数': content.word_count,
'状态': content.status.value,
'生成时间': content.generation_time,
'创建时间': content.created_at.strftime('%Y-%m-%d %H:%M:%S'),
})
df = pd.DataFrame(data)
# 确保目录存在
export_dir = Path(settings.LOCAL_STORAGE_PATH) / 'exports'
export_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
if format == 'csv':
output_path = export_dir / f'batch_result_{batch_id}_{timestamp}.csv'
df.to_csv(output_path, index=False, encoding='utf-8-sig')
else:
output_path = export_dir / f'batch_result_{batch_id}_{timestamp}.xlsx'
df.to_excel(output_path, index=False)
# 更新任务记录
task = self.db.query(BatchTask).filter(
BatchTask.batch_id == batch_id
).first()
if task:
task.result_file = str(output_path)
self.db.commit()
return str(output_path)
def get_task_status(self, batch_id: str) -> Optional[Dict[str, Any]]:
"""
获取任务状态
Args:
batch_id: 批量任务ID
Returns:
任务状态信息
"""
task = self.db.query(BatchTask).filter(
BatchTask.batch_id == batch_id
).first()
if not task:
return None
return task.to_dict()
4.7 Celery异步任务 (tasks/generation.py)
"""
Celery异步任务定义
处理耗时的内容生成任务
"""
import asyncio
from celery import Celery
from celery.signals import task_prerun, task_postrun
from app.config import settings
from app.database import SessionLocal
from app.services.generator import ContentGenerator
from app.services.batch_processor import BatchProcessor
from app.models.content import Content, ContentType, ContentStatus
# 创建Celery应用实例
celery_app = Celery(
"content_generator",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
)
# Celery配置
celery_app.conf.update(
# 任务序列化
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# 时区设置
timezone="Asia/Shanghai",
enable_utc=True,
# 任务执行设置
task_track_started=True,
task_time_limit=600, # 任务最大执行时间10分钟
task_soft_time_limit=300, # 软限制5分钟
# 结果存储设置
result_expires=3600 * 24 * 7, # 结果保留7天
# Worker设置
worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
worker_max_tasks_per_child=1000, # 每个worker子进程最大任务数
# 任务路由
task_routes={
"tasks.generation.generate_content": {"queue": "generation"},
"tasks.generation.process_batch": {"queue": "batch"},
},
# 任务默认速率限制
task_default_rate_limit="100/m",
)
@celery_app.task(bind=True, max_retries=3)
def generate_content_task(
self,
content_id: int,
content_type: str,
topic: str,
params: dict = None,
template_id: int = None,
provider: str = None,
):
"""
单条内容生成任务
Args:
content_id: 内容记录ID
content_type: 内容类型字符串
topic: 生成主题
params: 生成参数
template_id: 模板ID
provider: LLM提供商
"""
db = SessionLocal()
try:
# 获取内容记录
content = db.query(Content).filter(Content.id == content_id).first()
if not content:
return {"error": "内容记录不存在"}
# 更新状态为生成中
content.status = ContentStatus.GENERATING
db.commit()
# 创建生成器并生成内容
generator = ContentGenerator(provider)
# 异步生成
result = asyncio.run(generator.generate(
content_type=ContentType(content_type),
topic=topic,
params=params or {},
template_id=template_id,
))
# 更新记录
content.generated_content = result['content']
content.title = result['title']
content.raw_response = result['raw_response']
content.status = ContentStatus.COMPLETED
content.word_count = result['word_count']
content.generation_time = result['generation_time']
content.llm_provider = result['llm_provider']
content.llm_model = result['llm_model']
db.commit()
return {
"content_id": content_id,
"status": "completed",
"title": result['title'],
"word_count": result['word_count'],
}
except Exception as exc:
# 更新失败状态
if content:
content.status = ContentStatus.FAILED
content.generated_content = f"生成失败: {str(exc)}"
db.commit()
# 重试机制
if self.request.retries < 3:
raise self.retry(exc=exc, countdown=60)
return {
"content_id": content_id,
"status": "failed",
"error": str(exc),
}
finally:
db.close()
@celery_app.task(bind=True)
def process_batch_task(
self,
batch_id: str,
records: list,
content_type: str,
params: dict = None,
template_id: int = None,
provider: str = None,
):
"""
批量内容生成任务
Args:
batch_id: 批量任务ID
records: 数据记录列表
content_type: 内容类型
params: 基础参数
template_id: 模板ID
provider: LLM提供商
"""
db = SessionLocal()
try:
processor = BatchProcessor(db)
# 运行异步处理
result = asyncio.run(processor.process_batch(
batch_id=batch_id,
records=records,
content_type=ContentType(content_type),
params=params or {},
template_id=template_id,
provider=provider,
))
# 导出结果
export_path = processor.export_results(batch_id)
return {
"batch_id": batch_id,
**result,
"export_path": export_path,
}
except Exception as exc:
# 更新任务状态
from app.models.content import BatchTask
task = db.query(BatchTask).filter(BatchTask.batch_id == batch_id).first()
if task:
task.status = "failed"
task.error_message = str(exc)
db.commit()
raise
finally:
db.close()
@celery_app.task
def cleanup_old_results():
"""定时任务:清理过期的生成结果"""
from datetime import datetime, timedelta
db = SessionLocal()
try:
# 删除30天前的失败记录
cutoff = datetime.utcnow() - timedelta(days=30)
deleted = db.query(Content).filter(
Content.status == ContentStatus.FAILED,
Content.created_at < cutoff
).delete()
db.commit()
return {"deleted_count": deleted}
finally:
db.close()
# 定时任务配置
celery_app.conf.beat_schedule = {
"cleanup-old-results": {
"task": "tasks.generation.cleanup_old_results",
"schedule": 3600 * 24, # 每天执行一次
},
}
# 任务信号处理
@task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **extras):
"""任务开始前的处理"""
print(f"[Task Started] {task.name} - ID: {task_id}")
@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **extras):
"""任务完成后的处理"""
print(f"[Task Finished] {task.name} - ID: {task_id} - State: {state}")
4.8 API路由 (routers/content.py)
"""
内容生成API路由
提供RESTful接口供前端调用
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, UploadFile, File
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.content import Content, ContentType, ContentStatus
from app.services.generator import ContentGenerator
from app.services.batch_processor import BatchProcessor
from app.tasks.generation import generate_content_task, process_batch_task
router = APIRouter(prefix="/api/content", tags=["内容生成"])
# ========== 请求/响应模型 ==========
class GenerateRequest(BaseModel):
"""单条生成请求"""
content_type: str = Field(..., description="内容类型")
topic: str = Field(..., min_length=1, max_length=500, description="生成主题")
params: dict = Field(default={}, description="生成参数")
template_id: Optional[int] = Field(None, description="模板ID")
provider: Optional[str] = Field(None, description="LLM提供商")
async_mode: bool = Field(default=False, description="是否异步执行")
class GenerateResponse(BaseModel):
"""生成响应"""
id: int
title: str
content: str
content_type: str
status: str
word_count: int
generation_time: int
class BatchCreateRequest(BaseModel):
"""批量任务创建请求"""
content_type: str
params: dict = {}
template_id: Optional[int] = None
task_name: Optional[str] = None
class BatchStatusResponse(BaseModel):
"""批量任务状态响应"""
batch_id: str
task_name: str
status: str
total_count: int
completed_count: int
failed_count: int
progress: float
# ========== API端点 ==========
@router.post("/generate", response_model=GenerateResponse)
async def generate_content(
request: GenerateRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""
生成单条内容
- 支持同步和异步两种模式
- 同步模式直接返回结果
- 异步模式返回任务ID,通过状态接口查询
"""
# 验证内容类型
try:
content_type = ContentType(request.content_type)
except ValueError:
raise HTTPException(status_code=400, detail="无效的内容类型")
# 创建内容记录
content = Content(
content_type=content_type,
topic=request.topic,
params=request.params,
template_id=request.template_id,
status=ContentStatus.PENDING,
)
db.add(content)
db.commit()
db.refresh(content)
if request.async_mode:
# 异步模式:提交Celery任务
task = generate_content_task.delay(
content_id=content.id,
content_type=request.content_type,
topic=request.topic,
params=request.params,
template_id=request.template_id,
provider=request.provider,
)
return {
"id": content.id,
"title": "",
"content": "",
"content_type": request.content_type,
"status": "pending",
"word_count": 0,
"generation_time": 0,
"task_id": task.id, # Celery任务ID
}
# 同步模式:直接生成
generator = ContentGenerator(request.provider)
try:
result = await generator.generate(
content_type=content_type,
topic=request.topic,
params=request.params,
template_id=request.template_id,
)
# 更新记录
content.generated_content = result['content']
content.title = result['title']
content.status = ContentStatus.COMPLETED
content.word_count = result['word_count']
content.generation_time = result['generation_time']
content.llm_provider = result['llm_provider']
content.llm_model = result['llm_model']
db.commit()
db.refresh(content)
return {
"id": content.id,
"title": content.title,
"content": content.generated_content,
"content_type": content.content_type.value,
"status": content.status.value,
"word_count": content.word_count,
"generation_time": content.generation_time,
}
except Exception as e:
content.status = ContentStatus.FAILED
db.commit()
raise HTTPException(status_code=500, detail=f"生成失败: {str(e)}")
@router.get("/generate/{content_id}/status")
async def get_generation_status(
content_id: int,
db: Session = Depends(get_db),
):
"""获取生成任务状态(异步模式使用)"""
content = db.query(Content).filter(Content.id == content_id).first()
if not content:
raise HTTPException(status_code=404, detail="内容不存在")
return {
"id": content.id,
"status": content.status.value,
"title": content.title,
"word_count": content.word_count,
"generation_time": content.generation_time,
}
@router.get("/history", response_model=List[dict])
async def get_generation_history(
content_type: Optional[str] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db),
):
"""获取生成历史记录"""
query = db.query(Content).order_by(Content.created_at.desc())
if content_type:
query = query.filter(Content.content_type == content_type)
if status:
query = query.filter(Content.status == status)
contents = query.offset(skip).limit(limit).all()
return [content.to_dict() for content in contents]
@router.post("/batch/create")
async def create_batch_task(
request: BatchCreateRequest,
db: Session = Depends(get_db),
):
"""创建批量生成任务"""
try:
content_type = ContentType(request.content_type)
except ValueError:
raise HTTPException(status_code=400, detail="无效的内容类型")
processor = BatchProcessor(db)
task = processor.create_batch_task(
content_type=content_type,
params=request.params,
template_id=request.template_id,
task_name=request.task_name,
)
return {
"batch_id": task.batch_id,
"task_name": task.task_name,
"status": task.status,
}
@router.post("/batch/{batch_id}/upload")
async def upload_batch_file(
batch_id: str,
file: UploadFile = File(...),
db: Session = Depends(get_db),
):
"""上传批量处理文件"""
processor = BatchProcessor(db)
# 保存上传文件
import shutil
from pathlib import Path
from app.config import settings
upload_dir = Path(settings.LOCAL_STORAGE_PATH) / 'uploads'
upload_dir.mkdir(parents=True, exist_ok=True)
file_path = upload_dir / f"{batch_id}_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 解析文件
try:
records = processor.parse_upload_file(str(file_path))
except Exception as e:
raise HTTPException(status_code=400, detail=f"文件解析失败: {str(e)}")
return {
"batch_id": batch_id,
"filename": file.filename,
"record_count": len(records),
"preview": records[:3], # 预览前3条
}
@router.post("/batch/{batch_id}/start")
async def start_batch_task(
batch_id: str,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""启动批量生成任务"""
from app.models.content import BatchTask
task = db.query(BatchTask).filter(BatchTask.batch_id == batch_id).first()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
# 从上传的文件中读取记录
# 这里简化处理,实际应该从文件重新读取
records = [] # 需要从之前上传的文件中解析
# 提交Celery任务
process_batch_task.delay(
batch_id=batch_id,
records=records,
content_type=task.content_type,
params=task.params,
template_id=task.template_id,
)
return {"message": "批量任务已启动", "batch_id": batch_id}
@router.get("/batch/{batch_id}/status", response_model=BatchStatusResponse)
async def get_batch_status(
batch_id: str,
db: Session = Depends(get_db),
):
"""获取批量任务状态"""
processor = BatchProcessor(db)
status = processor.get_task_status(batch_id)
if not status:
raise HTTPException(status_code=404, detail="任务不存在")
return status
@router.get("/batch/{batch_id}/export")
async def export_batch_results(
batch_id: str,
format: str = "xlsx",
db: Session = Depends(get_db),
):
"""导出批量生成结果"""
processor = BatchProcessor(db)
try:
file_path = processor.export_results(batch_id, format)
from fastapi.responses import FileResponse
return FileResponse(
file_path,
filename=f"batch_result_{batch_id}.{format}",
media_type="application/octet-stream",
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/template/download/{content_type}")
async def download_batch_template(content_type: str):
"""下载批量导入模板"""
try:
ct = ContentType(content_type)
except ValueError:
raise HTTPException(status_code=400, detail="无效的内容类型")
from fastapi.responses import FileResponse
from app.services.batch_processor import BatchProcessor
# 创建一个临时数据库会话来使用processor
from app.database import SessionLocal
db = SessionLocal()
try:
processor = BatchProcessor(db)
file_path = processor.download_template(ct)
return FileResponse(
file_path,
filename=f"template_{content_type}.xlsx",
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
finally:
db.close()
4.9 主应用入口 (main.py)
"""
FastAPI应用主入口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.routers import content, template, batch
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 创建FastAPI应用
app = FastAPI(
title=settings.APP_NAME,
description="AI内容生成平台API",
version="1.0.0",
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(content.router)
app.include_router(template.router, prefix="/api")
app.include_router(batch.router, prefix="/api")
@app.get("/")
async def root():
"""根路径"""
return {
"name": settings.APP_NAME,
"version": "1.0.0",
"docs": "/docs",
}
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {"status": "healthy"}
五、前端开发
5.1 项目结构
frontend/
├── public/
│ └── index.html
├── src/
│ ├── components/
│ │ ├── Generator/ # 生成器组件
│ │ │ ├── TopicInput.jsx # 主题输入
│ │ │ ├── StyleSelector.jsx # 风格选择
│ │ │ ├── GenerateButton.jsx # 生成按钮
│ │ │ └── ResultPreview.jsx # 结果预览
│ │ ├── TemplateEditor/ # 模板编辑器
│ │ │ ├── VariableList.jsx
│ │ │ └── PromptEditor.jsx
│ │ ├── BatchUpload/ # 批量上传
│ │ │ ├── FileUpload.jsx
│ │ │ └── ProgressMonitor.jsx
│ │ └── HistoryList/ # 历史记录
│ │ └── ContentCard.jsx
│ ├── pages/
│ │ ├── Generate.jsx # 主生成页面
│ │ ├── Templates.jsx # 模板管理
│ │ ├── Batch.jsx # 批量生成
│ │ └── History.jsx # 历史记录
│ ├── api/
│ │ └── content.js # API封装
│ ├── hooks/
│ │ ├── useGenerator.js # 生成逻辑Hook
│ │ └── useWebSocket.js # WebSocket Hook
│ ├── utils/
│ │ └── constants.js # 常量定义
│ ├── App.jsx
│ └── index.js
└── package.json
5.2 API封装 (api/content.js)
/**
* 内容生成API封装
* 统一处理所有与后端的交互
*/
import axios from 'axios';
// API基础URL
const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:8000/api';
// 创建axios实例
const apiClient = axios.create({
baseURL: API_BASE_URL,
timeout: 120000, // 2分钟超时(生成任务可能耗时较长)
headers: {
'Content-Type': 'application/json',
},
});
// 请求拦截器
apiClient.interceptors.request.use(
(config) => {
// 可以在这里添加认证token
const token = localStorage.getItem('token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
(error) => Promise.reject(error)
);
// 响应拦截器
apiClient.interceptors.response.use(
(response) => response.data,
(error) => {
console.error('API Error:', error);
return Promise.reject(error.response?.data || error.message);
}
);
/**
* 内容生成API
*/
export const contentApi = {
/**
* 生成单条内容
* @param {Object} params - 生成参数
* @returns {Promise} 生成结果
*/
generate: (params) => apiClient.post('/content/generate', params),
/**
* 获取生成状态
* @param {number} contentId - 内容ID
* @returns {Promise} 状态信息
*/
getStatus: (contentId) => apiClient.get(`/content/generate/${contentId}/status`),
/**
* 获取生成历史
* @param {Object} filters - 筛选条件
* @returns {Promise} 历史记录列表
*/
getHistory: (filters = {}) => apiClient.get('/content/history', { params: filters }),
/**
* 获取内容详情
* @param {number} id - 内容ID
* @returns {Promise} 内容详情
*/
getDetail: (id) => apiClient.get(`/content/${id}`),
/**
* 删除内容
* @param {number} id - 内容ID
* @returns {Promise}
*/
delete: (id) => apiClient.delete(`/content/${id}`),
};
/**
* 模板管理API
*/
export const templateApi = {
/**
* 获取模板列表
* @param {Object} params - 查询参数
* @returns {Promise} 模板列表
*/
getList: (params = {}) => apiClient.get('/templates', { params }),
/**
* 获取模板详情
* @param {number} id - 模板ID
* @returns {Promise} 模板详情
*/
getDetail: (id) => apiClient.get(`/templates/${id}`),
/**
* 创建模板
* @param {Object} data - 模板数据
* @returns {Promise} 创建的模板
*/
create: (data) => apiClient.post('/templates', data),
/**
* 更新模板
* @param {number} id - 模板ID
* @param {Object} data - 更新数据
* @returns {Promise}
*/
update: (id, data) => apiClient.put(`/templates/${id}`, data),
/**
* 删除模板
* @param {number} id - 模板ID
* @returns {Promise}
*/
delete: (id) => apiClient.delete(`/templates/${id}`),
/**
* 预览模板
* @param {number} id - 模板ID
* @param {Object} sampleData - 示例数据
* @returns {Promise} 渲染结果
*/
preview: (id, sampleData) => apiClient.post(`/templates/${id}/preview`, sampleData),
};
/**
* 批量处理API
*/
export const batchApi = {
/**
* 创建批量任务
* @param {Object} params - 任务参数
* @returns {Promise} 任务信息
*/
createTask: (params) => apiClient.post('/content/batch/create', params),
/**
* 上传文件
* @param {string} batchId - 批量任务ID
* @param {File} file - 上传的文件
* @returns {Promise} 解析结果
*/
uploadFile: (batchId, file) => {
const formData = new FormData();
formData.append('file', file);
return apiClient.post(`/content/batch/${batchId}/upload`, formData, {
headers: { 'Content-Type': 'multipart/form-data' },
});
},
/**
* 启动批量任务
* @param {string} batchId - 批量任务ID
* @returns {Promise}
*/
startTask: (batchId) => apiClient.post(`/content/batch/${batchId}/start`),
/**
* 获取任务状态
* @param {string} batchId - 批量任务ID
* @returns {Promise} 任务状态
*/
getStatus: (batchId) => apiClient.get(`/content/batch/${batchId}/status`),
/**
* 导出结果
* @param {string} batchId - 批量任务ID
* @param {string} format - 导出格式
* @returns {Promise} 文件下载
*/
exportResults: (batchId, format = 'xlsx') =>
apiClient.get(`/content/batch/${batchId}/export`, {
params: { format },
responseType: 'blob',
}),
/**
* 下载导入模板
* @param {string} contentType - 内容类型
* @returns {Promise} 文件下载
*/
downloadTemplate: (contentType) =>
apiClient.get(`/content/template/download/${contentType}`, {
responseType: 'blob',
}),
};
export default {
content: contentApi,
template: templateApi,
batch: batchApi,
};
5.3 主生成页面 (pages/Generate.jsx)
import React, { useState, useCallback } from 'react';
import {
Card, Form, Input, Select, Button, message,
Row, Col, Tabs, Slider, Switch, Space, Tag
} from 'antd';
import {
SendOutlined, CopyOutlined, SaveOutlined,
DownloadOutlined, ClearOutlined
} from '@ant-design/icons';
import ReactMarkdown from 'react-markdown';
import { contentApi } from '../api/content';
const { TextArea } = Input;
const { Option } = Select;
const { TabPane } = Tabs;
// 内容类型选项
const CONTENT_TYPES = [
{ value: 'article', label: '📄 公众号文章', color: 'blue' },
{ value: 'xiaohongshu', label: '📕 小红书文案', color: 'red' },
{ value: 'video_script', label: '🎬 短视频脚本', color: 'purple' },
{ value: 'twitter', label: '🐦 Twitter推文', color: 'cyan' },
];
// 风格选项
const STYLE_OPTIONS = {
article: ['专业', '科普', '故事', '干货', '轻松', '严肃'],
xiaohongshu: ['种草', '探店', '教程', '测评', '穿搭', '美食'],
video_script: ['教程', 'vlog', '剧情', '口播', '搞笑', '情感'],
twitter: ['观点', '分享', '提问', '吐槽', '干货', '热点'],
};
/**
* 主生成页面组件
* 提供内容生成的表单和预览功能
*/
const Generate = () => {
const [form] = Form.useForm();
const [loading, setLoading] = useState(false);
const [result, setResult] = useState(null);
const [contentType, setContentType] = useState('article');
const [activeTab, setActiveTab] = useState('edit');
// 处理内容类型变更
const handleTypeChange = (value) => {
setContentType(value);
// 重置风格选择
form.setFieldsValue({ style: undefined });
};
// 提交生成请求
const handleGenerate = useCallback(async (values) => {
setLoading(true);
try {
const response = await contentApi.generate({
content_type: values.contentType,
topic: values.topic,
params: {
style: values.style,
audience: values.audience,
word_count: values.wordCount,
temperature: values.temperature,
...values.extraParams,
},
template_id: values.templateId,
provider: values.provider,
});
setResult(response);
setActiveTab('preview');
message.success('生成成功!');
} catch (error) {
message.error('生成失败: ' + (error.message || '未知错误'));
} finally {
setLoading(false);
}
}, []);
// 复制内容到剪贴板
const handleCopy = useCallback(() => {
if (result?.content) {
navigator.clipboard.writeText(result.content);
message.success('已复制到剪贴板');
}
}, [result]);
// 清空表单
const handleClear = useCallback(() => {
form.resetFields();
setResult(null);
setActiveTab('edit');
}, [form]);
// 导出为文件
const handleExport = useCallback((format) => {
if (!result?.content) return;
const blob = new Blob([result.content], { type: 'text/plain' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `content_${Date.now()}.${format}`;
a.click();
URL.revokeObjectURL(url);
}, [result]);
return (
{/* 左侧:配置表单 */}
{/* 主题输入 */}
{/* 风格选择 */}
{/* 目标读者 */}
{/* 字数控制 */}
{/* 创意度 */}
{/* 模型选择 */}
{/* 操作按钮 */}
}
loading={loading}
size="large"
>
开始生成
}
onClick={handleClear}
>
清空
{/* 右侧:结果预览 */}
}
onClick={handleCopy}
>
复制
}
onClick={() => handleExport('md')}
>
导出
)}
>
在左侧配置参数并点击"开始生成"...
{result ? (
{result.word_count} 字
{result.generation_time}ms
{result.llm_provider}
{result.content}
) : (
暂无生成结果
)}
{result ? (
{result.content}
) : (
暂无生成结果
)}
);
};
export default Generate;
5.4 批量生成页面 (pages/Batch.jsx)
import React, { useState, useCallback, useEffect } from 'react';
import {
Card, Steps, Upload, Button, Table, Progress,
message, Tag, Space, Descriptions, Alert, Modal
} from 'antd';
import {
UploadOutlined, DownloadOutlined, PlayCircleOutlined,
FileExcelOutlined, CheckCircleOutlined, ClockCircleOutlined
} from '@ant-design/icons';
import { batchApi } from '../api/content';
const { Step } = Steps;
const { Dragger } = Upload;
/**
* 批量生成页面
* 支持Excel/CSV上传、批量生成、结果导出
*/
const Batch = () => {
const [currentStep, setCurrentStep] = useState(0);
const [batchId, setBatchId] = useState(null);
const [fileList, setFileList] = useState([]);
const [uploadData, setUploadData] = useState(null);
const [taskStatus, setTaskStatus] = useState(null);
const [loading, setLoading] = useState(false);
const [polling, setPolling] = useState(false);
// 创建批量任务
const createTask = useCallback(async () => {
try {
const response = await batchApi.createTask({
content_type: 'article',
task_name: `批量任务_${new Date().toLocaleString()}`,
});
setBatchId(response.batch_id);
setCurrentStep(1);
message.success('任务创建成功,请上传文件');
} catch (error) {
message.error('创建任务失败');
}
}, []);
// 文件上传配置
const uploadProps = {
name: 'file',
multiple: false,
action: null, // 自定义上传
accept: '.xlsx,.xls,.csv',
fileList,
beforeUpload: async (file) => {
if (!batchId) {
message.error('请先创建任务');
return false;
}
setLoading(true);
try {
const response = await batchApi.uploadFile(batchId, file);
setUploadData(response);
setFileList([file]);
message.success(`成功解析 ${response.record_count} 条记录`);
return false; // 阻止默认上传
} catch (error) {
message.error('文件解析失败');
return false;
} finally {
setLoading(false);
}
},
onRemove: () => {
setFileList([]);
setUploadData(null);
},
};
// 开始批量生成
const startGeneration = useCallback(async () => {
if (!batchId) return;
setLoading(true);
try {
await batchApi.startTask(batchId);
setCurrentStep(3);
setPolling(true);
message.success('批量生成已启动');
} catch (error) {
message.error('启动失败');
} finally {
setLoading(false);
}
}, [batchId]);
// 轮询任务状态
useEffect(() => {
if (!polling || !batchId) return;
const interval = setInterval(async () => {
try {
const status = await batchApi.getStatus(batchId);
setTaskStatus(status);
if (status.status === 'completed' || status.status === 'failed') {
setPolling(false);
setCurrentStep(4);
if (status.status === 'completed') {
message.success('批量生成完成!');
}
}
} catch (error) {
console.error('获取状态失败', error);
}
}, 2000);
return () => clearInterval(interval);
}, [polling, batchId]);
// 导出结果
const exportResults = useCallback(async (format = 'xlsx') => {
if (!batchId) return;
try {
const blob = await batchApi.exportResults(batchId, format);
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `batch_result_${batchId}.${format}`;
a.click();
URL.revokeObjectURL(url);
message.success('导出成功');
} catch (error) {
message.error('导出失败');
}
}, [batchId]);
// 下载模板
const downloadTemplate = useCallback(async () => {
try {
const blob = await batchApi.downloadTemplate('article');
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'batch_template.xlsx';
a.click();
URL.revokeObjectURL(url);
} catch (error) {
message.error('下载模板失败');
}
}, []);
// 预览表格列
const previewColumns = uploadData?.preview?.[0]
? Object.keys(uploadData.preview[0]).map(key => ({
title: key,
dataIndex: key,
key,
ellipsis: true,
}))
: [];
const steps = [
{
title: '创建任务',
content: (
}
onClick={createTask}
size="large"
>
创建批量任务
),
},
{
title: '上传文件',
content: (
} onClick={downloadTemplate}>
下载导入模板
点击或拖拽文件到此区域上传
支持 .xlsx, .xls, .csv 格式,文件大小不超过 10MB
{uploadData && (
)}
),
},
{
title: '确认预览',
content: (
{uploadData?.preview && (
<>
}
onClick={startGeneration}
loading={loading}
>
开始生成
>
)}
),
},
{
title: '生成中',
content: (
{taskStatus && (
{taskStatus.batch_id}
{taskStatus.task_name}
{taskStatus.status}
} color="success">
成功: {taskStatus.completed_count}
} color="processing">
总计: {taskStatus.total_count}
)}
),
},
{
title: '完成',
content: (
0 ? "warning" : "success"}
title={taskStatus?.failed_count > 0 ? "部分生成完成" : "全部生成完成"}
subTitle={`成功: ${taskStatus?.completed_count} / ${taskStatus?.total_count}`}
extra={[
}
onClick={() => exportResults('xlsx')}
>
导出Excel
,
,
,
]}
/>
),
},
];
return (
{steps.map(item => (
))}
{steps[currentStep].content}
);
};
export default Batch;
六、提示词工程详解
6.1 文章生成模板
【SYSTEM】
你是一位专业的{style}领域作家,拥有丰富的内容创作经验。你擅长撰写深入浅出、逻辑清晰的文章,能够准确把握读者需求。
【USER】
请根据以下要求撰写一篇文章:
主题:{topic}
目标读者:{audience}
写作风格:{style}
预期字数:约{word_count}字
文章结构要求:
1. 标题(20字以内,吸引眼球,包含核心关键词)
2. 引言(100-150字,提出问题或引出话题)
3. 正文(3-5个小节,每节有小标题)
- 提供具体案例或数据支撑
- 使用通俗易懂的语言
- 适当使用对比、类比等修辞手法
4. 结论(总结观点,给出行动建议或思考)
格式要求:
- 使用Markdown格式
- 重要观点使用加粗
- 适当使用列表和引用
- 不要生成目录
请直接输出完整的文章内容:
6.2 小红书文案模板
【SYSTEM】
你是一位小红书爆款文案写手,擅长创作真实、亲切、有感染力的种草内容。你熟悉小红书的调性,懂得如何用emoji和排版增加内容吸引力。
【USER】
请为以下产品/主题创作一篇小红书文案:
主题:{topic}
风格调性:{style}
核心关键词:{keywords}
emoji使用程度:{emoji_level}
创作要求:
【标题】
- 包含1-2个emoji
- 15字以内
- 制造好奇心或提供价值承诺
【正文】
- 开头:个人化场景引入(2-3句话)
- 中间:产品/体验亮点介绍(3-5点)
- 结尾:真实感受+互动引导
- 根据emoji_level添加适量emoji
【标签】
生成5-8个相关标签,格式:#标签名
【排版要求】
- 适当使用换行
- 重要内容可加emoji强调
- 避免大段文字
请直接输出文案内容:
6.3 视频脚本模板
【SYSTEM】
你是一位资深短视频编导,精通各类短视频平台的算法推荐机制。你擅长设计抓人眼球的开场、节奏紧凑的内容和有效的互动结尾。
【USER】
请为以下主题创作一个短视频脚本:
视频主题:{topic}
目标时长:{duration}秒
内容风格:{style}
发布平台:{platform}
脚本格式:
【视频标题】(15字以内,含关键词)
【开场画面】(前3秒,必须抓人眼球)
画面描述:
台词/字幕:
时长:X秒
【正文镜头】
每个镜头包含:
- 镜头编号
- 画面描述(景别、动作、场景)
- 台词/字幕(口语化,不超过20字/句)
- 时长(秒)
- 转场方式(如需要)
【结尾画面】
- 总结/引导关注/CTA
- 预留1-2秒给观众反应
【BGM建议】
推荐音乐风格或具体曲目
【字幕样式建议】
颜色、位置、特效等
请按以上格式输出完整脚本:
七、批量处理功能详解
7.1 功能流程
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 1.下载模板 │───▶│ 2.填写数据 │───▶│ 3.上传文件 │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌─────────────┐ ┌─────────────┐ ┌──────▼──────┐
│ 6.导出结果 │◀───│ 5.监控进度 │◀───│ 4.启动任务 │
└─────────────┘ └─────────────┘ └─────────────┘
7.2 Excel模板格式
| topic(必需) | style | audience | word_count | keywords |
|---|---|---|---|---|
| 人工智能医疗应用 | 科普 | 普通大众 | 1500 | AI,医疗,健康 |
| 高效学习方法 | 干货 | 学生 | 1200 | 学习,效率 |
八、高级功能
8.1 内容风格迁移
# 风格迁移服务示例
class StyleTransferService:
"""风格迁移服务 - 将内容转换为指定风格"""
STYLE_PROMPTS = {
"professional": "使用专业、严谨的商务语言",
"casual": "使用轻松、亲切的口语化表达",
"humorous": "加入幽默元素,让内容更有趣",
"poetic": "使用优美的文学语言,富有意境",
"technical": "深入技术细节,面向专业读者",
}
async def transfer(self, content: str, target_style: str) -> str:
prompt = f"""
请将以下内容转换为{target_style}风格:
原内容:
{content}
要求:{self.STYLE_PROMPTS.get(target_style, target_style)}
保持原意不变,只调整表达方式。
"""
# 调用LLM进行风格转换
...
8.2 SEO关键词优化
class SEOOptimizer:
"""SEO优化服务"""
def optimize_title(self, title: str, keywords: List[str]) -> str:
"""优化标题,确保包含核心关键词"""
# 检查关键词覆盖
covered = [k for k in keywords if k in title]
# 如果覆盖不足,重写标题
...
def generate_meta_description(self, content: str) -> str:
"""生成Meta Description"""
# 提取摘要,控制在150字符内
...
def suggest_keywords(self, content: str) -> List[str]:
"""基于内容推荐关键词"""
# 使用NLP提取关键词
...
8.3 敏感词检测
class SensitiveWordDetector:
"""敏感词检测服务"""
def __init__(self):
# 加载敏感词库
self.sensitive_words = self._load_word_list()
def detect(self, text: str) -> Dict:
"""检测敏感词"""
found = []
for word in self.sensitive_words:
if word in text:
found.append(word)
return {
"has_sensitive": len(found) > 0,
"words_found": found,
"risk_level": "high" if len(found) > 3 else "medium" if len(found) > 0 else "low"
}
def replace(self, text: str, replacement: str = "***") -> str:
"""替换敏感词"""
for word in self.sensitive_words:
text = text.replace(word, replacement)
return text
九、部署上线
9.1 Docker配置
# docker-compose.yml
version: '3.8'
services:
# 后端API服务
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://content_gen:password@postgres:5432/content_generator
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- postgres
- redis
volumes:
- ./uploads:/app/uploads
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
# Celery Worker
worker:
build: .
environment:
- DATABASE_URL=postgresql://content_gen:password@postgres:5432/content_generator
- REDIS_HOST=redis
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- postgres
- redis
command: celery -A app.tasks.generation worker --loglevel=info
# Celery Beat(定时任务)
beat:
build: .
environment:
- REDIS_HOST=redis
depends_on:
- redis
command: celery -A app.tasks.generation beat --loglevel=info
# 前端服务
frontend:
build: ./frontend
ports:
- "3000:80"
depends_on:
- api
# 数据库
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: content_gen
POSTGRES_PASSWORD: password
POSTGRES_DB: content_generator
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
9.2 环境变量清单
# .env 文件模板
# 应用配置
DEBUG=false
SECRET_KEY=your-secret-key-here
# 数据库
POSTGRES_USER=content_gen
POSTGRES_PASSWORD=your_secure_password
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=content_generator
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
# LLM配置
OPENAI_API_KEY=sk-xxxxxxxx
OPENAI_MODEL=gpt-4
ANTHROPIC_API_KEY=sk-ant-xxxxxxxx
# 可选:阿里云OSS配置
OSS_ACCESS_KEY=your-access-key
OSS_SECRET_KEY=your-secret-key
OSS_BUCKET=your-bucket
OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com
9.3 Nginx配置
server {
listen 80;
server_name your-domain.com;
# 前端静态文件
location / {
root /var/www/frontend;
try_files $uri $uri/ /index.html;
}
# API代理
location /api/ {
proxy_pass http://localhost:8000/api/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# 上传文件访问
location /uploads/ {
alias /var/www/uploads/;
}
}
# HTTPS配置(使用Certbot自动生成)
server {
listen 443 ssl http2;
server_name your-domain.com;
ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
# ... 其他配置同上
}
十、完整项目代码
requirements.txt
# Web框架
fastapi==0.104.1
uvicorn[standard]==0.24.0
# 数据库
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
alembic==1.12.1
# 异步任务
celery==5.3.4
redis==5.0.1
# LLM
openai==1.3.5
anthropic==0.7.7
# 数据处理
pandas==2.1.3
openpyxl==3.1.2
# 工具
python-dotenv==1.0.0
pydantic==2.5.2
python-multipart==0.0.6
# 开发依赖
pytest==7.4.3
pytest-asyncio==0.21.1
black==23.11.0
前端 package.json
{
"name": "content-generator-frontend",
"version": "1.0.0",
"private": true,
"dependencies": {
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.20.0",
"react-markdown": "^9.0.1",
"antd": "^5.11.5",
"@ant-design/icons": "^5.2.6",
"axios": "^1.6.2"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test"
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}
总结
本案例完整实现了一个自动化内容生成平台,主要特点:
- 模块化架构:前后端分离,服务层清晰,易于扩展
- 多LLM支持:支持OpenAI、Claude等多种模型,可灵活切换
- 模板系统:可配置的提示词模板,支持变量替换和预览
- 批量处理:Excel导入导出,并发生成,WebSocket进度推送
- 异步架构:Celery处理耗时任务,提升系统响应能力
- 生产就绪:Docker部署、环境隔离、日志监控
下一步可以扩展的功能:
- 用户权限管理和多租户支持
- A/B测试功能,对比不同提示词效果
- 内容评分和质量反馈循环
- 接入更多内容平台API,实现一键发布