Code Interpreter 是什麼?#
在前一段時間 ChatGPT Code Interpreter 推出後,大家應該都已經對它有了一些認識,知道它是什麼、可以做什麼,所以這裡不再重複解釋這些基礎問題,這次我們換一個角度,從流程和要素的角度來看看怎麼理解 Code Interpreter。
在常規的 ChatGPT 交互中,流程和要素是:Prompt => Output Text,這也是為什麼 ChatGPT 剛推出的時候馬上湧現出了 Prompt Engineering 的概念以及配套構建 Prompt 的 embedding 工作,因為它的流程短,要素簡單,構造出一個好的 Prompt 就是這個流程中的關鍵。對於 Code Interpreter 來說,它的流程和要素是這樣的:Prompt => Code ==Interpreter==> Output (Description, Image, File...)。這帶來了一些變化:
- Prompt 構造不再是直接面向輸出,而是生成中間代碼
- 需要一個代碼解釋器,能夠區分會話、執行代碼,保存中間變量等
- 輸出變得更加多樣,可以是圖片、文件等
為什麼要實現 Code Interpreter#
ChatGPT 已經實現了 Code Interpreter,依托於 OpenAI 的 GPT 模型,能力也很強大,為什麼還需要實現一個自己的 Code Interpreter?除了向行業領頭羊看齊以及對接公司內部模型能力外,我們還可以想一下自己來實現會有什麼增量。可以看到的典型增量有:
- 能夠與實時數據交互:ChatGPT 的 Code Interpreter 不具有 Plugins 的聯網能力,開啟 Code Interpreter 後就不能再選中 plugin,這導致 ChatGPT Code Interpreter 的數據實時性不足,不能做類似於 “將 2023 年蘋果公司的股市表現繪製成圖” 這樣的事情
- 能夠與更多環境交互:本地或雲端部署之後,就擁有了更自由的環境,無論是操作文件系統,還是調用 API,安裝 ChatGPT Code Interpreter 不支持的包等,都成為了可能
實現 Code Interpreter 的思路#
要實現 Code Interpreter 有兩個核心關注點:一是借助模型能力,比如 OpenAI API 的 Function Calling 能力產出要調用的代碼;二是擁有一個能執行 Python 代碼的環境,例如用戶指定要輸出一個正弦函數圖,那麼需要得到繪製正弦函數圖的代碼,然後送到 Python 解釋器中執行,輸出圖片然後展示給用戶。在這個過程中 LLM Agent 可能還要對結果做一些解釋和細節補充。此外還需要考慮文件 IO 以及會話中變量保存的能力。
如果用 LangChain 來實現的話,這會更加方便,這裡 Python 解釋器和文件 IO、變量保存都可以看作是 LangChain 的一個 Tool,把它塞到 LangChain 裡 Agent Executor 裡來調用。按照這個思路社區已經有了開源的實現:codebox-api ,可以把它註冊成 LangChain Tool。除了提供代碼執行的核心能力的 Tool,我們還需要一些周邊來實現:會話管理、發起 Kernel 調用、文件 IO,即每當創建一個新的會話時,就創建一個新的 Jupyter Kernel channel 來執行代碼,之後將執行結果按照輸出的類型分門別類反饋給用戶,這一部分上面的 codebox-api 的作者也把它封裝成了解決方案:codeinterpreter-api。
實現 Code Interpreter 的設計#
接下來是對 codeinterpreter-api 項目的梳理拆解,看下具體怎麼設計和實現上述的思路。由於項目主要是用 LangChain 來編排整個流程的,這裡先補充說明一下項目用到 LangChain 部分的基礎概念:
LangChain 中的部分基礎概念:#
- LangChain Agents:LangChain 中的一個基礎模塊,核心思想是用 LLM 來選擇要採取的一系列動作。與硬編碼在鏈(chains)中的動作序列不同,代理(agents)使用語言模型作為推理引擎來決定要採取的動作及其順序。
- LangChain Tools:Tools 是 Agent 調用的能力。主要有兩個方面需要考慮:給 Agent 提供正確的工具,並以對 Agent 最有幫助的方式描述這些工具。在面向 Code Interpreter 創建自定義 StructuredTool 時需要定義:name, description, func (同步調用的函數), coroutine (異步調用的函數), args_schema (輸入的 schema)
- LangChain Agent Executor:Agent Executor 是 Agent 的運行時。它實際上調用 Agent 並執行它選擇的動作。這個執行器還做了一些額外的降低複雜性的工作,例如處理:Agent 選擇不存在的 Tool、Tool 出錯、Agent 產生無法解析為 Tool 調用的輸出等情況
流程設計與實現#
在有了上述基礎概念之後,就可以來看怎麼基於 LangChain Agent 實現 Code Interpreter 了,我們通過下面這段代碼來看具體的執行流程:
from codeinterpreterapi import CodeInterpreterSession, File
async def main():
# context manager for start/stop of the session
async with CodeInterpreterSession(model="gpt-3.5-turbo") as session:
# define the user request
user_request = "Analyze this dataset and plot something interesting about it."
files = [
File.from_path("examples/assets/iris.csv"),
]
# generate the response
response = await session.generate_response(user_request, files=files)
# output the response (text + image)
response.show()
if __name__ == "__main__":
import asyncio
# run the async function
asyncio.run(main())
效果如圖所示:
執行環境和工具實例化#
通過 with
創建 session 時,就要開始實例化 jupyter kernel 以及 agent executor,以下是一些關鍵的步驟:
- 通過
jupyter-kernel-gateway
創建與 Jupyter kernel 通信的服務並檢測啟動成功的狀態
self.jupyter = await asyncio.create_subprocess_exec(
python,
"-m",
"jupyter",
"kernelgateway",
"--KernelGatewayApp.ip='0.0.0.0'",
f"--KernelGatewayApp.port={self.port}",
stdout=out,
stderr=out,
cwd=".codebox",
)
self._jupyter_pids.append(self.jupyter.pid)
# ...
while True:
try:
response = await self.aiohttp_session.get(self.kernel_url)
if response.status == 200:
break
except aiohttp.ClientConnectorError:
pass
except aiohttp.ServerDisconnectedError:
pass
if settings.VERBOSE:
print("Waiting for kernel to start...")
await asyncio.sleep(1)
await self._aconnect()
指定 stdout 和 stderr,同時記錄進程 pid 以及將這個 kernel 實例關聯到 session 上。在 kernel 創建好以後,再發送 HTTP 請求,建立與 kernel 之間的 websocket 連接。
- 創建 Agent Executor
def _agent_executor(self) -> AgentExecutor:
return AgentExecutor.from_agent_and_tools(
agent=self._choose_agent(),
max_iterations=9,
tools=self.tools,
verbose=self.verbose,
memory=ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
chat_memory=self._history_backend(),
),
)
def _choose_agent(self) -> BaseSingleActionAgent:
return (
OpenAIFunctionsAgent.from_llm_and_tools(
llm=self.llm,
tools=self.tools,
system_message=code_interpreter_system_message,
extra_prompt_messages=[
MessagesPlaceholder(variable_name="chat_history")
],
)
# ...
)
def _tools(self, additional_tools: list[BaseTool]) -> list[BaseTool]:
return additional_tools + [
StructuredTool(
name="python",
description="Input a string of code to a ipython interpreter. "
"Write the entire code in a single string. This string can "
"be really long, so you can use the `;` character to split lines. "
"Variables are preserved between runs. ",
func=self._run_handler, # 調用 CodeBox 同步執行
coroutine=self._arun_handler, # 調用 CodeBox 異步執行
args_schema=CodeInput,
),
]
這裡定義了使用 OpenAIFunctionsAgent,有需要我們可換成自己的 Agent,只是現在只有 OpenAI 的 API 有方便又強大的 Function Calling 能力,所以我們以這個舉例。這裡還同時指定了 Agent 以及 Agent Executor 會用到的 Tool,Tool 的定義包含名稱和描述以及其它用於執行 python 代碼的參數,其中上一步創建的 jupyter kernel 實例被 CodeBox 做了進一步封裝,再作為同步和異步調用方法傳遞到 Tool 中。
處理輸入的文本和文件#
因為 Prompt Engineering 是一個可以外置的步驟,用戶使用時應該將 Prompt 構建好再傳進來,所以這一步框架本身沒有做太多工作,只是將傳入的文本和文件做了簡單的 Prompt 附加(比如給到 LLM 用戶指定了使用哪些文件),同時將文件記錄到 CodeBox 實例中方便後續執行。
class UserRequest(HumanMessage):
files: list[File] = []
def __str__(self):
return self.content
def __repr__(self):
return f"UserRequest(content={self.content}, files={self.files})"
def _input_handler(self, request: UserRequest) -> None:
"""Callback function to handle user input."""
if not request.files:
return
if not request.content:
request.content = (
"I uploaded, just text me back and confirm that you got the file(s)."
)
request.content += "\n**The user uploaded the following files: **\n"
for file in request.files:
self.input_files.append(file)
request.content += f"[Attachment: {file.name}]\n"
self.codebox.upload(file.name, file.content)
request.content += "**File(s) are now available in the cwd. **\n"
執行和結果處理#
通過 Agent Executor,已經能夠實現自動的 prompt 到 code 的轉化,下面我們來看看這段代碼具體怎麼執行:
def _connect(self) -> None:
response = requests.post(
f"{self.kernel_url}/kernels",
headers={"Content-Type": "application/json"},
timeout=90,
)
self.kernel_id = response.json()["id"]
if self.kernel_id is None:
raise Exception("Could not start kernel")
self.ws = ws_connect_sync(f"{self.ws_url}/kernels/{self.kernel_id}/channels")
首先要通過 websocket 與特定的 kernel 進行連接,
self.ws.send(
json.dumps(
{
"header": {
"msg_id": (msg_id := uuid4().hex),
"msg_type": "execute_request",
},
"content": {
"code": code,
# ...
},
# ...
}
)
)
之後通過 websocket 將代碼發送給 kernel 執行,
while True:
# ...
if (
received_msg["header"]["msg_type"] == "stream"
and received_msg["parent_header"]["msg_id"] == msg_id
):
msg = received_msg["content"]["text"].strip()
if "Requirement already satisfied:" in msg:
continue
result += msg + "\n"
if settings.VERBOSE:
print("Output:\n", result)
elif (
received_msg["header"]["msg_type"] == "execute_result"
and received_msg["parent_header"]["msg_id"] == msg_id
):
result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
if settings.VERBOSE:
print("Output:\n", result)
elif received_msg["header"]["msg_type"] == "display_data":
if "image/png" in received_msg["content"]["data"]:
return CodeBoxOutput(
type="image/png",
content=received_msg["content"]["data"]["image/png"],
)
if "text/plain" in received_msg["content"]["data"]:
return CodeBoxOutput(
type="text",
content=received_msg["content"]["data"]["text/plain"],
)
return CodeBoxOutput(
type="error",
content="Could not parse output",
)
之後就是處理 channel message 中的一系列返回了:
- msg_type: stream,msg 中有 Requirement already satisfied: 則追加輸出內容,繼續等待 ws 返回
- msg_type: execute_result,將
msg["content"]["data"]["text/plain"]
追加到輸出內容,繼續等待 - msg_type: display_data,獲取
msg["content"]["data"]
,如果有 image/png,則把 png 包在 CodeBoxOutput 裡返回,如果是 text/plain,同理返回。否則返回錯誤類型的輸出,顯示無法解析輸出結果 - msg_type: status, execution_state: idle:代碼執行成功但沒有輸出結果
- msg_type: error:直接報錯
輸出結果#
在得到上述 CodeBoxOutput
輸出後,就可以對輸出進行處理了:對於文本類型的輸出,不需要額外的處理,在運行過程中就通過 stdout 輸出出來了;對於文件系統的操作,因為是通過 jupyter 直接進行的,所以也不需要在框架中額外處理,會自動落成本地的文件,直到需要對輸出文件進行描述、解釋,才需要做額外的處理。對於圖片類型的輸出,會在執行過程中將返回的圖片 base64 保存在會話的 out_files
中,最後做輸出處理時再轉換為 Python 中標準的 Image 類型,之後通過 IPython 的 display 方法展示出來。
def get_image(self):
# ...
img_io = BytesIO(self.content)
img = Image.open(img_io)
# Convert image to RGB if it's not
if img.mode not in ("RGB", "L"): # L is for greyscale images
img = img.convert("RGB")
return img
def show_image(self):
img = self.get_image()
# Display the image
try:
# Try to get the IPython shell if available.
shell = get_ipython().__class__.__name__ # type: ignore
# If the shell is in a Jupyter notebook or similar.
if shell == "ZMQInteractiveShell" or shell == "Shell":
from IPython.display import display # type: ignore
display(img)
else:
img.show()
except NameError:
img.show()
內部落地的設想#
如果要在公司內部落地 Code Interpreter,以下是一些我們可以關注到的點:
- 服務化或方案化
- 為平台已有模塊提供基礎執行能力或組件
- 給到內部需要做 Code Interpreter 的團隊相關方案
- 對接內部模型
- 對接內部系統和環境,實現自動 API 調用
- 支持非 LangChain 的開放技術棧
寫在最後#
這次只是從大體流程和設計上過了一下 Code Interpreter 的實現方案,但裡面還有很多微小但重要的細節沒有討論到,比如:怎麼在網頁而不是本地進行輸出、遇到執行報錯後怎麼反饋給模型重新生成代碼、遇到依賴包未安裝怎麼自動安裝然後重新執行等,這都是一個健壯的 Code Interpreter 必須要考慮和實現的東西,社區的方案目前也是 MVP 版本,對 edge case 的處理和考慮離實際生產應用還有一些差距,要實現一個完善、可用於生產的 Code Interpreter,還有不少路要走,手還得弄得更髒些。