知识模块
🤖 Agent 知识模块
十一、Agent 应用场景
数据分析 Agent

数据分析 Agent

数据分析 Agent 能够理解自然语言查询、自动生成 SQL、创建可视化图表、发现数据洞察,让数据分析变得人人可用。


一、核心能力

1.1 能力矩阵

┌─────────────────────────────────────────────────────────────┐
│                    数据分析 Agent 能力矩阵                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                 数据查询能力                         │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 自然语言 │ │ SQL 生成 │ │ 多表关联 │            │  │
│   │  │ 理解     │ │          │ │ 查询     │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 查询优化 │ │ 结果解释 │ │ 异常检测 │            │  │
│   │  │          │ │          │ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                 数据可视化能力                       │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 图表推荐 │ │ 自动绘图 │ │ 交互式   │            │  │
│   │  │          │ │          │ │ 图表     │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 仪表盘   │ │ 报告生成 │ │ 导出分享 │            │  │
│   │  │ 构建     │ │          │ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                 洞察发现能力                         │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 趋势分析 │ │ 对比分析 │ │ 异常识别 │            │  │
│   │  │          │ │          │ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 归因分析 │ │ 预测分析 │ │ 建议生成 │            │  │
│   │  │          │ │          │ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.2 典型应用场景

场景用户输入示例Agent 输出
销售分析"上个月各产品线销售额是多少"SQL + 图表 + 分析报告
用户行为"最近7天的日活跃用户趋势"折线图 + 增长分析
运营监控"今天的订单异常情况"异常列表 + 原因分析
财务报表"生成Q3财务摘要"结构化报表 + 关键指标

二、架构设计

2.1 整体架构

┌─────────────────────────────────────────────────────────────┐
│                    数据分析 Agent 架构                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                    用户交互层                        │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 自然语言 │ │ 图表展示 │ │ 报告导出 │            │  │
│   │  │ 输入     │ │ 界面     │ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   └─────────────────────────────────────────────────────┘  │
│                          │                                  │
│                          ↓                                  │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                    Agent 核心                        │  │
│   │  ┌───────────────────────────────────────────────┐  │  │
│   │  │              Query Understanding               │  │  │
│   │  │  解析自然语言、理解分析意图、提取关键实体     │  │  │
│   │  └───────────────────────────────────────────────┘  │  │
│   │                         │                            │  │
│   │  ┌───────────────────────────────────────────────┐  │  │
│   │  │              SQL Generator                     │  │  │
│   │  │  Schema 感知、SQL 生成、语法校验、安全检查    │  │  │
│   │  └───────────────────────────────────────────────┘  │  │
│   │                         │                            │  │
│   │  ┌───────────────────────────────────────────────┐  │  │
│   │  │              Insight Generator                 │  │  │
│   │  │  数据分析、洞察提取、建议生成                 │  │  │
│   │  └───────────────────────────────────────────────┘  │  │
│   │                         │                            │  │
│   │  ┌───────────────────────────────────────────────┐  │  │
│   │  │              Visualization Engine              │  │  │
│   │  │  图表推荐、配置生成、交互支持                 │  │  │
│   │  └───────────────────────────────────────────────┘  │  │
│   └─────────────────────────────────────────────────────┘  │
│                          │                                  │
│                          ↓                                  │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                    数据层                           │  │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │  │
│   │  │ 数据库   │ │ 数据仓库 │ │ 数据湖   │            │  │
│   │  │ MySQL    │ │ BigQuery │ │ S3       │            │  │
│   │  │ Postgres │ │ Snowflake│ │          │            │  │
│   │  └──────────┘ └──────────┘ └──────────┘            │  │
│   │  ┌──────────┐ ┌──────────┐                         │  │
│   │  │ Schema   │ │ 元数据   │                         │  │
│   │  │ 缓存     │ │ 管理     │                         │  │
│   │  └──────────┘ └──────────┘                         │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.2 核心代码实现

"""
数据分析 Agent 实现
"""
 
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
 
 
@dataclass
class QueryResult:
    """查询结果"""
    sql: str                    # 生成的 SQL
    data: List[Dict]            # 查询数据
    columns: List[str]          # 列名
    row_count: int              # 行数
    execution_time: float       # 执行时间
 
 
@dataclass
class Insight:
    """数据洞察"""
    title: str                  # 洞察标题
    description: str            # 描述
    importance: str             # 重要程度 (high/medium/low)
    recommendation: Optional[str] = None  # 建议
 
 
