苏森AI

  • 首页
  • AI资讯
  • AI应用
  • AI工作流
  • AI智能体
  • AI提示词
苏森AI
从这里开启你的AI学习旅程!
  1. 首页
  2. AI工作流
  3. 正文

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

2025-06-25 131点热度 0人点赞 0条评论

Dify 是一个开源的 LLM 应用开发平台,提供了直观的界面,结合了 AI 工作流、RAG、Agent、模型管理、可观测性功能等,让用户可以快速从原型到生产。

  • 从零开始学 Dify
  • 从零开始学 Dify-系统架构
  • 从零开始学 Dify- 帐户与租户管理系统设计揭秘
  • 从零开始学 Dify- 模型提供者系统设计实现模型统一调用接口
  • 从零开始学 Dify- RAG 知识库系统设计详解
  • 从零开始学 Dify- 对话系统的关键功能
  • 从零开始学 Dify- 工作流(Workflow)系统架构
  • 从零开始学 Dify-扫描、加载和管理模型提供者的详细过程
  • 从零开始学 Dify-详细介绍 Dify 模型运行时的核心架构
  • 从零开始学 Dify - Dify 的 RAG 系统如何有效地处理和检索大量文档?
  • 从零开始学 Dify - 详细介绍 Dify 工具(Tool)系统核心架构设计
  • 从零开始学 Dify - 万字详解RAG父子分段模式的原理与实现
  • 从零开始学 Dify - Dify Token 计算机制深度解析:如何避免 LLM 上下文长度超限错误

本文将深入分析 Dify 聊天助手应用的实现机制。

👆👆👆欢迎关注,一起进步👆👆👆

一、聊天助手应用创建流程图

以下流程图展示了在 Dify 中创建聊天助手应用的过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

二、应用创建代码分析

2.1 控制器层

应用创建流程从控制器层开始,主要在api/controllers/console/app/app.py文件中的AppListApi.post方法:

  1. 接收前端请求,解析参数:name、description、mode、icon_type、icon和icon_background
  2. 验证用户权限(要求用户为editor角色)
  3. 验证必要参数,特别是mode参数(可选值:chat、agent-chat、advanced-chat、workflow和completion)
  4. 调用AppService.create_app方法创建应用
  5. 返回应用信息和201状态码

2.2 服务层

服务层的实现在api/services/app_service.py文件中的create_app方法:

  1. 根据应用模式(AppMode)从default_app_templates获取默认应用模板
  2. 处理默认模型实例的获取和设置
  3. 创建App实例并设置其属性(名称、描述、模式、图标、租户ID等)
  4. 创建AppModelConfig实例并设置其属性
  5. 将应用和关联的模型配置保存到数据库
  6. 触发app_was_created事件
  7. 返回创建的应用实例

2.3 模型定义

应用创建涉及的主要数据模型定义在api/models/model.py文件中:

  1. App模型:应用的核心数据结构,包含应用的基本信息和配置
  2. AppModelConfig模型:应用的模型配置,包含模型提供商、模型ID、参数等
  3. InstalledApp模型:记录应用的安装信息,包含租户ID、应用ID、位置等
  4. Site模型:记录应用的站点信息,包含标题、图标、默认语言等

2.4 默认应用模板

默认应用模板定义在api/constants/model_template.py文件中的default_app_templates字典:

  1. 为不同的应用模式(WORKFLOW、COMPLETION、CHAT、ADVANCED_CHAT、AGENT_CHAT)定义默认配置
  2. 包含enable_site和enable_api等默认设置

2.5 事件处理

应用创建后触发的事件处理在以下文件中实现:

  1. api/events/event_handlers/create_installed_app_when_app_created.py:创建InstalledApp记录
  2. api/events/event_handlers/create_site_record_when_app_created.py:创建Site记录

这些事件处理器通过app_was_created信号连接到应用创建过程,确保在应用创建后自动创建相关记录。

三、系统数据流图

