diff --git a/src/memos/api/handlers/chat_handler.py b/src/memos/api/handlers/chat_handler.py index 2e9032f11..68dcfe6fb 100644 --- a/src/memos/api/handlers/chat_handler.py +++ b/src/memos/api/handlers/chat_handler.py @@ -60,6 +60,7 @@ def __init__( self, dependencies: HandlerDependencies, chat_llms: dict[str, Any], + playground_chat_llms: dict[str, Any] | None = None, search_handler=None, add_handler=None, online_bot=None, @@ -70,6 +71,7 @@ def __init__( Args: dependencies: HandlerDependencies instance chat_llms: Dictionary mapping model names to LLM instances + playground_chat_llms: Optional model map for /chat/stream/playground search_handler: Optional SearchHandler instance (created if not provided) add_handler: Optional AddHandler instance (created if not provided) online_bot: Optional DingDing bot function for notifications @@ -89,6 +91,7 @@ def __init__( add_handler = AddHandler(dependencies) self.chat_llms = chat_llms + self.playground_chat_llms = playground_chat_llms or chat_llms self.search_handler = search_handler self.add_handler = add_handler self.online_bot = online_bot @@ -630,10 +633,11 @@ def generate_chat_response() -> Generator[str, None, None]: # Step 3: Generate streaming response from LLM try: - model = next(iter(self.chat_llms.keys())) + chat_llms = self.playground_chat_llms + model = next(iter(chat_llms.keys())) self.logger.info(f"[PLAYGROUND CHAT] Chat Playground Stream Model: {model}") start = time.time() - response_stream = self.chat_llms[model].generate_stream( + response_stream = chat_llms[model].generate_stream( current_messages, model_name_or_path=model ) diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 03dcc8412..b9c209e61 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -157,6 +157,7 @@ def init_server() -> dict[str, Any]: graph_db_config = build_graph_db_config() llm_config = build_llm_config() chat_llm_config = build_chat_llm_config() + playground_chat_llm_config = build_chat_llm_config("PLAYGROUND_CHAT_MODEL_LIST") embedder_config = build_embedder_config() nli_client_config = build_nli_client_config() mem_reader_config = build_mem_reader_config() @@ -174,6 +175,11 @@ def init_server() -> dict[str, Any]: if os.getenv("ENABLE_CHAT_API", "false") == "true" else None ) + playground_chat_llms = ( + _init_chat_llms(playground_chat_llm_config) + if os.getenv("ENABLE_CHAT_API", "false") == "true" and playground_chat_llm_config + else chat_llms + ) embedder = EmbedderFactory.from_config(embedder_config) plugin_context = build_plugin_context( @@ -317,6 +323,7 @@ def init_server() -> dict[str, Any]: "mem_reader": mem_reader, "llm": llm, "chat_llms": chat_llms, + "playground_chat_llms": playground_chat_llms, "embedder": embedder, "reranker": reranker, "internet_retriever": internet_retriever, diff --git a/src/memos/api/handlers/config_builders.py b/src/memos/api/handlers/config_builders.py index d29429fc9..0a083e284 100644 --- a/src/memos/api/handlers/config_builders.py +++ b/src/memos/api/handlers/config_builders.py @@ -85,14 +85,17 @@ def build_llm_config() -> dict[str, Any]: ) -def build_chat_llm_config() -> list[dict[str, Any]]: +def build_chat_llm_config(env_name: str = "CHAT_MODEL_LIST") -> list[dict[str, Any]]: """ Build chat LLM configuration. Returns: Validated chat LLM configuration dictionary + Args: + env_name: Environment variable that contains the JSON chat model list. + """ - configs = json.loads(os.getenv("CHAT_MODEL_LIST", "[]")) + configs = json.loads(os.getenv(env_name, "[]")) return [ { "config_class": LLMConfigFactory.model_validate( diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index fa8a0b396..351d3a54e 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -76,10 +76,11 @@ add_handler = AddHandler(dependencies) chat_handler = ( ChatHandler( - dependencies, - components["chat_llms"], - search_handler, - add_handler, + dependencies=dependencies, + chat_llms=components["chat_llms"], + playground_chat_llms=components.get("playground_chat_llms"), + search_handler=search_handler, + add_handler=add_handler, online_bot=components.get("online_bot"), ) if os.getenv("ENABLE_CHAT_API", "false") == "true" diff --git a/src/memos/mem_scheduler/base_mixins/queue_ops.py b/src/memos/mem_scheduler/base_mixins/queue_ops.py index 13de79b3d..e8d215dc6 100644 --- a/src/memos/mem_scheduler/base_mixins/queue_ops.py +++ b/src/memos/mem_scheduler/base_mixins/queue_ops.py @@ -10,6 +10,7 @@ from memos.context.context import ( ContextThread, RequestContext, + get_current_api_path, get_current_context, get_current_trace_id, set_request_context, @@ -38,6 +39,7 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt return current_trace_id = get_current_trace_id() + current_api_path = get_current_api_path() immediate_msgs: list[ScheduleMessageItem] = [] queued_msgs: list[ScheduleMessageItem] = [] @@ -45,6 +47,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt for msg in messages: if current_trace_id: msg.trace_id = current_trace_id + if current_api_path and not getattr(msg, "api_path", None): + msg.api_path = current_api_path with suppress(Exception): self.metrics.task_enqueued(user_id=msg.user_id, task_type=msg.label) @@ -173,6 +177,7 @@ def _message_consumer(self) -> None: try: msg_context = RequestContext( trace_id=msg.trace_id, + api_path=msg.api_path, user_name=msg.user_name, ) set_request_context(msg_context) diff --git a/src/memos/mem_scheduler/base_mixins/web_log_ops.py b/src/memos/mem_scheduler/base_mixins/web_log_ops.py index 64b5348d3..7fba9916b 100644 --- a/src/memos/mem_scheduler/base_mixins/web_log_ops.py +++ b/src/memos/mem_scheduler/base_mixins/web_log_ops.py @@ -1,6 +1,7 @@ from __future__ import annotations from memos.log import get_logger +from memos.context.context import get_current_api_path from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem from memos.mem_scheduler.schemas.task_schemas import ( ADD_TASK_LABEL, @@ -28,6 +29,9 @@ def _submit_web_logs( if self.rabbitmq_config is None: return try: + current_api_path = get_current_api_path() + if current_api_path and not getattr(message, "api_path", None): + message.api_path = current_api_path logger.info( "[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish %s", message.model_dump_json(indent=2), diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index fd83ec86f..495b7b9ea 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -3,6 +3,7 @@ from collections.abc import Callable from memos.log import get_logger +from memos.context.context import get_current_api_path from memos.mem_cube.general import GeneralMemCube from memos.mem_scheduler.general_modules.base import BaseSchedulerModule from memos.mem_scheduler.schemas.general_schemas import ( @@ -125,6 +126,7 @@ def create_autofilled_log_item( log_content=log_content, current_memory_sizes=current_memory_sizes, memory_capacities=memory_capacities, + api_path=get_current_api_path(), ) return log_message diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index c11d30470..ceb89e0de 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -53,6 +53,7 @@ class ScheduleMessageItem(BaseModel, DictConversionMixin): description="user name / display name (optional)", ) info: dict | None = Field(default=None, description="user custom info") + api_path: str | None = Field(default=None, description="source HTTP API path") task_id: str | None = Field( default=None, description="Optional business-level task ID. Multiple items can share the same task_id.", @@ -94,6 +95,7 @@ def to_dict(self) -> dict: "timestamp": self.timestamp.isoformat(), "user_name": self.user_name, "task_id": self.task_id if self.task_id is not None else "", + "api_path": self.api_path if self.api_path is not None else "", "chat_history": self.chat_history if self.chat_history is not None else [], "user_context": self.user_context.model_dump(exclude_none=True) if self.user_context @@ -152,6 +154,7 @@ def _decode(val: Any) -> Any: timestamp=timestamp, user_name=_decode(data.get("user_name")), task_id=_decode(data.get("task_id")), + api_path=_decode(data.get("api_path")), chat_history=chat_history, user_context=UserContext.model_validate(raw_user_context) if raw_user_context else None, ) @@ -209,6 +212,7 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): ) source_doc_id: str | None = Field(default=None, description="Source document ID") chat_history: list | None = Field(default=None, description="user chat history") + api_path: str | None = Field(default=None, description="source HTTP API path") def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes.""" diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 02cd59e8c..60dc04fcd 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -24,7 +24,7 @@ from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue from memos.mem_scheduler.task_schedule_modules.task_queue import ScheduleTaskQueue -from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube, is_cloud_env +from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube, is_playground_api from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker @@ -140,6 +140,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): # Propagate trace_id and user info to logging context for this handler execution ctx = RequestContext( trace_id=trace_id, + api_path=getattr(first_msg, "api_path", None), user_name=getattr(first_msg, "user_name", None), user_type=None, ) @@ -317,8 +318,7 @@ def _maybe_emit_task_completion( mem_cube_id = first.mem_cube_id try: - cloud_env = is_cloud_env() - if not cloud_env: + if is_playground_api(): return for task_id in task_ids: @@ -345,6 +345,7 @@ def _maybe_emit_task_completion( log_content=f"Task {task_id} completed", status="completed", source_doc_id=source_doc_id, + api_path=getattr(messages[0], "api_path", None) if messages else None, ) self.submit_web_logs(event) @@ -369,6 +370,7 @@ def _maybe_emit_task_completion( log_content=f"Task {task_id} failed: {error_msg}", status="failed", source_doc_id=source_doc_id, + api_path=getattr(messages[0], "api_path", None) if messages else None, ) self.submit_web_logs(event) except Exception: diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py index e4a88a635..81e78d69f 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/add_handler.py @@ -12,7 +12,7 @@ ) from memos.mem_scheduler.task_schedule_modules.base_handler import BaseSchedulerHandler from memos.mem_scheduler.utils.filter_utils import transform_name_to_key -from memos.mem_scheduler.utils.misc_utils import is_cloud_env +from memos.mem_scheduler.utils.misc_utils import is_playground_api if TYPE_CHECKING: @@ -38,14 +38,14 @@ def batch_handler( prepared_add_items, prepared_update_items_with_original, ) - cloud_env = is_cloud_env() + playground_api = is_playground_api() - if cloud_env: - self.send_add_log_messages_to_cloud_env( + if playground_api: + self.send_add_log_messages_to_local_env( msg, prepared_add_items, prepared_update_items_with_original ) else: - self.send_add_log_messages_to_local_env( + self.send_add_log_messages_to_memory_change( msg, prepared_add_items, prepared_update_items_with_original ) @@ -231,10 +231,10 @@ def send_add_log_messages_to_local_env( logger.info("send_add_log_messages_to_local_env: %s", len(events)) if events: self.scheduler_context.services.submit_web_logs( - events, additional_log_info="send_add_log_messages_to_cloud_env" + events, additional_log_info="send_add_log_messages_to_local_env" ) - def send_add_log_messages_to_cloud_env( + def send_add_log_messages_to_memory_change( self, msg: ScheduleMessageItem, prepared_add_items, @@ -278,7 +278,7 @@ def send_add_log_messages_to_cloud_env( if kb_log_content: logger.info( - "[DIAGNOSTIC] add_handler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: %s, mem_cube_id: %s, task_id: %s. KB content: %s", + "[DIAGNOSTIC] add_handler.send_add_log_messages_to_memory_change: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: %s, mem_cube_id: %s, task_id: %s. KB content: %s", msg.user_id, msg.mem_cube_id, msg.task_id, diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/feedback_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/feedback_handler.py index 173d37b50..445a50d8e 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/feedback_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/feedback_handler.py @@ -11,7 +11,7 @@ USER_INPUT_TYPE, ) from memos.mem_scheduler.task_schedule_modules.base_handler import BaseSchedulerHandler -from memos.mem_scheduler.utils.misc_utils import is_cloud_env +from memos.mem_scheduler.utils.misc_utils import is_playground_api logger = get_logger(__name__) @@ -75,8 +75,8 @@ def process_single_feedback(self, message: ScheduleMessageItem) -> None: mem_cube_id, ) - cloud_env = is_cloud_env() - if cloud_env: + playground_api = is_playground_api() + if not playground_api: record = feedback_result.get("record") if isinstance(feedback_result, dict) else {} add_records = record.get("add") if isinstance(record, dict) else [] update_records = record.get("update") if isinstance(record, dict) else [] @@ -191,6 +191,7 @@ def _extract_fields(mem_item): ) else: logger.info( - "Skipping web log for feedback. Not in a cloud environment (is_cloud_env=%s)", - cloud_env, + "Skipping memory-change web log for feedback on playground API " + "(is_playground_api=%s)", + playground_api, ) diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index 36cc97bdf..90ace633c 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -19,7 +19,7 @@ ) from memos.mem_scheduler.task_schedule_modules.base_handler import BaseSchedulerHandler from memos.mem_scheduler.utils.filter_utils import transform_name_to_key -from memos.mem_scheduler.utils.misc_utils import is_cloud_env +from memos.mem_scheduler.utils.misc_utils import is_playground_api from memos.memories.textual.tree import TreeTextMemory @@ -268,8 +268,8 @@ def _process_memories_with_reader( "[Scheduler] merged_from provided but graph_db is unavailable; skip archiving." ) - cloud_env = is_cloud_env() - if cloud_env: + playground_api = is_playground_api() + if not playground_api: kb_log_content = [] for item in flattened_memories: metadata = getattr(item, "metadata", None) @@ -448,8 +448,8 @@ def _process_memories_with_reader( exc_info=True, ) with contextlib.suppress(Exception): - cloud_env = is_cloud_env() - if cloud_env: + playground_api = is_playground_api() + if not playground_api: if not kb_log_content: trigger_source = ( info.get("trigger_source", "Messages") if info else "Messages" diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index 6bcf0023c..17a787895 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -5,7 +5,7 @@ the local memos_message_queue functionality in BaseScheduler. """ -from memos.context.context import get_current_trace_id +from memos.context.context import get_current_api_path, get_current_trace_id from memos.log import get_logger from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.task_schedule_modules.local_queue import SchedulerLocalQueue @@ -104,11 +104,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt return current_trace_id = get_current_trace_id() + current_api_path = get_current_api_path() for msg in messages: if current_trace_id: # Prefer current request trace_id so logs can be correlated msg.trace_id = current_trace_id + if current_api_path and not getattr(msg, "api_path", None): + msg.api_path = current_api_path msg.stream_key = self.memos_message_queue.get_stream_key( user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label ) diff --git a/src/memos/mem_scheduler/utils/misc_utils.py b/src/memos/mem_scheduler/utils/misc_utils.py index 3ce727b5c..e0ccc238d 100644 --- a/src/memos/mem_scheduler/utils/misc_utils.py +++ b/src/memos/mem_scheduler/utils/misc_utils.py @@ -1,14 +1,13 @@ import json -import os import re import traceback - from collections import defaultdict from functools import wraps from pathlib import Path import yaml +from memos.context.context import get_current_api_path from memos.log import get_logger from memos.mem_scheduler.schemas.message_schemas import ( ScheduleMessageItem, @@ -17,39 +16,15 @@ logger = get_logger(__name__) - -def _normalize_env_value(value: str | None) -> str: - """Normalize environment variable values for comparison.""" - return value.strip().lower() if isinstance(value, str) else "" - - -def is_playground_env() -> bool: - """Return True when ENV_NAME indicates a Playground environment.""" - env_name = _normalize_env_value(os.getenv("ENV_NAME")) - return env_name.startswith("playground") +PLAYGROUND_CHAT_STREAM_PATH = "/product/chat/stream/playground" -def is_cloud_env() -> bool: +def is_playground_api() -> bool: """ - Determine whether the scheduler should treat the runtime as a cloud environment. - - Rules: - - Any Playground ENV_NAME is explicitly NOT cloud. - - MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME must be set to enable cloud behavior. - - The default memos-fanout/fanout combination is treated as non-cloud. + Determine whether the scheduler should use old playground behavior. """ - if is_playground_env(): - return False - - exchange_name = _normalize_env_value(os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")) - exchange_type = _normalize_env_value(os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_TYPE")) - - if not exchange_name: - return False - - return not ( - exchange_name == "memos-fanout" and (not exchange_type or exchange_type == "fanout") - ) + api_path = get_current_api_path() + return api_path == PLAYGROUND_CHAT_STREAM_PATH def extract_json_obj(text: str): diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index a07934b8e..3123bfdee 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -18,6 +18,8 @@ logger = get_logger(__name__) +PLAYGROUND_CHAT_STREAM_PATH = "/product/chat/stream/playground" + class RabbitMQSchedulerModule(BaseSchedulerModule): @require_python_package( @@ -37,6 +39,8 @@ def __init__(self): self.rabbit_queue_name = "memos-scheduler" self.rabbitmq_exchange_name = "memos-fanout" # Default, will be overridden by config self.rabbitmq_exchange_type = FANOUT_EXCHANGE_TYPE # Default, will be overridden by config + self.rabbitmq_playground_chat_exchange_name: str | None = None + self.rabbitmq_playground_chat_exchange_type = FANOUT_EXCHANGE_TYPE self.rabbitmq_connection = None self.rabbitmq_channel = None @@ -154,6 +158,27 @@ def initialize_rabbitmq( self.rabbitmq_exchange_type = env_exchange_type logger.info(f"Using env exchange type override: {self.rabbitmq_exchange_type}") + playground_exchange_name = os.getenv( + "MEMSCHEDULER_RABBITMQ_PLAYGROUND_CHAT_EXCHANGE_NAME", "" + ).strip() + playground_exchange_type = os.getenv( + "MEMSCHEDULER_RABBITMQ_PLAYGROUND_CHAT_EXCHANGE_TYPE", + FANOUT_EXCHANGE_TYPE, + ).strip() + if playground_exchange_name: + self.rabbitmq_playground_chat_exchange_name = playground_exchange_name + self.rabbitmq_playground_chat_exchange_type = ( + playground_exchange_type or FANOUT_EXCHANGE_TYPE + ) + logger.info( + "Using playground chat exchange override: name=%s, type=%s", + self.rabbitmq_playground_chat_exchange_name, + self.rabbitmq_playground_chat_exchange_type, + ) + else: + self.rabbitmq_playground_chat_exchange_name = None + self.rabbitmq_playground_chat_exchange_type = FANOUT_EXCHANGE_TYPE + # Start connection process parameters = self.get_rabbitmq_connection_param() self.rabbitmq_connection = SelectConnection( @@ -260,16 +285,33 @@ def on_rabbitmq_channel_open(self, channel): self.rabbitmq_channel = channel logger.info("[DIAGNOSTIC] RabbitMQ channel opened") - # Setup exchange and queue + # Setup primary/direct exchange and optional playground/fanout exchange. channel.exchange_declare( exchange=self.rabbitmq_exchange_name, exchange_type=self.rabbitmq_exchange_type, durable=True, - callback=self.on_rabbitmq_exchange_declared, + callback=self.on_rabbitmq_primary_exchange_declared, ) - def on_rabbitmq_exchange_declared(self, frame): - """Called when exchange is ready.""" + def on_rabbitmq_primary_exchange_declared(self, frame): + """Called when primary exchange is ready.""" + if self.rabbitmq_playground_chat_exchange_name: + self.rabbitmq_channel.exchange_declare( + exchange=self.rabbitmq_playground_chat_exchange_name, + exchange_type=self.rabbitmq_playground_chat_exchange_type, + durable=True, + callback=self.on_rabbitmq_playground_exchange_declared, + ) + return + + self._rabbitmq_continue_queue_setup() + + def on_rabbitmq_playground_exchange_declared(self, frame): + """Called when optional playground exchange is ready.""" + self._rabbitmq_continue_queue_setup() + + def _rabbitmq_continue_queue_setup(self): + """Declare scheduler queue and bind it to the primary exchange.""" self.rabbitmq_channel.queue_declare( queue=self.rabbit_queue_name, durable=True, callback=self.on_rabbitmq_queue_declared ) @@ -289,6 +331,13 @@ def on_rabbitmq_bind_ok(self, frame): # Flush any cached publish messages now that connection is ready self._flush_cached_publish_messages() + def resolve_publish_route(self, message: dict) -> tuple[str, str]: + api_path = message.get("api_path") + if api_path == PLAYGROUND_CHAT_STREAM_PATH and self.rabbitmq_playground_chat_exchange_name: + return self.rabbitmq_playground_chat_exchange_name, "" + + return self.rabbitmq_exchange_name, "" + def on_rabbitmq_message(self, channel, method, properties, body): """Handle incoming messages. Only for test.""" try: @@ -327,34 +376,17 @@ def rabbitmq_publish_message(self, message: dict): """ import pika - exchange_name = self.rabbitmq_exchange_name - routing_key = self.rabbit_queue_name + exchange_name, routing_key = self.resolve_publish_route(message) label = message.get("label") - # Special handling for knowledgeBaseUpdate in local environment: always empty routing key - if label == "knowledgeBaseUpdate": - routing_key = "" - - # Env override: apply to all message types when MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME is set - env_exchange_name = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") - env_routing_key = os.getenv("MEMSCHEDULER_RABBITMQ_ROUTING_KEY") - if env_exchange_name: - exchange_name = env_exchange_name - routing_key = ( - env_routing_key if env_routing_key is not None and env_routing_key != "" else "" - ) - logger.info( - f"[DIAGNOSTIC] Publishing {label} message with env exchange override. " - f"Exchange: {exchange_name}, Routing Key: '{routing_key}'." - ) - logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") - elif label == "knowledgeBaseUpdate": - # Original diagnostic logging for knowledgeBaseUpdate if NOT in cloud env - logger.info( - f"[DIAGNOSTIC] Publishing knowledgeBaseUpdate message (Local Env). " - f"Current configured Exchange: {exchange_name}, Routing Key: '{routing_key}'." - ) - logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") + logger.info( + "[DIAGNOSTIC] Publishing %s message. api_path=%s Exchange: %s, Routing Key: '%s'.", + label, + message.get("api_path"), + exchange_name, + routing_key, + ) + logger.info(f" - Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}") with self._rabbitmq_lock: logger.info(