File size: 4,422 Bytes
			
			| a8b3f00 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | import logging
import threading
import time
from typing import Any, Optional
from flask import Flask, current_app
from pydantic import BaseModel, ConfigDict
from configs import dify_config
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.queue_entities import QueueMessageReplaceEvent
from core.moderation.base import ModerationAction, ModerationOutputsResult
from core.moderation.factory import ModerationFactory
logger = logging.getLogger(__name__)
class ModerationRule(BaseModel):
    type: str
    config: dict[str, Any]
class OutputModeration(BaseModel):
    tenant_id: str
    app_id: str
    rule: ModerationRule
    queue_manager: AppQueueManager
    thread: Optional[threading.Thread] = None
    thread_running: bool = True
    buffer: str = ""
    is_final_chunk: bool = False
    final_output: Optional[str] = None
    model_config = ConfigDict(arbitrary_types_allowed=True)
    def should_direct_output(self) -> bool:
        return self.final_output is not None
    def get_final_output(self) -> str:
        return self.final_output or ""
    def append_new_token(self, token: str) -> None:
        self.buffer += token
        if not self.thread:
            self.thread = self.start_thread()
    def moderation_completion(self, completion: str, public_event: bool = False) -> str:
        self.buffer = completion
        self.is_final_chunk = True
        result = self.moderation(tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=completion)
        if not result or not result.flagged:
            return completion
        if result.action == ModerationAction.DIRECT_OUTPUT:
            final_output = result.preset_response
        else:
            final_output = result.text
        if public_event:
            self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE)
        return final_output
    def start_thread(self) -> threading.Thread:
        buffer_size = dify_config.MODERATION_BUFFER_SIZE
        thread = threading.Thread(
            target=self.worker,
            kwargs={
                "flask_app": current_app._get_current_object(),
                "buffer_size": buffer_size if buffer_size > 0 else dify_config.MODERATION_BUFFER_SIZE,
            },
        )
        thread.start()
        return thread
    def stop_thread(self):
        if self.thread and self.thread.is_alive():
            self.thread_running = False
    def worker(self, flask_app: Flask, buffer_size: int):
        with flask_app.app_context():
            current_length = 0
            while self.thread_running:
                moderation_buffer = self.buffer
                buffer_length = len(moderation_buffer)
                if not self.is_final_chunk:
                    chunk_length = buffer_length - current_length
                    if 0 <= chunk_length < buffer_size:
                        time.sleep(1)
                        continue
                current_length = buffer_length
                result = self.moderation(
                    tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=moderation_buffer
                )
                if not result or not result.flagged:
                    continue
                if result.action == ModerationAction.DIRECT_OUTPUT:
                    final_output = result.preset_response
                    self.final_output = final_output
                else:
                    final_output = result.text + self.buffer[len(moderation_buffer) :]
                # trigger replace event
                if self.thread_running:
                    self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE)
                if result.action == ModerationAction.DIRECT_OUTPUT:
                    break
    def moderation(self, tenant_id: str, app_id: str, moderation_buffer: str) -> Optional[ModerationOutputsResult]:
        try:
            moderation_factory = ModerationFactory(
                name=self.rule.type, app_id=app_id, tenant_id=tenant_id, config=self.rule.config
            )
            result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer)
            return result
        except Exception as e:
            logger.error("Moderation Output error: %s", e)
        return None
 | 