接下来将分析 Dify 项目中“聊天应用”的实现机制,涵盖消息的收发、会话管理、消息流式/阻塞返回、上下文与多轮对话、文件与反馈等核心能力。该机制是 Dify 支持 AI 聊天、客服、知识问答等场景的基础,直接影响用户体验、系统扩展性与业务创新能力。

3.1 以"会话-消息"为核心的流程图

以下是 Dify 系统中主要数据流的示意图:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

3.2 相关源码定位

角色/功能
主要模块/文件
主要职责说明
API 入口
api/controllers/service_api/app/conversation.py
聊天消息的 API 接口,参数解析、鉴权、调用生成逻辑
消息生成主流程
api/core/app/apps/chat/app_generator.py
聊天消息生成主流程,队列与 worker 管理
消息队列管理
api/core/app/apps/message_based_app_queue_manager.py
消息事件推送、流式/阻塞返回
会话与消息服务
services/conversation_service.py
会话、消息的增删查改,历史消息查询
数据模型
models/model.py

、models/account.py
Conversation、Message、Account 等 ORM 定义
数据库扩展
extensions/ext_database.py
数据库会话管理
前端接口文档
web/app/components/develop/template/template_chat.zh.mdx
API 说明、参数与返回格式

3.3 代码示例

3.3.1 API 入口

文件:api/controllers/service_api/app/conversation.py

class ChatMessageApi(Resource):
    def post(self):
        args = request.get_json()
        user = get_user_from_token(request)
        result = ChatAppGenerator().generate(app_model, user, args, invoke_from=InvokeFrom.SERVICE_API)
        return result
  • 解析请求参数,鉴权,调用核心生成逻辑。

3.3.2 消息生成主流程

文件:api/core/app/apps/chat/app_generator.py

class ChatAppGenerator(MessageBasedAppGenerator):
    def generate(self, app_model, user, args, invoke_from, streaming=True):
        # 参数校验
        query = args["query"]
        # 获取/新建会话
        conversation = ConversationService.get_conversation(app_model, args.get("conversation_id"), user)
        # 创建消息
        message = Message(...)
        db.session.add(message)
        db.session.commit()
        # 初始化队列与worker
        queue_manager = MessageBasedAppQueueManager(...)
        worker_thread = threading.Thread(target=worker_with_context)
        worker_thread.start()
        # 返回流式/阻塞响应
        return self._handle_response(...)
  • 负责会话与消息的创建、队列与 worker 的初始化、响应的返回。

3.3.3 Worker 线程与消息推送

def _generate_worker(self, flask_app, application_generate_entity, queue_manager, conversation_id, message_id):
    with flask_app.app_context():
        conversation = self._get_conversation(conversation_id)
        message = self._get_message(message_id)
        runner = ChatAppRunner()
        runner.run(
            application_generate_entity=application_generate_entity,
            queue_manager=queue_manager,
            conversation=conversation,
            message=message,
        )
  • Worker 线程负责实际的 LLM 调用与消息推送,支持异步处理。

3.3.4 消息队列与流式返回

文件:api/core/app/apps/message_based_app_queue_manager.py

  • 负责消息事件的推送,支持 SSE(Server-Sent Events)流式返回,提升用户体验。

四、数据模型设计 (ER图)

以下是Dify聊天助手应用的核心数据模型关系图:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

4.1 数据模型详细解释

