James Tsang

James Tsang

A developer.
github
twitter
tg_channel

如何實現自己的 ChatGPT 代碼解釋器

Code Interpreter 是什麼?#

在前一段時間 ChatGPT Code Interpreter 推出後,大家應該都已經對它有了一些認識,知道它是什麼、可以做什麼,所以這裡不再重複解釋這些基礎問題,這次我們換一個角度,從流程和要素的角度來看看怎麼理解 Code Interpreter。

Mermaid Loading...

在常規的 ChatGPT 交互中,流程和要素是:Prompt => Output Text,這也是為什麼 ChatGPT 剛推出的時候馬上湧現出了 Prompt Engineering 的概念以及配套構建 Prompt 的 embedding 工作,因為它的流程短,要素簡單,構造出一個好的 Prompt 就是這個流程中的關鍵。對於 Code Interpreter 來說,它的流程和要素是這樣的:Prompt => Code ==Interpreter==> Output (Description, Image, File...)。這帶來了一些變化:

  1. Prompt 構造不再是直接面向輸出,而是生成中間代碼
  2. 需要一個代碼解釋器,能夠區分會話、執行代碼,保存中間變量等
  3. 輸出變得更加多樣,可以是圖片、文件等

為什麼要實現 Code Interpreter#

ChatGPT 已經實現了 Code Interpreter,依托於 OpenAI 的 GPT 模型,能力也很強大,為什麼還需要實現一個自己的 Code Interpreter?除了向行業領頭羊看齊以及對接公司內部模型能力外,我們還可以想一下自己來實現會有什麼增量。可以看到的典型增量有:

  1. 能夠與實時數據交互:ChatGPT 的 Code Interpreter 不具有 Plugins 的聯網能力,開啟 Code Interpreter 後就不能再選中 plugin,這導致 ChatGPT Code Interpreter 的數據實時性不足,不能做類似於 “將 2023 年蘋果公司的股市表現繪製成圖” 這樣的事情
  2. 能夠與更多環境交互:本地或雲端部署之後,就擁有了更自由的環境,無論是操作文件系統,還是調用 API,安裝 ChatGPT Code Interpreter 不支持的包等,都成為了可能

實現 Code Interpreter 的思路#

要實現 Code Interpreter 有兩個核心關注點:一是借助模型能力,比如 OpenAI API 的 Function Calling 能力產出要調用的代碼;二是擁有一個能執行 Python 代碼的環境,例如用戶指定要輸出一個正弦函數圖,那麼需要得到繪製正弦函數圖的代碼,然後送到 Python 解釋器中執行,輸出圖片然後展示給用戶。在這個過程中 LLM Agent 可能還要對結果做一些解釋和細節補充。此外還需要考慮文件 IO 以及會話中變量保存的能力。

如果用 LangChain 來實現的話,這會更加方便,這裡 Python 解釋器和文件 IO、變量保存都可以看作是 LangChain 的一個 Tool,把它塞到 LangChain 裡 Agent Executor 裡來調用。按照這個思路社區已經有了開源的實現:codebox-api ,可以把它註冊成 LangChain Tool。除了提供代碼執行的核心能力的 Tool,我們還需要一些周邊來實現:會話管理、發起 Kernel 調用、文件 IO,即每當創建一個新的會話時,就創建一個新的 Jupyter Kernel channel 來執行代碼,之後將執行結果按照輸出的類型分門別類反饋給用戶,這一部分上面的 codebox-api 的作者也把它封裝成了解決方案:codeinterpreter-api

實現 Code Interpreter 的設計#

接下來是對 codeinterpreter-api 項目的梳理拆解,看下具體怎麼設計和實現上述的思路。由於項目主要是用 LangChain 來編排整個流程的,這裡先補充說明一下項目用到 LangChain 部分的基礎概念:

LangChain 中的部分基礎概念:#

  • LangChain Agents:LangChain 中的一個基礎模塊,核心思想是用 LLM 來選擇要採取的一系列動作。與硬編碼在鏈(chains)中的動作序列不同,代理(agents)使用語言模型作為推理引擎來決定要採取的動作及其順序。
  • LangChain Tools:Tools 是 Agent 調用的能力。主要有兩個方面需要考慮:給 Agent 提供正確的工具,並以對 Agent 最有幫助的方式描述這些工具。在面向 Code Interpreter 創建自定義 StructuredTool 時需要定義:name, description, func (同步調用的函數), coroutine (異步調用的函數), args_schema (輸入的 schema)
  • LangChain Agent Executor:Agent Executor 是 Agent 的運行時。它實際上調用 Agent 並執行它選擇的動作。這個執行器還做了一些額外的降低複雜性的工作,例如處理:Agent 選擇不存在的 Tool、Tool 出錯、Agent 產生無法解析為 Tool 調用的輸出等情況

流程設計與實現#

在有了上述基礎概念之後,就可以來看怎麼基於 LangChain Agent 實現 Code Interpreter 了,我們通過下面這段代碼來看具體的執行流程:

