AI应用开发框架实战指南:从LangChain到LlamaIndex的完整攻略

AI应用开发框架为构建智能应用提供了强大的基础设施,让开发者能够快速构建复杂的AI系统。本文将深入介绍主流框架的使用方法,从基础概念到实战项目,帮助您掌握现代AI应用开发的核心技能。

🏗️ AI应用开发框架生态

📊 框架分类和定位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
graph TD
A[AI应用开发框架] --> B[应用编排框架]
A --> C[数据处理框架]
A --> D[向量数据库]
A --> E[UI框架]

B --> B1[LangChain]
B --> B2[LlamaIndex]
B --> B3[Semantic Kernel]

C --> C1[Transformers]
C --> C2[Datasets]
C --> C3[Accelerate]

D --> D1[Chroma]
D --> D2[Pinecone]
D --> D3[Weaviate]
D --> D4[Qdrant]

E --> E1[Gradio]
E --> E2[Streamlit]
E --> E3[Chainlit]

🎯 核心能力对比

框架 主要用途 核心优势 学习难度 生态成熟度
LangChain 通用AI应用 生态丰富,组件完整 中等 ⭐⭐⭐⭐⭐
LlamaIndex RAG应用 数据处理强,索引优化 简单 ⭐⭐⭐⭐
Transformers 模型使用 模型丰富,性能优秀 中等 ⭐⭐⭐⭐⭐
Gradio 快速原型 简单易用,部署便捷 简单 ⭐⭐⭐⭐
Streamlit 数据应用 交互丰富,可视化强 简单 ⭐⭐⭐⭐

🦜 LangChain:最全面的AI应用开发框架

官方网站https://langchain.com
GitHubhttps://github.com/langchain-ai/langchain

LangChain是目前最受欢迎的AI应用开发框架,提供了构建复杂AI应用所需的全套工具。

🔧 核心概念

1. 组件架构

1
2
3
4
5
6
7
8
9
10
11
# LangChain核心组件
components = {
"Models": "语言模型抽象层",
"Prompts": "提示词模板管理",
"Chains": "组件链式组合",
"Agents": "智能决策代理",
"Memory": "对话记忆管理",
"Retrievers": "信息检索组件",
"Tools": "外部工具集成",
"Callbacks": "执行过程监控"
}

2. 设计哲学

  • 模块化:每个组件都可以独立使用和替换
  • 可组合:通过链式调用构建复杂应用
  • 可扩展:支持自定义组件和工具
  • 可观测:完整的执行过程追踪

🚀 安装和配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 安装核心包
pip install langchain

# 安装社区包(更多集成)
pip install langchain-community

# 安装实验性功能
pip install langchain-experimental

# 安装特定集成
pip install langchain-openai
pip install langchain-anthropic
pip install langchain-google-genai

# 安装向量数据库支持
pip install chromadb
pip install pinecone-client

# 安装其他依赖
pip install tiktoken
pip install faiss-cpu

📝 基础使用示例

1. 简单的LLM调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage

# 初始化模型
llm = ChatOpenAI(
model="gpt-4-turbo",
temperature=0.7,
api_key="your-api-key"
)

# 构建消息
messages = [
SystemMessage(content="你是一个专业的Python编程助手。"),
HumanMessage(content="如何实现一个简单的装饰器?")
]

# 获取回复
response = llm.invoke(messages)
print(response.content)

2. 提示词模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.prompts.few_shot import FewShotPromptTemplate

# 基础提示词模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role},专门帮助用户{task}。"),
("human", "请帮我{request}")
])

# 格式化提示词
formatted_prompt = prompt.format_messages(
role="数据分析师",
task="分析数据和生成报告",
request="分析这个销售数据的趋势"
)

print(formatted_prompt)

# Few-shot提示词
examples = [
{
"question": "什么是机器学习?",
"answer": "机器学习是人工智能的一个分支,通过算法让计算机从数据中学习模式。"
},
{
"question": "什么是深度学习?",
"answer": "深度学习是机器学习的子集,使用多层神经网络来处理复杂数据。"
}
]

example_prompt = PromptTemplate(
input_variables=["question", "answer"],
template="问题: {question}\n答案: {answer}"
)

few_shot_prompt = FewShotPromptTemplate(
examples=examples,
example_prompt=example_prompt,
prefix="以下是一些问答示例:",
suffix="问题: {input}\n答案:",
input_variables=["input"]
)

print(few_shot_prompt.format(input="什么是自然语言处理?"))

3. 链式组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from langchain.chains import LLMChain, SimpleSequentialChain
from langchain.prompts import PromptTemplate

# 第一个链:生成故事大纲
outline_prompt = PromptTemplate(
input_variables=["topic"],
template="为以下主题创建一个故事大纲:{topic}"
)
outline_chain = LLMChain(llm=llm, prompt=outline_prompt)

# 第二个链:扩展故事
story_prompt = PromptTemplate(
input_variables=["outline"],
template="基于以下大纲写一个完整的短故事:{outline}"
)
story_chain = LLMChain(llm=llm, prompt=story_prompt)

# 组合链
full_chain = SimpleSequentialChain(
chains=[outline_chain, story_chain],
verbose=True
)

# 执行链
result = full_chain.run("一个关于AI和人类友谊的科幻故事")
print(result)

🤖 Agent开发

1. 基础Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.prompts import ChatPromptTemplate
import requests

# 定义自定义工具
def get_weather(city: str) -> str:
"""获取指定城市的天气信息"""
# 这里应该调用真实的天气API
return f"{city}今天天气晴朗,温度25°C"

def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果: {result}"
except:
return "计算错误,请检查表达式"

# 创建工具列表
tools = [
Tool(
name="搜索",
func=DuckDuckGoSearchRun().run,
description="用于搜索最新信息和回答问题"
),
Tool(
name="天气查询",
func=get_weather,
description="查询指定城市的天气信息,输入城市名称"
),
Tool(
name="计算器",
func=calculate,
description="计算数学表达式,输入要计算的表达式"
)
]

# 创建Agent提示词
prompt = ChatPromptTemplate.from_messages([
("system", """
你是一个有用的AI助手,可以使用以下工具来帮助用户:

{tools}

使用以下格式:

Question: 用户的问题
Thought: 你应该思考要做什么
Action: 要使用的工具名称
Action Input: 工具的输入
Observation: 工具的输出
... (这个思考/行动/观察的过程可以重复N次)
Thought: 我现在知道最终答案了
Final Answer: 对原始问题的最终答案
"""),
("human", "{input}"),
("assistant", "{agent_scratchpad}")
])