4.1.1 核心实体说明

  1. Tenant(租户)

    • id:租户唯一标识符
    • name:租户名称
    • plan:订阅计划类型
    • created_at:创建时间
    • 代表使用 Dify 平台的组织或团队
    • 每个租户可以拥有多个账户(Account)
    • 关键字段:
  2. Account(账户)

    • id:账户唯一标识符
    • name:用户名
    • email:电子邮箱,用于登录
    • password:加密存储的密码
    • status:账户状态(如活跃、禁用等)
    • last_login_at:最后登录时间
    • 代表可以登录和使用 Dify 平台的用户
    • 每个账户属于一个租户,可以创建和管理多个应用(App)
    • 关键字段:
  3. App(应用)

    • id:应用唯一标识符
    • tenant_id:所属租户ID
    • name:应用名称
    • description:应用描述
    • mode:应用模式(如聊天模式、完成模式等)
    • app_model_config_id:关联的模型配置ID
    • workflow_id:关联的工作流ID
    • enable_site:是否启用网站访问
    • enable_api:是否启用API访问
    • created_at:创建时间
    • 代表在 Dify 平台上创建的 AI 应用,如聊天助手
    • 每个应用属于一个租户,由账户创建和管理
    • 关键字段:
  4. AppModelConfig(应用模型配置)

    • id:配置唯一标识符
    • app_id:关联的应用ID
    • provider:模型提供商(如OpenAI、Anthropic等)
    • model_id:使用的模型ID
    • configs:模型参数配置(JSON格式)
    • pre_prompt:系统提示词
    • opening_statement:对话开场白
    • agent_mode:Agent模式设置
    • 定义应用使用的 AI 模型和相关配置
    • 每个应用有一个对应的模型配置
    • 关键字段:
  5. Conversation(对话)

    • id:对话唯一标识符
    • app_id:关联的应用ID
    • name:对话名称
    • inputs:对话初始输入
    • from_end_user_id:发起对话的终端用户ID
    • from_account_id:发起对话的账户ID
    • status:对话状态
    • created_at:创建时间
    • updated_at:更新时间
    • 代表用户与应用之间的一次完整对话
    • 每个对话属于一个应用,包含多条消息
    • 关键字段:
  6. Message(消息)

    • id:消息唯一标识符
    • app_id:关联的应用ID
    • conversation_id:关联的对话ID
    • query:用户查询内容
    • message:完整消息内容(JSON格式)
    • answer:系统回复内容
    • status:消息处理状态
    • from_source:消息来源
    • created_at:创建时间
    • 代表对话中的一条消息,包括用户输入和系统回复
    • 每条消息属于一个对话
    • 关键字段:
  7. EndUser(终端用户)

    • id:用户唯一标识符
    • session_id:会话ID
    • type:用户类型
    • created_at:创建时间
    • 代表使用应用的最终用户
    • 一个终端用户可以发起多个对话
    • 关键字段:
  8. Workflow(工作流)

    • id:工作流唯一标识符
    • tenant_id:所属租户ID
    • app_id:关联的应用ID
    • type:工作流类型
    • version:工作流版本
    • graph:工作流图定义(文本格式)
    • 定义应用的处理流程
    • 每个工作流关联到一个应用
    • 关键字段:
  9. ConversationVariable(对话变量)

    • id:变量唯一标识符
    • conversation_id:关联的对话ID
    • key:变量名
    • value:变量值
    • 存储对话过程中产生的变量
    • 每个变量属于一个对话
    • 关键字段:

4.1.2 实体关系说明

  1. Tenant 与 Account 关系

    • 一对多关系:一个租户可以包含多个账户
    • 关系描述:"包含"
    • 实现方式:Account 表中的 tenant_id 外键关联到 Tenant 表的 id
  2. Account 与 App 关系

    • 一对多关系:一个账户可以创建和管理多个应用
    • 关系描述:"创建/管理"
    • 实现方式:通过中间表或权限表实现
  3. App 与 AppModelConfig 关系

    • 一对一关系:一个应用有一个对应的模型配置
    • 关系描述:"配置"
    • 实现方式:App 表中的 app_model_config_id 外键关联到 AppModelConfig 表的 id,同时 AppModelConfig 表中的 app_id 外键关联到 App 表的 id
  4. App 与 Conversation 关系

    • 一对多关系:一个应用可以包含多个对话
    • 关系描述:"包含"
    • 实现方式:Conversation 表中的 app_id 外键关联到 App 表的 id
  5. Conversation 与 Message 关系

    • 一对多关系:一个对话包含多条消息
    • 关系描述:"包含"
    • 实现方式:Message 表中的 conversation_id 外键关联到 Conversation 表的 id
  6. Conversation 与 ConversationVariable 关系

    • 一对多关系:一个对话可以包含多个变量
    • 关系描述:"包含"
    • 实现方式:ConversationVariable 表中的 conversation_id 外键关联到 Conversation 表的 id
  7. App 与 Workflow 关系

    • 一对多关系:一个应用可以关联多个工作流(不同版本)
    • 关系描述:"关联"
    • 实现方式:Workflow 表中的 app_id 外键关联到 App 表的 id,同时 App 表中的 workflow_id 外键关联到当前使用的 Workflow 表的 id
  8. EndUser 与 Conversation 关系

    • 一对多关系:一个终端用户可以发起多个对话
    • 关系描述:"发起"
    • 实现方式:Conversation 表中的 from_end_user_id 外键关联到 EndUser 表的 id