from codeinterpreterapi import CodeInterpreterSession, File

async def main():
    # context manager for start/stop of the session
    async with CodeInterpreterSession(model="gpt-3.5-turbo") as session:
        # define the user request
        user_request = "Analyze this dataset and plot something interesting about it."
        files = [
            File.from_path("examples/assets/iris.csv"),
        ]
        # generate the response
        response = await session.generate_response(user_request, files=files)
        # output the response (text + image)
        response.show()

if __name__ == "__main__":
    import asyncio
    # run the async function
    asyncio.run(main())

效果如圖所示:

Kapture

執行環境和工具實例化#

通過 with 創建 session 時,就要開始實例化 jupyter kernel 以及 agent executor,以下是一些關鍵的步驟:

  1. 通過 jupyter-kernel-gateway 創建與 Jupyter kernel 通信的服務並檢測啟動成功的狀態
self.jupyter = await asyncio.create_subprocess_exec(
	python,
	"-m",
	"jupyter",
	"kernelgateway",
	"--KernelGatewayApp.ip='0.0.0.0'",
	f"--KernelGatewayApp.port={self.port}",
	stdout=out,
	stderr=out,
	cwd=".codebox",
)
self._jupyter_pids.append(self.jupyter.pid)

# ...
while True:
	try:
		response = await self.aiohttp_session.get(self.kernel_url)
		if response.status == 200:
			break
	except aiohttp.ClientConnectorError:
		pass
	except aiohttp.ServerDisconnectedError:
		pass
	if settings.VERBOSE:
		print("Waiting for kernel to start...")
	await asyncio.sleep(1)
await self._aconnect()

指定 stdout 和 stderr,同時記錄進程 pid 以及將這個 kernel 實例關聯到 session 上。在 kernel 創建好以後,再發送 HTTP 請求,建立與 kernel 之間的 websocket 連接。

  1. 創建 Agent Executor
def _agent_executor(self) -> AgentExecutor:
	return AgentExecutor.from_agent_and_tools(
		agent=self._choose_agent(),
		max_iterations=9,
		tools=self.tools,
		verbose=self.verbose,
		memory=ConversationBufferMemory(
			memory_key="chat_history",
			return_messages=True,
			chat_memory=self._history_backend(),
		),
	)

def _choose_agent(self) -> BaseSingleActionAgent:
	return (
		OpenAIFunctionsAgent.from_llm_and_tools(
			llm=self.llm,
			tools=self.tools,
			system_message=code_interpreter_system_message,
			extra_prompt_messages=[
				MessagesPlaceholder(variable_name="chat_history")
			],
		)
		# ...
	)

def _tools(self, additional_tools: list[BaseTool]) -> list[BaseTool]:
	return additional_tools + [
		StructuredTool(
			name="python",
			description="Input a string of code to a ipython interpreter. "
			"Write the entire code in a single string. This string can "
			"be really long, so you can use the `;` character to split lines. "
			"Variables are preserved between runs. ",
			func=self._run_handler, # 調用 CodeBox 同步執行
			coroutine=self._arun_handler, # 調用 CodeBox 異步執行
			args_schema=CodeInput,
		),
	]

這裡定義了使用 OpenAIFunctionsAgent,有需要我們可換成自己的 Agent,只是現在只有 OpenAI 的 API 有方便又強大的 Function Calling 能力,所以我們以這個舉例。這裡還同時指定了 Agent 以及 Agent Executor 會用到的 Tool,Tool 的定義包含名稱和描述以及其它用於執行 python 代碼的參數,其中上一步創建的 jupyter kernel 實例被 CodeBox 做了進一步封裝,再作為同步和異步調用方法傳遞到 Tool 中。

處理輸入的文本和文件#

因為 Prompt Engineering 是一個可以外置的步驟,用戶使用時應該將 Prompt 構建好再傳進來,所以這一步框架本身沒有做太多工作,只是將傳入的文本和文件做了簡單的 Prompt 附加(比如給到 LLM 用戶指定了使用哪些文件),同時將文件記錄到 CodeBox 實例中方便後續執行。

class UserRequest(HumanMessage):
    files: list[File] = []

    def __str__(self):
        return self.content

    def __repr__(self):
        return f"UserRequest(content={self.content}, files={self.files})"

def _input_handler(self, request: UserRequest) -> None:
	"""Callback function to handle user input."""
	if not request.files:
		return
	if not request.content:
		request.content = (
			"I uploaded, just text me back and confirm that you got the file(s)."
		)
	request.content += "\n**The user uploaded the following files: **\n"
	for file in request.files:
		self.input_files.append(file)
		request.content += f"[Attachment: {file.name}]\n"
		self.codebox.upload(file.name, file.content)
	request.content += "**File(s) are now available in the cwd. **\n"

執行和結果處理#

通過 Agent Executor,已經能夠實現自動的 prompt 到 code 的轉化,下面我們來看看這段代碼具體怎麼執行:

