苏森AI

  • AI资讯
  • AI应用
  • AI工具
  • AI工作流
  • AI智能体
  • AI提示词
苏森AI
从这里开启你的AI学习旅程!
  1. 首页
  2. AI工作流
  3. 正文

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

2025-06-25 58点热度 0人点赞 0条评论

Dify 是一个开源的 LLM 应用开发平台,提供了直观的界面,结合了 AI 工作流、RAG、Agent、模型管理、可观测性功能等,让用户可以快速从原型到生产。

  • 从零开始学 Dify
  • 从零开始学 Dify-系统架构
  • 从零开始学 Dify- 帐户与租户管理系统设计揭秘
  • 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
  • 从零开始学 Dify- RAG 知识库系统设计详解
  • 从零开始学 Dify- 对话系统的关键功能
  • 从零开始学 Dify- 工作流(Workflow)系统架构
  • 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
  • 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
  • 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
  • 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
  • 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
  • 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误

本文将深入分析 Dify 聊天助手应用的实现机制。

👆👆👆欢迎关注,一起进步👆👆👆

一、聊天助手应用创建流程图

以下流程图展示了在 Dify 中创建聊天助手应用的过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

二、应用创建代码分析

2.1 控制器层

应用创建流程从控制器层开始,主要在api/controllers/console/app/app.py文件中的AppListApi.post方法:

  1. 接收前端请求,解析参数:name、description、mode、icon_type、icon和icon_background
  2. 验证用户权限(要求用户为editor角色)
  3. 验证必要参数,特别是mode参数(可选值:chat、agent-chat、advanced-chat、workflow和completion)
  4. 调用AppService.create_app方法创建应用
  5. 返回应用信息和201状态码

2.2 服务层

服务层的实现在api/services/app_service.py文件中的create_app方法:

  1. 根据应用模式(AppMode)从default_app_templates获取默认应用模板
  2. 处理默认模型实例的获取和设置
  3. 创建App实例并设置其属性(名称、描述、模式、图标、租户ID等)
  4. 创建AppModelConfig实例并设置其属性
  5. 将应用和关联的模型配置保存到数据库
  6. 触发app_was_created事件
  7. 返回创建的应用实例

2.3 模型定义

应用创建涉及的主要数据模型定义在api/models/model.py文件中:

  1. App模型:应用的核心数据结构,包含应用的基本信息和配置
  2. AppModelConfig模型:应用的模型配置,包含模型提供商、模型ID、参数等
  3. InstalledApp模型:记录应用的安装信息,包含租户ID、应用ID、位置等
  4. Site模型:记录应用的站点信息,包含标题、图标、默认语言等

2.4 默认应用模板

默认应用模板定义在api/constants/model_template.py文件中的default_app_templates字典:

  1. 为不同的应用模式(WORKFLOW、COMPLETION、CHAT、ADVANCED_CHAT、AGENT_CHAT)定义默认配置
  2. 包含enable_site和enable_api等默认设置

2.5 事件处理

应用创建后触发的事件处理在以下文件中实现:

  1. api/events/event_handlers/create_installed_app_when_app_created.py:创建InstalledApp记录
  2. api/events/event_handlers/create_site_record_when_app_created.py:创建Site记录

这些事件处理器通过app_was_created信号连接到应用创建过程,确保在应用创建后自动创建相关记录。

三、系统数据流图

接下来将分析 Dify 项目中“聊天应用”的实现机制,涵盖消息的收发、会话管理、消息流式/阻塞返回、上下文与多轮对话、文件与反馈等核心能力。该机制是 Dify 支持 AI 聊天、客服、知识问答等场景的基础,直接影响用户体验、系统扩展性与业务创新能力。

3.1 以"会话-消息"为核心的流程图

以下是 Dify 系统中主要数据流的示意图:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

3.2 相关源码定位