# 创建Agent
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# 使用Agent
result = agent_executor.invoke({
"input": "北京今天天气怎么样?另外帮我计算一下 25 * 4 + 10 等于多少?"
})

print(result["output"])

2. 高级Agent:ReAct模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from langchain.agents import create_react_agent
from langchain import hub

# 使用预定义的ReAct提示词
react_prompt = hub.pull("hwchase17/react")

# 创建ReAct Agent
react_agent = create_react_agent(llm, tools, react_prompt)
react_executor = AgentExecutor(
agent=react_agent,
tools=tools,
verbose=True,
max_iterations=5,
early_stopping_method="generate"
)

# 复杂任务执行
complex_task = """
请帮我完成以下任务:
1. 搜索最新的AI技术趋势
2. 查询上海的天气情况
3. 计算如果我每天学习2小时,一年能学习多少小时
4. 基于以上信息,给我一个学习AI的建议
"""

result = react_executor.invoke({"input": complex_task})
print(result["output"])

💾 记忆管理

1. 对话记忆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain.chains import ConversationChain

# 缓冲记忆(保存完整对话历史)
buffer_memory = ConversationBufferMemory()

# 摘要记忆(压缩历史对话)
summary_memory = ConversationSummaryMemory(llm=llm)

# 创建对话链
conversation = ConversationChain(
llm=llm,
memory=buffer_memory,
verbose=True
)

# 多轮对话
print(conversation.predict(input="你好,我叫张三"))
print(conversation.predict(input="我喜欢编程"))
print(conversation.predict(input="你还记得我的名字吗?"))

# 查看记忆内容
print("\n记忆内容:")
print(buffer_memory.buffer)

2. 向量记忆

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langchain.memory import VectorStoreRetrieverMemory
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

# 创建向量记忆
vector_memory = VectorStoreRetrieverMemory(
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)

# 保存记忆
vector_memory.save_context(
{"input": "我最喜欢的编程语言是Python"},
{"output": "很好!Python是一门优秀的编程语言。"}
)

vector_memory.save_context(
{"input": "我在学习机器学习"},
{"output": "机器学习是一个很有前景的领域。"}
)

# 检索相关记忆
relevant_memories = vector_memory.load_memory_variables(
{"prompt": "编程语言"}
)
print(relevant_memories)

🔍 RAG系统构建

1. 基础RAG实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 1. 加载文档
loader = TextLoader("knowledge_base.txt", encoding="utf-8")
documents = loader.load()

# 2. 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
splits = text_splitter.split_documents(documents)

# 3. 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=splits,
embedding=embeddings,
persist_directory="./chroma_db"
)

# 4. 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)

# 5. 添加压缩检索(可选)
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever
)

# 6. 创建QA链
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=compression_retriever,
return_source_documents=True,
verbose=True
)

# 7. 查询
query = "什么是机器学习?"
result = qa_chain({"query": query})

print(f"问题: {query}")
print(f"答案: {result['result']}")
print(f"\n来源文档:")
for i, doc in enumerate(result['source_documents']):
print(f"文档 {i+1}: {doc.page_content[:200]}...")

2. 高级RAG:多模态检索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from langchain.schema import Document
from langchain.retrievers import EnsembleRetriever
from langchain.retrievers import BM25Retriever
import jieba

# 中文分词函数
def chinese_tokenizer(text):
return list(jieba.cut(text))

# 创建BM25检索器(关键词检索)
bm25_retriever = BM25Retriever.from_documents(
splits,
preprocess_func=chinese_tokenizer
)
bm25_retriever.k = 4

# 创建混合检索器
ensemble_retriever = EnsembleRetriever(
retrievers=[vectorstore.as_retriever(), bm25_retriever],
weights=[0.7, 0.3] # 向量检索权重0.7,关键词检索权重0.3
)

# 使用混合检索器
hybrid_qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=ensemble_retriever,
return_source_documents=True
)

result = hybrid_qa_chain({"query": "深度学习的应用领域"})
print(result['result'])

🔧 自定义组件

1. 自定义LLM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from langchain.llms.base import LLM
from typing import Optional, List, Any
import requests

class CustomLLM(LLM):
api_url: str
api_key: str

@property
def _llm_type(self) -> str:
return "custom"

def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[Any] = None,
**kwargs: Any,
) -> str:
# 自定义API调用逻辑
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}

data = {
"prompt": prompt,
"max_tokens": kwargs.get("max_tokens", 100),
"temperature": kwargs.get("temperature", 0.7)
}

response = requests.post(self.api_url, headers=headers, json=data)
return response.json().get("text", "")

@property
def _identifying_params(self) -> dict:
return {"api_url": self.api_url}

# 使用自定义LLM
custom_llm = CustomLLM(
api_url="https://your-api-endpoint.com/generate",
api_key="your-api-key"
)

2. 自定义工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from langchain.tools import BaseTool
from typing import Optional, Type
from pydantic import BaseModel, Field

class WeatherInput(BaseModel):
city: str = Field(description="城市名称")
date: Optional[str] = Field(description="日期,格式:YYYY-MM-DD")

class WeatherTool(BaseTool):
name = "weather_query"
description = "查询指定城市和日期的天气信息"
args_schema: Type[BaseModel] = WeatherInput

def _run(self, city: str, date: Optional[str] = None) -> str:
# 实际的天气查询逻辑
if date:
return f"{city}{date}的天气:晴朗,温度20-25°C"
else:
return f"{city}今天天气:晴朗,温度20-25°C"

async def _arun(self, city: str, date: Optional[str] = None) -> str:
# 异步版本
return self._run(city, date)

# 使用自定义工具
weather_tool = WeatherTool()
result = weather_tool.run({"city": "北京", "date": "2024-01-15"})
print(result)

🦙 LlamaIndex:专业的RAG开发框架

官方网站https://llamaindex.ai
GitHubhttps://github.com/run-llama/llama_index

LlamaIndex专注于数据索引和检索,是构建RAG应用的最佳选择。