def _connect(self) -> None:
	response = requests.post(
		f"{self.kernel_url}/kernels",
		headers={"Content-Type": "application/json"},
		timeout=90,
	)
	self.kernel_id = response.json()["id"]
	if self.kernel_id is None:
		raise Exception("Could not start kernel")

	self.ws = ws_connect_sync(f"{self.ws_url}/kernels/{self.kernel_id}/channels")

首先要通過 websocket 與特定的 kernel 進行連接,

self.ws.send(
	json.dumps(
		{
			"header": {
				"msg_id": (msg_id := uuid4().hex),
				"msg_type": "execute_request",
			},
			"content": {
				"code": code,
				# ...
			},
			# ...
		}
	)
)

之後通過 websocket 將代碼發送給 kernel 執行,

while True:
    # ...
	if (
		received_msg["header"]["msg_type"] == "stream"
		and received_msg["parent_header"]["msg_id"] == msg_id
	):
		msg = received_msg["content"]["text"].strip()
		if "Requirement already satisfied:" in msg:
			continue
		result += msg + "\n"
		if settings.VERBOSE:
			print("Output:\n", result)

	elif (
		received_msg["header"]["msg_type"] == "execute_result"
		and received_msg["parent_header"]["msg_id"] == msg_id
	):
		result += received_msg["content"]["data"]["text/plain"].strip() + "\n"
		if settings.VERBOSE:
			print("Output:\n", result)

	elif received_msg["header"]["msg_type"] == "display_data":
		if "image/png" in received_msg["content"]["data"]:
			return CodeBoxOutput(
				type="image/png",
				content=received_msg["content"]["data"]["image/png"],
			)
		if "text/plain" in received_msg["content"]["data"]:
			return CodeBoxOutput(
				type="text",
				content=received_msg["content"]["data"]["text/plain"],
			)
		return CodeBoxOutput(
			type="error",
			content="Could not parse output",
		)

之後就是處理 channel message 中的一系列返回了:

  • msg_type: stream,msg 中有 Requirement already satisfied: 則追加輸出內容,繼續等待 ws 返回
  • msg_type: execute_result,將 msg["content"]["data"]["text/plain"] 追加到輸出內容,繼續等待
  • msg_type: display_data,獲取 msg["content"]["data"],如果有 image/png,則把 png 包在 CodeBoxOutput 裡返回,如果是 text/plain,同理返回。否則返回錯誤類型的輸出,顯示無法解析輸出結果
  • msg_type: status, execution_state: idle:代碼執行成功但沒有輸出結果
  • msg_type: error:直接報錯

輸出結果#

在得到上述 CodeBoxOutput 輸出後,就可以對輸出進行處理了:對於文本類型的輸出,不需要額外的處理,在運行過程中就通過 stdout 輸出出來了;對於文件系統的操作,因為是通過 jupyter 直接進行的,所以也不需要在框架中額外處理,會自動落成本地的文件,直到需要對輸出文件進行描述、解釋,才需要做額外的處理。對於圖片類型的輸出,會在執行過程中將返回的圖片 base64 保存在會話的 out_files 中,最後做輸出處理時再轉換為 Python 中標準的 Image 類型,之後通過 IPython 的 display 方法展示出來。

def get_image(self):
    # ...
	img_io = BytesIO(self.content)
	img = Image.open(img_io)

	# Convert image to RGB if it's not
	if img.mode not in ("RGB", "L"):  # L is for greyscale images
		img = img.convert("RGB")

	return img

def show_image(self):
	img = self.get_image()
	# Display the image
	try:
		# Try to get the IPython shell if available.
		shell = get_ipython().__class__.__name__  # type: ignore
		# If the shell is in a Jupyter notebook or similar.
		if shell == "ZMQInteractiveShell" or shell == "Shell":
			from IPython.display import display  # type: ignore

			display(img)
		else:
			img.show()
	except NameError:
		img.show()

內部落地的設想#

如果要在公司內部落地 Code Interpreter,以下是一些我們可以關注到的點:

  1. 服務化或方案化
    1. 為平台已有模塊提供基礎執行能力或組件
    2. 給到內部需要做 Code Interpreter 的團隊相關方案
  2. 對接內部模型
  3. 對接內部系統和環境,實現自動 API 調用
  4. 支持非 LangChain 的開放技術棧

寫在最後#

這次只是從大體流程和設計上過了一下 Code Interpreter 的實現方案,但裡面還有很多微小但重要的細節沒有討論到,比如:怎麼在網頁而不是本地進行輸出、遇到執行報錯後怎麼反饋給模型重新生成代碼、遇到依賴包未安裝怎麼自動安裝然後重新執行等,這都是一個健壯的 Code Interpreter 必須要考慮和實現的東西,社區的方案目前也是 MVP 版本,對 edge case 的處理和考慮離實際生產應用還有一些差距,要實現一個完善、可用於生產的 Code Interpreter,還有不少路要走,手還得弄得更髒些。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。