角色/功能
主要模块/文件
主要职责说明
API 入口
api/controllers/service_api/app/conversation.py
聊天消息的 API 接口,参数解析、鉴权、调用生成逻辑
消息生成主流程
api/core/app/apps/chat/app_generator.py
聊天消息生成主流程,队列与 worker 管理
消息队列管理
api/core/app/apps/message_based_app_queue_manager.py
消息事件推送、流式/阻塞返回
会话与消息服务
services/conversation_service.py
会话、消息的增删查改,历史消息查询
数据模型
models/model.py

、models/account.py
Conversation、Message、Account 等 ORM 定义
数据库扩展
extensions/ext_database.py
数据库会话管理
前端接口文档
web/app/components/develop/template/template_chat.zh.mdx
API 说明、参数与返回格式

3.3 代码示例

3.3.1 API 入口

文件:api/controllers/service_api/app/conversation.py

class ChatMessageApi(Resource):
    def post(self):
        args = request.get_json()
        user = get_user_from_token(request)
        result = ChatAppGenerator().generate(app_model, user, args, invoke_from=InvokeFrom.SERVICE_API)
        return result
  • 解析请求参数,鉴权,调用核心生成逻辑。

3.3.2 消息生成主流程

文件:api/core/app/apps/chat/app_generator.py

class ChatAppGenerator(MessageBasedAppGenerator):
    def generate(self, app_model, user, args, invoke_from, streaming=True):
        # 参数校验
        query = args["query"]
        # 获取/新建会话
        conversation = ConversationService.get_conversation(app_model, args.get("conversation_id"), user)
        # 创建消息
        message = Message(...)
        db.session.add(message)
        db.session.commit()
        # 初始化队列与worker
        queue_manager = MessageBasedAppQueueManager(...)
        worker_thread = threading.Thread(target=worker_with_context)
        worker_thread.start()
        # 返回流式/阻塞响应
        return self._handle_response(...)
  • 负责会话与消息的创建、队列与 worker 的初始化、响应的返回。

3.3.3 Worker 线程与消息推送

def _generate_worker(self, flask_app, application_generate_entity, queue_manager, conversation_id, message_id):
    with flask_app.app_context():
        conversation = self._get_conversation(conversation_id)
        message = self._get_message(message_id)
        runner = ChatAppRunner()
        runner.run(
            application_generate_entity=application_generate_entity,
            queue_manager=queue_manager,
            conversation=conversation,
            message=message,
        )
  • Worker 线程负责实际的 LLM 调用与消息推送,支持异步处理。

3.3.4 消息队列与流式返回

文件:api/core/app/apps/message_based_app_queue_manager.py

  • 负责消息事件的推送,支持 SSE(Server-Sent Events)流式返回,提升用户体验。

四、数据模型设计 (ER图)

以下是Dify聊天助手应用的核心数据模型关系图:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

4.1 数据模型详细解释