🎯 核心优势

  • 数据连接器:支持100+种数据源
  • 索引结构:多种高效的索引算法
  • 查询引擎:智能的查询处理
  • 评估工具:完整的RAG评估体系

🚀 快速开始

1. 安装和配置

1
2
3
4
5
6
7
8
9
10
11
# 安装核心包
pip install llama-index

# 安装特定集成
pip install llama-index-llms-openai
pip install llama-index-embeddings-openai
pip install llama-index-vector-stores-chroma

# 安装数据加载器
pip install llama-index-readers-file
pip install llama-index-readers-web

2. 基础使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

# 配置全局设置
Settings.llm = OpenAI(model="gpt-4-turbo", api_key="your-api-key")
Settings.embed_model = OpenAIEmbedding(api_key="your-api-key")

# 加载文档
documents = SimpleDirectoryReader("./data").load_data()

# 创建索引
index = VectorStoreIndex.from_documents(documents)

# 创建查询引擎
query_engine = index.as_query_engine()

# 查询
response = query_engine.query("什么是人工智能?")
print(response)

📊 高级索引结构

1. 层次索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from llama_index.core import TreeIndex
from llama_index.core.node_parser import SentenceSplitter

# 创建节点解析器
node_parser = SentenceSplitter(
chunk_size=1024,
chunk_overlap=20
)

# 解析文档为节点
nodes = node_parser.get_nodes_from_documents(documents)

# 创建树形索引
tree_index = TreeIndex(nodes)

# 创建查询引擎
tree_query_engine = tree_index.as_query_engine(
retriever_mode="select_leaf_embedding",
response_mode="tree_summarize"
)

response = tree_query_engine.query("总结文档的主要内容")
print(response)

2. 关键词索引

1
2
3
4
5
6
7
8
9
from llama_index.core import KeywordTableIndex

# 创建关键词索引
keyword_index = KeywordTableIndex.from_documents(documents)

# 查询
keyword_query_engine = keyword_index.as_query_engine()
response = keyword_query_engine.query("机器学习算法")
print(response)

3. 知识图谱索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from llama_index.core import KnowledgeGraphIndex
from llama_index.core.graph_stores import SimpleGraphStore

# 创建图存储
graph_store = SimpleGraphStore()

# 创建知识图谱索引
kg_index = KnowledgeGraphIndex.from_documents(
documents,
graph_store=graph_store,
max_triplets_per_chunk=2
)

# 查询
kg_query_engine = kg_index.as_query_engine(
include_text=True,
response_mode="tree_summarize"
)

response = kg_query_engine.query("人工智能和机器学习的关系")
print(response)

🔍 高级查询技术

1. 多模态查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.tools import QueryEngineTool

# 创建多个查询引擎
vector_tool = QueryEngineTool.from_defaults(
query_engine=query_engine,
description="用于回答关于文档内容的具体问题"
)

tree_tool = QueryEngineTool.from_defaults(
query_engine=tree_query_engine,
description="用于总结和概括文档内容"
)

keyword_tool = QueryEngineTool.from_defaults(
query_engine=keyword_query_engine,
description="用于基于关键词的精确搜索"
)

# 创建路由查询引擎
router_query_engine = RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=[
vector_tool,
tree_tool,
keyword_tool
]
)

# 智能路由查询
response = router_query_engine.query("请总结人工智能的发展历程")
print(response)

2. 子问题查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from llama_index.core.query_engine import SubQuestionQueryEngine
from llama_index.core.tools import QueryEngineTool

# 创建子问题查询引擎
query_engine_tools = [
QueryEngineTool(
query_engine=query_engine,
metadata={"name": "ai_docs", "description": "AI相关文档"}
)
]

sub_question_engine = SubQuestionQueryEngine.from_defaults(
query_engine_tools=query_engine_tools
)

# 复杂问题分解查询
complex_query = "比较监督学习和无监督学习的优缺点,并给出应用场景"
response = sub_question_engine.query(complex_query)
print(response)

📈 RAG评估

1. 评估指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from llama_index.core.evaluation import (
FaithfulnessEvaluator,
RelevancyEvaluator,
CorrectnessEvaluator
)
from llama_index.core.evaluation import BatchEvalRunner

# 创建评估器
faithfulness_evaluator = FaithfulnessEvaluator()
relevancy_evaluator = RelevancyEvaluator()
correctness_evaluator = CorrectnessEvaluator()

# 准备评估数据
eval_questions = [
"什么是机器学习?",
"深度学习有哪些应用?",
"如何选择合适的算法?"
]

# 批量评估
runner = BatchEvalRunner(
{"faithfulness": faithfulness_evaluator,
"relevancy": relevancy_evaluator,
"correctness": correctness_evaluator},
workers=2
)

eval_results = await runner.aevaluate_queries(
query_engine,
queries=eval_questions
)

# 查看结果
for query, result in eval_results.items():
print(f"Query: {query}")
for metric, score in result.items():
print(f" {metric}: {score.score}")

2. 自定义评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from llama_index.core.evaluation import BaseEvaluator
from llama_index.core.evaluation.eval_utils import get_responses

class CustomEvaluator(BaseEvaluator):
def _get_prompts(self) -> dict:
return {
"eval_template": (
"请评估以下回答的质量(1-5分):\n"
"问题: {query}\n"
"回答: {response}\n"
"评分(1-5):"
)
}

def _get_prompt_modules(self) -> dict:
return {}

async def aevaluate(
self,
query: str = None,
response: str = None,
contexts: list = None,
**kwargs
) -> dict:
eval_response = await self.llm.apredict(
self.eval_template,
query=query,
response=response
)

# 提取分数
try:
score = float(eval_response.strip())
except:
score = 0.0

return {"score": score, "feedback": eval_response}

# 使用自定义评估器
custom_evaluator = CustomEvaluator()
result = await custom_evaluator.aevaluate(
query="什么是机器学习?",
response="机器学习是人工智能的一个分支..."
)
print(result)

🗄️ 向量数据库实战

🎨 Chroma:轻量级向量数据库

1. 基础使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import chromadb
from chromadb.config import Settings

# 创建客户端
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_db"
))

# 创建集合
collection = client.create_collection(
name="my_collection",
metadata={"hnsw:space": "cosine"}
)

