🎯 本章目标
本章将带你完成3个完整项目:
- 项目1:文件系统助手 - 让Claude能安全地读写本地文件
- 项目2:数据库查询助手 - 自然语言查询SQL数据库
- 项目3:API聚合网关 - 统一访问多个外部API
每个项目都包含完整代码、配置说明和最佳实践。
📁 项目1:文件系统助手
构建一个安全的文件系统助手,让Claude能够读取、写入、搜索本地文件,同时通过Roots机制限制访问范围。
项目结构
filesystem-assistant/
├── server.py # MCP Server主文件
├── file_utils.py # 文件操作工具
├── security.py # 安全验证模块
├── requirements.txt # 依赖
└── claude_config.json # Claude Desktop配置
1. 核心代码:server.py
#!/usr/bin/env python3
"""
MCP 文件系统助手 Server
功能:安全地读写本地文件、搜索内容、分析代码
"""
import asyncio
import json
import os
import re
from pathlib import Path
from typing import AsyncIterator, Optional, List
from dataclasses import dataclass
from contextlib import asynccontextmanager
from mcp.server import Server
from mcp.types import (
Resource,
Tool,
TextContent,
ErrorData,
INTERNAL_ERROR,
INVALID_PARAMS,
)
from mcp.server.stdio import stdio_server
# 配置
APP_NAME = "filesystem-assistant"
ALLOWED_EXTENSIONS = {'.txt', '.md', '.py', '.js', '.json', '.yaml', '.yml', '.csv', '.html', '.css'}
MAX_FILE_SIZE = 1024 * 1024 # 1MB
class SecurityManager:
"""安全管理器 - 处理Roots限制和路径验证"""
def __init__(self, roots: List[str] = None):
self.roots: List[Path] = []
if roots:
self.roots = [Path(r.replace("file://", "")).resolve() for r in roots]
def is_path_allowed(self, target_path: str) -> bool:
"""检查路径是否在允许的Roots范围内"""
if not self.roots:
return True
target = Path(target_path).resolve()
for root in self.roots:
try:
target.relative_to(root)
return True
except ValueError:
continue
return False
def validate_file_path(self, file_path: str, must_exist: bool = False) -> Path:
"""验证文件路径"""
if '..' in file_path or '~' in file_path:
raise ValueError("路径包含非法字符")
if not self.is_path_allowed(file_path):
raise PermissionError(f"访问被拒绝: {file_path} 不在允许的Roots范围内")
path = Path(file_path)
if path.suffix.lower() not in ALLOWED_EXTENSIONS:
raise ValueError(f"不支持的文件类型: {path.suffix}")
if must_exist and not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
return path
def sanitize_content(self, content: str) -> str:
"""清理文件内容中的敏感信息"""
patterns = [
(r'sk-[a-zA-Z0-9]{48}', 'API_KEY_HIDDEN'),
(r'[a-zA-Z0-9_-]{20,}\.[a-zA-Z0-9_-]{20,}\.[a-zA-Z0-9_-]{20,}', 'JWT_TOKEN_HIDDEN'),
(r'password\s*=\s*["\'][^"\']+["\']', 'password=HIDDEN'),
]
for pattern, replacement in patterns:
content = re.sub(pattern, replacement, content)
return content
class FileManager:
"""文件管理器"""
def __init__(self, security: SecurityManager):
self.security = security
async def read_file(self, file_path: str, offset: int = 0, limit: int = None) -> str:
"""读取文件内容"""
path = self.security.validate_file_path(file_path, must_exist=True)
file_size = path.stat().st_size
if file_size > MAX_FILE_SIZE:
raise ValueError(f"文件过大 ({file_size} bytes),最大支持 {MAX_FILE_SIZE} bytes")
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
if offset > 0:
f.seek(offset)
content = f.read(limit) if limit else f.read()
return self.security.sanitize_content(content)
async def write_file(self, file_path: str, content: str, append: bool = False) -> dict:
"""写入文件"""
path = self.security.validate_file_path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if append else 'w'
with open(path, mode, encoding='utf-8') as f:
f.write(content)
return {"success": True, "path": str(path), "size": len(content)}
async def search_files(self, directory: str, pattern: str, file_pattern: str = "*") -> List[dict]:
"""在目录中搜索文件内容"""
if not self.security.is_path_allowed(directory):
raise PermissionError("访问被拒绝")
dir_path = Path(directory)
if not dir_path.exists():
raise FileNotFoundError(f"目录不存在: {directory}")
results = []
regex = re.compile(pattern, re.IGNORECASE)
for file_path in dir_path.rglob(file_pattern):
if file_path.suffix not in ALLOWED_EXTENSIONS:
continue
if file_path.stat().st_size > MAX_FILE_SIZE:
continue
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
matches = list(regex.finditer(content))
if matches:
contexts = []
for match in matches[:5]:
start = max(0, match.start() - 50)
end = min(len(content), match.end() + 50)
context = content[start:end].replace('\n', ' ')
contexts.append(f"...{context}...")
results.append({
"file": str(file_path),
"matches": len(matches),
"contexts": contexts
})
except Exception:
continue
return results
async def analyze_code(self, file_path: str) -> dict:
"""分析代码文件"""
path = self.security.validate_file_path(file_path, must_exist=True)
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
analysis = {
"file": str(path),
"language": path.suffix,
"total_lines": len(lines),
"code_lines": len([l for l in lines if l.strip() and not l.strip().startswith('#')]),
"comment_lines": len([l for l in lines if l.strip().startswith('#')]),
"blank_lines": len([l for l in lines if not l.strip()]),
"imports": [],
"functions": [],
"classes": []
}
if path.suffix == '.py':
for line in lines:
stripped = line.strip()
if stripped.startswith('import ') or stripped.startswith('from '):
analysis["imports"].append(stripped)
elif stripped.startswith('def '):
func_name = stripped[4:].split('(')[0].strip()
analysis["functions"].append(func_name)
elif stripped.startswith('class '):
class_name = stripped[6:].split('(')[0].strip(': ')
analysis["classes"].append(class_name)
return analysis
async def list_directory(self, directory: str, recursive: bool = False) -> List[dict]:
"""列出目录内容"""
if not self.security.is_path_allowed(directory):
raise PermissionError("访问被拒绝")
dir_path = Path(directory)
if not dir_path.exists():
raise FileNotFoundError(f"目录不存在: {directory}")
items = []
iterator = dir_path.rglob('*') if recursive else dir_path.iterdir()
for item in iterator:
stat = item.stat()
items.append({
"path": str(item),
"type": "directory" if item.is_dir() else "file",
"size": stat.st_size if item.is_file() else None,
"modified": stat.st_mtime
})
return items
class FileSystemServer:
"""文件系统MCP Server"""
def __init__(self):
self.server = Server(APP_NAME)
self.security: Optional[SecurityManager] = None
self.file_manager: Optional[FileManager] = None
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources() -> List[Resource]:
if not self.security or not self.security.roots:
return []
resources = []
for root in self.security.roots:
if root.exists():
resources.append(Resource(
uri=f"file://{root}",
name=root.name,
mimeType="inode/directory"
))
return resources
@self.server.read_resource()
async def read_resource(uri: str) -> str:
file_path = uri.replace("file://", "")
return await self.file_manager.read_file(file_path)
@self.server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="read_file",
description="读取文件内容,支持偏移量和限制行数",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"offset": {"type": "integer", "default": 0},
"limit": {"type": "integer", "default": 10000}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="写入或追加内容到文件",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "文件内容"},
"append": {"type": "boolean", "default": False}
},
"required": ["path", "content"]
}
),
Tool(
name="search_files",
description="在目录中搜索文件内容(正则表达式)",
inputSchema={
"type": "object",
"properties": {
"directory": {"type": "string", "description": "搜索目录"},
"pattern": {"type": "string", "description": "搜索模式(正则)"},
"file_pattern": {"type": "string", "default": "*"}
},
"required": ["directory", "pattern"]
}
),
Tool(
name="analyze_code",
description="分析代码文件,提取结构和统计信息",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "代码文件路径"}
},
"required": ["path"]
}
),
Tool(
name="list_directory",
description="列出目录内容",
inputSchema={
"type": "object",
"properties": {
"directory": {"type": "string", "description": "目录路径"},
"recursive": {"type": "boolean", "default": False}
},
"required": ["directory"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
if name == "read_file":
content = await self.file_manager.read_file(
arguments["path"],
arguments.get("offset", 0),
arguments.get("limit")
)
return [TextContent(type="text", text=content)]
elif name == "write_file":
result = await self.file_manager.write_file(
arguments["path"],
arguments["content"],
arguments.get("append", False)
)
return [TextContent(
type="text",
text=f"文件写入成功\n路径: {result['path']}\n大小: {result['size']} 字符"
)]
elif name == "search_files":
results = await self.file_manager.search_files(
arguments["directory"],
arguments["pattern"],
arguments.get("file_pattern", "*")
)
if not results:
return [TextContent(type="text", text="未找到匹配内容")]
text = f"找到 {len(results)} 个匹配文件:\n\n"
for r in results:
text += f"文件: {r['file']}\n"
text += f"匹配数: {r['matches']}\n"
for ctx in r['contexts']:
text += f" {ctx}\n"
text += "\n"
return [TextContent(type="text", text=text)]
elif name == "analyze_code":
analysis = await self.file_manager.analyze_code(arguments["path"])
text = f"代码分析: {analysis['file']}\n\n"
text += f"语言: {analysis['language']}\n"
text += f"总行数: {analysis['total_lines']}\n"
text += f"代码行: {analysis['code_lines']}\n"
text += f"注释行: {analysis['comment_lines']}\n\n"
if analysis['functions']:
text += f"函数: {', '.join(analysis['functions'])}\n"
if analysis['classes']:
text += f"类: {', '.join(analysis['classes'])}\n"
return [TextContent(type="text", text=text)]
elif name == "list_directory":
items = await self.file_manager.list_directory(
arguments["directory"],
arguments.get("recursive", False)
)
text = f"目录: {arguments['directory']}\n"
text += f"共 {len(items)} 个项目\n\n"
for item in items[:50]:
icon = "[DIR]" if item['type'] == 'directory' else "[FILE]"
text += f"{icon} {item['path']}\n"
return [TextContent(type="text", text=text)]
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
return [TextContent(type="text", text=f"错误: {str(e)}")]
async def run(self):
async with stdio_server() as (read_stream, write_stream):
self.security = SecurityManager()
self.file_manager = FileManager(self.security)
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
if __name__ == "__main__":
server = FileSystemServer()
asyncio.run(server.run())
2. requirements.txt
mcp>=1.0.0
3. Claude Desktop配置
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["/path/to/filesystem-assistant/server.py"]
}
}
}
提示:Claude Desktop会自动传递roots配置,Server会自动限制在这些目录内操作。
🗄️ 项目2:数据库查询助手
构建一个自然语言查询SQL数据库的助手,支持只读查询、结果格式化和查询解释。
项目结构
database-assistant/
├── server.py # MCP Server
├── query_engine.py # 查询引擎
├── sql_validator.py # SQL验证器
├── requirements.txt
核心代码:server.py
#!/usr/bin/env python3
"""MCP 数据库查询助手 Server"""
import asyncio
import json
import re
import sqlite3
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from mcp.server.stdio import stdio_server
import aiosqlite
APP_NAME = "database-assistant"
class SQLValidator:
"""SQL验证器 - 确保只读操作"""
DANGEROUS_KEYWORDS = [
'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER',
'TRUNCATE', 'GRANT', 'REVOKE', 'EXEC', 'EXECUTE',
'UNION', '--', '/*', '*/', ';', 'xp_', 'sp_'
]
ALLOWED_PREFIXES = ['SELECT', 'WITH', 'EXPLAIN', 'SHOW', 'DESCRIBE']
@classmethod
def validate(cls, sql: str) -> tuple[bool, str]:
normalized = ' '.join(sql.split())
upper_sql = normalized.upper()
for keyword in cls.DANGEROUS_KEYWORDS:
if keyword in upper_sql:
return False, f"检测到危险关键字: {keyword}"
first_word = upper_sql.split()[0] if upper_sql else ''
if first_word not in cls.ALLOWED_PREFIXES:
return False, f"不支持的查询类型: {first_word}"
return True, "验证通过"
class SchemaManager:
"""数据库结构管理器"""
def __init__(self, db_path: str):
self.db_path = db_path
self._schema_cache: Optional[Dict] = None
async def get_schema(self, force_refresh: bool = False) -> Dict:
if self._schema_cache and not force_refresh:
return self._schema_cache
async with aiosqlite.connect(self.db_path) as db:
schema = {"tables": {}}
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)
tables = await cursor.fetchall()
for (table_name,) in tables:
cursor = await db.execute(f"PRAGMA table_info({table_name})")
columns = await cursor.fetchall()
column_info = []
for col in columns:
column_info.append({
"name": col[1],
"type": col[2],
"nullable": not col[3],
"default": col[4],
"primary_key": bool(col[5])
})
schema["tables"][table_name] = {"columns": column_info}
self._schema_cache = schema
return schema
class QueryEngine:
"""查询引擎"""
def __init__(self, db_path: str):
self.db_path = db_path
self.schema_manager = SchemaManager(db_path)
async def execute_query(self, sql: str) -> Dict:
is_valid, message = SQLValidator.validate(sql)
if not is_valid:
raise ValueError(f"SQL验证失败: {message}")
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
start_time = datetime.now()
cursor = await db.execute(sql)
rows = await cursor.fetchall()
columns = [description[0] for description in cursor.description] if cursor.description else []
execution_time = (datetime.now() - start_time).total_seconds()
results = [{col: row[col] for col in columns} for row in rows]
return {
"columns": columns,
"rows": results,
"row_count": len(results),
"execution_time": execution_time
}
def format_results(self, results: Dict) -> str:
if not results["rows"]:
return "查询返回0行数据"
columns = results["columns"]
rows = results["rows"]
# 计算列宽
col_widths = {col: max(len(col), max(len(str(row.get(col, ""))) for row in rows)) + 2 for col in columns}
lines = []
header = "|".join(col.center(col_widths[col]) for col in columns)
lines.append(header)
lines.append("-" * len(header))
for row in rows[:100]:
line = "|".join(str(row.get(col, "")).ljust(col_widths[col]) for col in columns)
lines.append(line)
text = "\n".join(lines)
text += f"\n\n共 {results['row_count']} 行,执行时间: {results['execution_time']:.3f}s"
return text
class DatabaseServer:
"""数据库MCP Server"""
def __init__(self, db_path: str):
self.server = Server(APP_NAME)
self.engine = QueryEngine(db_path)
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources() -> List[Resource]:
schema = await self.engine.schema_manager.get_schema()
resources = [Resource(uri="schema://database", name="数据库结构", mimeType="application/json")]
for table_name in schema["tables"]:
resources.append(Resource(
uri=f"table://{table_name}",
name=f"表: {table_name}",
mimeType="application/json"
))
return resources
@self.server.read_resource()
async def read_resource(uri: str) -> str:
if uri == "schema://database":
schema = await self.engine.schema_manager.get_schema()
return json.dumps(schema, indent=2, ensure_ascii=False)
elif uri.startswith("table://"):
table_name = uri.replace("table://", "")
results = await self.engine.execute_query(f"SELECT * FROM {table_name} LIMIT 10")
return self.engine.format_results(results)
raise ValueError(f"未知资源: {uri}")
@self.server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="execute_query",
description="执行SQL查询(只读)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL查询语句(仅SELECT)"}
},
"required": ["sql"]
}
),
Tool(
name="get_schema",
description="获取数据库结构信息",
inputSchema={
"type": "object",
"properties": {
"table": {"type": "string", "description": "指定表名(可选)"}
}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
if name == "execute_query":
sql = arguments["sql"]
results = await self.engine.execute_query(sql)
formatted = self.engine.format_results(results)
return [TextContent(type="text", text=formatted)]
elif name == "get_schema":
schema = await self.engine.schema_manager.get_schema()
if arguments.get("table"):
table = arguments["table"]
if table in schema["tables"]:
schema = {"tables": {table: schema["tables"][table]}}
return [TextContent(type="text", text=json.dumps(schema, indent=2))]
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
return [TextContent(type="text", text=f"错误: {str(e)}")]
async def run(self):
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
if __name__ == "__main__":
import sys
db_path = sys.argv[1] if len(sys.argv) > 1 else "example.db"
server = DatabaseServer(db_path)
asyncio.run(server.run())
Claude Desktop配置
{
"mcpServers": {
"database": {
"command": "python",
"args": [
"/path/to/database-assistant/server.py",
"/path/to/your/database.db"
]
}
}
}
🌐 项目3:API聚合网关
构建一个API聚合网关,统一访问多个外部API,支持缓存和错误降级。
核心代码:server.py
#!/usr/bin/env python3
"""MCP API聚合网关 Server"""
import asyncio
import json
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import aiohttp
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from mcp.server.stdio import stdio_server
APP_NAME = "api-gateway"
class CacheManager:
"""API响应缓存管理器"""
def __init__(self, default_ttl: int = 300):
self._cache = {}
self.default_ttl = default_ttl
def _make_key(self, prefix: str, *args, **kwargs) -> str:
key_data = f"{prefix}:{args}:{sorted(kwargs.items())}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
entry = self._cache.get(key)
if entry and datetime.now() < entry["expires"]:
return entry["data"]
if entry:
del self._cache[key]
return None
def set(self, key: str, data: Any, ttl: int = None):
ttl = ttl or self.default_ttl
self._cache[key] = {
"data": data,
"expires": datetime.now() + timedelta(seconds=ttl)
}
class APIClient:
"""通用API客户端"""
def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 10):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
async def request(self, method: str, endpoint: str, params: Dict = None, fallback_data: Any = None) -> Dict:
url = f"{self.base_url}{endpoint}"
if params is None:
params = {}
if self.api_key:
params['apikey'] = self.api_key
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
params=params,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
if response.status == 200:
return await response.json()
if fallback_data:
return fallback_data
raise Exception(f"HTTP {response.status}")
except Exception as e:
if fallback_data:
return fallback_data
raise Exception(f"请求失败: {str(e)}")
class WeatherClient(APIClient):
async def get_current(self, city: str) -> Dict:
return await self.request(
'GET', '/current.json',
params={'q': city},
fallback_data={"error": "天气服务暂时不可用", "city": city}
)
class NewsClient(APIClient):
async def get_top_headlines(self, country: str = 'us', category: str = None) -> Dict:
params = {'country': country}
if category:
params['category'] = category
return await self.request(
'GET', '/v2/top-headlines',
params=params,
fallback_data={"status": "error", "articles": []}
)
class StockClient(APIClient):
async def get_quote(self, symbol: str) -> Dict:
return await self.request(
'GET', '/quote',
params={'symbol': symbol},
fallback_data={"symbol": symbol, "error": "股票服务暂时不可用"}
)
class APIGatewayServer:
"""API聚合网关 MCP Server"""
def __init__(self):
self.server = Server(APP_NAME)
self.cache = CacheManager(default_ttl=300)
self.weather = WeatherClient(
base_url=os.getenv('WEATHER_API_URL', 'http://api.weatherapi.com/v1'),
api_key=os.getenv('WEATHER_API_KEY')
)
self.news = NewsClient(
base_url=os.getenv('NEWS_API_URL', 'https://newsapi.org'),
api_key=os.getenv('NEWS_API_KEY')
)
self.stock = StockClient(
base_url=os.getenv('STOCK_API_URL', 'https://api.example.com/stock'),
api_key=os.getenv('STOCK_API_KEY')
)
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="get_weather",
description="获取指定城市的当前天气",
inputSchema={
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"use_cache": {"type": "boolean", "default": True}
},
"required": ["city"]
}
),
Tool(
name="get_news",
description="获取头条新闻",
inputSchema={
"type": "object",
"properties": {
"country": {"type": "string", "default": "us"},
"category": {"type": "string"}
}
}
),
Tool(
name="get_stock_price",
description="获取股票价格",
inputSchema={
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "股票代码"}
},
"required": ["symbol"]
}
),
Tool(
name="batch_get_data",
description="批量获取多种数据",
inputSchema={
"type": "object",
"properties": {
"weather_city": {"type": "string"},
"news_category": {"type": "string"},
"stock_symbol": {"type": "string"}
}
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
if name == "get_weather":
city = arguments["city"]
cache_key = self.cache._make_key("weather", city)
if arguments.get("use_cache", True):
cached = self.cache.get(cache_key)
if cached:
return [TextContent(type="text", text=self._format_weather(cached))]
data = await self.weather.get_current(city)
if "error" not in data:
self.cache.set(cache_key, data, ttl=600)
return [TextContent(type="text", text=self._format_weather(data))]
elif name == "get_news":
data = await self.news.get_top_headlines(
arguments.get("country", "us"),
arguments.get("category")
)
return [TextContent(type="text", text=self._format_news(data))]
elif name == "get_stock_price":
symbol = arguments["symbol"].upper()
data = await self.stock.get_quote(symbol)
return [TextContent(type="text", text=self._format_stock(data))]
elif name == "batch_get_data":
tasks = []
labels = []
if arguments.get("weather_city"):
tasks.append(self.weather.get_current(arguments["weather_city"]))
labels.append("weather")
if arguments.get("news_category"):
tasks.append(self.news.get_top_headlines(category=arguments["news_category"]))
labels.append("news")
if arguments.get("stock_symbol"):
tasks.append(self.stock.get_quote(arguments["stock_symbol"]))
labels.append("stock")
results = await asyncio.gather(*tasks, return_exceptions=True)
text = "批量数据获取结果:\n\n"
for label, result in zip(labels, results):
if isinstance(result, Exception):
text += f"{label}: 错误 - {str(result)}\n\n"
else:
if label == "weather":
text += self._format_weather(result) + "\n\n"
elif label == "news":
text += self._format_news(result) + "\n\n"
elif label == "stock":
text += self._format_stock(result) + "\n\n"
return [TextContent(type="text", text=text)]
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
return [TextContent(type="text", text=f"错误: {str(e)}")]
def _format_weather(self, data: Dict) -> str:
if "error" in data:
return f"天气: {data['error']}"
current = data.get("current", {})
location = data.get("location", {})
return f"""天气 - {location.get('name', 'Unknown')}
温度: {current.get('temp_c')}°C
天气: {current.get('condition', {}).get('text', 'N/A')}
湿度: {current.get('humidity')}%"""
def _format_news(self, data: Dict) -> str:
if data.get("status") != "ok":
return f"新闻获取失败: {data.get('message', 'Unknown')}"
articles = data.get("articles", [])
if not articles:
return "暂无新闻"
text = f"最新新闻 ({len(articles)} 条)\n\n"
for i, article in enumerate(articles[:5], 1):
text += f"{i}. {article.get('title', 'N/A')}\n"
text += f" 来源: {article.get('source', {}).get('name', 'N/A')}\n\n"
return text
def _format_stock(self, data: Dict) -> str:
if "error" in data:
return f"股票: {data['error']}"
symbol = data.get("symbol", "N/A")
price = data.get("price", "N/A")
change = data.get("change", 0)
return f"股票 - {symbol}\n价格: ${price}\n涨跌: {change:+.2f}"
async def run(self):
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
if __name__ == "__main__":
server = APIGatewayServer()
asyncio.run(server.run())
Docker部署配置
Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
ENV PYTHONUNBUFFERED=1
CMD ["python", "server.py"]
docker-compose.yml:
version: '3.8'
services:
api-gateway:
build: .
environment:
- WEATHER_API_KEY=${WEATHER_API_KEY}
- NEWS_API_KEY=${NEWS_API_KEY}
restart: unless-stopped
🔗 与LangChain集成
在LangChain中使用MCP Server作为工具:
# langchain_integration.py
from langchain.tools import BaseTool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
from pydantic import Field
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import asyncio
class MCPTool(BaseTool):
name: str = Field(default="mcp_tool")
description: str = Field(default="MCP工具")
server_params: StdioServerParameters = Field(default=None)
tool_name: str = Field(default="")
async def _arun(self, query: str) -> str:
async with stdio_client(self.server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(self.tool_name, {"query": query})
return result.content[0].text
def _run(self, query: str) -> str:
return asyncio.run(self._arun(query))
# 创建MCP工具
weather_server = StdioServerParameters(
command="python",
args=["/path/to/api-gateway/server.py"]
)
weather_tool = MCPTool(
name="get_weather",
description="获取指定城市的天气信息",
server_params=weather_server,
tool_name="get_weather"
)
# 使用Agent
llm = ChatOpenAI(temperature=0)
agent = initialize_agent(
[weather_tool],
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
response = agent.run("北京今天天气怎么样?")
print(response)
🔌 与OpenAI集成
在OpenAI Function Calling中使用MCP工具:
# openai_integration.py
import json
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
client = OpenAI()
async def chat_with_mcp():
# 连接MCP Server
server_params = StdioServerParameters(
command="python",
args=["/path/to/api-gateway/server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 获取工具定义
tools = await session.list_tools()
functions = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
}
for tool in tools
]
# 对话
messages = [{"role": "user", "content": "北京天气怎么样?"}]
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=functions
)
# 处理工具调用
if response.choices[0].message.tool_calls:
tool_call = response.choices[0].message.tool_calls[0]
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# 执行MCP工具
result = await session.call_tool(tool_name, arguments)
print(result.content[0].text)
📚 学习资源
官方资源
- MCP官方文档 - 最权威的技术文档
- MCP GitHub - 官方示例库
- MCP协议规范 - 详细的协议定义
社区资源
- 官方Server示例库 - 各种实用Server示例
- Python SDK - Python开发SDK
- MCP Discord社区 - 开发者交流
推荐文章
- 《MCP协议设计哲学》
- 《构建生产级MCP Server最佳实践》
- 《MCP与Function Calling对比分析》
🎓 课程总结
MCP的价值
- 标准化:统一的AI工具调用协议
- 安全性:细粒度的权限控制
- 可扩展:易于添加新功能
- 生态:丰富的工具生态
学习路径回顾
- 理解MCP核心概念(Resources、Tools、Prompts)
- 掌握Server和Client开发
- 学习高级特性(Progress、Cancellation、Sampling)
- 实践最佳实践(安全、性能、错误处理)
- 完成实战项目开发
未来展望
- MCP协议将持续演进
- 更多语言和平台支持
- 更丰富的工具生态
- 与更多AI平台集成
🎉 恭喜完成MCP入门教程!
现在你已经掌握了MCP的核心知识和实践技能,可以开始构建自己的MCP Server了!
现在你已经掌握了MCP的核心知识和实践技能,可以开始构建自己的MCP Server了!