4.1.1 核心实体说明

  1. Tenant(租户)

    • id:租户唯一标识符
    • name:租户名称
    • plan:订阅计划类型
    • created_at:创建时间
    • 代表使用 Dify 平台的组织或团队
    • 每个租户可以拥有多个账户(Account)
    • 关键字段:
  2. Account(账户)

    • id:账户唯一标识符
    • name:用户名
    • email:电子邮箱,用于登录
    • password:加密存储的密码
    • status:账户状态(如活跃、禁用等)
    • last_login_at:最后登录时间
    • 代表可以登录和使用 Dify 平台的用户
    • 每个账户属于一个租户,可以创建和管理多个应用(App)
    • 关键字段:
  3. App(应用)

    • id:应用唯一标识符
    • tenant_id:所属租户ID
    • name:应用名称
    • description:应用描述
    • mode:应用模式(如聊天模式、完成模式等)
    • app_model_config_id:关联的模型配置ID
    • workflow_id:关联的工作流ID
    • enable_site:是否启用网站访问
    • enable_api:是否启用API访问
    • created_at:创建时间
    • 代表在 Dify 平台上创建的 AI 应用,如聊天助手
    • 每个应用属于一个租户,由账户创建和管理
    • 关键字段:
  4. AppModelConfig(应用模型配置)

    • id:配置唯一标识符
    • app_id:关联的应用ID
    • provider:模型提供商(如OpenAI、Anthropic等)
    • model_id:使用的模型ID
    • configs:模型参数配置(JSON格式)
    • pre_prompt:系统提示词
    • opening_statement:对话开场白
    • agent_mode:Agent模式设置
    • 定义应用使用的 AI 模型和相关配置
    • 每个应用有一个对应的模型配置
    • 关键字段:
  5. Conversation(对话)

    • id:对话唯一标识符
    • app_id:关联的应用ID
    • name:对话名称
    • inputs:对话初始输入
    • from_end_user_id:发起对话的终端用户ID
    • from_account_id:发起对话的账户ID
    • status:对话状态
    • created_at:创建时间
    • updated_at:更新时间
    • 代表用户与应用之间的一次完整对话
    • 每个对话属于一个应用,包含多条消息
    • 关键字段:
  6. Message(消息)

    • id:消息唯一标识符
    • app_id:关联的应用ID
    • conversation_id:关联的对话ID
    • query:用户查询内容
    • message:完整消息内容(JSON格式)
    • answer:系统回复内容
    • status:消息处理状态
    • from_source:消息来源
    • created_at:创建时间
    • 代表对话中的一条消息,包括用户输入和系统回复
    • 每条消息属于一个对话
    • 关键字段:
  7. EndUser(终端用户)

    • id:用户唯一标识符
    • session_id:会话ID
    • type:用户类型
    • created_at:创建时间
    • 代表使用应用的最终用户
    • 一个终端用户可以发起多个对话
    • 关键字段:
  8. Workflow(工作流)

    • id:工作流唯一标识符
    • tenant_id:所属租户ID
    • app_id:关联的应用ID
    • type:工作流类型
    • version:工作流版本
    • graph:工作流图定义(文本格式)
    • 定义应用的处理流程
    • 每个工作流关联到一个应用
    • 关键字段:
  9. ConversationVariable(对话变量)

    • id:变量唯一标识符
    • conversation_id:关联的对话ID
    • key:变量名
    • value:变量值
    • 存储对话过程中产生的变量
    • 每个变量属于一个对话
    • 关键字段:

4.1.2 实体关系说明

  1. Tenant 与 Account 关系

    • 一对多关系:一个租户可以包含多个账户
    • 关系描述:"包含"
    • 实现方式:Account 表中的 tenant_id 外键关联到 Tenant 表的 id
  2. Account 与 App 关系

    • 一对多关系:一个账户可以创建和管理多个应用
    • 关系描述:"创建/管理"
    • 实现方式:通过中间表或权限表实现
  3. App 与 AppModelConfig 关系

    • 一对一关系:一个应用有一个对应的模型配置
    • 关系描述:"配置"
    • 实现方式:App 表中的 app_model_config_id 外键关联到 AppModelConfig 表的 id,同时 AppModelConfig 表中的 app_id 外键关联到 App 表的 id
  4. App 与 Conversation 关系

    • 一对多关系:一个应用可以包含多个对话
    • 关系描述:"包含"
    • 实现方式:Conversation 表中的 app_id 外键关联到 App 表的 id
  5. Conversation 与 Message 关系

    • 一对多关系:一个对话包含多条消息
    • 关系描述:"包含"
    • 实现方式:Message 表中的 conversation_id 外键关联到 Conversation 表的 id
  6. Conversation 与 ConversationVariable 关系

    • 一对多关系:一个对话可以包含多个变量
    • 关系描述:"包含"
    • 实现方式:ConversationVariable 表中的 conversation_id 外键关联到 Conversation 表的 id
  7. App 与 Workflow 关系

    • 一对多关系:一个应用可以关联多个工作流(不同版本)
    • 关系描述:"关联"
    • 实现方式:Workflow 表中的 app_id 外键关联到 App 表的 id,同时 App 表中的 workflow_id 外键关联到当前使用的 Workflow 表的 id
  8. EndUser 与 Conversation 关系

    • 一对多关系:一个终端用户可以发起多个对话
    • 关系描述:"发起"
    • 实现方式:Conversation 表中的 from_end_user_id 外键关联到 EndUser 表的 id

