跳到主要内容

🚰 Pipe 函数:创建自定义“Agents/模型”

⚠️ 重要安全警告

Pipe 函数会在您的服务器上执行任意 Python 代码。 函数的创建权限仅限于管理员。请仅安装来自信任源的函数,并在导入前仔细审核代码。恶意的 Function 可能会访问您的文件系统、窃取数据或破坏您的整个系统。有关完整详情,请参阅 Plugin 安全警告

欢迎阅读关于在 Open WebUI 中创建 Pipes 的指南!可以将 Pipes 视作向 Open WebUI 添加新模型的一种方式。在本篇文档中,我们将详细剖析 Pipe 是什么、它是如何工作的,以及您如何创建自己的 Pipe,以向您的 Open WebUI 模型中添加自定义逻辑和处理流程。我们将使用形象的比喻并探讨每一个细节,以确保您能有全面的理解。

建议使用异步函数以实现未来兼容性

Pipe 函数通常应当定义为 async(异步),以确保与未来的 Open WebUI 版本兼容。后端正在逐步走向完全的异步执行,同步函数可能会阻塞执行,或在未来的版本中导致各种问题。如有疑问,请将您的 pipe 函数设为异步。

Pipe 简介

将 Open WebUI 想象为一个管道系统,数据流经各种管道和阀门。在这个比喻中:

  • Pipes(管道)就像是插件,允许您引入新的数据流动路径,从而让您能够注入自定义的逻辑和数据处理。
  • Valves(阀门)则是您管道中可配置的部分,用以控制数据如何流过它。

通过创建 Pipe,您实际上是在 Open WebUI 框架内,精心打造一个具有您所期望特定行为的自定义模型。


理解 Pipe 结构

让我们从一个基本的、最简版的 Pipe 开始,以理解其结构:

from pydantic import BaseModel, Field

class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(default="")

    def __init__(self):
        self.valves = self.Valves()

    async def pipe(self, body: dict):
        # 具体的逻辑在此处实现
        print(self.valves, body)  # 这将打印配置选项以及输入的 body 请求体
        return "Hello, World!"

Pipe 类

  • 定义Pipe 类是您定义自定义逻辑的地方。
  • 目的:作为您插件的蓝图,决定它在 Open WebUI 中的具体行为方式。

Valves:配置您的 Pipe

  • 定义ValvesPipe 内部的一个嵌套类,继承自 Pydantic 的 BaseModel
  • 目的:它包含了在使用您的 Pipe 时能够持久保存的配置选项(参数)。
  • 示例:在上述代码中,MODEL_ID 是一个配置选项,默认值为空字符串。

比喻:将 Valves 想象为真实管道系统上的旋钮,用以控制水流。在您的 Pipe 中,Valves 允许用户调整那些会影响数据流动和处理方式的设置。