# 添加文档
documents = [
"机器学习是人工智能的一个重要分支",
"深度学习使用多层神经网络",
"自然语言处理处理人类语言",
"计算机视觉让机器理解图像"
]

ids = [f"doc_{i}" for i in range(len(documents))]
metadatas = [{"topic": "AI", "index": i} for i in range(len(documents))]

collection.add(
documents=documents,
ids=ids,
metadatas=metadatas
)

# 查询
results = collection.query(
query_texts=["什么是深度学习?"],
n_results=2
)

print("查询结果:")
for i, doc in enumerate(results['documents'][0]):
print(f"{i+1}. {doc} (距离: {results['distances'][0][i]:.3f})")

2. 高级功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from chromadb.utils import embedding_functions

# 使用自定义嵌入函数
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key="your-api-key",
model_name="text-embedding-ada-002"
)

# 创建带自定义嵌入的集合
advanced_collection = client.create_collection(
name="advanced_collection",
embedding_function=openai_ef
)

# 批量操作
batch_size = 100
for i in range(0, len(large_documents), batch_size):
batch_docs = large_documents[i:i+batch_size]
batch_ids = [f"doc_{j}" for j in range(i, min(i+batch_size, len(large_documents)))]

advanced_collection.add(
documents=batch_docs,
ids=batch_ids
)

# 复杂查询
complex_results = advanced_collection.query(
query_texts=["机器学习算法"],
n_results=5,
where={"topic": "AI"},
include=["documents", "distances", "metadatas"]
)

📌 Pinecone:企业级向量数据库

1. 基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pinecone
from pinecone import Pinecone, ServerlessSpec

# 初始化Pinecone
pc = Pinecone(api_key="your-api-key")

# 创建索引
index_name = "ai-knowledge-base"

if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=1536, # OpenAI embedding维度
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)

# 连接索引
index = pc.Index(index_name)

2. 数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import openai
import numpy as np
from typing import List

def get_embeddings(texts: List[str]) -> List[List[float]]:
"""获取文本嵌入"""
response = openai.Embedding.create(
input=texts,
model="text-embedding-ada-002"
)
return [item['embedding'] for item in response['data']]

# 准备数据
documents = [
"人工智能是模拟人类智能的技术",
"机器学习让计算机从数据中学习",
"深度学习是机器学习的子集",
"神经网络是深度学习的基础"
]

# 获取嵌入
embeddings = get_embeddings(documents)

# 准备向量数据
vectors = []
for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
vectors.append({
"id": f"doc_{i}",
"values": embedding,
"metadata": {
"text": doc,
"category": "AI",
"timestamp": "2024-01-01"
}
})

# 批量插入
index.upsert(vectors=vectors)

# 查询
query_text = "什么是机器学习?"
query_embedding = get_embeddings([query_text])[0]

results = index.query(
vector=query_embedding,
top_k=3,
include_metadata=True
)

print("查询结果:")
for match in results['matches']:
print(f"相似度: {match['score']:.3f}")
print(f"文本: {match['metadata']['text']}")
print("---")

3. 高级查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 混合查询(向量+元数据过滤)
filtered_results = index.query(
vector=query_embedding,
top_k=5,
filter={
"category": {"$eq": "AI"},
"timestamp": {"$gte": "2024-01-01"}
},
include_metadata=True
)

# 命名空间查询
index.query(
vector=query_embedding,
top_k=3,
namespace="production",
include_metadata=True
)

# 批量查询
query_vectors = get_embeddings([
"深度学习的应用",
"神经网络结构"
])

batch_results = index.query(
vector=query_vectors,
top_k=2,
include_metadata=True
)

🎨 UI框架:Gradio和Streamlit

🚀 Gradio:快速AI应用原型

1. 基础界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import gradio as gr
from transformers import pipeline

# 创建文本生成管道
generator = pipeline("text-generation", model="gpt2")

def generate_text(prompt, max_length, temperature):
result = generator(
prompt,
max_length=max_length,
temperature=temperature,
num_return_sequences=1,
pad_token_id=generator.tokenizer.eos_token_id
)
return result[0]['generated_text']

# 创建界面
iface = gr.Interface(
fn=generate_text,
inputs=[
gr.Textbox(label="输入提示", placeholder="请输入文本..."),
gr.Slider(10, 100, value=50, label="最大长度"),
gr.Slider(0.1, 2.0, value=0.7, label="创造性")
],
outputs=gr.Textbox(label="生成的文本"),
title="AI文本生成器",
description="使用GPT-2生成文本",
examples=[
["人工智能的未来", 50, 0.7],
["机器学习算法", 80, 0.5]
]
)

iface.launch()

2. 多功能应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import gradio as gr
from PIL import Image
import requests
from io import BytesIO

# 多个AI功能
def text_generation(prompt):
# 文本生成逻辑
return f"生成的文本基于: {prompt}"

def image_classification(image):
# 图像分类逻辑
return {"猫": 0.8, "狗": 0.2}

def sentiment_analysis(text):
# 情感分析逻辑
return {"正面": 0.7, "负面": 0.3}

# 创建多标签页界面
with gr.Blocks(title="AI工具箱") as demo:
gr.Markdown("# 🤖 AI工具箱")

with gr.Tab("文本生成"):
with gr.Row():
text_input = gr.Textbox(label="输入提示")
text_output = gr.Textbox(label="生成结果")
text_btn = gr.Button("生成")
text_btn.click(text_generation, text_input, text_output)

with gr.Tab("图像分类"):
with gr.Row():
image_input = gr.Image(label="上传图像")
image_output = gr.Label(label="分类结果")
image_btn = gr.Button("分类")
image_btn.click(image_classification, image_input, image_output)

with gr.Tab("情感分析"):
with gr.Row():
sentiment_input = gr.Textbox(label="输入文本")
sentiment_output = gr.Label(label="情感分析")
sentiment_btn = gr.Button("分析")
sentiment_btn.click(sentiment_analysis, sentiment_input, sentiment_output)

demo.launch(share=True)

📊 Streamlit:数据驱动的AI应用

1. 基础应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
from transformers import pipeline

# 页面配置
st.set_page_config(
page_title="AI数据分析平台",
page_icon="🤖",
layout="wide"
)

# 标题
st.title("🤖 AI数据分析平台")
st.markdown("---")