4.1.3 数据模型设计特点

  1. 多租户架构:通过 Tenant 实体实现多租户隔离,每个租户拥有自己的账户、应用和数据

  2. 灵活的应用配置:通过 AppModelConfig 实现对不同模型和参数的灵活配置

  3. 完整的对话历史:通过 Conversation 和 Message 实体存储完整的对话历史和上下文

  4. 工作流支持:通过 Workflow 实体支持复杂的 AI 处理流程定义

  5. 变量存储:通过 ConversationVariable 支持对话过程中的变量存储和使用

  6. 用户区分:区分平台用户(Account)和应用终端用户(EndUser)

五、关键流程

5.1 消息处理流程图

以下流程图展示了 Dify 聊天助手应用处理用户消息的过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

5.2 消息处理源代码实现

消息处理的入口点在控制器层,主要由ChatApi类处理:

# api/controllers/web/completion.py
class ChatApi(WebApiResource):
    def post(self, app_model, end_user):
        app_mode = AppMode.value_of(app_model.mode)
        if app_mode notin {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
            raise NotChatAppError()

        parser = reqparse.RequestParser()
        parser.add_argument("inputs", type=dict, required=True, location="json")
        parser.add_argument("query", type=str, required=True, location="json")
        parser.add_argument("files", type=list, required=False, location="json")
        parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
        parser.add_argument("conversation_id", type=uuid_value, location="json")
        parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json")
        parser.add_argument("retriever_from", type=str, required=False, default="web_app", location="json")

        args = parser.parse_args()

        streaming = args["response_mode"] == "streaming"
        args["auto_generate_name"] = False

        try:
            response = AppGenerateService.generate(
                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.WEB_APP, streaming=streaming
            )

            return helper.compact_generate_response(response)
        except services.errors.conversation.ConversationNotExistsError:
            raise NotFound("Conversation Not Exists.")
        # 异常处理...

消息生成的核心服务在AppGenerateService类中:

# api/services/app_generate_service.py
class AppGenerateService:
    @classmethod
    def generate(
        cls,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ):
        # 系统级别限流检查
        if dify_config.BILLING_ENABLED:
            # 检查是否是免费计划
            # ...

        # 应用级别限流
        max_active_request = AppGenerateService._get_max_active_requests(app_model)
        rate_limit = RateLimit(app_model.id, max_active_request)
        request_id = RateLimit.gen_request_key()
        try:
            request_id = rate_limit.enter(request_id)
            # 根据应用模式选择不同的生成器
            if app_model.mode == AppMode.COMPLETION.value:
                return rate_limit.generate(
                    CompletionAppGenerator.convert_to_event_stream(
                        CompletionAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
                # Agent聊天模式
                # ...
            elif app_model.mode == AppMode.CHAT.value:
                # 普通聊天模式
                # ...
            elif app_model.mode == AppMode.ADVANCED_CHAT.value:
                # 高级聊天模式
                # ...
            # ...
        except RateLimitError as e:
            raise InvokeRateLimitError(str(e))
        # ...

具体的消息生成逻辑在各个 AppGenerator 类中实现,以 ChatAppGenerator 为例:

# api/core/app/apps/chat/app_generator.py
class ChatAppGenerator(MessageBasedAppGenerator):
    def generate(
        self,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
        # 验证查询参数
        ifnot args.get("query"):
            raise ValueError("query is required")

        query = args["query"]
        inputs = args["inputs"]
        extras = {"auto_generate_conversation_name": args.get("auto_generate_name", True)}

        # 获取或创建对话
        conversation = None
        conversation_id = args.get("conversation_id")
        if conversation_id:
            conversation = ConversationService.get_conversation(
                app_model=app_model, conversation_id=conversation_id, user=user
            )
        # 获取应用模型配置
        app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

        # 验证覆盖模型配置
        # ...

        # 解析文件
        # ...

        # 转换为应用配置
        # ...

        # 初始化应用生成实体
        application_generate_entity = ChatAppGenerateEntity(
            task_id=str(uuid.uuid4()),
            app_config=app_config,
            model_conf=ModelConfigConverter.convert(app_config),
            file_upload_config=file_extra_config,
            conversation_id=conversation.id if conversation elseNone,
            inputs=self._prepare_user_inputs(
                user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
            ),
            query=query,
            files=list(file_objs),
            parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
            user_id=user.id,
            invoke_from=invoke_from,
            extras=extras,
            trace_manager=trace_manager,
            stream=streaming,
        )

        # 初始化生成记录
        (conversation, message) = self._init_generate_records(application_generate_entity, conversation)

        # 初始化队列管理器
        queue_manager = MessageBasedAppQueueManager(
            task_id=application_generate_entity.task_id,
            user_id=application_generate_entity.user_id,
            invoke_from=application_generate_entity.invoke_from,
            conversation_id=conversation.id,
            app_mode=conversation.mode,
            message_id=message.id,
        )

        # 在新线程中执行生成任务
        worker_thread = threading.Thread(target=worker_with_context)
        worker_thread.start()

        # 返回响应或流生成器
        response = self._handle_response(
            application_generate_entity=application_generate_entity,
            queue_manager=queue_manager,
            conversation=conversation,
            message=message,
            user=user,
            stream=streaming,
        )

        return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

5.3 Conversation 管理流程

以下流程图详细展示了 Conversation 的管理过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

5.4 Conversation 管理源代码实现

Conversation 管理的核心实现在ConversationService类中:

# api/services/conversation_service.py
class ConversationService:
    @classmethod
    def get_conversation(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        conversation = (
            db.session.query(Conversation)
            .filter(
                Conversation.id == conversation_id,
                Conversation.app_id == app_model.id,
                Conversation.from_source == ("api"if isinstance(user, EndUser) else"console"),
                Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) elseNone),
                Conversation.from_account_id == (user.id if isinstance(user, Account) elseNone),
                Conversation.is_deleted == False,
            )
            .first()
        )

        ifnot conversation:
            raise ConversationNotExistsError()

        return conversation
    
    @classmethod
    def rename(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]], name: str, auto_generate: bool):
        conversation = cls.get_conversation(app_model, conversation_id, user)

        if auto_generate:
            return cls.auto_generate_name(app_model, conversation)
        else:
            conversation.name = name
            conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
            db.session.commit()

        return conversation
    
    @classmethod
    def auto_generate_name(cls, app_model: App, conversation: Conversation):
        # 获取对话的第一条消息
        message = (
            db.session.query(Message)
            .filter(Message.app_id == app_model.id, Message.conversation_id == conversation.id)
            .order_by(Message.created_at.asc())
            .first()
        )

        ifnot message:
            raise MessageNotExistsError()

        # 生成对话名称
        try:
            name = LLMGenerator.generate_conversation_name(
                app_model.tenant_id, message.query, conversation.id, app_model.id
            )
            conversation.name = name
        except:
            pass

        db.session.commit()

        return conversation
    
    @classmethod
    def delete(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        conversation = cls.get_conversation(app_model, conversation_id, user)

        conversation.is_deleted = True
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
        db.session.commit()

在消息生成过程中,Conversation 的创建和更新主要在 ChatAppGenerator 类的 _init_generate_records 方法中实现:

# api/core/app/apps/chat/app_generator.py (简化示例)
def _init_generate_records(self, application_generate_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # 获取或创建对话
    ifnot conversation:
        # 创建新对话
        conversation = Conversation(
            id=str(uuid.uuid4()),
            app_id=application_generate_entity.app_config.app_id,
            app_model_config_id=application_generate_entity.app_config.app_model_config_id,
            model_provider=application_generate_entity.model_conf.model_provider,
            model_id=application_generate_entity.model_conf.model_id,
            mode=application_generate_entity.app_config.mode,
            name="",  # 初始名称为空
            inputs=application_generate_entity.inputs,
            status="created",
            from_source=("api"if application_generate_entity.invoke_from == InvokeFrom.API else"console"),
            from_end_user_id=(application_generate_entity.user_id if application_generate_entity.invoke_from == InvokeFrom.API elseNone),
            from_account_id=(application_generate_entity.user_id if application_generate_entity.invoke_from != InvokeFrom.API elseNone),
            invoke_from=application_generate_entity.invoke_from.value,
        )
        db.session.add(conversation)
    else:
        # 更新现有对话
        conversation.status = "processing"
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    
    # 创建消息记录
    message = Message(
        id=str(uuid.uuid4()),
        conversation_id=conversation.id,
        app_id=application_generate_entity.app_config.app_id,
        model_provider=application_generate_entity.model_conf.model_provider,
        model_id=application_generate_entity.model_conf.model_id,
        query=application_generate_entity.query,
        inputs=application_generate_entity.inputs,
        message_files=[],
        answer="",
        parent_message_id=application_generate_entity.parent_message_id,
        user_id=application_generate_entity.user_id,
        from_source=("api"if application_generate_entity.invoke_from == InvokeFrom.API else"console"),
        from_end_user_id=(application_generate_entity.user_id if application_generate_entity.invoke_from == InvokeFrom.API elseNone),
        from_account_id=(application_generate_entity.user_id if application_generate_entity.invoke_from != InvokeFrom.API elseNone),
    )
    db.session.add(message)
    db.session.commit()
    
    return conversation, message

对话变量的管理在消息处理完成后进行:

# api/core/app/apps/chat/app_runner.py (简化示例)
def _handle_variables(self, conversation: Conversation, variables: dict):
    # 处理对话变量
    for var_name, var_value in variables.items():
        # 检查变量是否已存在
        variable = (
            db.session.query(ConversationVariable)
            .filter(
                ConversationVariable.conversation_id == conversation.id,
                ConversationVariable.name == var_name,
            )
            .first()
        )
        
        if variable:
            # 更新现有变量
            variable.value = var_value
        else:
            # 创建新变量
            variable = ConversationVariable(
                id=str(uuid.uuid4()),
                conversation_id=conversation.id,
                app_id=conversation.app_id,
                name=var_name,
                value=var_value,
            )
            db.session.add(variable)
    
    db.session.commit()

六、对话状态图

以下状态图展示了 Dify 聊天助手应用中对话的状态变化:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

6.1 对话状态源代码实现

对话状态在数据库模型中的定义:

# api/models/model.py
class Conversation(Base):
    __tablename__ = "conversations"
    # ...
    status = db.Column(db.String(255), nullable=False)
    # ...

对话状态的初始化在_init_generate_records方法中实现:

# api/core/app/apps/message_based_app_generator.py
def _init_generate_records(self, application_generate_entity: Union[ChatAppGenerateEntity, ...], conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # ...
    ifnot conversation:
        conversation = Conversation(
            # ...
            status="normal",  # 初始状态为normal
            # ...
        )
        db.session.add(conversation)
        db.session.commit()
        db.session.refresh(conversation)
    else:
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
        db.session.commit()
    # ...

在 ChatAppGenerator 中,对话状态在处理消息前更新为 "processing":

# api/core/app/apps/chat/app_generator.py
def _init_generate_records(self, application_generate_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # 获取或创建对话
    ifnot conversation:
        # 创建新对话
        conversation = Conversation(
            # ...
            status="created",
            # ...
        )
        db.session.add(conversation)
    else:
        # 更新现有对话
        conversation.status = "processing"
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    # ...

在消息处理完成后,对话状态更新为 "completed":

# api/core/app/apps/chat/app_runner.py
def run(self, application_generate_entity: ChatAppGenerateEntity, queue_manager: AppQueueManager, conversation: Conversation, message: Message) -> None:
    # ...
    # 处理完成后更新对话状态
    conversation.status = "completed"
    db.session.commit()

如果处理过程中出现错误,对话状态会更新为 "error":

# 错误处理
try:
    # 处理消息
    # ...
except Exception as e:
    # 更新对话状态为错误
    conversation.status = "error"
    db.session.commit()
    # 记录错误日志
    logger.exception("Error when processing message")

七、消息处理时序图

以下时序图展示了用户与 Dify 聊天助手应用交互的详细过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

7.1 消息处理时序源代码实现

消息处理的时序在代码中主要体现在以下几个关键部分:

  1. 控制器层接收请求并调用服务层:
# api/controllers/web/completion.py
class ChatApi(WebApiResource):
    def post(self, app_model, end_user):
        # 解析请求参数
        parser = reqparse.RequestParser()
        # ...
        args = parser.parse_args()
        
        try:
            # 调用AppGenerateService生成回复
            response = AppGenerateService.generate(
                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.WEB_APP, streaming=streaming
            )
            return helper.compact_generate_response(response)
        except services.errors.conversation.ConversationNotExistsError:
            raise NotFound("Conversation Not Exists.")
        # 异常处理...
  1. 服务层根据应用模式选择不同的生成器:
# api/services/app_generate_service.py
class AppGenerateService:
    @classmethod
    def generate(cls, app_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke_from: InvokeFrom, streaming: bool = True):
        # 限流检查
        # ...
        
        # 根据应用模式选择不同的生成器
        if app_model.mode == AppMode.CHAT.value:
            return rate_limit.generate(
                ChatAppGenerator.convert_to_event_stream(
                    ChatAppGenerator().generate(
                        app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                    ),
                ),
                request_id=request_id,
            )
        # 其他模式...
  1. 生成器初始化对话和消息记录:
# api/core/app/apps/chat/app_generator.py
def generate(self, app_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke_from: InvokeFrom, streaming: bool = True):
    # 获取或创建对话
    conversation = None
    conversation_id = args.get("conversation_id")
    if conversation_id:
        conversation = ConversationService.get_conversation(
            app_model=app_model, conversation_id=conversation_id, user=user
        )
    
    # 初始化应用生成实体
    application_generate_entity = ChatAppGenerateEntity(
        # 参数设置
        # ...
    )
    
    # 初始化生成记录
    (conversation, message) = self._init_generate_records(application_generate_entity, conversation)
    
    # 在新线程中执行生成任务
    worker_thread = threading.Thread(target=worker_with_context)
    worker_thread.start()
    
    # 返回响应或流生成器
    response = self._handle_response(
        application_generate_entity=application_generate_entity,
        queue_manager=queue_manager,
        conversation=conversation,
        message=message,
        user=user,
        stream=streaming,
    )
    
    return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
  1. 工作线程中执行实际的生成任务:
# api/core/app/apps/chat/app_generator.py
def _generate_worker(self, flask_app: Flask, application_generate_entity: ChatAppGenerateEntity, queue_manager: AppQueueManager, conversation_id: str, message_id: str) -> None:
    with flask_app.app_context():
        try:
            # 获取对话和消息
            conversation = self._get_conversation(conversation_id)
            message = self._get_message(message_id)
            if message isNone:
                raise MessageNotExistsError("Message not exists")

            # 运行聊天应用
            runner = ChatAppRunner()
            runner.run(
                application_generate_entity=application_generate_entity,
                queue_manager=queue_manager,
                conversation=conversation,
                message=message,
            )
        except GenerateTaskStoppedError:
            pass
        # 异常处理...
        finally:
            db.session.close()
  1. 消息处理完成后更新状态:
# api/core/app/apps/chat/app_runner.py (简化示例)
def run(self, application_generate_entity, queue_manager, conversation, message):
    # 执行RAG检索(如果启用)
    # 准备模型输入
    # 调用LLM生成回复
    # 保存回复到消息
    
    # 更新对话状态
    conversation.status = "completed"
    conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    db.session.commit()
    
    # 处理对话变量(如果有)
    if new_variables:
        for var_name, var_value in new_variables.items():
            # 创建或更新对话变量
            # ...

八、数据生命周期

为了更好地理解 Dify 系统中数据的完整生命周期,展示主要数据实体在系统中的创建、使用、更新和删除过程。

8.1 App 数据生命周期

以下图表展示了 App 实体的完整生命周期,以及与其他数据模型的关联:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

8.2 Conversation 数据生命周期

以下图表展示了 Conversation 实体的完整生命周期,以及与 Message 和 ConversationVariable 的关联:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

九、总结

Dify 提供了一个全面的 LLM 应用开发平台,通过直观的界面和强大的后端支持,使开发者能够快速构建和部署AI应用。

Dify 聊天应用机制以“会话-消息”为核心,结合异步队列、流式返回、丰富的扩展表设计,实现了高效、可扩展的智能对话服务。其与认证、插件、知识库等模块协作,形成了强大的智能应用生态。

参考资料

https://github.com/langgenius/dify (v1.4.1)

本篇文章来源于微信公众号: HelloTech技术派
标签: Agent Dify RAG 工作流
最后更新:2025-07-20

苏森

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
最新 热点 随机
最新 热点 随机
AI智能体扣子(Coze)工作流实战,1分钟生成100篇阅读量10万+AI漫画公众号文章,保姆级教程 17个作品涨粉超110万!用AI制作人生哲理视频,百万点赞火的一塌糊涂(保姆级实战教程) 独家首发!挑战用Coze做治愈系动态视频(风景/动漫/乡村),核心工作流搭建思路拆解(附体验链接) Coze+剪映视频工作流,1分钟制作治愈老爷爷/老奶奶视频,每条都是10w+(附提示词和体验链接) 打造10万+爆文的新法宝:Coze+DeepSeek工作流全攻略,手把手教你搭建 Coze+剪映视频工作流,一分钟打造爆款养生视频,干货分享,价值4位数 Coze+DeepSeek+剪映打造爆款国学文化视频工作流,进阶版保姆级教程,助力自媒体运营一路开挂 重磅发布 | 挑战用Coze做“如果书籍会说话”读书视频,20天涨粉10万,书单赛道新型玩法,附核心工作流搭建思路拆解
基于扣子 (Coze):AI 智能体搭建【作业批改复习助手】工作流企业新基建:MCP + LLM + Agent 8大架构,将打通AI Agent的“神经中枢”落地实操!2025年,建议你一定要用AI搭建自媒体全自动工作流值得收藏!3个黄金提示词公式,让 AI 从 “答非所问” 变 “超级助理”AI智能体扣子(Coze)工作流搭建,3分钟自动生成100篇知识图文,保姆级教程基于Dify动态解析异构银行流水:架构拆解→风控报告生成别卷了!用 Dify 搭建你的专属 AI 数据分析报告助手AI智能体:一键生成爆款历史人物短视频,66万粉丝
Prompt老跑偏?教你写出模型真正听得懂的提示词 手把手教你从0搭建一个智能体,全部跟下来你就Agent入门了!(超详细的讲解) Coze+剪映视频工作流,一分钟打造爆款养生视频,干货分享,价值4位数 23招教你掌握大模型提示词技巧 7 天爆涨10 万粉!治愈系中年大叔这样做(附制作思路) Dify工作流实践:从文档生成思维导图 AI智能体:一键生成爆款历史人物短视频,66万粉丝 【AI应用教学】如何使用AI生成解说漫画视频的全流程分享
标签聚合
小红书 DeepSeek Agent ChatGPT n8n Prompt 智能体 FastGPT Dify 提示词 coze 工作流 飞书 mcp 扣子 豆包

COPYRIGHT © 2025 苏森AI SOOSON.COM. ALL RIGHTS RESERVED.

站点地图