diff --git a/pyproject.toml b/pyproject.toml index 05eb2d395..7165e1fcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,6 +124,8 @@ urllib3 = "2.5.0" wikipedia = "^1.4.0" xai-sdk = "^1.5.0" youtube-transcript-api = "^0.6.3" +openwakeword = {version = "^0.6.0", optional = true} +pycaw = {version = "^20240210", optional = true} [tool.poetry.group.dev.dependencies] pyinstaller = "^6.4.0" diff --git a/src/pygpt_net/app.py b/src/pygpt_net/app.py index 857a26fac..6361ff787 100755 --- a/src/pygpt_net/app.py +++ b/src/pygpt_net/app.py @@ -245,6 +245,9 @@ def run(**kwargs): from pygpt_net.plugin.mcp import Plugin as MCPPlugin from pygpt_net.plugin.wolfram import Plugin as WolframPlugin from pygpt_net.plugin.osm import Plugin as OSMPlugin + from pygpt_net.plugin.wake_word import Plugin as WakeWordPlugin + from pygpt_net.plugin.cmd_app_launcher import Plugin as AppLauncherPlugin + from pygpt_net.plugin.assistant_mode import Plugin as AssistantModePlugin # agents (Llama-index) from pygpt_net.provider.agents.llama_index.legacy.openai_assistant import OpenAIAssistantAgent @@ -465,6 +468,9 @@ def run(**kwargs): launcher.add_plugin(MCPPlugin()) launcher.add_plugin(WolframPlugin()) launcher.add_plugin(OSMPlugin()) + launcher.add_plugin(WakeWordPlugin()) + launcher.add_plugin(AppLauncherPlugin()) + launcher.add_plugin(AssistantModePlugin()) # register custom plugins plugins = kwargs.get('plugins', None) diff --git a/src/pygpt_net/config.py b/src/pygpt_net/config.py index 51c2625e2..d4827d7b4 100755 --- a/src/pygpt_net/config.py +++ b/src/pygpt_net/config.py @@ -516,17 +516,21 @@ def load_config(self, all: bool = True): :param all: load all configs """ - self.data = self.provider.load(all) - if self.data is not None: - self.data = dict(sorted(self.data.items(), key=itemgetter(0))) + loaded = self.provider.load(all) + if loaded is None: + self.data = {} + else: + self.data = dict(sorted(loaded.items(), key=itemgetter(0))) def load_base_config(self): """ Load app config from JSON file """ - self.data_base = self.provider.load_base() - if self.data_base is not None: - self.data_base = dict(sorted(self.data_base.items(), key=itemgetter(0))) + loaded_base = self.provider.load_base() + if loaded_base is None: + self.data_base = {} + else: + self.data_base = dict(sorted(loaded_base.items(), key=itemgetter(0))) self.initialized_base = True def from_base_config(self): diff --git a/src/pygpt_net/controller/plugins/settings.py b/src/pygpt_net/controller/plugins/settings.py index 436cec88b..37caa60c5 100755 --- a/src/pygpt_net/controller/plugins/settings.py +++ b/src/pygpt_net/controller/plugins/settings.py @@ -32,11 +32,9 @@ def __init__(self, window=None): self.height = 500 def setup(self): - """Set up plugin settings""" - idx = None - if 'plugin.settings' in self.window.ui.tabs: - idx = self.window.ui.tabs['plugin.settings'].currentIndex() - self.window.plugin_settings.setup(idx) + """Set up plugin settings (skeleton now, full build deferred)""" + self.window.plugin_settings.setup_skeleton() + self.config_initialized = False def toggle_editor(self): """Toggle plugin settings dialog""" diff --git a/src/pygpt_net/core/profile/profile.py b/src/pygpt_net/core/profile/profile.py index fcbe76c80..b473e348b 100644 --- a/src/pygpt_net/core/profile/profile.py +++ b/src/pygpt_net/core/profile/profile.py @@ -99,14 +99,16 @@ def load(self): """Load profiles""" f = os.path.join(self.base_workdir, self.PROFILE_FILE) if os.path.exists(f): - with open(f, 'r', encoding='utf-8') as f: - try: - data = json.load(f) + try: + with open(f, 'r', encoding='utf-8') as fh: + data = json.load(fh) self.current = data['current'] self.profiles = data['profiles'] self.initialized = True - except Exception as e: - print('CRITICAL: Error loading profile file:', e) + except FileNotFoundError: + pass # race condition: file removed between exists() and open() + except Exception as e: + print('CRITICAL: Error loading profile file:', e) def save(self): """Save profiles""" diff --git a/src/pygpt_net/plugin/assistant_mode/__init__.py b/src/pygpt_net/plugin/assistant_mode/__init__.py new file mode 100644 index 000000000..e064b4c13 --- /dev/null +++ b/src/pygpt_net/plugin/assistant_mode/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from .plugin import Plugin diff --git a/src/pygpt_net/plugin/assistant_mode/config.py b/src/pygpt_net/plugin/assistant_mode/config.py new file mode 100644 index 000000000..fa1bc0b42 --- /dev/null +++ b/src/pygpt_net/plugin/assistant_mode/config.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from pygpt_net.plugin.base.config import BaseConfig, BasePlugin + + +class Config(BaseConfig): + def __init__(self, plugin: BasePlugin = None, *args, **kwargs): + super(Config, self).__init__(plugin) + self.plugin = plugin + + def from_defaults(self, plugin: BasePlugin = None): + """ + Set default options for plugin + + :param plugin: plugin instance + """ + plugin.add_option( + "auto_listen_after_response", + type="bool", + value=True, + label="Auto-listen after response", + description="Automatically start listening for voice input after the AI finishes " + "speaking its response. Creates a continuous conversation loop. Default: True", + ) + plugin.add_option( + "require_wake_word_each_turn", + type="bool", + value=False, + label="Require wake word each turn", + description="If enabled, the user must say the wake word before each command. " + "If disabled, after the first wake word the assistant stays in " + "continuous conversation mode until a stop word is detected. Default: False", + ) + plugin.add_option( + "conversation_timeout", + type="int", + value=30, + label="Conversation timeout (seconds)", + description="If no input is received within this time, the assistant goes back to " + "wake word listening mode. Set to 0 to disable. Default: 30", + min=0, + max=120, + slider=True, + tooltip="Conversation timeout in seconds, default: 30, 0 = disabled", + ) + plugin.add_option( + "stop_words", + type="text", + value="goodbye, bye, stop listening, that's all, go to sleep", + label="Stop words", + description="Phrases that end the conversation loop and return to wake word mode. " + "Separate with commas.", + ) + plugin.add_option( + "greeting_enabled", + type="bool", + value=True, + label="Greeting on activation", + description="Speak a short greeting when the assistant is activated by wake word. " + "Default: True", + ) + plugin.add_option( + "greeting_text", + type="text", + value="Yes?", + label="Greeting text", + description="Text to speak when the assistant is activated. Default: Yes?", + ) + plugin.add_option( + "auto_enable_plugins", + type="bool", + value=True, + label="Auto-enable required plugins", + description="Automatically enable Wake Word, Audio Input, and Audio Output plugins " + "when Assistant Mode is enabled. Default: True", + ) + plugin.add_option( + "response_delay", + type="float", + value=0.5, + label="Post-response delay (seconds)", + description="Delay after audio response finishes before listening again. " + "Prevents the assistant from hearing its own voice. Default: 0.5", + min=0.0, + max=3.0, + slider=True, + multiplier=10, + ) diff --git a/src/pygpt_net/plugin/assistant_mode/plugin.py b/src/pygpt_net/plugin/assistant_mode/plugin.py new file mode 100644 index 000000000..a12777d5d --- /dev/null +++ b/src/pygpt_net/plugin/assistant_mode/plugin.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +import time +import threading + +from PySide6.QtCore import Slot, QTimer + +from pygpt_net.plugin.base.plugin import BasePlugin +from pygpt_net.core.events import Event +from pygpt_net.item.ctx import CtxItem + +from .config import Config + + +class Plugin(BasePlugin): + def __init__(self, *args, **kwargs): + super(Plugin, self).__init__(*args, **kwargs) + self.id = "assistant_mode" + self.name = "Assistant Mode" + self.type = [ + "audio.control", + ] + self.description = ( + "Orchestrates Wake Word, Audio Input, and Audio Output plugins into a " + "seamless voice assistant loop: wake word -> listen -> respond -> repeat." + ) + self.prefix = "Assistant" + self.order = 200 # after other plugins + self.use_locale = True + self.config = Config(self) + + # State + self.active = False # assistant mode is active + self.in_conversation = False # currently in a conversation turn + self.last_interaction_time = 0 + self.timeout_timer = None + self.init_options() + + def init_options(self): + """Initialize options""" + self.config.from_defaults(self) + + def handle(self, event: Event, *args, **kwargs): + """ + Handle dispatched event + + :param event: event object + :param args: args + :param kwargs: kwargs + """ + name = event.name + data = event.data + ctx = event.ctx + + if name == Event.ENABLE: + if data["value"] == self.id: + self.on_enable() + + elif name == Event.DISABLE: + if data["value"] == self.id: + self.on_disable() + + elif name == Event.INPUT_BEFORE: + if self.active: + self.on_input_before(data.get("value", "")) + + elif name == Event.CTX_BEGIN: + if self.active: + self.in_conversation = True + + elif name == Event.CTX_END: + if self.active and self.in_conversation: + self.on_response_complete(ctx) + + def on_enable(self): + """Enable assistant mode and set up required plugins""" + self.active = True + self.log("Assistant Mode activated") + + if self.get_option_value("auto_enable_plugins"): + plugins_to_enable = ["wake_word", "audio_input", "audio_output"] + for pid in plugins_to_enable: + plugin = self.window.core.plugins.get(pid) + if plugin and not plugin.enabled: + self.window.controller.plugins.enable(pid) + self.log("Auto-enabled plugin: {}".format(pid)) + + def on_disable(self): + """Disable assistant mode""" + self.active = False + self.in_conversation = False + self.stop_timeout_timer() + self.log("Assistant Mode deactivated") + + def on_input_before(self, text: str): + """ + Called before user input is sent. Check for stop words. + + :param text: user input text + """ + self.last_interaction_time = time.time() + + stop_words_str = self.get_option_value("stop_words") + if stop_words_str: + stop_words = [w.strip().lower() for w in stop_words_str.split(",") if w.strip()] + text_lower = text.strip().lower().replace(".", "").replace("!", "").replace("?", "") + for stop_word in stop_words: + if stop_word in text_lower: + self.log("Stop word detected: '{}'. Ending conversation.".format(stop_word)) + self.end_conversation() + return + + def on_response_complete(self, ctx: CtxItem): + """ + Called when AI response is complete (after TTS playback). + Decides whether to continue listening. + + :param ctx: context item + """ + self.in_conversation = False + + if not self.active: + return + + if self.get_option_value("require_wake_word_each_turn"): + # Go back to wake word mode + self.log("Waiting for wake word...") + return + + # Auto-listen for next command + if self.get_option_value("auto_listen_after_response"): + delay = self.get_option_value("response_delay") + if delay and delay > 0: + # Use QTimer for thread-safe delayed execution + QTimer.singleShot(int(delay * 1000), self.start_listening) + else: + self.start_listening() + + # Start conversation timeout + self.start_timeout_timer() + + def start_listening(self): + """Trigger audio input to start listening""" + if not self.active: + return + + audio_input = self.window.core.plugins.get("audio_input") + if audio_input is None or not audio_input.enabled: + return + + self.log("Listening for next command...") + self.last_interaction_time = time.time() + + if not audio_input.is_advanced(): + audio_input.toggle_recording_simple(state=True, auto=True) + else: + audio_input.magic_word_detected = True + if not audio_input.speech_enabled: + audio_input.toggle_speech(True) + + def end_conversation(self): + """End the conversation loop, go back to wake word mode""" + self.in_conversation = False + self.stop_timeout_timer() + self.log("Conversation ended, returning to wake word mode") + + def start_timeout_timer(self): + """Start the conversation timeout timer""" + timeout = self.get_option_value("conversation_timeout") + if timeout <= 0: + return + + self.stop_timeout_timer() + self.timeout_timer = QTimer() + self.timeout_timer.setSingleShot(True) + self.timeout_timer.timeout.connect(self.on_timeout) + self.timeout_timer.start(timeout * 1000) + + def stop_timeout_timer(self): + """Stop the conversation timeout timer""" + if self.timeout_timer is not None: + try: + self.timeout_timer.stop() + except RuntimeError: + pass + self.timeout_timer = None + + @Slot() + def on_timeout(self): + """Handle conversation timeout""" + if not self.active: + return + + elapsed = time.time() - self.last_interaction_time + timeout = self.get_option_value("conversation_timeout") + if elapsed >= timeout: + self.log("Conversation timed out ({} seconds)".format(timeout)) + self.end_conversation() + + def on_wake_word_activated(self): + """ + Called by wake_word plugin when wake word is detected. + Can be used to speak a greeting. + """ + if not self.active: + return + + self.last_interaction_time = time.time() + + if self.get_option_value("greeting_enabled"): + greeting = self.get_option_value("greeting_text") + if greeting: + self.speak_text(greeting) + + def speak_text(self, text: str): + """ + Speak text using the audio output system + + :param text: text to speak + """ + audio_output = self.window.core.plugins.get("audio_output") + if audio_output is None or not audio_output.enabled: + return + + try: + ctx = CtxItem() + ctx.output = text + event = Event(Event.AUDIO_READ_TEXT, ctx=ctx) + self.window.dispatch(event) + except Exception as e: + self.log("Error speaking text: {}".format(str(e))) + + def destroy(self): + """Destroy plugin""" + self.active = False + self.in_conversation = False + self.stop_timeout_timer() diff --git a/src/pygpt_net/plugin/cmd_app_launcher/__init__.py b/src/pygpt_net/plugin/cmd_app_launcher/__init__.py new file mode 100644 index 000000000..e064b4c13 --- /dev/null +++ b/src/pygpt_net/plugin/cmd_app_launcher/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from .plugin import Plugin diff --git a/src/pygpt_net/plugin/cmd_app_launcher/config.py b/src/pygpt_net/plugin/cmd_app_launcher/config.py new file mode 100644 index 000000000..4f9cadff9 --- /dev/null +++ b/src/pygpt_net/plugin/cmd_app_launcher/config.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from pygpt_net.plugin.base.config import BaseConfig, BasePlugin + + +class Config(BaseConfig): + def __init__(self, plugin: BasePlugin = None, *args, **kwargs): + super(Config, self).__init__(plugin) + self.plugin = plugin + + def from_defaults(self, plugin: BasePlugin = None): + """ + Set default options for plugin + + :param plugin: plugin instance + """ + # --- Commands --- + + plugin.add_cmd( + "app_launch", + instruction="Launch/open an application by name on the user's system. " + "Use the app name as it would appear in the Start Menu or PATH. " + "Examples: 'chrome', 'notepad', 'spotify', 'firefox', 'vscode'.", + params=[ + { + "name": "app_name", + "type": "str", + "description": "Application name to launch (e.g. 'chrome', 'notepad', 'spotify')", + "required": True, + }, + { + "name": "args", + "type": "str", + "description": "Optional arguments to pass to the application", + "required": False, + }, + ], + enabled=True, + ) + + plugin.add_cmd( + "app_close", + instruction="Close/kill a running application by name. " + "This will terminate all processes matching the given name.", + params=[ + { + "name": "app_name", + "type": "str", + "description": "Application/process name to close (e.g. 'chrome', 'notepad')", + "required": True, + }, + ], + enabled=True, + ) + + plugin.add_cmd( + "app_list_running", + instruction="List currently running applications/processes with visible windows. " + "Returns a list of window titles and process names.", + params=[], + enabled=True, + ) + + plugin.add_cmd( + "app_open_url", + instruction="Open a URL in the user's default web browser. " + "Can also open specific sites like 'youtube.com', 'github.com', etc.", + params=[ + { + "name": "url", + "type": "str", + "description": "URL to open (e.g. 'https://youtube.com' or just 'youtube.com')", + "required": True, + }, + ], + enabled=True, + ) + + plugin.add_cmd( + "app_list_installed", + instruction="List installed applications that can be launched. " + "Scans Start Menu shortcuts and PATH for available programs.", + params=[ + { + "name": "filter", + "type": "str", + "description": "Optional filter to search by name (e.g. 'chrome')", + "required": False, + }, + ], + enabled=True, + ) + + plugin.add_cmd( + "media_play_pause", + instruction="Toggle play/pause on the current media player (system-wide media key).", + params=[], + enabled=True, + ) + + plugin.add_cmd( + "media_next_track", + instruction="Skip to the next track (system-wide media key).", + params=[], + enabled=True, + ) + + plugin.add_cmd( + "media_prev_track", + instruction="Go to the previous track (system-wide media key).", + params=[], + enabled=True, + ) + + plugin.add_cmd( + "volume_set", + instruction="Set system volume to a specific level (0-100).", + params=[ + { + "name": "level", + "type": "int", + "description": "Volume level from 0 to 100", + "required": True, + }, + ], + enabled=True, + ) + + plugin.add_cmd( + "volume_mute_toggle", + instruction="Toggle system mute on/off.", + params=[], + enabled=True, + ) + + # --- Options --- + + plugin.add_option( + "custom_app_aliases", + type="dict", + value={ + "chrome": "chrome", + "firefox": "firefox", + "notepad": "notepad", + "explorer": "explorer", + "calculator": "calc", + "terminal": "cmd", + "powershell": "powershell", + "vscode": "code", + "spotify": "spotify", + }, + label="Custom app aliases", + description="Map friendly names to executable names or paths. " + "Key = alias name, Value = executable name or full path.", + keys={ + "name": "text", + "value": "text", + }, + persist=True, + ) + + plugin.add_option( + "scan_start_menu", + type="bool", + value=True, + label="Scan Start Menu", + description="Scan Windows Start Menu for installed applications. " + "Enables launching apps by their Start Menu name. Default: True", + ) diff --git a/src/pygpt_net/plugin/cmd_app_launcher/plugin.py b/src/pygpt_net/plugin/cmd_app_launcher/plugin.py new file mode 100644 index 000000000..9f8838b29 --- /dev/null +++ b/src/pygpt_net/plugin/cmd_app_launcher/plugin.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from pygpt_net.plugin.base.plugin import BasePlugin +from pygpt_net.core.events import Event +from pygpt_net.item.ctx import CtxItem + +from .config import Config + + +class Plugin(BasePlugin): + def __init__(self, *args, **kwargs): + super(Plugin, self).__init__(*args, **kwargs) + self.id = "cmd_app_launcher" + self.name = "App Launcher" + self.type = [ + "cmd", + ] + self.description = ( + "Launch, close, and control applications on your system using voice " + "or text commands. Includes media playback controls and volume management." + ) + self.prefix = "AppLauncher" + self.allowed_cmds = [ + "app_launch", + "app_close", + "app_list_running", + "app_open_url", + "app_list_installed", + "media_play_pause", + "media_next_track", + "media_prev_track", + "volume_set", + "volume_mute_toggle", + ] + self.order = 100 + self.use_locale = True + self.config = Config(self) + self.init_options() + + def init_options(self): + """Initialize options""" + self.config.from_defaults(self) + + def handle(self, event: Event, *args, **kwargs): + """ + Handle dispatched event + + :param event: event object + :param args: args + :param kwargs: kwargs + """ + name = event.name + data = event.data + ctx = event.ctx + + if name == Event.CMD_SYNTAX: + self.cmd_syntax(data) + + elif name == Event.CMD_EXECUTE: + self.cmd( + ctx, + data["commands"], + ) + + def cmd_syntax(self, data: dict): + """ + Event: CMD_SYNTAX + + :param data: event data dict + """ + for item in self.allowed_cmds: + if self.has_cmd(item): + cmd = self.get_cmd(item) + data["cmd"].append(cmd) + + def cmd(self, ctx: CtxItem, cmds: list): + """ + Event: CMD_EXECUTE + + :param ctx: CtxItem + :param cmds: commands dict + """ + from .worker import Worker + + is_cmd = False + force = False + my_commands = [] + for item in cmds: + if item["cmd"] in self.allowed_cmds: + my_commands.append(item) + is_cmd = True + if "force" in item and item["force"]: + force = True + + if not is_cmd: + return + + # set state: busy + self.cmd_prepare(ctx, my_commands) + + try: + worker = Worker() + worker.from_defaults(self) + worker.cmds = my_commands + worker.ctx = ctx + + if not self.is_async(ctx) and not force: + worker.run() + return + worker.run_async() + + except Exception as e: + self.error(e) diff --git a/src/pygpt_net/plugin/cmd_app_launcher/worker.py b/src/pygpt_net/plugin/cmd_app_launcher/worker.py new file mode 100644 index 000000000..eac43e91f --- /dev/null +++ b/src/pygpt_net/plugin/cmd_app_launcher/worker.py @@ -0,0 +1,493 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +import os +import glob +import shutil +import subprocess +import platform +import webbrowser + +from PySide6.QtCore import Slot + +from pygpt_net.plugin.base.worker import BaseWorker +from pygpt_net.utils import normalize_text + + +class Worker(BaseWorker): + def __init__(self, *args, **kwargs): + super(Worker, self).__init__() + self.args = args + self.kwargs = kwargs + self.cmds = None + self.ctx = None + + @Slot() + def run(self): + """Run worker.""" + try: + for item in self.cmds: + response = None + + if item["cmd"] == "app_launch": + response = self.cmd_app_launch(item) + + elif item["cmd"] == "app_close": + response = self.cmd_app_close(item) + + elif item["cmd"] == "app_list_running": + response = self.cmd_app_list_running(item) + + elif item["cmd"] == "app_open_url": + response = self.cmd_app_open_url(item) + + elif item["cmd"] == "app_list_installed": + response = self.cmd_app_list_installed(item) + + elif item["cmd"] == "media_play_pause": + response = self.cmd_media_key(item, "play_pause") + + elif item["cmd"] == "media_next_track": + response = self.cmd_media_key(item, "next_track") + + elif item["cmd"] == "media_prev_track": + response = self.cmd_media_key(item, "prev_track") + + elif item["cmd"] == "volume_set": + response = self.cmd_volume_set(item) + + elif item["cmd"] == "volume_mute_toggle": + response = self.cmd_media_key(item, "mute") + + if response is not None: + self.reply(response) + + except Exception as e: + self.error(e) + finally: + self.cleanup() + + def cmd_app_launch(self, item: dict) -> dict: + """ + Launch an application + + :param item: command item + :return: response dict + """ + app_name = normalize_text(self.get_param(item, "app_name", "").strip()) + app_args = self.get_param(item, "args", "") + + if not app_name: + return self.make_response(item, "Error: app_name is required") + + # Check custom aliases first + aliases = self.plugin.get_option_value("custom_app_aliases") + resolved = None + if isinstance(aliases, dict): + for alias, target in aliases.items(): + if normalize_text(alias) == app_name: + resolved = target + break + + # Try Start Menu shortcuts + if resolved is None and platform.system() == "Windows": + resolved = self.find_start_menu_shortcut(app_name) + + # Try finding in PATH + if resolved is None: + found_in_path = shutil.which(app_name) + if found_in_path: + resolved = found_in_path + + # Try common executable patterns on Windows + if resolved is None and platform.system() == "Windows": + resolved = self.find_windows_app(app_name) + + if resolved is None: + return self.make_response( + item, + "Could not find application '{}'. " + "Try adding it to custom app aliases in plugin settings.".format(app_name) + ) + + try: + cmd_parts = [resolved] + if app_args: + cmd_parts.extend(app_args.split()) + + if platform.system() == "Windows": + subprocess.Popen( + cmd_parts, + shell=False, + creationflags=subprocess.DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP, + ) + else: + subprocess.Popen(cmd_parts, start_new_session=True) + + self.log("Launched: {}".format(resolved)) + return self.make_response(item, "Launched '{}' successfully".format(app_name)) + + except Exception as e: + # Fallback: try os.startfile on Windows + if platform.system() == "Windows": + try: + os.startfile(resolved) + return self.make_response(item, "Launched '{}' successfully".format(app_name)) + except Exception: + pass + return self.make_response(item, "Failed to launch '{}': {}".format(app_name, str(e))) + + def cmd_app_close(self, item: dict) -> dict: + """ + Close a running application + + :param item: command item + :return: response dict + """ + app_name = normalize_text(self.get_param(item, "app_name", "").strip()) + if not app_name: + return self.make_response(item, "Error: app_name is required") + + if platform.system() != "Windows": + return self.make_response(item, "app_close is only supported on Windows") + + # Map common names to process names + process_map = { + "chrome": "chrome.exe", + "firefox": "firefox.exe", + "notepad": "notepad.exe", + "explorer": "explorer.exe", + "spotify": "Spotify.exe", + "vscode": "Code.exe", + "code": "Code.exe", + } + + process_name = process_map.get(app_name, app_name) + if not process_name.endswith(".exe"): + process_name += ".exe" + + try: + result = subprocess.run( + ["taskkill", "/F", "/IM", process_name], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + return self.make_response(item, "Closed '{}'".format(app_name)) + else: + return self.make_response( + item, + "Could not close '{}': {}".format(app_name, result.stderr.strip()) + ) + except Exception as e: + return self.make_response(item, "Error closing '{}': {}".format(app_name, str(e))) + + def cmd_app_list_running(self, item: dict) -> dict: + """ + List running applications with visible windows + + :param item: command item + :return: response dict + """ + if platform.system() != "Windows": + return self.make_response(item, "app_list_running is only supported on Windows") + + try: + import ctypes + from ctypes import wintypes + + user32 = ctypes.windll.user32 + windows = [] + + WNDENUMPROC = ctypes.WINFUNCTYPE( + wintypes.BOOL, wintypes.HWND, wintypes.LPARAM + ) + + def enum_callback(hwnd, lparam): + if user32.IsWindowVisible(hwnd): + length = user32.GetWindowTextLengthW(hwnd) + if length > 0: + buf = ctypes.create_unicode_buffer(length + 1) + user32.GetWindowTextW(hwnd, buf, length + 1) + title = buf.value.strip() + if title: + windows.append(title) + return True + + user32.EnumWindows(WNDENUMPROC(enum_callback), 0) + + # Deduplicate and limit + seen = set() + unique = [] + for w in windows: + if w not in seen: + seen.add(w) + unique.append(w) + + result_text = "Running windows ({}):\n".format(len(unique)) + for w in unique[:50]: # limit to 50 + result_text += "- {}\n".format(w) + + return self.make_response(item, result_text) + + except Exception as e: + return self.make_response(item, "Error listing windows: {}".format(str(e))) + + def cmd_app_open_url(self, item: dict) -> dict: + """ + Open a URL in the default browser + + :param item: command item + :return: response dict + """ + url = self.get_param(item, "url", "").strip() + if not url: + return self.make_response(item, "Error: url is required") + + # Add scheme if missing + if not url.startswith("http://") and not url.startswith("https://"): + url = "https://{}".format(url) + + try: + webbrowser.open(url) + return self.make_response(item, "Opened URL: {}".format(url)) + except Exception as e: + return self.make_response(item, "Error opening URL: {}".format(str(e))) + + def cmd_app_list_installed(self, item: dict) -> dict: + """ + List installed applications + + :param item: command item + :return: response dict + """ + name_filter = normalize_text(self.get_param(item, "filter", "").strip()) + apps = [] + + # From custom aliases + aliases = self.plugin.get_option_value("custom_app_aliases") + if isinstance(aliases, dict): + for alias, target in aliases.items(): + if not name_filter or name_filter in normalize_text(alias): + apps.append("{} -> {}".format(alias, target)) + + # From Start Menu + if platform.system() == "Windows" and self.plugin.get_option_value("scan_start_menu"): + start_menu_apps = self.scan_start_menu() + for app_label, app_path in start_menu_apps: + if not name_filter or name_filter in normalize_text(app_label): + apps.append("{} [Start Menu]".format(app_label)) + + if not apps: + return self.make_response(item, "No applications found matching '{}'".format(name_filter)) + + # Deduplicate and sort + apps = sorted(set(apps))[:100] + + result_text = "Installed applications ({}):\n".format(len(apps)) + for app in apps: + result_text += "- {}\n".format(app) + + return self.make_response(item, result_text) + + def cmd_media_key(self, item: dict, action: str) -> dict: + """ + Send a media key press + + :param item: command item + :param action: media action + :return: response dict + """ + if platform.system() != "Windows": + return self.make_response(item, "Media keys are only supported on Windows") + + try: + import ctypes + + user32 = ctypes.windll.user32 + + # Virtual key codes for media keys + VK_MEDIA = { + "play_pause": 0xB3, + "next_track": 0xB0, + "prev_track": 0xB1, + "mute": 0xAD, + } + + vk = VK_MEDIA.get(action) + if vk is None: + return self.make_response(item, "Unknown media action: {}".format(action)) + + # Simulate key press and release + KEYEVENTF_EXTENDEDKEY = 0x0001 + KEYEVENTF_KEYUP = 0x0002 + + user32.keybd_event(vk, 0, KEYEVENTF_EXTENDEDKEY, 0) + user32.keybd_event(vk, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0) + + action_labels = { + "play_pause": "Toggled play/pause", + "next_track": "Skipped to next track", + "prev_track": "Went to previous track", + "mute": "Toggled mute", + } + + return self.make_response(item, action_labels.get(action, "Done")) + + except Exception as e: + return self.make_response(item, "Error sending media key: {}".format(str(e))) + + def cmd_volume_set(self, item: dict) -> dict: + """ + Set system volume level + + :param item: command item + :return: response dict + """ + level = self.get_param(item, "level", 50) + + try: + level = int(level) + level = max(0, min(100, level)) + except (ValueError, TypeError): + return self.make_response(item, "Error: volume level must be a number 0-100") + + if platform.system() != "Windows": + return self.make_response(item, "Volume control is only supported on Windows") + + try: + from pycaw.pycaw import AudioUtilities + + devices = AudioUtilities.GetSpeakers() + volume = devices.EndpointVolume + + # pycaw uses scalar volume (0.0 to 1.0) + volume.SetMasterVolumeLevelScalar(level / 100.0, None) + + return self.make_response(item, "Volume set to {}%".format(level)) + + except ImportError: + # Fallback: use nircmd if available + nircmd = shutil.which("nircmd") + if nircmd: + try: + # nircmd uses 0-65535 range + nircmd_level = int(level * 65535 / 100) + subprocess.run( + [nircmd, "setsysvolume", str(nircmd_level)], + timeout=5, + ) + return self.make_response(item, "Volume set to {}%".format(level)) + except Exception as e: + return self.make_response(item, "Error setting volume: {}".format(str(e))) + + return self.make_response( + item, + "Volume control requires 'pycaw' package. Install with: pip install pycaw" + ) + + except Exception as e: + return self.make_response(item, "Error setting volume: {}".format(str(e))) + + # --- Helpers --- + + def find_start_menu_shortcut(self, app_name: str): + """ + Find application shortcut in Windows Start Menu + + :param app_name: application name to find + :return: path to shortcut or None + """ + shortcuts = self.scan_start_menu() + app_name_norm = normalize_text(app_name) + + # Exact match first + for label, path in shortcuts: + if normalize_text(label) == app_name_norm: + return path + + # Partial match + for label, path in shortcuts: + if app_name_norm in normalize_text(label): + return path + + return None + + def scan_start_menu(self): + """ + Scan Windows Start Menu directories for .lnk shortcuts + + :return: list of (label, path) tuples + """ + results = [] + start_menu_dirs = [] + + # Common Start Menu paths + appdata = os.environ.get("APPDATA", "") + programdata = os.environ.get("PROGRAMDATA", "") + + if appdata: + start_menu_dirs.append( + os.path.join(appdata, "Microsoft", "Windows", "Start Menu", "Programs") + ) + if programdata: + start_menu_dirs.append( + os.path.join(programdata, "Microsoft", "Windows", "Start Menu", "Programs") + ) + + for start_dir in start_menu_dirs: + if not os.path.isdir(start_dir): + continue + + for lnk_path in glob.glob( + os.path.join(start_dir, "**", "*.lnk"), recursive=True + ): + label = os.path.splitext(os.path.basename(lnk_path))[0] + results.append((label, lnk_path)) + + return results + + def find_windows_app(self, app_name: str): + """ + Try to find an application on Windows by common paths + + :param app_name: application name + :return: executable path or None + """ + common_paths = { + "chrome": [ + os.path.join(os.environ.get("PROGRAMFILES", ""), "Google", "Chrome", "Application", "chrome.exe"), + os.path.join(os.environ.get("PROGRAMFILES(X86)", ""), "Google", "Chrome", "Application", "chrome.exe"), + os.path.join(os.environ.get("LOCALAPPDATA", ""), "Google", "Chrome", "Application", "chrome.exe"), + ], + "firefox": [ + os.path.join(os.environ.get("PROGRAMFILES", ""), "Mozilla Firefox", "firefox.exe"), + os.path.join(os.environ.get("PROGRAMFILES(X86)", ""), "Mozilla Firefox", "firefox.exe"), + ], + "spotify": [ + os.path.join(os.environ.get("APPDATA", ""), "Spotify", "Spotify.exe"), + os.path.join(os.environ.get("LOCALAPPDATA", ""), "Microsoft", "WindowsApps", "Spotify.exe"), + ], + "vscode": [ + os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Microsoft VS Code", "Code.exe"), + ], + "code": [ + os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Microsoft VS Code", "Code.exe"), + ], + } + + paths = common_paths.get(app_name.lower(), []) + for path in paths: + if path and os.path.isfile(path): + return path + + return None diff --git a/src/pygpt_net/plugin/wake_word/__init__.py b/src/pygpt_net/plugin/wake_word/__init__.py new file mode 100644 index 000000000..e064b4c13 --- /dev/null +++ b/src/pygpt_net/plugin/wake_word/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from .plugin import Plugin diff --git a/src/pygpt_net/plugin/wake_word/config.py b/src/pygpt_net/plugin/wake_word/config.py new file mode 100644 index 000000000..2b23c052f --- /dev/null +++ b/src/pygpt_net/plugin/wake_word/config.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from pygpt_net.plugin.base.config import BaseConfig, BasePlugin + + +class Config(BaseConfig): + def __init__(self, plugin: BasePlugin = None, *args, **kwargs): + super(Config, self).__init__(plugin) + self.plugin = plugin + + def from_defaults(self, plugin: BasePlugin = None): + """ + Set default options for plugin + + :param plugin: plugin instance + """ + plugin.add_option( + "wake_word_model", + type="combo", + value="hey_jarvis", + label="Wake word model", + description="Select the wake word model to use for activation. " + "Default: hey_jarvis", + tooltip="Select wake word model", + keys=[ + {"hey_jarvis": "Hey Jarvis"}, + {"hey_mycroft": "Hey Mycroft"}, + {"alexa": "Alexa"}, + {"ok_google": "OK Google"}, + {"hey_siri": "Hey Siri"}, + {"custom": "Custom (specify path below)"}, + ], + ) + plugin.add_option( + "custom_model_path", + type="text", + value="", + label="Custom model path", + description="Path to a custom OpenWakeWord .tflite model file. " + "Only used when 'Custom' is selected above.", + tooltip="Path to custom .tflite model", + ) + plugin.add_option( + "threshold", + type="float", + value=0.5, + label="Detection threshold", + description="Confidence threshold for wake word detection. " + "Higher = fewer false positives. Range: 0.1 - 1.0. Default: 0.5", + min=0.1, + max=1.0, + slider=True, + multiplier=10, + tooltip="Detection threshold, default: 0.5", + ) + plugin.add_option( + "cooldown_seconds", + type="int", + value=3, + label="Cooldown (seconds)", + description="Minimum seconds between consecutive wake word detections " + "to prevent rapid re-triggering. Default: 3", + min=1, + max=15, + slider=True, + tooltip="Cooldown between detections, default: 3", + ) + plugin.add_option( + "audio_feedback", + type="bool", + value=True, + label="Audio feedback on detection", + description="Play a short beep sound when wake word is detected. Default: True", + ) + plugin.add_option( + "auto_enable_audio_input", + type="bool", + value=True, + label="Auto-enable Audio Input plugin", + description="Automatically enable the Audio Input plugin when this plugin is enabled. " + "Default: True", + ) diff --git a/src/pygpt_net/plugin/wake_word/plugin.py b/src/pygpt_net/plugin/wake_word/plugin.py new file mode 100644 index 000000000..4616cbd1d --- /dev/null +++ b/src/pygpt_net/plugin/wake_word/plugin.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from PySide6.QtCore import Slot + +from pygpt_net.plugin.base.plugin import BasePlugin +from pygpt_net.core.events import Event + +from .config import Config +from .worker import WakeWordWorker + + +# Built-in model name mapping for OpenWakeWord +BUILTIN_MODELS = { + "hey_jarvis": ["hey_jarvis_v0.1"], + "hey_mycroft": ["hey_mycroft_v0.1"], + "alexa": ["alexa_v0.1"], + "ok_google": ["ok_google_v0.1"], + "hey_siri": ["hey_siri_v0.1"], +} + + +class Plugin(BasePlugin): + def __init__(self, *args, **kwargs): + super(Plugin, self).__init__(*args, **kwargs) + self.id = "wake_word" + self.name = "Wake Word" + self.type = [ + "audio.control", + ] + self.description = ( + "Provides always-on wake word detection using OpenWakeWord. " + "When the wake word is detected, it triggers audio input recording." + ) + self.prefix = "WakeWord" + self.urls = { + "OpenWakeWord": "https://github.com/dscripka/openWakeWord", + } + self.order = 0 # before audio_input + self.use_locale = True + self.listening = False + self.worker = None + self.config = Config(self) + self.init_options() + + def init_options(self): + """Initialize options""" + self.config.from_defaults(self) + + def handle(self, event: Event, *args, **kwargs): + """ + Handle dispatched event + + :param event: event object + :param args: args + :param kwargs: kwargs + """ + name = event.name + data = event.data + + if name == Event.ENABLE: + if data["value"] == self.id: + self.on_enable() + + elif name == Event.DISABLE: + if data["value"] == self.id: + self.on_disable() + + elif name == Event.CTX_END: + # After response is complete, restart listening + if self.listening: + self.log("Response complete, wake word listener active") + + def on_enable(self): + """Start wake word listener when plugin is enabled""" + if self.get_option_value("auto_enable_audio_input"): + self.window.controller.plugins.enable("audio_input") + + self.start_listener() + + def on_disable(self): + """Stop wake word listener when plugin is disabled""" + self.stop_listener() + + def start_listener(self): + """Start the wake word background listener thread""" + if self.listening: + return + + self.listening = True + + try: + worker = WakeWordWorker() + worker.from_defaults(self) + worker.threshold = self.get_option_value("threshold") + worker.cooldown_seconds = self.get_option_value("cooldown_seconds") + + # Resolve model paths + model_key = self.get_option_value("wake_word_model") + if model_key == "custom": + custom_path = self.get_option_value("custom_model_path") + if custom_path: + worker.model_paths = [custom_path] + elif model_key in BUILTIN_MODELS: + # OpenWakeWord handles built-in models by name + worker.model_paths = None # use defaults or specify + else: + worker.model_paths = None + + # Connect signals + worker.signals.wake_word_detected.connect(self.handle_wake_word_detected) + + self.worker = worker + worker.run_async() + + self.log("Wake word listener started") + + except Exception as e: + self.listening = False + self.error(e) + + def stop_listener(self): + """Stop the wake word background listener thread""" + self.listening = False + self.worker = None + self.log("Wake word listener stopped") + + @Slot(str) + def handle_wake_word_detected(self, model_name: str): + """ + Handle wake word detection event + + :param model_name: name of the detected wake word model + """ + self.log("Wake word detected: {}".format(model_name)) + + # Play audio feedback if enabled + if self.get_option_value("audio_feedback"): + self.play_detection_sound() + + # Notify assistant mode if active + self.notify_assistant_mode() + + # Trigger audio input recording + self.trigger_audio_input() + + def trigger_audio_input(self): + """Trigger the audio input plugin to start recording""" + audio_input = self.window.core.plugins.get("audio_input") + if audio_input is None or not audio_input.enabled: + self.log("Audio Input plugin is not enabled, cannot trigger recording") + return + + # For simple mode: toggle recording on + if not audio_input.is_advanced(): + audio_input.toggle_recording_simple(state=True, auto=True) + else: + # For advanced mode: set magic word as detected and enable listening + audio_input.magic_word_detected = True + if not audio_input.speech_enabled: + audio_input.toggle_speech(True) + + def notify_assistant_mode(self): + """Notify the assistant mode plugin about wake word activation""" + assistant = self.window.core.plugins.get("assistant_mode") + if assistant is not None and assistant.enabled: + assistant.on_wake_word_activated() + + def play_detection_sound(self): + """Play a short beep sound to indicate wake word detection""" + try: + from PySide6.QtMultimedia import QSoundEffect + from PySide6.QtCore import QUrl + import os + + # Use system beep as fallback + beep_path = os.path.join( + self.window.core.config.get_app_path(), + "data", "audio", "wake_word_beep.wav", + ) + if os.path.exists(beep_path): + effect = QSoundEffect() + effect.setSource(QUrl.fromLocalFile(beep_path)) + effect.play() + else: + # Fallback: system bell + print("\a", end="", flush=True) + except Exception: + # Silent fallback + print("\a", end="", flush=True) + + def destroy(self): + """Destroy plugin and stop listener""" + self.stop_listener() diff --git a/src/pygpt_net/plugin/wake_word/worker.py b/src/pygpt_net/plugin/wake_word/worker.py new file mode 100644 index 000000000..41ba3a36e --- /dev/null +++ b/src/pygpt_net/plugin/wake_word/worker.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +import time + +import numpy as np +from PySide6.QtCore import Slot, Signal + +from pygpt_net.plugin.base.worker import BaseWorker, BaseSignals + + +class WorkerSignals(BaseSignals): + wake_word_detected = Signal(str) + + +class WakeWordWorker(BaseWorker): + def __init__(self, *args, **kwargs): + super(WakeWordWorker, self).__init__() + self.signals = WorkerSignals() + self.args = args + self.kwargs = kwargs + self.plugin = None + self.model_paths = None + self.threshold = 0.5 + self.cooldown_seconds = 3 + + @Slot() + def run(self): + """Run wake word listener in background thread.""" + oww = None + stream = None + pa = None + try: + import openwakeword + from openwakeword.model import Model as OWWModel + import pyaudio + + # Load model + openwakeword.utils.download_models() + + model_kwargs = {} + if self.model_paths: + model_kwargs["wakeword_models"] = self.model_paths + oww = OWWModel(**model_kwargs) + + # Audio stream config + chunk_size = 1280 # 80ms at 16kHz + sample_rate = 16000 + + pa = pyaudio.PyAudio() + stream = pa.open( + format=pyaudio.paInt16, + channels=1, + rate=sample_rate, + input=True, + frames_per_buffer=chunk_size, + ) + + self.log("Wake word listener started") + self.status("Listening for wake word...") + + last_detection_time = 0 + + while self.plugin and self.plugin.listening and not self.is_stopped(): + try: + audio_data = stream.read(chunk_size, exception_on_overflow=False) + audio_array = np.frombuffer(audio_data, dtype=np.int16) + + # Run prediction + prediction = oww.predict(audio_array) + + for model_name, score in prediction.items(): + if score >= self.threshold: + now = time.time() + if now - last_detection_time >= self.cooldown_seconds: + last_detection_time = now + self.log( + "Wake word detected: {} (score: {:.2f})".format( + model_name, score + ) + ) + if self.signals is not None: + self.signals.wake_word_detected.emit(model_name) + # Reset model state after detection + oww.reset() + break + except OSError: + # Audio buffer overflow, skip + continue + + except ImportError as e: + self.error( + "Wake word dependencies not installed. " + "Run: pip install openwakeword pyaudio. Error: {}".format(e) + ) + except Exception as e: + self.error(e) + finally: + if stream is not None: + try: + stream.stop_stream() + stream.close() + except Exception: + pass + if pa is not None: + try: + pa.terminate() + except Exception: + pass + self.log("Wake word listener stopped") + self.status("") + self.destroyed() + self.cleanup() diff --git a/src/pygpt_net/ui/dialog/plugins.py b/src/pygpt_net/ui/dialog/plugins.py index fe3484eb3..62021fa3f 100755 --- a/src/pygpt_net/ui/dialog/plugins.py +++ b/src/pygpt_net/ui/dialog/plugins.py @@ -42,6 +42,49 @@ def __init__(self, window=None): self.dialog_id = "plugin_settings" self.max_list_width = 250 + def setup_skeleton(self): + """ + Create minimal plugin settings UI structure for startup. + + Registers empty tab widget, plugin list, and dialog so that + other parts of the system can reference them safely. Full widget + building is deferred to the first dialog open via setup(). + """ + from PySide6.QtGui import QStandardItemModel + + # empty tab widget + self.window.ui.tabs['plugin.settings'] = QTabWidget() + self.window.ui.tabs['plugin.settings.tabs'] = {} + + # empty plugin list + list_id = 'plugin.list' + self.window.ui.nodes[list_id] = PluginList(self.window, list_id) + self.window.ui.models[list_id] = self.create_model(self.window) + self.window.ui.nodes[list_id].setModel(self.window.ui.models[list_id]) + + # populate plugin list data + sorted_ids = self.window.core.plugins.get_ids(sort=True) + get_plugin = self.window.core.plugins.get + data = {} + for plugin_id in sorted_ids: + data[plugin_id] = get_plugin(plugin_id) + self.update_list(list_id, data) + + # create placeholder dialog + self.window.ui.nodes['plugin.settings.cmd.footer'] = HelpLabel(trans('cmd.tip')) + splitter = QSplitter(Qt.Horizontal) + splitter.addWidget(self.window.ui.nodes[list_id]) + splitter.addWidget(self.window.ui.tabs['plugin.settings']) + self.window.ui.splitters['dialog.plugins'] = splitter + + self.window.ui.dialog[self.dialog_id] = PluginSettingsDialog(self.window, self.dialog_id) + layout = QVBoxLayout() + main_layout = QHBoxLayout() + main_layout.addWidget(splitter) + layout.addLayout(main_layout) + self.window.ui.dialog[self.dialog_id].setLayout(layout) + self.window.ui.dialog[self.dialog_id].setWindowTitle(trans('dialog.plugin_settings')) + def setup(self, idx=None): """ Setup plugin settings dialog @@ -76,7 +119,7 @@ def setup(self, idx=None): footer.addWidget(self.window.ui.nodes['plugin.settings.btn.defaults.app']) footer.addWidget(self.window.ui.nodes['plugin.settings.btn.save']) - # plugins tabs + # plugins tabs - recreate for full setup self.window.ui.tabs['plugin.settings'] = QTabWidget() self.window.ui.tabs['plugin.settings.tabs'] = {} @@ -304,8 +347,9 @@ def extract_option_tabs(self, options: dict) -> list: option = options[key] if 'tab' in option: tab = option['tab'] - if tab == "" or tab is None: + if tab is None or tab == "": is_default = True + continue if tab not in keys: keys.append(tab) else: diff --git a/src/pygpt_net/utils.py b/src/pygpt_net/utils.py index 8952c7e28..2f95b2701 100755 --- a/src/pygpt_net/utils.py +++ b/src/pygpt_net/utils.py @@ -13,6 +13,7 @@ import os import re import math +import unicodedata from datetime import datetime from contextlib import contextmanager @@ -493,4 +494,25 @@ def _strip_trailing_zeros(s: str) -> str: # Format, trim zeros, apply custom decimal separator, attach suffix out = f"{d:.{decimals}f}" out = _strip_trailing_zeros(out).replace(".", decimal_sep) - return f"{sign}{out}{suffixes[idx]}" \ No newline at end of file + return f"{sign}{out}{suffixes[idx]}" + + +def normalize_text(text: str) -> str: + """ + Normalize text for locale-safe, case-insensitive comparison. + + Handles Turkish I/İ/ı and other accented characters correctly by + applying casefold + NFKD decomposition + combining-mark removal. + + Examples: + normalize_text("YARDIMI") == normalize_text("yardımı") # True + normalize_text("İstanbul") == normalize_text("istanbul") # True + normalize_text("Notepad++") == normalize_text("notepad++") # True + """ + folded = text.casefold() + normalized = unicodedata.normalize("NFKD", folded) + stripped = "".join( + ch for ch in normalized if unicodedata.category(ch) != "Mn" + ) + stripped = stripped.replace("\u0131", "i") + return stripped \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 0b21037a9..215c4f041 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,4 +14,11 @@ def set_env_vars(): def reload_attachment_module(): import pygpt_net.item.attachment as mod importlib.reload(mod) - yield \ No newline at end of file + yield + +@pytest.fixture(autouse=True) +def reset_global_locale(): + """Reset global locale in utils to prevent state leaking between tests.""" + import pygpt_net.utils as utils_mod + yield + utils_mod.locale = None \ No newline at end of file diff --git a/tests/core/attachments/test_attachments_context.py b/tests/core/attachments/test_attachments_context.py index e0a2f460a..ecb5163e3 100644 --- a/tests/core/attachments/test_attachments_context.py +++ b/tests/core/attachments/test_attachments_context.py @@ -214,7 +214,7 @@ def test_store_content(mock_window): with patch('builtins.open', mock_open()) as mock_file: context = Context(mock_window) result, _ = context.store_content(attachment, "test_dir") - assert result == "test_dir/url.txt" + assert result == os.path.join("test_dir", "url.txt") def test_index_attachment(mock_window): diff --git a/tests/core/bridge/test_bridge.py b/tests/core/bridge/test_bridge.py index 362346f9a..9e0de806e 100644 --- a/tests/core/bridge/test_bridge.py +++ b/tests/core/bridge/test_bridge.py @@ -253,5 +253,5 @@ def test_apply_rate_limit_with_sleep(monkeypatch): b.apply_rate_limit() assert sleep_mock.call_count == 1 sleep_arg = sleep_mock.call_args[0][0] - assert sleep_arg == pytest.approx(extra.total_seconds(), rel=1e-3) + assert sleep_arg == pytest.approx(extra.total_seconds(), abs=0.01) window.core.debug.debug.assert_called() \ No newline at end of file diff --git a/tests/core/bridge/test_bridge_worker.py b/tests/core/bridge/test_bridge_worker.py index c83d5f775..a2c9fdb46 100644 --- a/tests/core/bridge/test_bridge_worker.py +++ b/tests/core/bridge/test_bridge_worker.py @@ -23,6 +23,12 @@ from pygpt_net.core.events import KernelEvent, Event +class MetaObj: + def __init__(self): + self.group = None + self.additional_ctx_current = None + + class CtxObj: def __init__(self, reply=False, meta=None): self.reply = reply @@ -118,7 +124,7 @@ def test_handle_additional_context_has_no_context(): attachment = AttachmentStub(has_context=False) worker.window = SimpleNamespace(core=SimpleNamespace(config=MagicMock()), controller=SimpleNamespace(chat=SimpleNamespace(attachment=attachment))) ctx = ContextObj() - ctx.ctx.meta = "m" + ctx.ctx.meta = MetaObj() ctx.prompt = "p" worker.context = ctx worker.handle_additional_context() @@ -131,7 +137,7 @@ def test_handle_additional_context_empty_ad_context(): attachment = AttachmentStub(has_context=True, context_value="", mode_value="query") worker.window = SimpleNamespace(core=SimpleNamespace(config=MagicMock()), controller=SimpleNamespace(chat=SimpleNamespace(attachment=attachment))) ctx = ContextObj() - ctx.ctx.meta = "m" + ctx.ctx.meta = MetaObj() ctx.prompt = "p" worker.context = ctx worker.handle_additional_context() @@ -144,7 +150,7 @@ def test_handle_additional_context_query_mode_sets_hidden_input(): attachment = AttachmentStub(has_context=True, context_value="ADCTX", mode_value="query") worker.window = SimpleNamespace(core=SimpleNamespace(config=MagicMock()), controller=SimpleNamespace(chat=SimpleNamespace(attachment=attachment))) ctx = ContextObj() - ctx.ctx.meta = "m" + ctx.ctx.meta = MetaObj() ctx.prompt = "p" worker.context = ctx worker.handle_additional_context() @@ -157,7 +163,7 @@ def test_handle_additional_context_agent_mode_sets_hidden_input(): attachment = AttachmentStub(has_context=True, context_value="ADCTX", mode_value="full") worker.window = SimpleNamespace(core=SimpleNamespace(config=MagicMock()), controller=SimpleNamespace(chat=SimpleNamespace(attachment=attachment))) ctx = ContextObj() - ctx.ctx.meta = "m" + ctx.ctx.meta = MetaObj() ctx.prompt = "p" worker.context = ctx worker.mode = MODE_AGENT_LLAMA @@ -171,7 +177,7 @@ def test_handle_additional_context_full_mode_no_hidden_input(): attachment = AttachmentStub(has_context=True, context_value="ADCTX", mode_value="full") worker.window = SimpleNamespace(core=SimpleNamespace(config=MagicMock()), controller=SimpleNamespace(chat=SimpleNamespace(attachment=attachment))) ctx = ContextObj() - ctx.ctx.meta = "m" + ctx.ctx.meta = MetaObj() ctx.prompt = "p" worker.context = ctx worker.mode = None diff --git a/tests/core/render/plain/test_renderer_plain.py b/tests/core/render/plain/test_renderer_plain.py index ecb1acd8a..adfaf08f6 100755 --- a/tests/core/render/plain/test_renderer_plain.py +++ b/tests/core/render/plain/test_renderer_plain.py @@ -244,7 +244,9 @@ def test_append_timestamp(mock_window): ctx = CtxItem() meta = CtxMeta() ctx.input_timestamp = 1234567890 - assert render.append_timestamp(ctx, text).startswith("00:31:30: test") is True + from datetime import datetime + expected_hour = datetime.fromtimestamp(1234567890).strftime("%H:%M:%S") + assert render.append_timestamp(ctx, text).startswith(expected_hour + ": test") is True def test_pre_format_text(mock_window): diff --git a/tests/mocks.py b/tests/mocks.py index 69d516421..4a30f1198 100755 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -9,14 +9,17 @@ # Updated Date: 2024.04.17 01:00:00 # # ================================================== # +import os +import tempfile + import pytest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from PySide6.QtWidgets import QMainWindow from pygpt_net.config import Config @pytest.fixture -def mock_window(): +def mock_window(tmp_path_factory): window = MagicMock(spec=QMainWindow) window.STATE_IDLE = 'idle' window.STATE_BUSY = 'busy' @@ -25,7 +28,9 @@ def mock_window(): window.stateChanged = MagicMock() window.idx_logger_message = MagicMock() window.core = MagicMock() - window.core.config = Config(window) # real config object + with patch('pygpt_net.core.profile.profile.Profile.init'): + window.core.config = Config(window) # real config object + window.core.config.path = str(tmp_path_factory.mktemp("config")) # isolate from real filesystem window.core.config.initialized = True # prevent initializing config window.core.config.init = MagicMock() # mock init method to prevent init window.core.config.load = MagicMock() # mock load method to prevent loading diff --git a/tests/plugin/assistant_mode/test_plugin_assistant_mode.py b/tests/plugin/assistant_mode/test_plugin_assistant_mode.py new file mode 100644 index 000000000..6f5abb1a4 --- /dev/null +++ b/tests/plugin/assistant_mode/test_plugin_assistant_mode.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from unittest.mock import MagicMock, patch + +from pygpt_net.core.events import Event +from pygpt_net.item.ctx import CtxItem +from tests.mocks import mock_window +from pygpt_net.plugin.assistant_mode import Plugin + + +def test_options(mock_window): + """Test plugin options are correctly initialized""" + plugin = Plugin(window=mock_window) + plugin.init_options() + options = plugin.setup() + assert "auto_listen_after_response" in options + assert "require_wake_word_each_turn" in options + assert "conversation_timeout" in options + assert "stop_words" in options + assert "greeting_enabled" in options + assert "greeting_text" in options + assert "auto_enable_plugins" in options + assert "response_delay" in options + + +def test_plugin_id(mock_window): + """Test plugin id and name""" + plugin = Plugin(window=mock_window) + assert plugin.id == "assistant_mode" + assert plugin.name == "Assistant Mode" + assert "audio.control" in plugin.type + + +def test_enable_activates(mock_window): + """Test enabling the plugin sets active state""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + # Mock plugins + mock_window.core.plugins.get.return_value = MagicMock(enabled=False) + + event = Event() + event.name = Event.ENABLE + event.data = {"value": "assistant_mode"} + event.ctx = None + plugin.handle(event) + + assert plugin.active is True + + +def test_disable_deactivates(mock_window): + """Test disabling the plugin clears active state""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.in_conversation = True + + event = Event() + event.name = Event.DISABLE + event.data = {"value": "assistant_mode"} + event.ctx = None + plugin.handle(event) + + assert plugin.active is False + assert plugin.in_conversation is False + + +def test_enable_other_plugin_no_effect(mock_window): + """Test enabling another plugin does not activate assistant mode""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + event = Event() + event.name = Event.ENABLE + event.data = {"value": "some_other_plugin"} + event.ctx = None + plugin.handle(event) + + assert plugin.active is False + + +def test_ctx_begin_sets_conversation(mock_window): + """Test CTX_BEGIN sets in_conversation flag""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + + ctx = CtxItem() + event = Event() + event.name = Event.CTX_BEGIN + event.data = {} + event.ctx = ctx + plugin.handle(event) + + assert plugin.in_conversation is True + + +def test_ctx_begin_inactive_no_effect(mock_window): + """Test CTX_BEGIN has no effect when inactive""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = False + + ctx = CtxItem() + event = Event() + event.name = Event.CTX_BEGIN + event.data = {} + event.ctx = ctx + plugin.handle(event) + + assert plugin.in_conversation is False + + +def test_stop_word_detection(mock_window): + """Test stop word detection in input""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.in_conversation = True + + event = Event() + event.name = Event.INPUT_BEFORE + event.data = {"value": "Goodbye!"} + event.ctx = None + plugin.handle(event) + + # After stop word, conversation should end + assert plugin.in_conversation is False + + +def test_no_stop_word_keeps_conversation(mock_window): + """Test normal input does not end conversation""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.in_conversation = True + + event = Event() + event.name = Event.INPUT_BEFORE + event.data = {"value": "What's the weather?"} + event.ctx = None + plugin.handle(event) + + assert plugin.in_conversation is True + + +def test_response_complete_with_wake_word_requirement(mock_window): + """Test response complete when wake word required each turn""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.in_conversation = True + + # Set require wake word each turn + plugin.options["require_wake_word_each_turn"]["value"] = True + + ctx = CtxItem() + event = Event() + event.name = Event.CTX_END + event.data = {} + event.ctx = ctx + plugin.handle(event) + + # Should NOT auto-listen, just wait for wake word + assert plugin.in_conversation is False + + +def test_response_complete_auto_listen(mock_window): + """Test response complete triggers auto-listen""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.in_conversation = True + plugin.options["require_wake_word_each_turn"]["value"] = False + plugin.options["response_delay"]["value"] = 0 # no delay for test + + # Mock audio input + audio_input_mock = MagicMock() + audio_input_mock.enabled = True + audio_input_mock.is_advanced.return_value = False + mock_window.core.plugins.get.return_value = audio_input_mock + + # Mock start_listening to avoid QTimer + plugin.start_listening = MagicMock() + plugin.start_timeout_timer = MagicMock() + + ctx = CtxItem() + event = Event() + event.name = Event.CTX_END + event.data = {} + event.ctx = ctx + plugin.handle(event) + + assert plugin.in_conversation is False + plugin.start_listening.assert_called_once() + + +def test_start_listening_simple_mode(mock_window): + """Test start_listening triggers simple mode recording""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + + audio_input_mock = MagicMock() + audio_input_mock.enabled = True + audio_input_mock.is_advanced.return_value = False + mock_window.core.plugins.get.return_value = audio_input_mock + + plugin.start_listening() + + audio_input_mock.toggle_recording_simple.assert_called_once_with(state=True, auto=True) + + +def test_start_listening_advanced_mode(mock_window): + """Test start_listening triggers advanced mode""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + + audio_input_mock = MagicMock() + audio_input_mock.enabled = True + audio_input_mock.is_advanced.return_value = True + audio_input_mock.speech_enabled = False + mock_window.core.plugins.get.return_value = audio_input_mock + + plugin.start_listening() + + assert audio_input_mock.magic_word_detected is True + audio_input_mock.toggle_speech.assert_called_once_with(True) + + +def test_start_listening_inactive(mock_window): + """Test start_listening does nothing when inactive""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = False + + audio_input_mock = MagicMock() + mock_window.core.plugins.get.return_value = audio_input_mock + + plugin.start_listening() + + audio_input_mock.toggle_recording_simple.assert_not_called() + + +def test_wake_word_activation_with_greeting(mock_window): + """Test wake word activation speaks greeting""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + + # Mock audio output + audio_output_mock = MagicMock() + audio_output_mock.enabled = True + mock_window.core.plugins.get.return_value = audio_output_mock + + plugin.on_wake_word_activated() + + # Should dispatch AUDIO_READ_TEXT event + mock_window.dispatch.assert_called() + + +def test_wake_word_activation_no_greeting(mock_window): + """Test wake word activation without greeting""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = True + plugin.options["greeting_enabled"]["value"] = False + + plugin.on_wake_word_activated() + + # Should NOT dispatch any event + mock_window.dispatch.assert_not_called() + + +def test_wake_word_activation_inactive(mock_window): + """Test wake word activation when plugin inactive""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.active = False + + plugin.on_wake_word_activated() + + mock_window.dispatch.assert_not_called() + + +def test_destroy(mock_window): + """Test destroy cleans up state""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.active = True + plugin.in_conversation = True + + plugin.destroy() + + assert plugin.active is False + assert plugin.in_conversation is False + + +def test_default_values(mock_window): + """Test default configuration values""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + assert plugin.get_option_value("auto_listen_after_response") is True + assert plugin.get_option_value("require_wake_word_each_turn") is False + assert plugin.get_option_value("conversation_timeout") == 30 + assert plugin.get_option_value("greeting_enabled") is True + assert plugin.get_option_value("greeting_text") == "Yes?" + assert plugin.get_option_value("auto_enable_plugins") is True + assert plugin.get_option_value("response_delay") == 0.5 diff --git a/tests/plugin/cmd_app_launcher/test_plugin_app_launcher.py b/tests/plugin/cmd_app_launcher/test_plugin_app_launcher.py new file mode 100644 index 000000000..2a34661f6 --- /dev/null +++ b/tests/plugin/cmd_app_launcher/test_plugin_app_launcher.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +import platform +from unittest.mock import MagicMock, patch + +from pygpt_net.core.events import Event +from pygpt_net.item.ctx import CtxItem +from tests.mocks import mock_window +from pygpt_net.plugin.cmd_app_launcher import Plugin +from pygpt_net.plugin.cmd_app_launcher.worker import Worker + + +def test_options(mock_window): + """Test plugin options are correctly initialized""" + plugin = Plugin(window=mock_window) + plugin.init_options() + options = plugin.setup() + assert "cmd.app_launch" in options + assert "cmd.app_close" in options + assert "cmd.app_list_running" in options + assert "cmd.app_open_url" in options + assert "cmd.app_list_installed" in options + assert "cmd.media_play_pause" in options + assert "cmd.media_next_track" in options + assert "cmd.media_prev_track" in options + assert "cmd.volume_set" in options + assert "cmd.volume_mute_toggle" in options + assert "custom_app_aliases" in options + assert "scan_start_menu" in options + + +def test_plugin_id(mock_window): + """Test plugin id and name""" + plugin = Plugin(window=mock_window) + assert plugin.id == "cmd_app_launcher" + assert plugin.name == "App Launcher" + assert "cmd" in plugin.type + + +def test_allowed_cmds(mock_window): + """Test all commands are in allowed list""" + plugin = Plugin(window=mock_window) + expected = [ + "app_launch", "app_close", "app_list_running", + "app_open_url", "app_list_installed", + "media_play_pause", "media_next_track", "media_prev_track", + "volume_set", "volume_mute_toggle", + ] + for cmd in expected: + assert cmd in plugin.allowed_cmds + + +def test_cmd_syntax(mock_window): + """Test CMD_SYNTAX event appends commands""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + data = {"cmd": []} + event = Event() + event.name = Event.CMD_SYNTAX + event.data = data + event.ctx = None + plugin.handle(event) + + cmd_names = [c["cmd"] for c in data["cmd"]] + assert "app_launch" in cmd_names + assert "app_close" in cmd_names + assert "app_open_url" in cmd_names + + +def test_cmd_execute_dispatches(mock_window): + """Test CMD_EXECUTE creates worker for matching commands""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + ctx = CtxItem() + ctx.async_disabled = True + + commands = [ + {"cmd": "app_open_url", "params": {"url": "youtube.com"}}, + ] + + data = {"commands": commands} + event = Event() + event.name = Event.CMD_EXECUTE + event.data = data + event.ctx = ctx + + # Mock worker to prevent actual execution + with patch("pygpt_net.plugin.cmd_app_launcher.plugin.Plugin.is_async", return_value=False): + with patch("pygpt_net.plugin.cmd_app_launcher.worker.Worker.run") as mock_run: + plugin.handle(event) + # Worker.run should have been called + mock_run.assert_called_once() + + +def test_worker_open_url(mock_window): + """Test worker opens URL correctly""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_open_url", "params": {"url": "youtube.com"}} + + with patch("webbrowser.open") as mock_open: + result = worker.cmd_app_open_url(item) + mock_open.assert_called_once_with("https://youtube.com") + assert "Opened URL" in result["result"] + + +def test_worker_open_url_with_scheme(mock_window): + """Test worker preserves existing URL scheme""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_open_url", "params": {"url": "https://example.com"}} + + with patch("webbrowser.open") as mock_open: + result = worker.cmd_app_open_url(item) + mock_open.assert_called_once_with("https://example.com") + + +def test_worker_open_url_empty(mock_window): + """Test worker handles empty URL""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_open_url", "params": {"url": ""}} + result = worker.cmd_app_open_url(item) + assert "Error" in result["result"] + + +def test_worker_app_launch_not_found(mock_window): + """Test worker handles app not found""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_launch", "params": {"app_name": "nonexistent_app_xyz123"}} + + with patch("shutil.which", return_value=None): + with patch.object(worker, "find_start_menu_shortcut", return_value=None): + with patch.object(worker, "find_windows_app", return_value=None): + result = worker.cmd_app_launch(item) + assert "Could not find" in result["result"] + + +def test_worker_app_launch_empty_name(mock_window): + """Test worker handles empty app name""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_launch", "params": {"app_name": ""}} + result = worker.cmd_app_launch(item) + assert "Error" in result["result"] + + +def test_worker_app_list_installed(mock_window): + """Test worker lists installed apps""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + item = {"cmd": "app_list_installed", "params": {"filter": "chrome"}} + + with patch.object(worker, "scan_start_menu", return_value=[]): + result = worker.cmd_app_list_installed(item) + # Should find "chrome" in custom aliases + assert "chrome" in result["result"].lower() + + +def test_worker_volume_set_bounds(mock_window): + """Test worker clamps volume to 0-100""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + worker = Worker() + worker.plugin = plugin + worker.window = mock_window + worker.signals = MagicMock() + + # Test with out-of-bounds value + item = {"cmd": "volume_set", "params": {"level": 150}} + + with patch("platform.system", return_value="Windows"): + try: + result = worker.cmd_volume_set(item) + # Should clamp to 100, not error + assert "Error" not in result.get("result", "") or "pycaw" in result.get("result", "") + except ImportError: + pass # pycaw not installed in test env diff --git a/tests/plugin/wake_word/test_plugin_wake_word.py b/tests/plugin/wake_word/test_plugin_wake_word.py new file mode 100644 index 000000000..cc3af2f91 --- /dev/null +++ b/tests/plugin/wake_word/test_plugin_wake_word.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# ================================================== # +# This file is a part of PYGPT package # +# Website: https://pygpt.net # +# GitHub: https://github.com/szczyglis-dev/py-gpt # +# MIT License # +# Created By : PYGPT Contributors # +# Updated Date: 2026.03.11 00:00:00 # +# ================================================== # + +from unittest.mock import MagicMock, patch + +from pygpt_net.core.events import Event +from pygpt_net.item.ctx import CtxItem +from tests.mocks import mock_window +from pygpt_net.plugin.wake_word import Plugin + + +def test_options(mock_window): + """Test plugin options are correctly initialized""" + plugin = Plugin(window=mock_window) + plugin.init_options() + options = plugin.setup() + assert "wake_word_model" in options + assert "custom_model_path" in options + assert "threshold" in options + assert "cooldown_seconds" in options + assert "audio_feedback" in options + assert "auto_enable_audio_input" in options + + +def test_plugin_id(mock_window): + """Test plugin id and name""" + plugin = Plugin(window=mock_window) + assert plugin.id == "wake_word" + assert plugin.name == "Wake Word" + assert "audio.control" in plugin.type + + +def test_default_threshold(mock_window): + """Test default threshold value""" + plugin = Plugin(window=mock_window) + plugin.init_options() + threshold = plugin.get_option_value("threshold") + assert threshold == 0.5 + + +def test_default_cooldown(mock_window): + """Test default cooldown value""" + plugin = Plugin(window=mock_window) + plugin.init_options() + cooldown = plugin.get_option_value("cooldown_seconds") + assert cooldown == 3 + + +def test_default_model(mock_window): + """Test default wake word model""" + plugin = Plugin(window=mock_window) + plugin.init_options() + model = plugin.get_option_value("wake_word_model") + assert model == "hey_jarvis" + + +def test_handle_enable(mock_window): + """Test plugin enable triggers audio input and listener""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + # Mock start_listener to prevent actual thread creation + plugin.start_listener = MagicMock() + + event = Event() + event.name = Event.ENABLE + event.data = {"value": "wake_word"} + event.ctx = None + plugin.handle(event) + + # Should enable audio_input plugin + mock_window.controller.plugins.enable.assert_called_with("audio_input") + # Should start listener + plugin.start_listener.assert_called_once() + + +def test_handle_disable(mock_window): + """Test plugin disable stops listener""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + + plugin.listening = True + plugin.stop_listener = MagicMock() + + event = Event() + event.name = Event.DISABLE + event.data = {"value": "wake_word"} + event.ctx = None + plugin.handle(event) + + plugin.stop_listener.assert_called_once() + + +def test_handle_enable_other_plugin(mock_window): + """Test that enabling another plugin does not trigger wake word""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.setup() + plugin.start_listener = MagicMock() + + event = Event() + event.name = Event.ENABLE + event.data = {"value": "some_other_plugin"} + event.ctx = None + plugin.handle(event) + + plugin.start_listener.assert_not_called() + + +def test_trigger_audio_input_simple(mock_window): + """Test trigger_audio_input in simple mode""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + audio_input_mock = MagicMock() + audio_input_mock.enabled = True + audio_input_mock.is_advanced.return_value = False + mock_window.core.plugins.get.return_value = audio_input_mock + + plugin.trigger_audio_input() + + audio_input_mock.toggle_recording_simple.assert_called_once_with(state=True, auto=True) + + +def test_trigger_audio_input_advanced(mock_window): + """Test trigger_audio_input in advanced mode""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + audio_input_mock = MagicMock() + audio_input_mock.enabled = True + audio_input_mock.is_advanced.return_value = True + audio_input_mock.speech_enabled = False + mock_window.core.plugins.get.return_value = audio_input_mock + + plugin.trigger_audio_input() + + assert audio_input_mock.magic_word_detected is True + audio_input_mock.toggle_speech.assert_called_once_with(True) + + +def test_trigger_audio_input_disabled(mock_window): + """Test trigger_audio_input when audio input is disabled""" + plugin = Plugin(window=mock_window) + plugin.init_options() + + audio_input_mock = MagicMock() + audio_input_mock.enabled = False + mock_window.core.plugins.get.return_value = audio_input_mock + + # Should not raise, just log + plugin.trigger_audio_input() + audio_input_mock.toggle_recording_simple.assert_not_called() + + +def test_destroy(mock_window): + """Test destroy stops listener""" + plugin = Plugin(window=mock_window) + plugin.init_options() + plugin.listening = True + + plugin.destroy() + + assert plugin.listening is False diff --git a/tests/provider/core/config/test_provider_config_json.py b/tests/provider/core/config/test_provider_config_json.py index 844c72074..46429d21d 100755 --- a/tests/provider/core/config/test_provider_config_json.py +++ b/tests/provider/core/config/test_provider_config_json.py @@ -24,15 +24,18 @@ def test_install(mock_window): provider.path = mock_window.core.config.path with patch('os.path.exists') as os_path_exists: os_path_exists.return_value = True - provider.install() - os_path_exists.assert_called() + with patch('builtins.open', mock_open(read_data='{"foo": "bar"}')): + provider.install() + os_path_exists.assert_called() def test_get_version(mock_window): """Test get version""" provider = JsonFileProvider(mock_window) provider.path = mock_window.core.config.get_path() - assert provider.get_version() is not None + data = json.dumps({"__meta__": {"version": "1.0.0"}}) + with patch('builtins.open', mock_open(read_data=data)): + assert provider.get_version() is not None def test_load(mock_window): diff --git a/tests/provider/core/model/test_provider_model_json.py b/tests/provider/core/model/test_provider_model_json.py index 1982fd687..e99dd146a 100755 --- a/tests/provider/core/model/test_provider_model_json.py +++ b/tests/provider/core/model/test_provider_model_json.py @@ -23,15 +23,18 @@ def test_install(mock_window): provider = JsonFileProvider(mock_window) with patch('os.path.exists') as os_path_exists: os_path_exists.return_value = True - provider.install() - os_path_exists.assert_called_once() + with patch('builtins.open', mock_open(read_data='{"items": {}}')): + provider.install() + os_path_exists.assert_called_once() def test_get_version(mock_window): """Test get version""" provider = JsonFileProvider(mock_window) provider.path = mock_window.core.config.get_path() - assert provider.get_version() is not None + data = json.dumps({"__meta__": {"version": "1.0.0"}}) + with patch('builtins.open', mock_open(read_data=data)): + assert provider.get_version() is not None def test_load(mock_window):