4.1.3 数据模型设计特点

  1. 多租户架构:通过 Tenant 实体实现多租户隔离,每个租户拥有自己的账户、应用和数据

  2. 灵活的应用配置:通过 AppModelConfig 实现对不同模型和参数的灵活配置

  3. 完整的对话历史:通过 Conversation 和 Message 实体存储完整的对话历史和上下文

  4. 工作流支持:通过 Workflow 实体支持复杂的 AI 处理流程定义

  5. 变量存储:通过 ConversationVariable 支持对话过程中的变量存储和使用

  6. 用户区分:区分平台用户(Account)和应用终端用户(EndUser)

五、关键流程

5.1 消息处理流程图

以下流程图展示了 Dify 聊天助手应用处理用户消息的过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

5.2 消息处理源代码实现

消息处理的入口点在控制器层,主要由ChatApi类处理:

# api/controllers/web/completion.py
class ChatApi(WebApiResource):
    def post(self, app_model, end_user):
        app_mode = AppMode.value_of(app_model.mode)
        if app_mode notin {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
            raise NotChatAppError()

        parser = reqparse.RequestParser()
        parser.add_argument("inputs", type=dict, required=True, location="json")
        parser.add_argument("query", type=str, required=True, location="json")
        parser.add_argument("files", type=list, required=False, location="json")
        parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
        parser.add_argument("conversation_id", type=uuid_value, location="json")
        parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json")
        parser.add_argument("retriever_from", type=str, required=False, default="web_app", location="json")

        args = parser.parse_args()

        streaming = args["response_mode"] == "streaming"
        args["auto_generate_name"] = False

        try:
            response = AppGenerateService.generate(
                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.WEB_APP, streaming=streaming
            )

            return helper.compact_generate_response(response)
        except services.errors.conversation.ConversationNotExistsError:
            raise NotFound("Conversation Not Exists.")
        # 异常处理...

消息生成的核心服务在AppGenerateService类中:

# api/services/app_generate_service.py
class AppGenerateService:
    @classmethod
    def generate(
        cls,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ):
        # 系统级别限流检查
        if dify_config.BILLING_ENABLED:
            # 检查是否是免费计划
            # ...

        # 应用级别限流
        max_active_request = AppGenerateService._get_max_active_requests(app_model)
        rate_limit = RateLimit(app_model.id, max_active_request)
        request_id = RateLimit.gen_request_key()
        try:
            request_id = rate_limit.enter(request_id)
            # 根据应用模式选择不同的生成器
            if app_model.mode == AppMode.COMPLETION.value:
                return rate_limit.generate(
                    CompletionAppGenerator.convert_to_event_stream(
                        CompletionAppGenerator().generate(
                            app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                        ),
                    ),
                    request_id=request_id,
                )
            elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
                # Agent聊天模式
                # ...
            elif app_model.mode == AppMode.CHAT.value:
                # 普通聊天模式
                # ...
            elif app_model.mode == AppMode.ADVANCED_CHAT.value:
                # 高级聊天模式
                # ...
            # ...
        except RateLimitError as e:
            raise InvokeRateLimitError(str(e))
        # ...

具体的消息生成逻辑在各个 AppGenerator 类中实现,以 ChatAppGenerator 为例:

# api/core/app/apps/chat/app_generator.py
class ChatAppGenerator(MessageBasedAppGenerator):
    def generate(
        self,
        app_model: App,
        user: Union[Account, EndUser],
        args: Mapping[str, Any],
        invoke_from: InvokeFrom,
        streaming: bool = True,
    ) -> Union[Mapping[str, Any], Generator[Mapping[str, Any] | str, None, None]]:
        # 验证查询参数
        ifnot args.get("query"):
            raise ValueError("query is required")

        query = args["query"]
        inputs = args["inputs"]
        extras = {"auto_generate_conversation_name": args.get("auto_generate_name", True)}

        # 获取或创建对话
        conversation = None
        conversation_id = args.get("conversation_id")
        if conversation_id:
            conversation = ConversationService.get_conversation(
                app_model=app_model, conversation_id=conversation_id, user=user
            )
        # 获取应用模型配置
        app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

        # 验证覆盖模型配置
        # ...

        # 解析文件
        # ...

        # 转换为应用配置
        # ...

        # 初始化应用生成实体
        application_generate_entity = ChatAppGenerateEntity(
            task_id=str(uuid.uuid4()),
            app_config=app_config,
            model_conf=ModelConfigConverter.convert(app_config),
            file_upload_config=file_extra_config,
            conversation_id=conversation.id if conversation elseNone,
            inputs=self._prepare_user_inputs(
                user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
            ),
            query=query,
            files=list(file_objs),
            parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
            user_id=user.id,
            invoke_from=invoke_from,
            extras=extras,
            trace_manager=trace_manager,
            stream=streaming,
        )

        # 初始化生成记录
        (conversation, message) = self._init_generate_records(application_generate_entity, conversation)

        # 初始化队列管理器
        queue_manager = MessageBasedAppQueueManager(
            task_id=application_generate_entity.task_id,
            user_id=application_generate_entity.user_id,
            invoke_from=application_generate_entity.invoke_from,
            conversation_id=conversation.id,
            app_mode=conversation.mode,
            message_id=message.id,
        )

        # 在新线程中执行生成任务
        worker_thread = threading.Thread(target=worker_with_context)
        worker_thread.start()

        # 返回响应或流生成器
        response = self._handle_response(
            application_generate_entity=application_generate_entity,
            queue_manager=queue_manager,
            conversation=conversation,
            message=message,
            user=user,
            stream=streaming,
        )

        return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

5.3 Conversation 管理流程

以下流程图详细展示了 Conversation 的管理过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

5.4 Conversation 管理源代码实现

Conversation 管理的核心实现在ConversationService类中:

# api/services/conversation_service.py
class ConversationService:
    @classmethod
    def get_conversation(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        conversation = (
            db.session.query(Conversation)
            .filter(
                Conversation.id == conversation_id,
                Conversation.app_id == app_model.id,
                Conversation.from_source == ("api"if isinstance(user, EndUser) else"console"),
                Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) elseNone),
                Conversation.from_account_id == (user.id if isinstance(user, Account) elseNone),
                Conversation.is_deleted == False,
            )
            .first()
        )

        ifnot conversation:
            raise ConversationNotExistsError()

        return conversation
    
    @classmethod
    def rename(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]], name: str, auto_generate: bool):
        conversation = cls.get_conversation(app_model, conversation_id, user)

        if auto_generate:
            return cls.auto_generate_name(app_model, conversation)
        else:
            conversation.name = name
            conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
            db.session.commit()

        return conversation
    
    @classmethod
    def auto_generate_name(cls, app_model: App, conversation: Conversation):
        # 获取对话的第一条消息
        message = (
            db.session.query(Message)
            .filter(Message.app_id == app_model.id, Message.conversation_id == conversation.id)
            .order_by(Message.created_at.asc())
            .first()
        )

        ifnot message:
            raise MessageNotExistsError()

        # 生成对话名称
        try:
            name = LLMGenerator.generate_conversation_name(
                app_model.tenant_id, message.query, conversation.id, app_model.id
            )
            conversation.name = name
        except:
            pass

        db.session.commit()

        return conversation
    
    @classmethod
    def delete(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        conversation = cls.get_conversation(app_model, conversation_id, user)

        conversation.is_deleted = True
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
        db.session.commit()

在消息生成过程中,Conversation 的创建和更新主要在 ChatAppGenerator 类的 _init_generate_records 方法中实现:

# api/core/app/apps/chat/app_generator.py (简化示例)
def _init_generate_records(self, application_generate_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # 获取或创建对话
    ifnot conversation:
        # 创建新对话
        conversation = Conversation(
            id=str(uuid.uuid4()),
            app_id=application_generate_entity.app_config.app_id,
            app_model_config_id=application_generate_entity.app_config.app_model_config_id,
            model_provider=application_generate_entity.model_conf.model_provider,
            model_id=application_generate_entity.model_conf.model_id,
            mode=application_generate_entity.app_config.mode,
            name="",  # 初始名称为空
            inputs=application_generate_entity.inputs,
            status="created",
            from_source=("api"if application_generate_entity.invoke_from == InvokeFrom.API else"console"),
            from_end_user_id=(application_generate_entity.user_id if application_generate_entity.invoke_from == InvokeFrom.API elseNone),
            from_account_id=(application_generate_entity.user_id if application_generate_entity.invoke_from != InvokeFrom.API elseNone),
            invoke_from=application_generate_entity.invoke_from.value,
        )
        db.session.add(conversation)
    else:
        # 更新现有对话
        conversation.status = "processing"
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    
    # 创建消息记录
    message = Message(
        id=str(uuid.uuid4()),
        conversation_id=conversation.id,
        app_id=application_generate_entity.app_config.app_id,
        model_provider=application_generate_entity.model_conf.model_provider,
        model_id=application_generate_entity.model_conf.model_id,
        query=application_generate_entity.query,
        inputs=application_generate_entity.inputs,
        message_files=[],
        answer="",
        parent_message_id=application_generate_entity.parent_message_id,
        user_id=application_generate_entity.user_id,
        from_source=("api"if application_generate_entity.invoke_from == InvokeFrom.API else"console"),
        from_end_user_id=(application_generate_entity.user_id if application_generate_entity.invoke_from == InvokeFrom.API elseNone),
        from_account_id=(application_generate_entity.user_id if application_generate_entity.invoke_from != InvokeFrom.API elseNone),
    )
    db.session.add(message)
    db.session.commit()
    
    return conversation, message

对话变量的管理在消息处理完成后进行:

# api/core/app/apps/chat/app_runner.py (简化示例)
def _handle_variables(self, conversation: Conversation, variables: dict):
    # 处理对话变量
    for var_name, var_value in variables.items():
        # 检查变量是否已存在
        variable = (
            db.session.query(ConversationVariable)
            .filter(
                ConversationVariable.conversation_id == conversation.id,
                ConversationVariable.name == var_name,
            )
            .first()
        )
        
        if variable:
            # 更新现有变量
            variable.value = var_value
        else:
            # 创建新变量
            variable = ConversationVariable(
                id=str(uuid.uuid4()),
                conversation_id=conversation.id,
                app_id=conversation.app_id,
                name=var_name,
                value=var_value,
            )
            db.session.add(variable)
    
    db.session.commit()

六、对话状态图

以下状态图展示了 Dify 聊天助手应用中对话的状态变化:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

6.1 对话状态源代码实现

对话状态在数据库模型中的定义:

# api/models/model.py
class Conversation(Base):
    __tablename__ = "conversations"
    # ...
    status = db.Column(db.String(255), nullable=False)
    # ...

对话状态的初始化在_init_generate_records方法中实现:

# api/core/app/apps/message_based_app_generator.py
def _init_generate_records(self, application_generate_entity: Union[ChatAppGenerateEntity, ...], conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # ...
    ifnot conversation:
        conversation = Conversation(
            # ...
            status="normal",  # 初始状态为normal
            # ...
        )
        db.session.add(conversation)
        db.session.commit()
        db.session.refresh(conversation)
    else:
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
        db.session.commit()
    # ...

在 ChatAppGenerator 中,对话状态在处理消息前更新为 "processing":

# api/core/app/apps/chat/app_generator.py
def _init_generate_records(self, application_generate_entity: ChatAppGenerateEntity, conversation: Optional[Conversation] = None) -> tuple[Conversation, Message]:
    # 获取或创建对话
    ifnot conversation:
        # 创建新对话
        conversation = Conversation(
            # ...
            status="created",
            # ...
        )
        db.session.add(conversation)
    else:
        # 更新现有对话
        conversation.status = "processing"
        conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    # ...

在消息处理完成后,对话状态更新为 "completed":

# api/core/app/apps/chat/app_runner.py
def run(self, application_generate_entity: ChatAppGenerateEntity, queue_manager: AppQueueManager, conversation: Conversation, message: Message) -> None:
    # ...
    # 处理完成后更新对话状态
    conversation.status = "completed"
    db.session.commit()

如果处理过程中出现错误,对话状态会更新为 "error":

# 错误处理
try:
    # 处理消息
    # ...
except Exception as e:
    # 更新对话状态为错误
    conversation.status = "error"
    db.session.commit()
    # 记录错误日志
    logger.exception("Error when processing message")

七、消息处理时序图

以下时序图展示了用户与 Dify 聊天助手应用交互的详细过程:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

7.1 消息处理时序源代码实现

消息处理的时序在代码中主要体现在以下几个关键部分:

  1. 控制器层接收请求并调用服务层:
# api/controllers/web/completion.py
class ChatApi(WebApiResource):
    def post(self, app_model, end_user):
        # 解析请求参数
        parser = reqparse.RequestParser()
        # ...
        args = parser.parse_args()
        
        try:
            # 调用AppGenerateService生成回复
            response = AppGenerateService.generate(
                app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.WEB_APP, streaming=streaming
            )
            return helper.compact_generate_response(response)
        except services.errors.conversation.ConversationNotExistsError:
            raise NotFound("Conversation Not Exists.")
        # 异常处理...
  1. 服务层根据应用模式选择不同的生成器:
# api/services/app_generate_service.py
class AppGenerateService:
    @classmethod
    def generate(cls, app_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke_from: InvokeFrom, streaming: bool = True):
        # 限流检查
        # ...
        
        # 根据应用模式选择不同的生成器
        if app_model.mode == AppMode.CHAT.value:
            return rate_limit.generate(
                ChatAppGenerator.convert_to_event_stream(
                    ChatAppGenerator().generate(
                        app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
                    ),
                ),
                request_id=request_id,
            )
        # 其他模式...
  1. 生成器初始化对话和消息记录:
# api/core/app/apps/chat/app_generator.py
def generate(self, app_model: App, user: Union[Account, EndUser], args: Mapping[str, Any], invoke_from: InvokeFrom, streaming: bool = True):
    # 获取或创建对话
    conversation = None
    conversation_id = args.get("conversation_id")
    if conversation_id:
        conversation = ConversationService.get_conversation(
            app_model=app_model, conversation_id=conversation_id, user=user
        )
    
    # 初始化应用生成实体
    application_generate_entity = ChatAppGenerateEntity(
        # 参数设置
        # ...
    )
    
    # 初始化生成记录
    (conversation, message) = self._init_generate_records(application_generate_entity, conversation)
    
    # 在新线程中执行生成任务
    worker_thread = threading.Thread(target=worker_with_context)
    worker_thread.start()
    
    # 返回响应或流生成器
    response = self._handle_response(
        application_generate_entity=application_generate_entity,
        queue_manager=queue_manager,
        conversation=conversation,
        message=message,
        user=user,
        stream=streaming,
    )
    
    return ChatAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)
  1. 工作线程中执行实际的生成任务:
# api/core/app/apps/chat/app_generator.py
def _generate_worker(self, flask_app: Flask, application_generate_entity: ChatAppGenerateEntity, queue_manager: AppQueueManager, conversation_id: str, message_id: str) -> None:
    with flask_app.app_context():
        try:
            # 获取对话和消息
            conversation = self._get_conversation(conversation_id)
            message = self._get_message(message_id)
            if message isNone:
                raise MessageNotExistsError("Message not exists")

            # 运行聊天应用
            runner = ChatAppRunner()
            runner.run(
                application_generate_entity=application_generate_entity,
                queue_manager=queue_manager,
                conversation=conversation,
                message=message,
            )
        except GenerateTaskStoppedError:
            pass
        # 异常处理...
        finally:
            db.session.close()
  1. 消息处理完成后更新状态:
# api/core/app/apps/chat/app_runner.py (简化示例)
def run(self, application_generate_entity, queue_manager, conversation, message):
    # 执行RAG检索(如果启用)
    # 准备模型输入
    # 调用LLM生成回复
    # 保存回复到消息
    
    # 更新对话状态
    conversation.status = "completed"
    conversation.updated_at = datetime.now(UTC).replace(tzinfo=None)
    db.session.commit()
    
    # 处理对话变量(如果有)
    if new_variables:
        for var_name, var_value in new_variables.items():
            # 创建或更新对话变量
            # ...

八、数据生命周期

为了更好地理解 Dify 系统中数据的完整生命周期,展示主要数据实体在系统中的创建、使用、更新和删除过程。

8.1 App 数据生命周期

以下图表展示了 App 实体的完整生命周期,以及与其他数据模型的关联:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

8.2 Conversation 数据生命周期

以下图表展示了 Conversation 实体的完整生命周期,以及与 Message 和 ConversationVariable 的关联:

从零开始学 Dify - 万字详解 Dify 聊天助手应用的实现机制

九、总结

Dify 提供了一个全面的 LLM 应用开发平台,通过直观的界面和强大的后端支持,使开发者能够快速构建和部署AI应用。

Dify 聊天应用机制以“会话-消息”为核心,结合异步队列、流式返回、丰富的扩展表设计,实现了高效、可扩展的智能对话服务。其与认证、插件、知识库等模块协作,形成了强大的智能应用生态。

参考资料

https://github.com/langgenius/dify (v1.4.1)

本篇文章来源于微信公众号: HelloTech技术派
标签: Agent Dify RAG 工作流
最后更新:2025-07-20

苏森

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复
最新 热点 随机
最新 热点 随机
我替你们试过了,这才是Nano Banana在国内最爽的玩法 即梦AI图片、视频无水印保存教程:跟即梦水印说再见吧!亲测有效,上手超简单。 生成图片有水印怎么办?豆包最新无水印图片保存技巧(手机/网页端) 90%的人都不知道,这套提示词公式,让我AI生成的画面准确率提升5倍 1小时用AI工具搞定一支《浪浪山》风格茶饮广告片 别人花一周爬数据,我用Crawlee只花了十分钟! 保姆级n8n教程来了:手把手教你打造一个AI生成内容并自动发布公众号的工作流 一线中小学教师的10个豆包AI教学指令公式+实操示例
基于Dify的RAG知识库搭建 治愈性vlog又火了,3招教你搞定人物一致性 100%免费无限次!创作还能领现金,这款国产AI工具正在闷声搞事情 音视频秒变爆款图文,支持自动配图,金句标注,让你的创作效率疯狂加倍 如何用AI制作全息城市投影,手把手教会你(附提示词) 用 AI 三步搞定专业排版:从Prompt到代码,一键生成惊艳杂志风 23招教你掌握大模型提示词技巧 Coze+剪映视频工作流,一分钟打造爆款养生视频,干货分享,价值4位数
标签聚合
coze Gemini DeepSeek 工作流 Dify 豆包 nano-banana Agent Prompt 扣子 飞书 n8n 提示词 小红书 ChatGPT 智能体

COPYRIGHT © 2025 苏森AI SOOSON.COM. ALL RIGHTS RESERVED.

站点地图