# 侧边栏
with st.sidebar:
st.header("配置选项")
model_choice = st.selectbox(
"选择模型",
["sentiment-analysis", "text-classification", "summarization"]
)

confidence_threshold = st.slider(
"置信度阈值",
0.0, 1.0, 0.5
)

# 主要内容
col1, col2 = st.columns([2, 1])

with col1:
st.header("文本分析")

# 文本输入
user_input = st.text_area(
"输入要分析的文本",
height=150,
placeholder="请输入文本..."
)

if st.button("开始分析", type="primary"):
if user_input:
# 创建分析管道
classifier = pipeline(model_choice)

# 执行分析
with st.spinner("分析中..."):
results = classifier(user_input)

# 显示结果
st.success("分析完成!")

# 结果可视化
if isinstance(results, list) and len(results) > 0:
result = results[0]

# 创建数据框
df = pd.DataFrame([
{"标签": result['label'], "置信度": result['score']}
])

# 显示表格
st.dataframe(df, use_container_width=True)

# 显示图表
fig = px.bar(
df,
x="标签",
y="置信度",
title="分析结果"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("请输入要分析的文本")

with col2:
st.header("统计信息")

if user_input:
# 文本统计
word_count = len(user_input.split())
char_count = len(user_input)

st.metric("字符数", char_count)
st.metric("单词数", word_count)
st.metric("平均词长", f"{char_count/word_count:.1f}" if word_count > 0 else "0")

# 历史记录
st.header("使用历史")
if 'history' not in st.session_state:
st.session_state.history = []

if user_input and st.button("保存到历史"):
st.session_state.history.append({
"时间": pd.Timestamp.now().strftime("%H:%M:%S"),
"文本": user_input[:50] + "..." if len(user_input) > 50 else user_input
})

if st.session_state.history:
history_df = pd.DataFrame(st.session_state.history)
st.dataframe(history_df, use_container_width=True)

2. 高级功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import streamlit as st
from streamlit_chat import message
from streamlit_option_menu import option_menu
import plotly.graph_objects as go

# 多页面应用
with st.sidebar:
selected = option_menu(
"主菜单",
["聊天机器人", "数据分析", "模型训练", "设置"],
icons=["chat", "graph-up", "cpu", "gear"],
menu_icon="cast",
default_index=0
)

if selected == "聊天机器人":
st.header("🤖 AI聊天助手")

# 初始化聊天历史
if 'messages' not in st.session_state:
st.session_state.messages = []

# 显示聊天历史
for i, msg in enumerate(st.session_state.messages):
message(msg["content"], is_user=msg["role"] == "user", key=f"msg_{i}")

# 用户输入
user_input = st.chat_input("输入消息...")

if user_input:
# 添加用户消息
st.session_state.messages.append({"role": "user", "content": user_input})

# 生成AI回复(这里应该调用实际的AI模型)
ai_response = f"我理解您说的:{user_input}。这是一个示例回复。"

# 添加AI回复
st.session_state.messages.append({"role": "assistant", "content": ai_response})

# 重新运行以更新界面
st.rerun()

elif selected == "数据分析":
st.header("📊 数据分析")

# 文件上传
uploaded_file = st.file_uploader(
"上传CSV文件",
type=["csv"],
help="请上传要分析的CSV文件"
)

if uploaded_file:
# 读取数据
df = pd.read_csv(uploaded_file)

# 显示数据预览
st.subheader("数据预览")
st.dataframe(df.head(), use_container_width=True)

# 数据统计
col1, col2, col3 = st.columns(3)

with col1:
st.metric("行数", len(df))
with col2:
st.metric("列数", len(df.columns))
with col3:
st.metric("缺失值", df.isnull().sum().sum())

# 选择列进行分析
numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()

if numeric_columns:
selected_column = st.selectbox("选择要分析的列", numeric_columns)

# 创建图表
fig = go.Figure()
fig.add_trace(go.Histogram(x=df[selected_column], name=selected_column))
fig.update_layout(title=f"{selected_column} 分布图")

st.plotly_chart(fig, use_container_width=True)

elif selected == "模型训练":
st.header("🧠 模型训练")

# 训练参数配置
with st.expander("训练参数", expanded=True):
col1, col2 = st.columns(2)

with col1:
epochs = st.number_input("训练轮数", min_value=1, max_value=100, value=10)
batch_size = st.selectbox("批次大小", [16, 32, 64, 128])

with col2:
learning_rate = st.number_input("学习率", min_value=0.0001, max_value=0.1, value=0.001, format="%.4f")
model_type = st.selectbox("模型类型", ["BERT", "RoBERTa", "DistilBERT"])

# 训练进度
if st.button("开始训练", type="primary"):
progress_bar = st.progress(0)
status_text = st.empty()

for i in range(epochs):
# 模拟训练过程
progress = (i + 1) / epochs
progress_bar.progress(progress)
status_text.text(f"训练进度: {i+1}/{epochs} 轮")

# 模拟训练时间
import time
time.sleep(0.1)

st.success("训练完成!")

elif selected == "设置":
st.header("⚙️ 系统设置")

# API配置
with st.expander("API配置"):
api_key = st.text_input("OpenAI API Key", type="password")
api_base = st.text_input("API Base URL", value="https://api.openai.com/v1")

# 模型配置
with st.expander("模型配置"):
default_model = st.selectbox(
"默认模型",
["gpt-3.5-turbo", "gpt-4", "claude-3-sonnet"]
)
max_tokens = st.number_input("最大Token数", min_value=100, max_value=4000, value=1000)

# 保存设置
if st.button("保存设置"):
st.success("设置已保存!")

🔧 实战项目:构建完整的RAG应用

📋 项目架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 项目结构
project_structure = """
rag_app/
├── app.py # Streamlit主应用
├── components/
│ ├── __init__.py
│ ├── document_loader.py # 文档加载器
│ ├── vector_store.py # 向量存储
│ ├── retriever.py # 检索器
│ └── generator.py # 生成器
├── config/
│ ├── __init__.py
│ └── settings.py # 配置文件
├── data/
│ └── documents/ # 文档存储
├── requirements.txt # 依赖包
└── README.md # 项目说明
"""

🏗️ 核心组件实现

1. 配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# config/settings.py
from pydantic import BaseSettings
from typing import Optional

class Settings(BaseSettings):
# API配置
openai_api_key: str
anthropic_api_key: Optional[str] = None

# 模型配置
embedding_model: str = "text-embedding-ada-002"
llm_model: str = "gpt-4-turbo"

# 向量数据库配置
vector_db_type: str = "chroma" # chroma, pinecone
chroma_persist_directory: str = "./chroma_db"
pinecone_index_name: Optional[str] = None

# 检索配置
chunk_size: int = 1000
chunk_overlap: int = 200
top_k: int = 4

# 生成配置
max_tokens: int = 1000
temperature: float = 0.7

class Config:
env_file = ".env"

settings = Settings()

2. 文档加载器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# components/document_loader.py
from typing import List, Union
from pathlib import Path
from langchain.document_loaders import (
TextLoader,
PyPDFLoader,
UnstructuredWordDocumentLoader,
CSVLoader
)
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

class DocumentLoader:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len
)