class DataAnalysisAgent:
    """数据分析 Agent"""
    
    def __init__(
        self,
        llm=None,
        db_connection=None,
        schema_cache=None
    ):
        self.llm = llm or ChatOpenAI(model="gpt-4", temperature=0)
        self.db = db_connection
        self.schema_cache = schema_cache
    
    async def analyze(
        self,
        query: str,
        context: Optional[Dict] = None
    ) -> Dict[str, Any]:
        """
        执行数据分析
        
        Args:
            query: 自然语言查询
            context: 额外上下文
            
        Returns:
            分析结果
        """
        # 1. 理解查询意图
        query_intent = await self._understand_query(query)
        
        # 2. 获取相关 Schema
        relevant_schema = await self._get_relevant_schema(query)
        
        # 3. 生成 SQL
        sql = await self._generate_sql(query, relevant_schema)
        
        # 4. 执行查询
        result = await self._execute_sql(sql)
        
        # 5. 生成洞察
        insights = await self._generate_insights(query, result)
        
        # 6. 推荐可视化
        visualization = await self._recommend_visualization(query, result)
        
        return {
            "query": query,
            "intent": query_intent,
            "sql": sql,
            "data": result.data[:100],  # 限制返回行数
            "insights": insights,
            "visualization": visualization,
            "metadata": {
                "row_count": result.row_count,
                "execution_time": result.execution_time
            }
        }
    
    async def _understand_query(self, query: str) -> Dict:
        """理解查询意图"""
        prompt = f"""
分析以下数据分析查询的意图:
 
查询:{query}
 
请识别:
1. 分析类型(趋势分析/对比分析/统计汇总/异常检测)
2. 时间范围
3. 关注的指标
4. 筛选条件
5. 分组维度
 
以 JSON 格式输出。
"""
        
        response = await self.llm.ainvoke(prompt)
        
        # 解析 JSON
        import json
        try:
            return json.loads(response.content)
        except:
            return {"raw": response.content}
    
    async def _get_relevant_schema(self, query: str) -> str:
        """获取相关 Schema"""
        if not self.schema_cache:
            return ""
        
        # 使用语义搜索找到相关的表和列
        relevant_tables = await self.schema_cache.search(query, k=3)
        
        schema_text = ""
        for table in relevant_tables:
            schema_text += f"\n表: {table['name']}\n"
            schema_text += f"描述: {table.get('description', 'N/A')}\n"
            schema_text += "列:\n"
            for col in table.get('columns', []):
                schema_text += f"  - {col['name']} ({col['type']}): {col.get('description', '')}\n"
        
        return schema_text
    
    async def _generate_sql(
        self,
        query: str,
        schema: str
    ) -> str:
        """生成 SQL"""
        prompt = f"""
根据用户查询和数据库 Schema 生成 SQL。
 
数据库 Schema:
{schema}
 
用户查询:{query}
 
要求:
1. 生成标准 SQL
2. 添加必要的过滤条件
3. 使用合适的聚合函数
4. 考虑性能优化
5. 只输出 SQL,不要输出其他内容
 
SQL:
"""
        
        response = await self.llm.ainvoke(prompt)
        
        # 提取 SQL
        sql = response.content.strip()
        if sql.startswith("```"):
            sql = sql.split("\n", 1)[1].rsplit("```", 1)[0].strip()
        
        return sql
    
    async def _execute_sql(self, sql: str) -> QueryResult:
        """执行 SQL"""
        import time
        
        if not self.db:
            # 返回模拟数据
            return QueryResult(
                sql=sql,
                data=[{"value": 100}, {"value": 200}],
                columns=["value"],
                row_count=2,
                execution_time=0.1
            )
        
        start_time = time.time()
        
        # 执行查询
        cursor = self.db.cursor()
        cursor.execute(sql)
        columns = [desc[0] for desc in cursor.description]
        data = cursor.fetchall()
        
        execution_time = time.time() - start_time
        
        return QueryResult(
            sql=sql,
            data=[dict(zip(columns, row)) for row in data],
            columns=columns,
            row_count=len(data),
            execution_time=execution_time
        )
    
    async def _generate_insights(
        self,
        query: str,
        result: QueryResult
    ) -> List[Insight]:
        """生成数据洞察"""
        # 数据摘要
        data_summary = self._summarize_data(result)
        
        prompt = f"""
基于查询结果生成数据洞察。
 
原始查询:{query}
 
数据摘要:
{data_summary}
 
请生成 2-3 条关键洞察,每条洞察包含:
1. 标题(简短)
2. 描述(详细)
3. 重要程度(high/medium/low)
4. 可选建议
 
以 JSON 数组格式输出。
"""
        
        response = await self.llm.ainvoke(prompt)
        
        # 解析洞察
        import json
        try:
            insights_data = json.loads(response.content)
            return [
                Insight(
                    title=i.get("title", ""),
                    description=i.get("description", ""),
                    importance=i.get("importance", "medium"),
                    recommendation=i.get("recommendation")
                )
                for i in insights_data
            ]
        except:
            return [Insight(
                title="数据洞察",
                description=response.content,
                importance="medium"
            )]
    
    def _summarize_data(self, result: QueryResult) -> str:
        """数据摘要"""
        if not result.data:
            return "无数据"
        
        summary_parts = []
        
        # 基本信息
        summary_parts.append(f"总行数: {result.row_count}")
        summary_parts.append(f"列: {', '.join(result.columns)}")
        
        # 前几行示例
        summary_parts.append("前几行数据:")
        for i, row in enumerate(result.data[:5]):
            summary_parts.append(f"  {row}")
        
        return "\n".join(summary_parts)
    
    async def _recommend_visualization(
        self,
        query: str,
        result: QueryResult
    ) -> Dict:
        """推荐可视化"""
        prompt = f"""
为以下数据推荐最佳可视化方式。
 
查询:{query}
列:{result.columns}
数据示例:{result.data[:3]}
 
请推荐:
1. 图表类型(line/bar/pie/table/scatter/area)
2. X 轴字段
3. Y 轴字段
4. 颜色/分组字段(可选)
5. 图表标题
 
以 JSON 格式输出。
"""
        
        response = await self.llm.ainvoke(prompt)
        
        import json
        try:
            return json.loads(response.content)
        except:
            return {
                "type": "table",
                "title": "查询结果"
            }
 
 
