跳到主要内容

管道 (Pipes)

管道 (Pipes)

管道 (Pipes) 是独立的函数,用于处理输入并生成响应,它们可以通过调用一个或多个大语言模型 (LLM) 或外部服务来处理任务,然后再将最终结果返回给用户。您可以通过管道实现的操作包括:检索增强生成 (RAG)、向非 OpenAI 格式的 LLM 服务商(例如 Anthropic、Azure OpenAI 或 Google)发送请求,或者直接在您的 Web UI 界面中执行特定函数。管道可以直接作为 Function 托管,也可以部署在 Pipelines 服务器上运行。我们已在 Pipelines 仓库中维护了一份管道示例列表。其通用工作流如下图所示。

Pipe 工作流

在您的 WebUI 中配置的 Pipes 会作为一个带有 “External(外部)”标记的新模型显示在模型列表中。下图展示了两个 Pipe 模型(Database RAG PipelineDOOM)与两个自托管模型的并列显示效果:

WebUI 中的 Pipe 模型


流式响应格式

Pipes 可以返回单个 str 纯文本,也可以返回一个迭代器/生成器。在流式传输时,每个 yield 产出的项可以为:

  • 普通字符串 (str) —— 被视为助手可见的文本内容,并在到达时直接附加到消息末尾。这是最简单的形式,也是绝大多数智能体流水线在处理常规文本输出时应当采用的方式。

  • 兼容 OpenAI 规范的 SSE chunk 字典 —— 结构与标准 /v1/chat/completions 流式响应包完全一致,例如:

    {"choices": [{"delta": {"content": "..."}, "finish_reason": None}]}

    当您需要设置除 content 之外的其他字段时(例如在最后一个分块上设置 finish_reason 标志),请使用此形式。

对于一个自包含的流式输出,请在末尾 yield 一个代表流结束的终止分块来关闭它:

yield {"choices": [{"delta": {}, "finish_reason": "stop"}]}

finish_reason 字段应当仅出现一次(在流的最末尾),且对于自行处理其内部工具执行的流水线来说,它的值应当始终为 "stop" —— 而绝非 "tool_calls"(详情请参阅下一节)。


独立智能体与 delta.tool_calls

这是在构建独立智能体流水线(如 LangChain、LlamaIndex、自定义规划器,或任何自行执行其内部工具并流式返回结果的系统)时,最容易踩中的超级大坑

在响应分块中返回 delta.tool_calls 代表 “客户端,请帮我执行这个工具调用”。当 Open WebUI 的中间件截获到该字段时,系统工具执行器便会接管该调用并运行它,追加一条 role: "tool" 消息,并向同一个 pipeline 发起一次接续请求。系统会在一个由 CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES(约 30 次)控制的上限内循环执行此操作。

如果您的 pipeline 在内部已经执行完了该工具,此时再输出 delta.tool_calls 会导致 Open WebUI 尝试再次去执行它 —— 由于 pipeline 在每次重试中持续输出相同的调用,这会导致您在触发重试上限前,在界面上看到 30 份一模一样的响应层叠在一起。如果您在流式处理的中途设置了 finish_reason: "tool_calls",也会发生完全一样的情况。

黄金法则:

  • 模型正在调用一个应当由 Open WebUI 平台来运行的工具 → 输出 delta.tool_calls,以 finish_reason: "tool_calls" 结束,让 Open WebUI 中间件执行该工具并重新进入您的 pipeline。
  • pipeline 正在运行一个管理和调用自身工具的独立智能体 → 千万不要输出 delta.tool_calls。将工具的执行过程作为文本内容,利用下面介绍的 <details type="tool_calls"> 标签进行渲染。

将工具执行渲染为消息内容

Open WebUI 自带的服务端工具调用路径,会将已完成的工具执行作为 <details type="tool_calls"> HTML 块渲染在消息内容中。您也可以从智能体 pipeline 中输出完全一致的 HTML 块,从而获得一模一样的 “Called <tool>”(调用了某工具)的标签按钮,并支持点击展开查看入参和出参:

import html
import json

call_id = "call_123"
name = "get_weather_test"
arguments = {"location": "SF"}
result = {"temp_c": 22}

details_block = (
    f'<details type="tool_calls" done="true" '
    f'id="{call_id}" name="{name}" '
    f'arguments="{html.escape(json.dumps(arguments))}">\n'
    f'<summary>Tool Executed</summary>\n'
    f'{html.escape(json.dumps(result, ensure_ascii=False))}\n'
    f'</details>\n'
)

直接将 details_block 作为内容 yield 产出即可 —— 可以直接作为一个字符串(在 Pipelines 服务器上最简单),也可以放在 delta.content 字典中:

# 最简单 —— 适用于 Pipelines 服务器:
yield details_block

# 或者作为一个显式的 OpenAI chunk 字典:
yield {"choices": [{"delta": {"content": details_block}, "finish_reason": None}]}

对于一个运行了一个工具的自包含智能体,其最终的流式输出完整实现如下所示:

def pipe(self, user_message, model_id, messages, body):
    # 1. 工具执行前的叙述
    yield {"choices": [{"delta": {"role": "assistant", "content": "正在为您查询天气… "}, "finish_reason": None}]}

    # 2. 智能体在内部运行了工具(省略具体实现)
    call_id = "call_123"
    name = "get_weather_test"
    arguments = {"location": "SF"}
    result = {"temp_c": 22}

    # 3. 将工具执行过程渲染为一个 <details> HTML 块 —— 切勿使用 delta.tool_calls
    details_block = (
        f'<details type="tool_calls" done="true" '
        f'id="{call_id}" name="{name}" '
        f'arguments="{html.escape(json.dumps(arguments))}">\n'
        f'<summary>Tool Executed</summary>\n'
        f'{html.escape(json.dumps(result, ensure_ascii=False))}\n'
        f'</details>\n'
    )
    yield details_block

    # 4. 工具执行后的后续回复
    yield "查询完毕,当前天气是 22°C。"

    # 5. 单个流终止分块
    yield {"choices": [{"delta": {}, "finish_reason": "stop"}]}

LangChain 智能体示例

将一个 LangChain 智能体接入此模式的范例 —— 丢弃 AIMessageChunk 上的 tool_calls 属性,并将 ToolMessage 渲染为 <details> 块:

import html
import json

for chunk in agent.stream({"messages": messages}, stream_mode=["updates", "messages"]):
    if chunk["type"] != "messages":
        continue
    message = chunk["data"][0]

    if isinstance(message, AIMessageChunk):
        # 仅流式输出正文文本内容 —— 彻底丢弃 message.tool_calls 字段
        if message.content:
            yield message.content

    elif isinstance(message, ToolMessage):
        args = getattr(message, "args", {}) or {}
        details = (
            f'<details type="tool_calls" done="true" '
            f'id="{message.tool_call_id}" name="{message.name}" '
            f'arguments="{html.escape(json.dumps(args))}">\n'
            f'<summary>Tool Executed</summary>\n'
            f'{html.escape(json.dumps(message.content, ensure_ascii=False, default=str))}\n'
            f'</details>\n'
        )
        yield details

# 单个流终止分块
yield {"choices": [{"delta": {}, "finish_reason": "stop"}]}

参考讨论:open-webui #23957 详细分析了由于工具重复调用引发的症状及相应的解决方案细节。

This content is for informational purposes only and does not constitute a warranty, guarantee, or contractual commitment. Open WebUI is provided "as is." See your license for applicable terms.