def load_file(self, file_path: Union[str, Path]) -> List[Document]:
"""加载单个文件"""
file_path = Path(file_path)

if file_path.suffix.lower() == '.txt':
loader = TextLoader(str(file_path), encoding='utf-8')
elif file_path.suffix.lower() == '.pdf':
loader = PyPDFLoader(str(file_path))
elif file_path.suffix.lower() in ['.doc', '.docx']:
loader = UnstructuredWordDocumentLoader(str(file_path))
elif file_path.suffix.lower() == '.csv':
loader = CSVLoader(str(file_path))
else:
raise ValueError(f"不支持的文件类型: {file_path.suffix}")

documents = loader.load()
return self.text_splitter.split_documents(documents)

def load_directory(self, directory_path: Union[str, Path]) -> List[Document]:
"""加载目录中的所有文件"""
directory_path = Path(directory_path)
all_documents = []

for file_path in directory_path.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in ['.txt', '.pdf', '.doc', '.docx', '.csv']:
try:
documents = self.load_file(file_path)
all_documents.extend(documents)
except Exception as e:
print(f"加载文件 {file_path} 时出错: {e}")

return all_documents

3. 向量存储管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# components/vector_store.py
from typing import List, Optional
from abc import ABC, abstractmethod
from langchain.schema import Document
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
import chromadb
from config.settings import settings

class VectorStoreBase(ABC):
@abstractmethod
def add_documents(self, documents: List[Document]) -> None:
pass

@abstractmethod
def similarity_search(self, query: str, k: int = 4) -> List[Document]:
pass

@abstractmethod
def delete_collection(self) -> None:
pass

class ChromaVectorStore(VectorStoreBase):
def __init__(self, collection_name: str = "default"):
self.embeddings = OpenAIEmbeddings(
model=settings.embedding_model,
openai_api_key=settings.openai_api_key
)
self.collection_name = collection_name
self.vectorstore = None
self._initialize_store()

def _initialize_store(self):
"""初始化向量存储"""
self.vectorstore = Chroma(
collection_name=self.collection_name,
embedding_function=self.embeddings,
persist_directory=settings.chroma_persist_directory
)

def add_documents(self, documents: List[Document]) -> None:
"""添加文档到向量存储"""
if documents:
self.vectorstore.add_documents(documents)
self.vectorstore.persist()

def similarity_search(self, query: str, k: int = 4) -> List[Document]:
"""相似性搜索"""
return self.vectorstore.similarity_search(query, k=k)

def delete_collection(self) -> None:
"""删除集合"""
self.vectorstore.delete_collection()

class VectorStoreFactory:
@staticmethod
def create_vector_store(store_type: str = "chroma", **kwargs) -> VectorStoreBase:
if store_type == "chroma":
return ChromaVectorStore(**kwargs)
else:
raise ValueError(f"不支持的向量存储类型: {store_type}")

4. 检索器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# components/retriever.py
from typing import List, Dict, Any
from langchain.schema import Document
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI
from components.vector_store import VectorStoreBase

class AdvancedRetriever:
def __init__(self, vector_store: VectorStoreBase, use_compression: bool = True):
self.vector_store = vector_store
self.use_compression = use_compression

if use_compression:
self.llm = ChatOpenAI(
model=settings.llm_model,
api_key=settings.openai_api_key,
temperature=0
)
self.compressor = LLMChainExtractor.from_llm(self.llm)

def retrieve(self, query: str, k: int = None) -> List[Document]:
"""检索相关文档"""
k = k or settings.top_k

# 基础检索
base_retriever = self.vector_store.vectorstore.as_retriever(
search_kwargs={"k": k}
)

if self.use_compression:
# 使用压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_compressor=self.compressor,
base_retriever=base_retriever
)
return compression_retriever.get_relevant_documents(query)
else:
return base_retriever.get_relevant_documents(query)

def retrieve_with_scores(self, query: str, k: int = None) -> List[tuple]:
"""检索文档并返回相似度分数"""
k = k or settings.top_k
return self.vector_store.vectorstore.similarity_search_with_score(query, k=k)

5. 生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# components/generator.py
from typing import List, Dict, Any, Optional
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from components.retriever import AdvancedRetriever
from config.settings import settings

class RAGGenerator:
def __init__(self, retriever: AdvancedRetriever):
self.retriever = retriever
self.llm = ChatOpenAI(
model=settings.llm_model,
api_key=settings.openai_api_key,
temperature=settings.temperature,
max_tokens=settings.max_tokens
)

# 自定义提示词模板
self.prompt_template = PromptTemplate(
input_variables=["context", "question"],
template="""
基于以下上下文信息回答问题。如果上下文中没有相关信息,请说明无法从提供的信息中找到答案。

上下文信息:
{context}

问题:{question}

回答:
"""
)

# 创建QA链
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.retriever.vector_store.vectorstore.as_retriever(),
chain_type_kwargs={"prompt": self.prompt_template},
return_source_documents=True
)

def generate_answer(self, question: str) -> Dict[str, Any]:
"""生成答案"""
result = self.qa_chain({"query": question})

return {
"answer": result["result"],
"source_documents": result["source_documents"],
"question": question
}

def generate_streaming_answer(self, question: str):
"""流式生成答案"""
# 检索相关文档
docs = self.retriever.retrieve(question)

# 构建上下文
context = "\n\n".join([doc.page_content for doc in docs])

# 构建提示词
prompt = self.prompt_template.format(context=context, question=question)

