数据分析 Agent
数据分析 Agent 能够理解自然语言查询、自动生成 SQL、创建可视化图表、发现数据洞察,让数据分析变得人人可用。
一、核心能力
1.1 能力矩阵
┌─────────────────────────────────────────────────────────────┐
│ 数据分析 Agent 能力矩阵 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据查询能力 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 自然语言 │ │ SQL 生成 │ │ 多表关联 │ │ │
│ │ │ 理解 │ │ │ │ 查询 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 查询优化 │ │ 结果解释 │ │ 异常检测 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据可视化能力 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 图表推荐 │ │ 自动绘图 │ │ 交互式 │ │ │
│ │ │ │ │ │ │ 图表 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 仪表盘 │ │ 报告生成 │ │ 导出分享 │ │ │
│ │ │ 构建 │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 洞察发现能力 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 趋势分析 │ │ 对比分析 │ │ 异常识别 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 归因分析 │ │ 预测分析 │ │ 建议生成 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘1.2 典型应用场景
| 场景 | 用户输入示例 | Agent 输出 |
|---|---|---|
| 销售分析 | "上个月各产品线销售额是多少" | SQL + 图表 + 分析报告 |
| 用户行为 | "最近7天的日活跃用户趋势" | 折线图 + 增长分析 |
| 运营监控 | "今天的订单异常情况" | 异常列表 + 原因分析 |
| 财务报表 | "生成Q3财务摘要" | 结构化报表 + 关键指标 |
二、架构设计
2.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 数据分析 Agent 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 用户交互层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 自然语言 │ │ 图表展示 │ │ 报告导出 │ │ │
│ │ │ 输入 │ │ 界面 │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Agent 核心 │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Query Understanding │ │ │
│ │ │ 解析自然语言、理解分析意图、提取关键实体 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ SQL Generator │ │ │
│ │ │ Schema 感知、SQL 生成、语法校验、安全检查 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Insight Generator │ │ │
│ │ │ 数据分析、洞察提取、建议生成 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Visualization Engine │ │ │
│ │ │ 图表推荐、配置生成、交互支持 │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 数据层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 数据库 │ │ 数据仓库 │ │ 数据湖 │ │ │
│ │ │ MySQL │ │ BigQuery │ │ S3 │ │ │
│ │ │ Postgres │ │ Snowflake│ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Schema │ │ 元数据 │ │ │
│ │ │ 缓存 │ │ 管理 │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘2.2 核心代码实现
"""
数据分析 Agent 实现
"""
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
@dataclass
class QueryResult:
"""查询结果"""
sql: str # 生成的 SQL
data: List[Dict] # 查询数据
columns: List[str] # 列名
row_count: int # 行数
execution_time: float # 执行时间
@dataclass
class Insight:
"""数据洞察"""
title: str # 洞察标题
description: str # 描述
importance: str # 重要程度 (high/medium/low)
recommendation: Optional[str] = None # 建议
class DataAnalysisAgent:
"""数据分析 Agent"""
def __init__(
self,
llm=None,
db_connection=None,
schema_cache=None
):
self.llm = llm or ChatOpenAI(model="gpt-4", temperature=0)
self.db = db_connection
self.schema_cache = schema_cache
async def analyze(
self,
query: str,
context: Optional[Dict] = None
) -> Dict[str, Any]:
"""
执行数据分析
Args:
query: 自然语言查询
context: 额外上下文
Returns:
分析结果
"""
# 1. 理解查询意图
query_intent = await self._understand_query(query)
# 2. 获取相关 Schema
relevant_schema = await self._get_relevant_schema(query)
# 3. 生成 SQL
sql = await self._generate_sql(query, relevant_schema)
# 4. 执行查询
result = await self._execute_sql(sql)
# 5. 生成洞察
insights = await self._generate_insights(query, result)
# 6. 推荐可视化
visualization = await self._recommend_visualization(query, result)
return {
"query": query,
"intent": query_intent,
"sql": sql,
"data": result.data[:100], # 限制返回行数
"insights": insights,
"visualization": visualization,
"metadata": {
"row_count": result.row_count,
"execution_time": result.execution_time
}
}
async def _understand_query(self, query: str) -> Dict:
"""理解查询意图"""
prompt = f"""
分析以下数据分析查询的意图:
查询:{query}
请识别:
1. 分析类型(趋势分析/对比分析/统计汇总/异常检测)
2. 时间范围
3. 关注的指标
4. 筛选条件
5. 分组维度
以 JSON 格式输出。
"""
response = await self.llm.ainvoke(prompt)
# 解析 JSON
import json
try:
return json.loads(response.content)
except:
return {"raw": response.content}
async def _get_relevant_schema(self, query: str) -> str:
"""获取相关 Schema"""
if not self.schema_cache:
return ""
# 使用语义搜索找到相关的表和列
relevant_tables = await self.schema_cache.search(query, k=3)
schema_text = ""
for table in relevant_tables:
schema_text += f"\n表: {table['name']}\n"
schema_text += f"描述: {table.get('description', 'N/A')}\n"
schema_text += "列:\n"
for col in table.get('columns', []):
schema_text += f" - {col['name']} ({col['type']}): {col.get('description', '')}\n"
return schema_text
async def _generate_sql(
self,
query: str,
schema: str
) -> str:
"""生成 SQL"""
prompt = f"""
根据用户查询和数据库 Schema 生成 SQL。
数据库 Schema:
{schema}
用户查询:{query}
要求:
1. 生成标准 SQL
2. 添加必要的过滤条件
3. 使用合适的聚合函数
4. 考虑性能优化
5. 只输出 SQL,不要输出其他内容
SQL:
"""
response = await self.llm.ainvoke(prompt)
# 提取 SQL
sql = response.content.strip()
if sql.startswith("```"):
sql = sql.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return sql
async def _execute_sql(self, sql: str) -> QueryResult:
"""执行 SQL"""
import time
if not self.db:
# 返回模拟数据
return QueryResult(
sql=sql,
data=[{"value": 100}, {"value": 200}],
columns=["value"],
row_count=2,
execution_time=0.1
)
start_time = time.time()
# 执行查询
cursor = self.db.cursor()
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
data = cursor.fetchall()
execution_time = time.time() - start_time
return QueryResult(
sql=sql,
data=[dict(zip(columns, row)) for row in data],
columns=columns,
row_count=len(data),
execution_time=execution_time
)
async def _generate_insights(
self,
query: str,
result: QueryResult
) -> List[Insight]:
"""生成数据洞察"""
# 数据摘要
data_summary = self._summarize_data(result)
prompt = f"""
基于查询结果生成数据洞察。
原始查询:{query}
数据摘要:
{data_summary}
请生成 2-3 条关键洞察,每条洞察包含:
1. 标题(简短)
2. 描述(详细)
3. 重要程度(high/medium/low)
4. 可选建议
以 JSON 数组格式输出。
"""
response = await self.llm.ainvoke(prompt)
# 解析洞察
import json
try:
insights_data = json.loads(response.content)
return [
Insight(
title=i.get("title", ""),
description=i.get("description", ""),
importance=i.get("importance", "medium"),
recommendation=i.get("recommendation")
)
for i in insights_data
]
except:
return [Insight(
title="数据洞察",
description=response.content,
importance="medium"
)]
def _summarize_data(self, result: QueryResult) -> str:
"""数据摘要"""
if not result.data:
return "无数据"
summary_parts = []
# 基本信息
summary_parts.append(f"总行数: {result.row_count}")
summary_parts.append(f"列: {', '.join(result.columns)}")
# 前几行示例
summary_parts.append("前几行数据:")
for i, row in enumerate(result.data[:5]):
summary_parts.append(f" {row}")
return "\n".join(summary_parts)
async def _recommend_visualization(
self,
query: str,
result: QueryResult
) -> Dict:
"""推荐可视化"""
prompt = f"""
为以下数据推荐最佳可视化方式。
查询:{query}
列:{result.columns}
数据示例:{result.data[:3]}
请推荐:
1. 图表类型(line/bar/pie/table/scatter/area)
2. X 轴字段
3. Y 轴字段
4. 颜色/分组字段(可选)
5. 图表标题
以 JSON 格式输出。
"""
response = await self.llm.ainvoke(prompt)
import json
try:
return json.loads(response.content)
except:
return {
"type": "table",
"title": "查询结果"
}
# ========== Schema 缓存 ==========
class SchemaCache:
"""Schema 缓存和检索"""
def __init__(self, db_connection=None, vector_store=None):
self.db = db_connection
self.vector_store = vector_store
self.tables = []
async def load_schema(self):
"""加载 Schema"""
if not self.db:
return
# 获取所有表
cursor = self.db.cursor()
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
for (table_name,) in tables:
# 获取表结构
cursor.execute(f"DESCRIBE {table_name}")
columns = cursor.fetchall()
table_info = {
"name": table_name,
"columns": [
{
"name": col[0],
"type": col[1],
"description": ""
}
for col in columns
]
}
self.tables.append(table_info)
# 添加到向量存储
if self.vector_store:
await self._index_table(table_info)
async def _index_table(self, table_info: Dict):
"""索引表信息"""
text = f"表 {table_info['name']}: " + ", ".join([
f"{col['name']} ({col['type']})"
for col in table_info['columns']
])
# self.vector_store.add_texts([text], [table_info])
async def search(self, query: str, k: int = 3) -> List[Dict]:
"""搜索相关表"""
# 简单关键词匹配
results = []
query_lower = query.lower()
for table in self.tables:
score = 0
if table['name'].lower() in query_lower:
score += 10
for col in table.get('columns', []):
if col['name'].lower() in query_lower:
score += 5
if score > 0:
results.append((score, table))
results.sort(key=lambda x: x[0], reverse=True)
return [r[1] for r in results[:k]]三、可视化推荐
"""
可视化推荐引擎
"""
from typing import Dict, List
class VisualizationRecommender:
"""可视化推荐器"""
# 图表类型规则
CHART_RULES = {
"trend": {
"keywords": ["趋势", "变化", "增长", "下降", "随时间"],
"chart_types": ["line", "area"],
"requires_time": True
},
"comparison": {
"keywords": ["对比", "比较", "区别", "差异"],
"chart_types": ["bar", "grouped_bar"],
"requires_time": False
},
"distribution": {
"keywords": ["分布", "占比", "比例", "构成"],
"chart_types": ["pie", "donut"],
"requires_time": False
},
"ranking": {
"keywords": ["排名", "排行", "Top", "前几"],
"chart_types": ["bar", "horizontal_bar"],
"requires_time": False
},
"correlation": {
"keywords": ["关系", "相关", "影响"],
"chart_types": ["scatter"],
"requires_time": False
}
}
def recommend(
self,
query: str,
columns: List[str],
data_sample: List[Dict]
) -> Dict:
"""
推荐可视化方式
Args:
query: 用户查询
columns: 列名
data_sample: 数据样本
Returns:
可视化配置
"""
# 1. 检测查询类型
query_type = self._detect_query_type(query)
# 2. 检测数据类型
data_types = self._detect_data_types(columns, data_sample)
# 3. 推荐图表
chart_config = self._recommend_chart(
query_type,
columns,
data_types
)
return chart_config
def _detect_query_type(self, query: str) -> str:
"""检测查询类型"""
query_lower = query.lower()
for query_type, rules in self.CHART_RULES.items():
for keyword in rules["keywords"]:
if keyword in query_lower:
return query_type
return "summary"
def _detect_data_types(
self,
columns: List[str],
data_sample: List[Dict]
) -> Dict[str, str]:
"""检测数据类型"""
data_types = {}
for col in columns:
values = [row.get(col) for row in data_sample if row.get(col) is not None]
if not values:
data_types[col] = "unknown"
continue
# 检查是否为数值
try:
[float(v) for v in values]
data_types[col] = "numeric"
except:
# 检查是否为日期
import re
date_pattern = r"\d{4}[-/]\d{2}[-/]\d{2}"
if re.match(date_pattern, str(values[0])):
data_types[col] = "date"
else:
data_types[col] = "categorical"
return data_types
def _recommend_chart(
self,
query_type: str,
columns: List[str],
data_types: Dict[str, str]
) -> Dict:
"""推荐图表配置"""
# 找到数值列和分类列
numeric_cols = [c for c in columns if data_types.get(c) == "numeric"]
categorical_cols = [c for c in columns if data_types.get(c) == "categorical"]
date_cols = [c for c in columns if data_types.get(c) == "date"]
# 根据查询类型推荐
if query_type == "trend" and date_cols and numeric_cols:
return {
"type": "line",
"x": date_cols[0],
"y": numeric_cols[0],
"title": "趋势图"
}
elif query_type == "distribution" and numeric_cols:
return {
"type": "pie",
"dimension": categorical_cols[0] if categorical_cols else columns[0],
"measure": numeric_cols[0],
"title": "分布图"
}
elif query_type == "comparison" and numeric_cols:
return {
"type": "bar",
"x": categorical_cols[0] if categorical_cols else columns[0],
"y": numeric_cols[0],
"title": "对比图"
}
elif query_type == "ranking" and numeric_cols:
return {
"type": "horizontal_bar",
"x": numeric_cols[0],
"y": categorical_cols[0] if categorical_cols else columns[0],
"title": "排名图"
}
else:
return {
"type": "table",
"title": "数据表"
}四、面试问答
Q1: 数据分析 Agent 的核心难点是什么?
回答要点:
- SQL 生成准确性:理解自然语言到 SQL 的映射
- Schema 感知:理解数据库结构和语义
- 数据安全:防止 SQL 注入和敏感数据泄露
- 性能优化:生成的 SQL 需要高效执行
Q2: 如何提升 SQL 生成准确率?
回答要点:
- Schema 增强:提供详细的字段描述和示例值
- Few-shot 学习:提供相似查询的 SQL 示例
- 校验机制:SQL 语法检查和执行测试
- 用户反馈:记录成功案例,持续优化
Q3: 如何处理数据安全问题?
回答要点:
| 安全措施 | 实现方式 |
|---|---|
| SQL 注入防护 | 参数化查询、SQL 白名单 |
| 权限控制 | 基于用户角色的数据访问控制 |
| 敏感数据过滤 | 识别并脱敏敏感字段 |
| 审计日志 | 记录所有查询操作 |
五、小结
数据分析 Agent 的关键要素:
- SQL 生成:自然语言到 SQL 的准确转换
- Schema 理解:理解数据库结构和语义
- 洞察生成:从数据中发现有价值的信息
- 可视化:合适的图表呈现数据