# ========== Schema 缓存 ==========
 
class SchemaCache:
    """Schema 缓存和检索"""
    
    def __init__(self, db_connection=None, vector_store=None):
        self.db = db_connection
        self.vector_store = vector_store
        self.tables = []
    
    async def load_schema(self):
        """加载 Schema"""
        if not self.db:
            return
        
        # 获取所有表
        cursor = self.db.cursor()
        cursor.execute("SHOW TABLES")
        tables = cursor.fetchall()
        
        for (table_name,) in tables:
            # 获取表结构
            cursor.execute(f"DESCRIBE {table_name}")
            columns = cursor.fetchall()
            
            table_info = {
                "name": table_name,
                "columns": [
                    {
                        "name": col[0],
                        "type": col[1],
                        "description": ""
                    }
                    for col in columns
                ]
            }
            
            self.tables.append(table_info)
            
            # 添加到向量存储
            if self.vector_store:
                await self._index_table(table_info)
    
    async def _index_table(self, table_info: Dict):
        """索引表信息"""
        text = f"表 {table_info['name']}: " + ", ".join([
            f"{col['name']} ({col['type']})"
            for col in table_info['columns']
        ])
        
        # self.vector_store.add_texts([text], [table_info])
    
    async def search(self, query: str, k: int = 3) -> List[Dict]:
        """搜索相关表"""
        # 简单关键词匹配
        results = []
        query_lower = query.lower()
        
        for table in self.tables:
            score = 0
            if table['name'].lower() in query_lower:
                score += 10
            for col in table.get('columns', []):
                if col['name'].lower() in query_lower:
                    score += 5
            
            if score > 0:
                results.append((score, table))
        
        results.sort(key=lambda x: x[0], reverse=True)
        return [r[1] for r in results[:k]]

三、可视化推荐

"""
可视化推荐引擎
"""
 