# 流式生成
for chunk in self.llm.stream(prompt):
yield chunk.content

🖥️ Streamlit应用界面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# app.py
import streamlit as st
import os
from pathlib import Path
from components.document_loader import DocumentLoader
from components.vector_store import VectorStoreFactory
from components.retriever import AdvancedRetriever
from components.generator import RAGGenerator
from config.settings import settings

# 页面配置
st.set_page_config(
page_title="智能文档问答系统",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)

# 初始化会话状态
if 'vector_store' not in st.session_state:
st.session_state.vector_store = None
if 'rag_generator' not in st.session_state:
st.session_state.rag_generator = None
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []

# 侧边栏配置
with st.sidebar:
st.header("📁 文档管理")

# 文件上传
uploaded_files = st.file_uploader(
"上传文档",
type=["txt", "pdf", "docx", "csv"],
accept_multiple_files=True,
help="支持TXT、PDF、DOCX、CSV格式"
)

# 处理上传的文件
if uploaded_files and st.button("处理文档", type="primary"):
with st.spinner("正在处理文档..."):
# 保存上传的文件
docs_dir = Path("./temp_docs")
docs_dir.mkdir(exist_ok=True)

saved_files = []
for uploaded_file in uploaded_files:
file_path = docs_dir / uploaded_file.name
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
saved_files.append(file_path)

# 加载文档
loader = DocumentLoader(
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap
)

all_documents = []
for file_path in saved_files:
try:
documents = loader.load_file(file_path)
all_documents.extend(documents)
except Exception as e:
st.error(f"处理文件 {file_path.name} 时出错: {e}")

if all_documents:
# 创建向量存储
vector_store = VectorStoreFactory.create_vector_store(
store_type=settings.vector_db_type,
collection_name="uploaded_docs"
)

# 添加文档到向量存储
vector_store.add_documents(all_documents)

# 创建检索器和生成器
retriever = AdvancedRetriever(vector_store)
rag_generator = RAGGenerator(retriever)

# 保存到会话状态
st.session_state.vector_store = vector_store
st.session_state.rag_generator = rag_generator

st.success(f"成功处理 {len(all_documents)} 个文档片段!")

# 清理临时文件
for file_path in saved_files:
file_path.unlink()

# 系统配置
st.header("⚙️ 系统配置")

# 检索参数
top_k = st.slider("检索文档数量", 1, 10, settings.top_k)
temperature = st.slider("生成温度", 0.0, 1.0, settings.temperature, 0.1)
max_tokens = st.number_input("最大Token数", 100, 2000, settings.max_tokens)

# 清除历史
if st.button("清除对话历史"):
st.session_state.chat_history = []
st.rerun()

# 主界面
st.title("🤖 智能文档问答系统")
st.markdown("基于RAG技术的智能文档问答,支持多种文档格式")

# 检查是否已加载文档
if st.session_state.rag_generator is None:
st.info("👈 请先在侧边栏上传文档")

# 显示示例
with st.expander("💡 使用说明", expanded=True):
st.markdown("""
### 如何使用:
1. **上传文档**:在左侧侧边栏上传您的文档文件
2. **处理文档**:点击"处理文档"按钮,系统会自动分析和索引文档内容
3. **开始问答**:在下方输入框中输入您的问题
4. **查看答案**:系统会基于文档内容生成准确的答案

### 支持的文档格式:
- 📄 TXT文本文件
- 📕 PDF文档
- 📘 Word文档(DOCX)
- 📊 CSV数据文件

### 功能特点:
- 🔍 智能检索:基于语义相似度检索相关内容
- 🧠 上下文理解:结合多个文档片段生成综合答案
- 📚 来源追踪:显示答案的具体来源文档
- 💬 对话记忆:支持多轮对话上下文
""")
else:
# 对话界面
st.header("💬 智能问答")

# 显示对话历史
for i, (question, answer, sources) in enumerate(st.session_state.chat_history):
with st.container():
st.markdown(f"**🙋 问题 {i+1}:** {question}")
st.markdown(f"**🤖 回答:** {answer}")

if sources:
with st.expander(f"📚 参考来源 ({len(sources)}个文档片段)"):
for j, source in enumerate(sources):
st.markdown(f"**片段 {j+1}:**")
st.text(source.page_content[:300] + "..." if len(source.page_content) > 300 else source.page_content)
if hasattr(source, 'metadata') and source.metadata:
st.caption(f"来源:{source.metadata}")

st.markdown("---")

# 问题输入
question = st.text_input(
"请输入您的问题:",
placeholder="例如:文档中提到了哪些关键技术?",
key="question_input"
)

col1, col2 = st.columns([1, 4])

with col1:
ask_button = st.button("🚀 提问", type="primary", use_container_width=True)

with col2:
stream_mode = st.checkbox("流式输出", value=False)

if ask_button and question:
with st.spinner("正在思考中..."):
try:
if stream_mode:
# 流式输出
answer_placeholder = st.empty()
answer_text = ""

for chunk in st.session_state.rag_generator.generate_streaming_answer(question):
answer_text += chunk
answer_placeholder.markdown(f"**🤖 回答:** {answer_text}")

# 获取来源文档
docs = st.session_state.rag_generator.retriever.retrieve(question, k=top_k)

result = {
"answer": answer_text,
"source_documents": docs,
"question": question
}
else:
# 普通输出
result = st.session_state.rag_generator.generate_answer(question)

# 添加到对话历史
st.session_state.chat_history.append((
result["question"],
result["answer"],
result["source_documents"]
))

# 重新运行以显示新的对话
st.rerun()

except Exception as e:
st.error(f"生成答案时出错:{e}")

# 统计信息
if st.session_state.chat_history:
st.header("📊 对话统计")

col1, col2, col3 = st.columns(3)

with col1:
st.metric("对话轮数", len(st.session_state.chat_history))

with col2:
total_chars = sum(len(answer) for _, answer, _ in st.session_state.chat_history)
st.metric("总回答字数", total_chars)

with col3:
avg_sources = sum(len(sources) for _, _, sources in st.session_state.chat_history) / len(st.session_state.chat_history)
st.metric("平均参考来源", f"{avg_sources:.1f}")

