首先可以學(xué)習(xí)一些使用langchain的項(xiàng)目:
- https://dagster.io/blog/chatgpt-langchain
- https://github.com/namuan/dr-doc-search
- https://simonwillison.net/2023/Jan/13/semantic-search-answers/
- https://github.com/slavingia/askmybook
用到的python庫:
pydantic: 數(shù)據(jù)驗(yàn)證: 參考: https://www.cnblogs.com/fengqiang626/p/13307771.html
typing: 類型約束
manifest-ml: 多個(gè)大模型(不包含所有)的統(tǒng)一調(diào)用客戶端
promptlayer: 記錄對openapi的調(diào)用, 搜索歷史, 性能追蹤.
unstructured: 若用戶在使用 0.4.9 以下版本, 讀取到的信息沒有metadata字段, 而如果大于等于0.4.9則有
tenacity: 重試
用到的python特性:
dataclass: 合于存儲數(shù)據(jù)對象(data object)的Python類, 參考 https://zhuanlan.zhihu.com/p/59657729
LLM 模型層
BaseLanguageModel : 抽象基類, 和各個(gè)模型交互的通用行為:基于用戶的輸入生成prompt
BaseLLM: 通用的基礎(chǔ)大模型基類, 增加了緩存選項(xiàng), 回調(diào)選項(xiàng), 有部分序列化能力, 持有各種參數(shù).
LLM : 和大模型的交互抽象, 所有子類都有自己的交互實(shí)現(xiàn). 對它的調(diào)用, 將直接獲取完全的prompt, 配合大模型特有的參數(shù), 如temperture, length, top_p等等, 組裝后, 利用一個(gè)client(組裝的邏輯,既可以讓專有client,即sdk吃掉,也可以組裝后, 給到httpclient后者本地進(jìn)程)發(fā)送給背后的大模型.
BaseOpenAI 為了OpenAI設(shè)計(jì)的專有類, 因?yàn)镺penAI的模型有不同的調(diào)用平臺, 如部署在azure的, 官網(wǎng)自己的. NOTICE: 如果模型是"gpt-3.5-turbo" 則, 直接返回OpenAIChat
. 否則說明是其他模型
貼一段基類的模板代碼, 分析:
- 獲取調(diào)用參數(shù)(組裝openAI模型調(diào)用的請求體,請求參數(shù))
- 獲取子prompts
- 監(jiān)測token的消耗
- 對每個(gè)prompt開啟調(diào)用(重試機(jī)制)
- 生成統(tǒng)一的結(jié)果
def _generate(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> LLMResult:
"""Call out to OpenAI's endpoint with k unique prompts.
Args:
prompts: The prompts to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The full LLM output.
Example:
.. code-block:: python
response = openai.generate(["Tell me a joke."])
"""
# TODO: write a unit test for this
params = self._invocation_params
sub_prompts = self.get_sub_prompts(params, prompts, stop)
choices = []
token_usage: Dict[str, int] = {}
# Get the token usage from the response.
# Includes prompt, completion, and total tokens used.
_keys = {"completion_tokens", "prompt_tokens", "total_tokens"}
for _prompts in sub_prompts:
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
params["stream"] = True
response = _streaming_response_template()
for stream_resp in completion_with_retry(
self, prompt=_prompts, **params
):
self.callback_manager.on_llm_new_token(
stream_resp["choices"][0]["text"],
verbose=self.verbose,
logprobs=stream_resp["choices"][0]["logprobs"],
)
_update_response(response, stream_resp)
choices.extend(response["choices"])
else:
response = completion_with_retry(self, prompt=_prompts, **params)
choices.extend(response["choices"])
if not self.streaming:
# Can't update token usage if streaming
update_token_usage(_keys, response, token_usage)
return self.create_llm_result(choices, prompts, token_usage)
PromptLayerOpenAI... : 只是增加了promptlayer
層
OpenAIChat : 專和OpenAI的3.5模型交互的類
貼一段參數(shù)準(zhǔn)備的代碼:
- 校驗(yàn)參數(shù)的個(gè)數(shù), 只有一個(gè)prompt
- 預(yù)設(shè)參數(shù), user, prefix, max_tokens
def _get_chat_params(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> Tuple:
if len(prompts) > 1:
raise ValueError(
f"OpenAIChat currently only supports single prompt, got {prompts}"
)
messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}]
params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params}
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if params.get("max_tokens") == -1:
# for ChatGPT api, omitting max_tokens is equivalent to having no limit
del params["max_tokens"]
return messages, params
Chains 模塊
Chains是組合其他各層的膠水, 亦是用戶程序的調(diào)用入口.
Chain: 基類, 是所有chain對象的基本入口. 與用戶程序交互, 處理用戶的輸入, 準(zhǔn)備其他模塊的輸入, 提供內(nèi)存能力, chain的回調(diào)能力. 其他所有的 Chain 類都繼承自這個(gè)基類,并根據(jù)需要實(shí)現(xiàn)特定的功能。chain通過傳入string值, 來控制接受的輸入和給出的輸出. 如input_key為["abc", "def"], 那么它只會處理用戶輸入的dict里面的這兩個(gè)參數(shù), 如果output_key為["uvw"], 那么它在輸出的時(shí)候會過濾掉其他的dict值.
繼承Chain的子類主要有兩種類型:
- 通用 generic chain: 不在乎chain的具體類型, 控制chain的調(diào)用順序, 是否調(diào)用. 他們可以用來合并構(gòu)造其他的chain.
- 具體chain: 和通用chain比較來說, 他們承擔(dān)了具體的某項(xiàng)任務(wù), 可以和通用的chain組合起來使用, 也可以直接使用.
LLMChain : 針對語言模型LLM的查詢, 可以格式化prompt以及調(diào)用語言模型.
其他 Chain 類 :除了 LLMChain沃斤,還有其他繼承自 Chain 的類陨瘩,它們根據(jù)不同的需求實(shí)現(xiàn)了特定的功能。例如逊脯,有些 Chain 類可能用于處理文本數(shù)據(jù)优质,有些可能用于處理圖像數(shù)據(jù),有些可能用于處理音頻數(shù)據(jù)等军洼。這些類都繼承自 Chain 基類巩螃,并根據(jù)需要實(shí)現(xiàn)特定的輸入和輸出處理方法。
具體的chain就像是大模型在某種業(yè)務(wù)場景下的應(yīng)用模式總結(jié), 如VectorDBQA
就是利用 vector存儲和大模型, 在vectorstore中用某種相似算法找到和問題類似的doc, 之后利用大模型將doc和問題一起給到到模型, 讓大模型解釋給出結(jié)果. 杜宇這類合并文檔的任務(wù): BaseCombineDocumentsChain
有四種不同的模式.
generic chain
loading from hub
從hub里獲取某個(gè)配置好的chain, 實(shí)際的chain類型不會超過庫里已經(jīng)定義的. 舉例:
https://langchain.readthedocs.io/en/latest/modules/chains/generic/from_hub.html
從服務(wù)端拉下來一個(gè)VectorDBQA類型的chain, 本地根據(jù)拉下來的配置初始化chain.
LLM Chain
對大模型最直接的調(diào)用chain, 常常被用來組合成其他的chain.
如 APIChain中, SequentialChain中
template = """Write a {adjective} poem about {subject}."""
prompt = PromptTemplate(template=template, input_variables=["adjective", "subject"])
llm_chain = LLMChain(prompt=prompt, llm=OpenAI(temperature=0), verbose=True)
llm_chain.predict(adjective="sad", subject="ducks")
Sequentical Chain
順序執(zhí)行chains
- SimpleSequentialChain: 前者的輸出就是后者的輸入
- SequentialChain: 允許多個(gè)輸入和輸出
Serialization
所有的chain都可以持久化, 以及從持久化中恢復(fù). 具體在
langchain.chains.load_chain
方法中, load_chain_from_file
從file(虛擬file,可能是網(wǎng)絡(luò)) 中讀取到配置后, load_chain_from_config
從配置中獲取所有的chain的配置信息,
針對每一種chain都有對應(yīng)的load方法
type_to_loader_dict = {
"api_chain": _load_api_chain,
"hyde_chain": _load_hyde_chain,
"llm_chain": _load_llm_chain,
"llm_bash_chain": _load_llm_bash_chain,
"llm_checker_chain": _load_llm_checker_chain,
"llm_math_chain": _load_llm_math_chain,
"llm_requests_chain": _load_llm_requests_chain,
"pal_chain": _load_pal_chain,
"qa_with_sources_chain": _load_qa_with_sources_chain,
"stuff_documents_chain": _load_stuff_documents_chain,
"map_reduce_documents_chain": _load_map_reduce_documents_chain,
"map_rerank_documents_chain": _load_map_rerank_documents_chain,
"refine_documents_chain": _load_refine_documents_chain,
"sql_database_chain": _load_sql_database_chain,
"vector_db_qa_with_sources_chain": _load_vector_db_qa_with_sources_chain,
"vector_db_qa": _load_vector_db_qa,
}
舉例:
def _load_stuff_documents_chain(config: dict, **kwargs: Any) -> StuffDocumentsChain:
if "llm_chain" in config:
llm_chain_config = config.pop("llm_chain")
llm_chain = load_chain_from_config(llm_chain_config)
elif "llm_chain_path" in config:
llm_chain = load_chain(config.pop("llm_chain_path"))
else:
raise ValueError("One of `llm_chain` or `llm_chain_config` must be present.")
if not isinstance(llm_chain, LLMChain):
raise ValueError(f"Expected LLMChain, got {llm_chain}")
if "document_prompt" in config:
prompt_config = config.pop("document_prompt")
document_prompt = load_prompt_from_config(prompt_config)
elif "document_prompt_path" in config:
document_prompt = load_prompt(config.pop("document_prompt_path"))
else:
raise ValueError(
"One of `document_prompt` or `document_prompt_path` must be present."
)
return StuffDocumentsChain(
llm_chain=llm_chain, document_prompt=document_prompt, **config
)
更加細(xì)致的組件有:
llm的loader, prompt的loader, 等等, 分別在每個(gè)模塊下的loading.py文件中
transformation chain
提供了一個(gè)機(jī)制, 對用戶的輸入進(jìn)行修改. 舉例:
def transform_func(inputs: dict) -> dict:
text = inputs["text"]
shortened_text = "\n\n".join(text.split("\n\n")[:3])
return {"output_text": shortened_text}
transform_chain = TransformChain(input_variables=["text"], output_variables=["output_text"], transform=transform_func)
輸入層 prompt
prompt傳遞給大模型的消息模板和抽象. 一般會在prompt中放三類內(nèi)容:
- 對大模型的指示, 會話的設(shè)置, 如: 你是一個(gè)技術(shù)精湛的程序員
- 一些example, 以幫助大模型更好的理解輸入和給出輸出
- 提出的問題
BasePromptTemplate
作為基類, 暴露格式化prompt模板的方法, 返回一個(gè)prompt.
參數(shù):
- input_variables, promptTemplate內(nèi)部需要接受的參數(shù)
- output_parser: 對于大模型的返回, 可以用output_parser來解析它的返回. output_parser 的解析側(cè)重是: 持有大模型返回的消息的結(jié)構(gòu), 做信息的提取, 如返回是一個(gè)json, json有字段為realAns, resAns是prompt需要的答案, 再或者返回的數(shù)據(jù)需要以
;
切分為列表; 而不是側(cè)重網(wǎng)絡(luò)數(shù)據(jù)解析, 序列化反序列化.
@abstractmethod def format_prompt(self, **kwargs: Any) -> PromptValue:
格式化用戶輸入, 得到prompt
@root_validator() def validate_variable_names(cls, values: Dict) -> Dict:
驗(yàn)證輸入, 用戶輸入是否覆蓋了partial 輸入
需要關(guān)注的是兩個(gè)子類: PromptTemplate
, ChatPromptTemplate
前者是一次問答型業(yè)務(wù)常用的, 后者是問答聊天型業(yè)務(wù)設(shè)計(jì)的.
PromptTemplate 利用的語言自己的format能力 或者 其他庫(如web開發(fā)常用的ninja引擎), 舉例, 如從template中提取用戶需要輸入的變量, template如: i am a {someadj} student
其中someadj
就是用戶要輸入的變量.
input_variables = {
v for _, v, _, _ in Formatter().parse(template) if v is not None
}
ChatPromptTemplate :
專屬chat的prompt設(shè)計(jì), 存儲著chat的messages/templates
@classmethod
def from_role_strings(
cls, string_messages: List[Tuple[str, str]]
) -> ChatPromptTemplate:
messages = [
ChatMessagePromptTemplate(
content=PromptTemplate.from_template(template), role=role
)
for role, template in string_messages
]
return cls.from_messages(messages)
BaseMessagePromptTemplate
是chat中的一條消息/模板, 對應(yīng)消息有speaker, AI, Human,System => ChatMessage, 如下, 本質(zhì)和StringPromptTemplate沒太大區(qū)別, 只是它的含義是chat的一條message, 且不能(適合)像StringPromptTemplate一樣放多個(gè)問題在里面.
數(shù)據(jù)導(dǎo)入- loader
document_loaders
中含有大量的不同數(shù)據(jù)源的loader, loader的基本邏輯是: 連接到數(shù)據(jù)源, 拉取數(shù)據(jù), 按照指定的大小切塊.
BaseLoader
接口
Document
統(tǒng)一的數(shù)據(jù)表示, 不同的數(shù)據(jù)源的數(shù)據(jù)都要表示成Document, 方便Splitter的設(shè)計(jì)和處理.
以UnstructuredBaseLoader
為例
def _get_elements(self) -> List:
"""Get elements."""
@abstractmethod
def _get_metadata(self) -> dict:
"""Get metadata."""
def load(self) -> List[Document]:
"""Load file."""
elements = self._get_elements()
if self.mode == "elements":
docs: List[Document] = list()
for element in elements:
metadata = self._get_metadata()
# NOTE(MthwRobinson) - the attribute check is for backward compatibility
# with unstructured<0.4.9. The metadata attributed was added in 0.4.9.
if hasattr(element, "metadata"):
metadata.update(element.metadata.to_dict())
if hasattr(element, "category"):
metadata["category"] = element.category
docs.append(Document(page_content=str(element), metadata=metadata))
elif self.mode == "single":
metadata = self._get_metadata()
text = "\n\n".join([str(el) for el in elements])
docs = [Document(page_content=text, metadata=metadata)]
else:
raise ValueError(f"mode of {self.mode} not supported.")
return docs
數(shù)據(jù)訪問 - store
Docstore
訪問存儲了docs的接口, 任何的數(shù)據(jù)庫,網(wǎng)絡(luò),甚至內(nèi)存塊都可以成為一個(gè)store, 只要實(shí)現(xiàn)了search
接口, 能夠從中搜索doc.
舉例: 把wiki當(dāng)做是一個(gè)store
def search(self, search: str) -> Union[str, Document]:
"""Try to search for wiki page.
If page exists, return the page summary, and a PageWithLookups object.
If page does not exist, return similar entries.
"""
import wikipedia
try:
page_content = wikipedia.page(search).content
url = wikipedia.page(search).url
result: Union[str, Document] = Document(
page_content=page_content, metadata={"page": url}
)
except wikipedia.PageError:
result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
except wikipedia.DisambiguationError:
result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
return result
記憶模塊 memory
大模型在會話的時(shí)候, 多數(shù)是無狀態(tài)的, 需要調(diào)用者自己維持context. memory是通用的記憶模塊, 幫助所有基于langchain做開發(fā)的應(yīng)用維持會話的context, 或者是(a concept of state around through a user's interactions).
有兩種使用機(jī)制: 一種是可以從memory中提取一定的信息, 如messages的序列. 還可以是 在chain中直接使用memory, memory和chain本身會將 context 以某種形式(可能是經(jīng)過llm精煉過的, 不是原始的kv) 存儲下來.
舉例: ConversationBufferMemory
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
if self.output_key is None:
if len(outputs) != 1:
raise ValueError(f"One output key expected, got {outputs.keys()}")
output_key = list(outputs.keys())[0]
else:
output_key = self.output_key
self.chat_memory.add_user_message(inputs[prompt_input_key])
self.chat_memory.add_ai_message(outputs[output_key])
舉例: ConversationEntityMemory
先是使用大模型從歷史對話的k條內(nèi)容中提取名詞實(shí)體, 之后作為參數(shù)
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Return history buffer."""
chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt)
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
buffer_string = get_buffer_string(
self.buffer[-self.k * 2 :],
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
output = chain.predict(
history=buffer_string,
input=inputs[prompt_input_key],
)
if output.strip() == "NONE":
entities = []
else:
entities = [w.strip() for w in output.split(",")] // 歷史中提取到的實(shí)體
entity_summaries = {}
for entity in entities:
entity_summaries[entity] = self.store.get(entity, "") // 獲取每個(gè)實(shí)體的值, 并且更新
self.entity_cache = entities // 將此次抽取的實(shí)體們保存下來
if self.return_messages:
buffer: Any = self.buffer[-self.k * 2 :]
else:
buffer = buffer_string
return {
self.chat_history_key: buffer,
"entities": entity_summaries,
}
將歷史保存起來: 從上一次的實(shí)體抽取中獲取每個(gè)抽取的實(shí)體, 之后利用大模型進(jìn)行總結(jié), 將每個(gè)實(shí)體的總結(jié)結(jié)果保存起來. 稍微留一下代碼chain.predict : 在langchain項(xiàng)目中, 很多用到這樣的傳參方式, 雖然chain用不到那些參數(shù), 但是這個(gè)chain實(shí)例內(nèi)的prompt會用. 對于熟悉靜態(tài)類型編程的同學(xué),一般都不太習(xí)慣這種方式, 一般情況下更多的會用一個(gè)context類來向下傳遞參數(shù)吧. todo: 優(yōu)化: 設(shè)計(jì)ParamContext來傳遞參數(shù), 使用者使用ParamConsumer來從中提取關(guān)注的信息, 這樣可以避免dict, kwargs滿天飛的場景
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
super().save_context(inputs, outputs)
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
for entity in self.entity_cache:
chain = LLMChain(llm=self.llm, prompt=self.entity_summarization_prompt)
# key value store for entity
existing_summary = self.store.get(entity, "")
buffer_string = get_buffer_string(
self.buffer[-self.k * 2 :],
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
output = chain.predict(
summary=existing_summary,
history=buffer_string,
input=inputs[prompt_input_key],
entity=entity,
)
self.store[entity] = output.strip()
agent and tools 和其他項(xiàng)目的交互
agent
Agent
負(fù)責(zé)調(diào)用大模型, 并且決定接下來的動作. 調(diào)用大模型的結(jié)果是做決定的參考:
plan
-> _get_next_action
-> llm(專屬prompt).predict
-> _extract_tool_and_input 獲取下一個(gè)action以及要給它的輸入, 得到 AgentAction
AgentAction
: agent要執(zhí)行的動作封裝.
AgentExecutor
: 從命名上理解是 Agent的執(zhí)行環(huán)境, 執(zhí)行器. Agent
只能決定接下來要執(zhí)行的動作, 而AgentExecutor
才是具體發(fā)起執(zhí)行, 進(jìn)行執(zhí)行的執(zhí)行者. _take_next_step
-> Agent.plan
-> tool.run
tools
BaseTool
子類要重寫 run
以賦予不同的運(yùn)行邏輯
向量化 和 存儲 vector
VectorStore
: 向量存儲接口, 子類需要實(shí)現(xiàn)抽象方法(加粗的)以完成對應(yīng)引擎的訪問.
- add_texts
- add_documents
- similarity_search
- similarity_search_by_vector
- max_marginal_relevance_search
- max_marginal_relevance_search_by_vector
- from_documents
- from_texts
舉例: OpenSearchVectorStore
調(diào)用 大模型的embedding 能力, 將其向量化, 并且寫入索引. (需預(yù)先創(chuàng)建好索引, 自動創(chuàng)建的索引數(shù)據(jù)類型不對)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
bulk_size: Bulk API request count; Default: 500
Returns:
List of ids from adding the texts into the vectorstore.
"""
embeddings = [
self.embedding_function.embed_documents(list(text))[0] for text in texts
]
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
return _bulk_ingest_embeddings(
self.client, self.index_name, embeddings, texts, metadatas
)
相似度搜索, 還是用大模型的embedding能力, 將查詢轉(zhuǎn)化為向量, 然后利用存儲引擎自身的相似度搜索能力, 搜索出文檔. 搜索完成后, 取k條結(jié)果, 返回給上層.
embedding = self.embedding_function.embed_query(query)
search_type = _get_kwargs_value(kwargs, "search_type", "approximate_search")
if search_type == "approximate_search":
size = _get_kwargs_value(kwargs, "size", 4)
search_query = _default_approximate_search_query(embedding, size, k)
elif search_type == SCRIPT_SCORING_SEARCH:
space_type = _get_kwargs_value(kwargs, "space_type", "l2")
pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
search_query = _default_script_query(embedding, space_type, pre_filter)
elif search_type == PAINLESS_SCRIPTING_SEARCH:
space_type = _get_kwargs_value(kwargs, "space_type", "l2Squared")
pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
search_query = _default_painless_scripting_query(
embedding, space_type, pre_filter
)
from_texts: 構(gòu)造vectorstore, 創(chuàng)建新索引, 且將數(shù)據(jù)寫入. 封裝了這一系列過程.
embedding
向量接口, 每個(gè)子類需要實(shí)現(xiàn)兩個(gè)方法, 分別向量化文檔和查詢. 在實(shí)現(xiàn)中要注意的細(xì)節(jié)點(diǎn)是, 和大模型交互, 控制每次發(fā)送的數(shù)據(jù)量. 以及增加重試機(jī)制.
def embed_documents(self, texts: List[str]) -> List[List[float]]:
def embed_query(self, text: str) -> List[float]: