From 31c349367c1c2374f8a5a0b0e2407f0df188fadf Mon Sep 17 00:00:00 2001 From: Carl Bender Date: Tue, 21 Apr 2026 20:00:46 -0400 Subject: [PATCH 1/2] Compatibility with Open WebUI >= 0.9.0 (async DB APIs) Open WebUI v0.9.0 converted Chats / Files / Functions DB helpers to async. Updating both plugins so they don't hit 'coroutine was never awaited' / 'object is not subscriptable' errors on startup or request. gemini_manifold.py (pipe), version 2.1.0 -> 2.2.0: * await Chats.get_chat_by_id_and_user_id * await Files.get_file_by_id (drop asyncio.to_thread wrapper) * await Files.insert_new_file (drop asyncio.to_thread wrapper) * await Functions.get_function_by_id * _fetch_and_validate_chat_history is now async * _get_toggleable_feature_status is now async (@staticmethod preserved) * _build_gen_content_config and _determine_execution_order are now async; all call sites updated. * DB fetch and cumulative-usage injection moved out of GeminiContentBuilder.__init__ (sync) into build_contents() (async). metadata_body/user_data are captured on self; metadata dict identity is preserved so mutations still propagate to the caller. * Storage.upload_file is still sync upstream, left wrapped in asyncio.to_thread intentionally. * RECOMMENDED_COMPANION_VERSION bumped to 2.2.0. gemini_manifold_companion.py (filter), version 2.1.0 -> 2.2.0: * Remove the sync Functions.get_function_valves_by_id(...) call from Filter.__init__ (it is async upstream and would raise at import time). Initialize with self.Valves() defaults; Open WebUI's filter pipeline injects the DB-backed valves on the module instance before each inlet/outlet/stream call, so configured valves are still applied at runtime. * Drop the now-unused open_webui.models.functions import. --- plugins/filters/gemini_manifold_companion.py | 20 +++--- plugins/pipes/gemini_manifold.py | 72 ++++++++++++-------- 2 files changed, 54 insertions(+), 38 deletions(-) diff --git a/plugins/filters/gemini_manifold_companion.py b/plugins/filters/gemini_manifold_companion.py index ab64fa9..cd850cd 100644 --- a/plugins/filters/gemini_manifold_companion.py +++ b/plugins/filters/gemini_manifold_companion.py @@ -6,10 +6,18 @@ author_url: https://github.com/suurt8ll funding_url: https://github.com/suurt8ll/open_webui_functions license: MIT -version: 2.1.0 +version: 2.2.0 """ -VERSION = "2.1.0" +# Changelog: +# 2.2.0 - Compatibility with Open WebUI >= 0.9.0. +# Removed the sync `Functions.get_function_valves_by_id(...)` call from +# `Filter.__init__` (the method is now `async` upstream and raised at import +# time). Valves are now initialized with defaults; Open WebUI's filter +# pipeline injects the DB-backed valves onto the module instance before +# each `inlet`/`outlet`/`stream` invocation, so behavior is preserved. + +VERSION = "2.2.0" # This filter can detect that a feature like web search or code execution is enabled in the front-end, # set the feature back to False so Open WebUI does not run it's own logic and then @@ -34,8 +42,6 @@ from collections.abc import Awaitable, Callable from typing import Any, Literal, TYPE_CHECKING, cast -from open_webui.models.functions import Functions - if TYPE_CHECKING: from loguru import Record from loguru._handler import Handler # type: ignore @@ -148,10 +154,8 @@ class Valves(BaseModel): # TODO: Support user settting through UserValves. def __init__(self): - # This hack makes the valves values available to the `__init__` method. - # TODO: Get the id from the frontmatter instead of hardcoding it. - valves = Functions.get_function_valves_by_id("gemini_manifold_companion") - self.valves = self.Valves(**(valves if valves else {})) + # Initialize valves with defaults; the framework injects DB values before each request. + self.valves = self.Valves() self.log_level = self.valves.LOG_LEVEL self._add_log_handler() log.success("Function has been initialized.") diff --git a/plugins/pipes/gemini_manifold.py b/plugins/pipes/gemini_manifold.py index b0ea4fb..896b18b 100755 --- a/plugins/pipes/gemini_manifold.py +++ b/plugins/pipes/gemini_manifold.py @@ -6,17 +6,26 @@ author_url: https://github.com/suurt8ll funding_url: https://github.com/suurt8ll/open_webui_functions license: MIT -version: 2.1.0 +version: 2.2.0 requirements: google-genai==1.65.0 """ +# Changelog: +# 2.2.0 - Compatibility with Open WebUI >= 0.9.0. +# Open WebUI converted `Chats`, `Files`, and `Functions` DB methods to `async`. +# All affected call sites are now awaited; `_fetch_and_validate_chat_history`, +# `_get_toggleable_feature_status`, `_build_gen_content_config`, and +# `_determine_execution_order` are now `async`. DB fetch and cumulative +# usage injection moved from `GeminiContentBuilder.__init__` into +# `build_contents()` since `__init__` cannot await. + # I change these only when I make a release to avoid PR merge conflicts. # If you are making a PR then please do not change these values. -VERSION = "2.1.0" +VERSION = "2.2.0" # This is the recommended version for the companion filter. # Older versions might still work, but backward compatibility is not guaranteed # during the development of this personal use plugin. -RECOMMENDED_COMPANION_VERSION = "2.1.0" +RECOMMENDED_COMPANION_VERSION = "2.2.0" # Keys `title`, `id` and `description` in the frontmatter above are used for my own development purposes. @@ -730,6 +739,8 @@ def __init__( files_api_manager: "FilesAPIManager", ): self.messages_body = messages_body + self.metadata_body = metadata_body + self.user_data = user_data self.upload_documents = (metadata_body.get("features", {}) or {}).get( "upload_documents", False ) @@ -744,21 +755,24 @@ def __init__( self.system_prompt, self.messages_body = self._extract_system_prompt( self.messages_body ) - self.messages_db = self._fetch_and_validate_chat_history( - metadata_body, user_data - ) - - # Retrieve cumulative usage from the DB history and inject it into metadata. - # This will be picked up later when constructing the final usage payload. - c_tokens, c_cost = self._retrieve_previous_usage_data() - metadata_body["cumulative_tokens"] = c_tokens - metadata_body["cumulative_cost"] = c_cost + # messages_db and cumulative usage are populated asynchronously in build_contents(). + self.messages_db: list["ChatMessageTD"] | None = None async def build_contents(self) -> list[types.Content]: """ The main public method to generate the contents list by processing all message turns concurrently and using a self-configuring status manager. """ + # Fetch chat history and cumulative usage from the DB (async APIs). + self.messages_db = await self._fetch_and_validate_chat_history( + self.metadata_body, self.user_data + ) + # Retrieve cumulative usage from the DB history and inject it into metadata. + # This will be picked up later when constructing the final usage payload. + c_tokens, c_cost = self._retrieve_previous_usage_data() + self.metadata_body["cumulative_tokens"] = c_tokens + self.metadata_body["cumulative_cost"] = c_cost + if not self.messages_db: warn_msg = ( "There was a problem retrieving the messages from the backend database. " @@ -836,7 +850,7 @@ def _extract_system_prompt( system_prompt: str | None = (system_message or {}).get("content") return system_prompt, remaining_messages # type: ignore - def _fetch_and_validate_chat_history( + async def _fetch_and_validate_chat_history( self, metadata_body: "Metadata", user_data: "UserData" ) -> list["ChatMessageTD"] | None: """ @@ -845,7 +859,7 @@ def _fetch_and_validate_chat_history( """ # 1. Fetch from database chat_id = metadata_body.get("chat_id", "") - if chat := Chats.get_chat_by_id_and_user_id( + if chat := await Chats.get_chat_by_id_and_user_id( id=chat_id, user_id=user_data["id"] ): chat_content: "ChatObjectDataTD" = chat.chat # type: ignore @@ -1535,10 +1549,9 @@ async def _get_file_data(file_id: str) -> tuple[bytes | None, str | None]: log.warning("file_id is empty. Cannot continue.") return None, None - # Run the synchronous, blocking database call in a separate thread - # to avoid blocking the main asyncio event loop. + # Await the async database call directly. try: - file_model = await asyncio.to_thread(Files.get_file_by_id, file_id) + file_model = await Files.get_file_by_id(file_id) except Exception as e: log.exception( f"An unexpected error occurred during database call for file_id {file_id}: {e}" @@ -2116,7 +2129,7 @@ async def pipe( # TODO: use the structured outputs feature to ensure a valid json at all times? # 3. Determine Execution Order - execution_order = self._determine_execution_order( + execution_order = await self._determine_execution_order( valves=valves, __metadata__=__metadata__, model_config=model_config, @@ -2639,7 +2652,7 @@ def _is_image_model(model_id: str, config: dict) -> bool: # region 2.3 GenerateContentConfig assembly - def _build_gen_content_config( + async def _build_gen_content_config( self, body: "Body", __metadata__: "Metadata", @@ -2725,7 +2738,7 @@ def _build_gen_content_config( ) # Check if reasoning can be disabled via toggle, which overrides other settings. - is_avail, is_on = self._get_toggleable_feature_status( + is_avail, is_on = await self._get_toggleable_feature_status( "gemini_reasoning_toggle", __metadata__ ) if is_avail and not is_on: @@ -2799,7 +2812,7 @@ def _build_gen_content_config( ) # Determine if URL context tool should be enabled. - is_avail, is_on = self._get_toggleable_feature_status( + is_avail, is_on = await self._get_toggleable_feature_status( "gemini_url_context_toggle", __metadata__ ) enable_url_context = valves.ENABLE_URL_CONTEXT_TOOL # Start with valve default. @@ -2831,7 +2844,7 @@ def _build_gen_content_config( ) # Determine if Google Maps grounding should be enabled. - is_avail, is_on = self._get_toggleable_feature_status( + is_avail, is_on = await self._get_toggleable_feature_status( "gemini_maps_grounding_toggle", __metadata__ ) if is_avail and is_on: @@ -2965,7 +2978,7 @@ async def _execute_generation_attempt( contents = await builder.build_contents() # 4. Configuration Building - gen_content_conf = self._build_gen_content_config( + gen_content_conf = await self._build_gen_content_config( body, __metadata__, valves, model_config ) gen_content_conf.system_instruction = builder.system_prompt @@ -3443,8 +3456,7 @@ async def _upload_image( return None log.debug("Adding the image file to the Open WebUI files database.") - file_item = await asyncio.to_thread( - Files.insert_new_file, + file_item = await Files.insert_new_file( __metadata__.get("user_id"), FileForm( id=id, @@ -4048,7 +4060,7 @@ def plugin_filter(record: "Record"): # region 2.7 Utility helpers - def _determine_execution_order( + async def _determine_execution_order( self, valves: "Pipe.Valves", __metadata__: "Metadata", @@ -4064,7 +4076,7 @@ def _determine_execution_order( has_paid_key = bool(valves.GEMINI_PAID_API_KEY) # Retrieve Toggle Statuses - vertex_available, vertex_toggled_on = self._get_toggleable_feature_status( + vertex_available, vertex_toggled_on = await self._get_toggleable_feature_status( "gemini_vertex_ai_toggle", __metadata__ ) # Vertex is only viable if toggled on AND we have a project ID @@ -4072,7 +4084,7 @@ def _determine_execution_order( vertex_available and vertex_toggled_on and bool(valves.VERTEX_PROJECT) ) - paid_toggle_available, paid_toggled_on = self._get_toggleable_feature_status( + paid_toggle_available, paid_toggled_on = await self._get_toggleable_feature_status( "gemini_paid_api", __metadata__ ) @@ -4209,7 +4221,7 @@ def _resolve_custom_params( return merged_params @staticmethod - def _get_toggleable_feature_status( + async def _get_toggleable_feature_status( filter_id: str, __metadata__: "Metadata", ) -> tuple[bool, bool]: @@ -4234,7 +4246,7 @@ def _get_toggleable_feature_status( - is_toggled_on: True if the user has the toggle ON in the UI for this request. """ # 1. Check if the filter is installed - f = Functions.get_function_by_id(filter_id) + f = await Functions.get_function_by_id(filter_id) if not f: log.warning( f"The '{filter_id}' filter is not installed. " From faec5791239070c728b32b9415fb9fe8a4ea4976 Mon Sep 17 00:00:00 2001 From: Carl Bender Date: Tue, 21 Apr 2026 20:18:33 -0400 Subject: [PATCH 2/2] Incorporate production-hardened history alignment - _fetch_and_validate_chat_history: match upstream author's resilient version - short-circuit on empty/'local' chat_id - filter out system rows - strip trailing empty assistant placeholders - soft-align on length mismatch (trim when DB > body; return None when DB < body) - build_contents: quietly log.info + fall back to in-memory payload when DB history is not ready, instead of surfacing a user-facing warning toast - gemini_manifold_companion.py: restore 'from open_webui.models.functions import Functions' import to match upstream baseline --- plugins/filters/gemini_manifold_companion.py | 2 + plugins/pipes/gemini_manifold.py | 79 +++++++++++--------- 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/plugins/filters/gemini_manifold_companion.py b/plugins/filters/gemini_manifold_companion.py index cd850cd..6e86620 100644 --- a/plugins/filters/gemini_manifold_companion.py +++ b/plugins/filters/gemini_manifold_companion.py @@ -42,6 +42,8 @@ from collections.abc import Awaitable, Callable from typing import Any, Literal, TYPE_CHECKING, cast +from open_webui.models.functions import Functions + if TYPE_CHECKING: from loguru import Record from loguru._handler import Handler # type: ignore diff --git a/plugins/pipes/gemini_manifold.py b/plugins/pipes/gemini_manifold.py index 896b18b..0b9203f 100755 --- a/plugins/pipes/gemini_manifold.py +++ b/plugins/pipes/gemini_manifold.py @@ -18,6 +18,14 @@ # `_determine_execution_order` are now `async`. DB fetch and cumulative # usage injection moved from `GeminiContentBuilder.__init__` into # `build_contents()` since `__init__` cannot await. +# `_fetch_and_validate_chat_history` now performs soft alignment instead +# of strict length validation: short-circuits on empty/"local" chat ids, +# filters out system rows, strips trailing empty assistant placeholders, +# trims DB history when it exceeds the body length, and only returns +# `None` when the DB lags behind the body (common on the first message +# of a new chat). The missing-history branch in `build_contents` now +# logs quietly and falls back to the in-memory payload rather than +# surfacing a user-facing warning toast. # I change these only when I make a release to avoid PR merge conflicts. # If you are making a PR then please do not change these values. @@ -739,8 +747,6 @@ def __init__( files_api_manager: "FilesAPIManager", ): self.messages_body = messages_body - self.metadata_body = metadata_body - self.user_data = user_data self.upload_documents = (metadata_body.get("features", {}) or {}).get( "upload_documents", False ) @@ -755,8 +761,9 @@ def __init__( self.system_prompt, self.messages_body = self._extract_system_prompt( self.messages_body ) - # messages_db and cumulative usage are populated asynchronously in build_contents(). - self.messages_db: list["ChatMessageTD"] | None = None + self.metadata_body = metadata_body + self.user_data = user_data + self.messages_db = None async def build_contents(self) -> list[types.Content]: """ @@ -774,12 +781,10 @@ async def build_contents(self) -> list[types.Content]: self.metadata_body["cumulative_cost"] = c_cost if not self.messages_db: - warn_msg = ( - "There was a problem retrieving the messages from the backend database. " - "Check the console for more details. " - "Citation filtering and file uploads will not be available." + log.info( + "Database history not ready or lengths mismatched. " + "Falling back to active memory payload." ) - self.event_emitter.emit_toast(warn_msg, "warning") # 1. Set up and launch the status manager. It will activate itself if needed. status_manager = UploadStatusManager(self.event_emitter) @@ -854,40 +859,46 @@ async def _fetch_and_validate_chat_history( self, metadata_body: "Metadata", user_data: "UserData" ) -> list["ChatMessageTD"] | None: """ - Fetches message history from the database and validates its length against the request body. - Returns the database messages or None if not found or if validation fails. + Fetches message history from the database and safely aligns its length against the request body. """ - # 1. Fetch from database chat_id = metadata_body.get("chat_id", "") + if not chat_id or chat_id == "local": + return None + if chat := await Chats.get_chat_by_id_and_user_id( id=chat_id, user_id=user_data["id"] ): chat_content: "ChatObjectDataTD" = chat.chat # type: ignore - log.trace("Fetched messages from database:", payload=chat_content.get("messages")) - # Last message is the upcoming assistant response, at this point in the logic it's empty. - messages_db = chat_content.get("messages", [])[:-1] - else: - log.warning( - f"Chat {chat_id} not found. File handling (audio, video, PDF), citation filtering, " - "and high-fidelity assistant response restoration are unavailable." - ) - return None + raw_messages_db = chat_content.get("messages", []) - # 2. Validate length against the current message body - if len(messages_db) != len(self.messages_body): - warn_msg = ( - f"Messages in the body ({len(self.messages_body)}) and " - f"messages in the database ({len(messages_db)}) do not match. " - "This is likely due to a bug in Open WebUI. " - "File handling and high-fidelity response restoration will be disabled." - ) + # 1. Filter out system messages + messages_db = [m for m in raw_messages_db if m.get("role") != "system"] - # TODO: Emit a toast to the user in the front-end. - log.warning(warn_msg) - # Invalidate the db messages if they don't match - return None + # 2. Strip trailing empty assistant placeholders + while ( + len(messages_db) > len(self.messages_body) + and messages_db[-1].get("role") == "assistant" + ): + messages_db = messages_db[:-1] + + # 3. Soft validation and alignment + body_len = len(self.messages_body) + db_len = len(messages_db) - return messages_db + if db_len != body_len: + log.debug( + f"Message count mismatch. Body: {body_len}, DB: {db_len}. Auto-aligning." + ) + if db_len > body_len: + # DB has extra messages. Take the most recent matching ones. + messages_db = messages_db[-body_len:] + else: + # DB hasn't saved the latest user message yet (common on first message of a new chat). + return None + + return messages_db + + return None async def _process_message_turn( self, i: int, message: "Message", status_queue: asyncio.Queue