# 页脚
st.markdown("---")
st.markdown(
"<div style='text-align: center; color: gray;'>"
"🤖 智能文档问答系统 | 基于LangChain和Streamlit构建"
"</div>",
unsafe_allow_html=True
)

📦 部署配置

1. 依赖文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# requirements.txt
streamlit>=1.28.0
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.10
chromadb>=0.4.0
openai>=1.10.0
tiktoken>=0.5.0
pandas>=2.0.0
numpy>=1.24.0
plotly>=5.15.0
pydantic>=2.0.0
python-dotenv>=1.0.0
PyPDF2>=3.0.0
python-docx>=0.8.11
unstructured>=0.10.0
jieba>=0.42.1

2. 环境配置

1
2
3
4
5
6
7
8
9
10
11
12
# .env
OPENAI_API_KEY=your-openai-api-key
ANTHROPIC_API_KEY=your-anthropic-api-key
EMBEDDING_MODEL=text-embedding-ada-002
LLM_MODEL=gpt-4-turbo
VECTOR_DB_TYPE=chroma
CHROMA_PERSIST_DIRECTORY=./chroma_db
CHUNK_SIZE=1000
CHUNK_OVERLAP=200
TOP_K=4
MAX_TOKENS=1000
TEMPERATURE=0.7

3. 启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
# run.sh

# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows

# 安装依赖
pip install -r requirements.txt

# 启动应用
streamlit run app.py --server.port 8501

🚀 最佳实践和优化建议

1. 性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 缓存优化
import functools
from typing import List

@functools.lru_cache(maxsize=128)
def cached_embedding(text: str) -> List[float]:
"""缓存嵌入计算结果"""
return embeddings.embed_query(text)

# 批量处理
def batch_process_documents(documents: List[Document], batch_size: int = 10):
"""批量处理文档以提高效率"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
yield batch

# 异步处理
import asyncio

async def async_generate_answer(question: str) -> str:
"""异步生成答案"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
rag_generator.generate_answer,
question
)
return result

2. 错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 健壮的错误处理
class RAGError(Exception):
"""RAG系统自定义异常"""
pass

def safe_generate_answer(question: str, max_retries: int = 3) -> Dict[str, Any]:
"""安全的答案生成,包含重试机制"""
for attempt in range(max_retries):
try:
return rag_generator.generate_answer(question)
except Exception as e:
if attempt == max_retries - 1:
raise RAGError(f"生成答案失败: {e}")
time.sleep(2 ** attempt) # 指数退避

return {"answer": "抱歉,暂时无法生成答案", "source_documents": []}

3. 监控和日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('rag_app.log'),
logging.StreamHandler()
]
)

logger = logging.getLogger(__name__)

class MonitoredRAGGenerator(RAGGenerator):
def generate_answer(self, question: str) -> Dict[str, Any]:
start_time = datetime.now()
logger.info(f"开始处理问题: {question[:50]}...")

try:
result = super().generate_answer(question)
duration = (datetime.now() - start_time).total_seconds()

logger.info(f"问题处理完成,耗时: {duration:.2f}秒")
return result

except Exception as e:
logger.error(f"问题处理失败: {e}")
raise

📈 进阶功能扩展

1. 多模态RAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 支持图像和文本的多模态RAG
from langchain.document_loaders import UnstructuredImageLoader
from PIL import Image

class MultiModalRAG:
def __init__(self):
self.text_retriever = AdvancedRetriever(text_vector_store)
self.image_retriever = AdvancedRetriever(image_vector_store)

def process_image(self, image_path: str) -> str:
"""处理图像并提取文本描述"""
# 使用GPT-4V分析图像
with open(image_path, "rb") as image_file:
response = openai.ChatCompletion.create(
model="gpt-4-vision-preview",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "请详细描述这张图片的内容"},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64.b64encode(image_file.read()).decode()}"}}
]
}
]
)
return response.choices[0].message.content

def multimodal_search(self, query: str) -> List[Document]:
"""多模态搜索"""
text_results = self.text_retriever.retrieve(query)
image_results = self.image_retriever.retrieve(query)

# 合并和排序结果
all_results = text_results + image_results
return all_results[:10] # 返回前10个结果

2. 实时学习和更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 实时学习系统
class AdaptiveRAG:
def __init__(self):
self.feedback_store = []
self.performance_metrics = {}

def collect_feedback(self, question: str, answer: str, rating: int, feedback: str):
"""收集用户反馈"""
self.feedback_store.append({
"question": question,
"answer": answer,
"rating": rating,
"feedback": feedback,
"timestamp": datetime.now()
})

def update_retrieval_strategy(self):
"""基于反馈更新检索策略"""
# 分析反馈数据
low_rating_questions = [
item for item in self.feedback_store
if item["rating"] < 3
]

# 调整检索参数
if len(low_rating_questions) > 10:
# 增加检索的文档数量
settings.top_k = min(settings.top_k + 1, 10)
logger.info(f"调整检索参数,top_k增加到: {settings.top_k}")

🎯 总结和选择建议

框架选择矩阵

使用场景 推荐框架组合 理由
快速原型 LangChain + Gradio 开发速度快,组件丰富
企业应用 LlamaIndex + Streamlit 专业RAG,界面美观
研究项目 Transformers + Jupyter 灵活性高,可定制性强
生产环境 LangChain + FastAPI + React 性能稳定,可扩展性好

学习路径建议

  1. 入门阶段(1-2周)

    • 学习LangChain基础概念
    • 完成简单的LLM调用
    • 尝试Gradio快速原型
  2. 进阶阶段(2-4周)

    • 掌握RAG系统构建
    • 学习向量数据库使用
    • 开发完整的问答应用
  3. 高级阶段(1-2个月)

    • 优化系统性能
    • 添加监控和日志
    • 部署到生产环境

常见问题解决

  1. 内存不足:使用批量处理和缓存优化
  2. 响应慢:优化检索算法和模型选择
  3. 答案质量差:改进提示词和增加上下文
  4. 成本过高:使用本地模型或优化API调用

AI应用开发框架为构建智能应用提供了强大的基础设施。通过合理选择和组合这些框架,您可以快速构建出功能强大、性能优秀的AI应用。

下一篇预告AI编程助手使用指南 - 深入了解GitHub Copilot、Cursor等AI编程工具的使用技巧。

本文为AI开发工具系列文章第三篇,更多精彩内容请关注后续文章。