from typing import Dict, List
 
 
class VisualizationRecommender:
    """可视化推荐器"""
    
    # 图表类型规则
    CHART_RULES = {
        "trend": {
            "keywords": ["趋势", "变化", "增长", "下降", "随时间"],
            "chart_types": ["line", "area"],
            "requires_time": True
        },
        "comparison": {
            "keywords": ["对比", "比较", "区别", "差异"],
            "chart_types": ["bar", "grouped_bar"],
            "requires_time": False
        },
        "distribution": {
            "keywords": ["分布", "占比", "比例", "构成"],
            "chart_types": ["pie", "donut"],
            "requires_time": False
        },
        "ranking": {
            "keywords": ["排名", "排行", "Top", "前几"],
            "chart_types": ["bar", "horizontal_bar"],
            "requires_time": False
        },
        "correlation": {
            "keywords": ["关系", "相关", "影响"],
            "chart_types": ["scatter"],
            "requires_time": False
        }
    }
    
    def recommend(
        self,
        query: str,
        columns: List[str],
        data_sample: List[Dict]
    ) -> Dict:
        """
        推荐可视化方式
        
        Args:
            query: 用户查询
            columns: 列名
            data_sample: 数据样本
            
        Returns:
            可视化配置
        """
        # 1. 检测查询类型
        query_type = self._detect_query_type(query)
        
        # 2. 检测数据类型
        data_types = self._detect_data_types(columns, data_sample)
        
        # 3. 推荐图表
        chart_config = self._recommend_chart(
            query_type,
            columns,
            data_types
        )
        
        return chart_config
    
    def _detect_query_type(self, query: str) -> str:
        """检测查询类型"""
        query_lower = query.lower()
        
        for query_type, rules in self.CHART_RULES.items():
            for keyword in rules["keywords"]:
                if keyword in query_lower:
                    return query_type
        
        return "summary"
    
    def _detect_data_types(
        self,
        columns: List[str],
        data_sample: List[Dict]
    ) -> Dict[str, str]:
        """检测数据类型"""
        data_types = {}
        
        for col in columns:
            values = [row.get(col) for row in data_sample if row.get(col) is not None]
            
            if not values:
                data_types[col] = "unknown"
                continue
            
            # 检查是否为数值
            try:
                [float(v) for v in values]
                data_types[col] = "numeric"
            except:
                # 检查是否为日期
                import re
                date_pattern = r"\d{4}[-/]\d{2}[-/]\d{2}"
                if re.match(date_pattern, str(values[0])):
                    data_types[col] = "date"
                else:
                    data_types[col] = "categorical"
        
        return data_types
    
    def _recommend_chart(
        self,
        query_type: str,
        columns: List[str],
        data_types: Dict[str, str]
    ) -> Dict:
        """推荐图表配置"""
        # 找到数值列和分类列
        numeric_cols = [c for c in columns if data_types.get(c) == "numeric"]
        categorical_cols = [c for c in columns if data_types.get(c) == "categorical"]
        date_cols = [c for c in columns if data_types.get(c) == "date"]
        
        # 根据查询类型推荐
        if query_type == "trend" and date_cols and numeric_cols:
            return {
                "type": "line",
                "x": date_cols[0],
                "y": numeric_cols[0],
                "title": "趋势图"
            }
        elif query_type == "distribution" and numeric_cols:
            return {
                "type": "pie",
                "dimension": categorical_cols[0] if categorical_cols else columns[0],
                "measure": numeric_cols[0],
                "title": "分布图"
            }
        elif query_type == "comparison" and numeric_cols:
            return {
                "type": "bar",
                "x": categorical_cols[0] if categorical_cols else columns[0],
                "y": numeric_cols[0],
                "title": "对比图"
            }
        elif query_type == "ranking" and numeric_cols:
            return {
                "type": "horizontal_bar",
                "x": numeric_cols[0],
                "y": categorical_cols[0] if categorical_cols else columns[0],
                "title": "排名图"
            }
        else:
            return {
                "type": "table",
                "title": "数据表"
            }

四、面试问答

Q1: 数据分析 Agent 的核心难点是什么?

回答要点:

  1. SQL 生成准确性:理解自然语言到 SQL 的映射
  2. Schema 感知:理解数据库结构和语义
  3. 数据安全:防止 SQL 注入和敏感数据泄露
  4. 性能优化:生成的 SQL 需要高效执行

Q2: 如何提升 SQL 生成准确率?

回答要点:

  • Schema 增强:提供详细的字段描述和示例值
  • Few-shot 学习:提供相似查询的 SQL 示例
  • 校验机制:SQL 语法检查和执行测试
  • 用户反馈:记录成功案例,持续优化

Q3: 如何处理数据安全问题?

回答要点:

安全措施实现方式
SQL 注入防护参数化查询、SQL 白名单
权限控制基于用户角色的数据访问控制
敏感数据过滤识别并脱敏敏感字段
审计日志记录所有查询操作

五、小结

数据分析 Agent 的关键要素:

  1. SQL 生成:自然语言到 SQL 的准确转换
  2. Schema 理解:理解数据库结构和语义
  3. 洞察生成:从数据中发现有价值的信息
  4. 可视化:合适的图表呈现数据