__init__ 方法 {#the-init-method}

  • 定义Pipe 类的构造函数。
  • 目的:初始化 Pipe 的状态并建立任何必要的组件。
  • 最佳实践:保持简单;在这里主要只需初始化 self.valves
def __init__(self):
    self.valves = self.Valves()

pipe 函数

  • 定义:您的自定义逻辑所在的底层核心函数。
  • 参数
    • body:包含输入数据的字典。
  • 目的:使用您的自定义逻辑处理输入数据并返回结果。
async def pipe(self, body: dict):
    # 具体的逻辑在此处实现
    print(self.valves, body)  # 这将打印配置选项以及输入的 body 请求体
    return "Hello, World!"

注意:始终将 Valves 类放在您 Pipe 类的最顶部,紧接着是 __init__ 方法,然后是 pipe 函数。这种结构确保了清晰性与一致性。


利用 Pipes 创建多个模型

如果您希望您的 Pipe 在 Open WebUI 内部创建多个模型怎么办?您可以通过在 Pipe 类中定义一个 pipes 函数或变量来实现。这种形式在非正式场合被称为 manifold(歧管),它允许您的 Pipe 代表多个模型。

以下是实现方法:

from pydantic import BaseModel, Field

class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(default="")

    def __init__(self):
        self.valves = self.Valves()

    def pipes(self):
        return [
            {"id": "model_id_1", "name": "model_1"},
            {"id": "model_id_2", "name": "model_2"},
            {"id": "model_id_3", "name": "model_3"},
        ]

    async def pipe(self, body: dict):
        # 具体的逻辑在此处实现
        print(self.valves, body)  # 打印配置选项以及输入的 body 请求体
        model = body.get("model", "")
        return f"{model}: Hello, World!"

说明

  • pipes 函数

    • 返回一个字典列表。
    • 每个字典代表一个包含唯一 idname 键的模型。
    • 这些模型将分别显示在 Open WebUI 的模型选择器中。
  • 更新后的 pipe 函数

    • 根据所选的模型处理输入。
    • 在本示例中,它在返回的字符串中包含了模型名称。

示例:OpenAI 代理 Pipe

让我们深入一个非常实用的示例,我们将创建一个将请求代理(proxy)到 OpenAI API 的 Pipe。该 Pipe 会从 OpenAI 获取可用的模型,并允许用户通过 Open WebUI 与它们进行交互。

from pydantic import BaseModel, Field
import httpx


class Pipe:
    class Valves(BaseModel):
        NAME_PREFIX: str = Field(
            default="OPENAI/",
            description="加在模型名称前面的前缀。",
        )
        OPENAI_API_BASE_URL: str = Field(
            default="https://api.openai.com/v1",
            description="访问 OpenAI API 端点的基础 URL。",
        )
        OPENAI_API_KEY: str = Field(
            default="",
            description="用于验证发往 OpenAI API 请求的 API 密钥。",
        )

    def __init__(self):
        self.valves = self.Valves()

    async def pipes(self):
        if not self.valves.OPENAI_API_KEY:
            return [{"id": "error", "name": "未提供 API 密钥。"}]

        headers = {
            "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    f"{self.valves.OPENAI_API_BASE_URL}/models", headers=headers
                )
                r.raise_for_status()
                models = r.json()

            return [
                {
                    "id": model["id"],
                    "name": f'{self.valves.NAME_PREFIX}{model.get("name", model["id"])}',
                }
                for model in models["data"]
                if "gpt" in model["id"]
            ]
        except Exception:
            return [
                {
                    "id": "error",
                    "name": "获取模型出错。请检查您的 API 密钥。",
                }
            ]

    async def pipe(self, body: dict, __user__: dict):
        print(f"pipe:{__name__}")
        headers = {
            "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
            "Content-Type": "application/json",
        }

        # 从模型名称中提取实际的 model id
        model_id = body["model"][body["model"].find(".") + 1 :]

        # 在请求体中更新 model id
        payload = {**body, "model": model_id}
        url = f"{self.valves.OPENAI_API_BASE_URL}/chat/completions"

        try:
            if body.get("stream", False):
                async def event_stream():
                    async with httpx.AsyncClient(timeout=None) as client:
                        async with client.stream(
                            "POST", url, json=payload, headers=headers
                        ) as r:
                            r.raise_for_status()
                            async for line in r.aiter_lines():
                                yield line

                return event_stream()

            async with httpx.AsyncClient(timeout=None) as client:
                r = await client.post(url, json=payload, headers=headers)
                r.raise_for_status()
                return r.json()
        except Exception as e:
            return f"Error: {e}"
推荐使用异步的 HTTP 客户端

此示例使用了 httpx.AsyncClient 而不是经典的 requests 库,因为 pipes()pipe() 都是在 Open WebUI 的异步事件循环中运行的。如果在 async def 方法中调用同步的 requests 库,会阻塞整个事件循环(在流式传输时甚至会阻塞整个流的持续时间),从而使实例上其他并发请求全部陷入停滞。httpx 是异步原生的,并且已经是现成的依赖项,可以直接作为常见模式的无缝替代品。

如果您必须在异步处理器中使用同步的第三方库,请使用 await anyio.to_thread.run_sync(...) 包装该阻塞调用,以便它在工作线程(worker thread)而非主事件循环上运行。

详细拆解

Valves 配置

  • NAME_PREFIX
    • 为 Open WebUI 中显示的模型名称添加前缀。
    • 默认值:"OPENAI/"
  • OPENAI_API_BASE_URL
    • 指定 OpenAI API 的基础 URL。
    • 默认值:"https://api.openai.com/v1"
  • OPENAI_API_KEY
    • 您的 OpenAI API 密钥,用于身份验证。
    • 默认值:""(空字符串;必须在使用前提供)。

pipes 函数

  • 目的:获取所有可用的 OpenAI 模型,并使它们可以在 Open WebUI 中被访问。

  • 流程

    1. 检查 API 密钥:确保已经配置了 API 密钥。
    2. 获取模型:向 OpenAI API 发起 GET 请求以检索可用模型。
    3. 过滤模型:仅返回 id 中包含 "gpt" 的模型。
    4. 错误处理:如果出现问题,返回一条错误信息。
  • 返回格式:一个包含每个模型的 idname 的字典列表。

pipe 函数

  • 目的:处理发往所选 OpenAI 模型的请求并返回响应。

  • 参数

    • body:包含请求数据。
    • __user__:包含用户数据(在此示例中未直接使用,但对于身份验证或记录日志非常有用)。
  • 流程

    1. 准备请求头:使用 API 密钥及 content-type 设置 headers。
    2. 提取模型 ID:从选择的模型名称中剥离出实际的模型 ID。
    3. 准备 Payload 负载:用正确的模型 ID 更新 body。
    4. 发起 API 请求:使用 httpx.AsyncClient 向 OpenAI API 的聊天完成端点(chat completions endpoint)发送 POST 请求。
    5. 流式处理:如果 streamTrue,返回一个异步生成器,用于不断产生来自 upstream 响应的 SSE 数据行。
    6. 错误处理:捕获异常并返回错误信息。

扩展代理 Pipe

您可以通过调整 pipespipe 函数中的 API 端点、请求头以及特定的处理逻辑,轻松地修改此代理 Pipe,以支持像 Anthropic、Perplexity 等其他服务提供商。

正在构建一个自包含的 Agent?切勿发送 delta.tool_calls

如果您的 Pipe 封装了一个在内部执行 tools(如 LangChain、LlamaIndex 或自定义规划器等)然后将最终答案流式传回聊天的 Agent,那么在数据流中发送 delta.tool_calls 会触发 Open WebUI 的工具执行重试循环 — 中间件会将 delta.tool_calls 视为“请客户端为我执行此操作”,然后循环重新进入您的 pipe,从而重复产生响应,最多重试 CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES(约 30)次。

对于这类自包含的 Agent,请改将工具执行渲染为 <details type="tool_calls"> 内容块 — 这与 Open WebUI 自身在内部工具执行后发出的形状完全一致。具体的完整模式、LangChain 示例以及路线抉择的经验法则,请参见 Pipes → 自包含的 Agents 与 delta.tool_calls 章节。


使用 Open WebUI 内部函数

有时,您可能希望在您的 Pipe 内部,利用 Open WebUI 本身的内部函数。您可以直接从 open_webui 包中导入这些函数。需要记住的是,尽管概率较低,但出于优化目的,内部函数在未来可能会发生变化,因此请始终参考最新的官方开发文档。

以下是您如何在 Pipe 中使用 Open WebUI 内部函数的示例:

from pydantic import BaseModel, Field
from fastapi import Request

from open_webui.models.users import Users
from open_webui.utils.chat import generate_chat_completion

class Pipe:
    def __init__(self):
        pass

    async def pipe(
        self,
        body: dict,
        __user__: dict,
        __request__: Request,
    ) -> str:
        # 使用带有更新后签名的统一端点
        user = await Users.get_user_by_id(__user__["id"])
        body["model"] = "llama3.2:latest"
        return await generate_chat_completion(__request__, body, user)

说明

  • 导入

    • 来自 open_webui.models.usersUsers:用于获取用户信息。
    • 来自 open_webui.utils.chatgenerate_chat_completion:使用内部逻辑来生成聊天完成响应。
  • 异步的 pipe 函数

    • 参数
      • body:模型的输入数据。
      • __user__:包含用户信息的字典。
      • __request__:来自 FastAPI 的请求对象(generate_chat_completion 所必需的)。
    • 流程
      1. 获取用户对象:使用用户 ID 检索完整的 user 对象。
      2. 设定模型:指定要使用的具体模型。
      3. 生成响应:调用 generate_chat_completion 来处理输入并产生输出。

重要注意事项

  • 函数签名:请经常查阅最新的 Open WebUI 源码或开发文档,以获取最精确的函数签名和参数要求。
  • 最佳实践:务必优雅地处理所有的异常和错误,以确保顺畅的用户体验。

常见问题解答

Q1: 为什么我应该在 Open WebUI 中使用 Pipes?

:Pipes 允许您在 Open WebUI 中添加带有自定义逻辑和数据处理的新“模型”。这是一个非常灵活的插件系统,可以让您集成外部 API、自定义模型行为并打造创新功能,而完全无需更改核心代码库。


Q2: 什么是 Valves,为什么它们很重要?

:Valves 是您的 Pipe 中可配置的参数。它们就像是决定您 Pipe 如何运转的设置或控制项。通过调整 Valves,您无需修改任何底层代码即可改变 Pipe 的行为。


Q3: 我可以创建不带 Valves 的 Pipe 吗?

:可以。如果您的 Pipe 不需要任何持久保存的配置选项,您完全可以创建一个简单的 Pipe 而无需定义 Valves 类。然而,出于灵活性和未来可扩展性的考虑,包含 Valves 是一种良好的开发实践。


Q4: 在使用 API 密钥时,如何确保我的 Pipe 是安全的?

:千万不要将 API 密钥等敏感信息硬编码到您的 Pipe 源码中。相反,请使用 Valves 来安全地输入并存储 API 密钥。确保您的代码恰当地处理这些密钥,避免打印日志或无意泄露它们。


Q5: pipe 和 pipes 函数之间有什么区别?

  • pipe 函数:核心的主函数,您在此处处理输入数据并生成输出。它处理的是单个模型的逻辑。
  • pipes 函数:允许您的 Pipe 通过返回一个模型定义列表,从而代表多个模型。每个模型都将在 Open WebUI 中作为独立模型显示。

Q6: 我该如何在 Pipe 中处理错误?

:在您的 pipepipes 函数中使用 try-except 代码块来捕获异常。返回有意义的错误提示信息,或者优雅地处理错误,以确保用户能清楚地知晓发生了什么问题。


Q7: 我可以在 Pipe 中使用外部库吗?

:可以。您可以根据需要在代码中导入并使用各种外部 Python 库。请确保在您的运行环境中,所有对应的依赖项都已正确安装和配置。


Q8: 我该如何测试我的 Pipe?

:您可以通过在开发环境中运行 Open WebUI 并在界面上选择您的自定义模型来进行测试。验证您的 Pipe 在面对各种输入及不同配置时,是否能如期运转。


Q9: 组织我的 Pipe 代码有什么最佳实践吗?

:是的,遵循以下原则:

  • 始终将 Valves 类保持在您 Pipe 类的顶部。
  • __init__ 方法中初始化变量,主要是 self.valves
  • pipe 函数放置在 __init__ 方法之后。
  • 使用清晰且具描述性的变量名。
  • 为您的代码编写清晰的注释。

Q10: 我可以在哪里找到最新的 Open WebUI 文档?

:访问 Open WebUI 的官方 GitHub 仓库或官方文档网站,以获取最新的信息,包括如果发生任何重大更改时的函数签名、示例以及迁移指南。


结论

通过学习本篇指南,您现在应当对如何在 Open WebUI 中创建和使用 Pipes 有了全面的理解。Pipes 提供了一种强大而简便的方法来扩展和自定义 Open WebUI 的能力,以满足您的特定业务需求。无论是集成外部 API、添加新模型,还是注入复杂的业务逻辑,Pipes 都为您提供了实现这一切的极佳灵活性。

请记住:

  • 在您的 Pipe 类中使用清晰且一致的结构
  • 利用 Valves 来提供可配置的配置选项。
  • 优雅地处理错误以提升用户体验。
  • 查阅最新的开发文档以适应未来的更新与调整。

祝您编码愉快,尽情享受使用 Pipes 扩展 Open WebUI 的旅程!

This content is for informational purposes only and does not constitute a warranty, guarantee, or contractual commitment. Open WebUI is provided "as is." See your license for applicable terms.