Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions plugins/filters/gemini_manifold_companion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
author_url: https://github.com/suurt8ll
funding_url: https://github.com/suurt8ll/open_webui_functions
license: MIT
version: 2.1.0
version: 2.2.0
"""

VERSION = "2.1.0"
# Changelog:
# 2.2.0 - Compatibility with Open WebUI >= 0.9.0.
# Removed the sync `Functions.get_function_valves_by_id(...)` call from
# `Filter.__init__` (the method is now `async` upstream and raised at import
# time). Valves are now initialized with defaults; Open WebUI's filter
# pipeline injects the DB-backed valves onto the module instance before
# each `inlet`/`outlet`/`stream` invocation, so behavior is preserved.

VERSION = "2.2.0"

# This filter can detect that a feature like web search or code execution is enabled in the front-end,
# set the feature back to False so Open WebUI does not run it's own logic and then
Expand Down Expand Up @@ -148,10 +156,8 @@ class Valves(BaseModel):
# TODO: Support user settting through UserValves.

def __init__(self):
# This hack makes the valves values available to the `__init__` method.
# TODO: Get the id from the frontmatter instead of hardcoding it.
valves = Functions.get_function_valves_by_id("gemini_manifold_companion")
self.valves = self.Valves(**(valves if valves else {}))
# Initialize valves with defaults; the framework injects DB values before each request.
self.valves = self.Valves()
self.log_level = self.valves.LOG_LEVEL
self._add_log_handler()
log.success("Function has been initialized.")
Expand Down
143 changes: 83 additions & 60 deletions plugins/pipes/gemini_manifold.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,34 @@
author_url: https://github.com/suurt8ll
funding_url: https://github.com/suurt8ll/open_webui_functions
license: MIT
version: 2.1.0
version: 2.2.0
requirements: google-genai==1.65.0
"""

# Changelog:
# 2.2.0 - Compatibility with Open WebUI >= 0.9.0.
# Open WebUI converted `Chats`, `Files`, and `Functions` DB methods to `async`.
# All affected call sites are now awaited; `_fetch_and_validate_chat_history`,
# `_get_toggleable_feature_status`, `_build_gen_content_config`, and
# `_determine_execution_order` are now `async`. DB fetch and cumulative
# usage injection moved from `GeminiContentBuilder.__init__` into
# `build_contents()` since `__init__` cannot await.
# `_fetch_and_validate_chat_history` now performs soft alignment instead
# of strict length validation: short-circuits on empty/"local" chat ids,
# filters out system rows, strips trailing empty assistant placeholders,
# trims DB history when it exceeds the body length, and only returns
# `None` when the DB lags behind the body (common on the first message
# of a new chat). The missing-history branch in `build_contents` now
# logs quietly and falls back to the in-memory payload rather than
# surfacing a user-facing warning toast.

# I change these only when I make a release to avoid PR merge conflicts.
# If you are making a PR then please do not change these values.
VERSION = "2.1.0"
VERSION = "2.2.0"
# This is the recommended version for the companion filter.
# Older versions might still work, but backward compatibility is not guaranteed
# during the development of this personal use plugin.
RECOMMENDED_COMPANION_VERSION = "2.1.0"
RECOMMENDED_COMPANION_VERSION = "2.2.0"


# Keys `title`, `id` and `description` in the frontmatter above are used for my own development purposes.
Expand Down Expand Up @@ -744,28 +761,30 @@ def __init__(
self.system_prompt, self.messages_body = self._extract_system_prompt(
self.messages_body
)
self.messages_db = self._fetch_and_validate_chat_history(
metadata_body, user_data
)

# Retrieve cumulative usage from the DB history and inject it into metadata.
# This will be picked up later when constructing the final usage payload.
c_tokens, c_cost = self._retrieve_previous_usage_data()
metadata_body["cumulative_tokens"] = c_tokens
metadata_body["cumulative_cost"] = c_cost
self.metadata_body = metadata_body
self.user_data = user_data
self.messages_db = None

async def build_contents(self) -> list[types.Content]:
"""
The main public method to generate the contents list by processing all
message turns concurrently and using a self-configuring status manager.
"""
# Fetch chat history and cumulative usage from the DB (async APIs).
self.messages_db = await self._fetch_and_validate_chat_history(
self.metadata_body, self.user_data
)
# Retrieve cumulative usage from the DB history and inject it into metadata.
# This will be picked up later when constructing the final usage payload.
c_tokens, c_cost = self._retrieve_previous_usage_data()
self.metadata_body["cumulative_tokens"] = c_tokens
self.metadata_body["cumulative_cost"] = c_cost

if not self.messages_db:
warn_msg = (
"There was a problem retrieving the messages from the backend database. "
"Check the console for more details. "
"Citation filtering and file uploads will not be available."
log.info(
"Database history not ready or lengths mismatched. "
"Falling back to active memory payload."
)
self.event_emitter.emit_toast(warn_msg, "warning")

# 1. Set up and launch the status manager. It will activate itself if needed.
status_manager = UploadStatusManager(self.event_emitter)
Expand Down Expand Up @@ -836,44 +855,50 @@ def _extract_system_prompt(
system_prompt: str | None = (system_message or {}).get("content")
return system_prompt, remaining_messages # type: ignore

def _fetch_and_validate_chat_history(
async def _fetch_and_validate_chat_history(
self, metadata_body: "Metadata", user_data: "UserData"
) -> list["ChatMessageTD"] | None:
"""
Fetches message history from the database and validates its length against the request body.
Returns the database messages or None if not found or if validation fails.
Fetches message history from the database and safely aligns its length against the request body.
"""
# 1. Fetch from database
chat_id = metadata_body.get("chat_id", "")
if chat := Chats.get_chat_by_id_and_user_id(
if not chat_id or chat_id == "local":
return None

if chat := await Chats.get_chat_by_id_and_user_id(
id=chat_id, user_id=user_data["id"]
):
chat_content: "ChatObjectDataTD" = chat.chat # type: ignore
log.trace("Fetched messages from database:", payload=chat_content.get("messages"))
# Last message is the upcoming assistant response, at this point in the logic it's empty.
messages_db = chat_content.get("messages", [])[:-1]
else:
log.warning(
f"Chat {chat_id} not found. File handling (audio, video, PDF), citation filtering, "
"and high-fidelity assistant response restoration are unavailable."
)
return None
raw_messages_db = chat_content.get("messages", [])

# 2. Validate length against the current message body
if len(messages_db) != len(self.messages_body):
warn_msg = (
f"Messages in the body ({len(self.messages_body)}) and "
f"messages in the database ({len(messages_db)}) do not match. "
"This is likely due to a bug in Open WebUI. "
"File handling and high-fidelity response restoration will be disabled."
)
# 1. Filter out system messages
messages_db = [m for m in raw_messages_db if m.get("role") != "system"]

# TODO: Emit a toast to the user in the front-end.
log.warning(warn_msg)
# Invalidate the db messages if they don't match
return None
# 2. Strip trailing empty assistant placeholders
while (
len(messages_db) > len(self.messages_body)
and messages_db[-1].get("role") == "assistant"
):
messages_db = messages_db[:-1]

return messages_db
# 3. Soft validation and alignment
body_len = len(self.messages_body)
db_len = len(messages_db)

if db_len != body_len:
log.debug(
f"Message count mismatch. Body: {body_len}, DB: {db_len}. Auto-aligning."
)
if db_len > body_len:
# DB has extra messages. Take the most recent matching ones.
messages_db = messages_db[-body_len:]
else:
# DB hasn't saved the latest user message yet (common on first message of a new chat).
return None

return messages_db

return None

async def _process_message_turn(
self, i: int, message: "Message", status_queue: asyncio.Queue
Expand Down Expand Up @@ -1535,10 +1560,9 @@ async def _get_file_data(file_id: str) -> tuple[bytes | None, str | None]:
log.warning("file_id is empty. Cannot continue.")
return None, None

# Run the synchronous, blocking database call in a separate thread
# to avoid blocking the main asyncio event loop.
# Await the async database call directly.
try:
file_model = await asyncio.to_thread(Files.get_file_by_id, file_id)
file_model = await Files.get_file_by_id(file_id)
except Exception as e:
log.exception(
f"An unexpected error occurred during database call for file_id {file_id}: {e}"
Expand Down Expand Up @@ -2116,7 +2140,7 @@ async def pipe(
# TODO: use the structured outputs feature to ensure a valid json at all times?

# 3. Determine Execution Order
execution_order = self._determine_execution_order(
execution_order = await self._determine_execution_order(
valves=valves,
__metadata__=__metadata__,
model_config=model_config,
Expand Down Expand Up @@ -2639,7 +2663,7 @@ def _is_image_model(model_id: str, config: dict) -> bool:

# region 2.3 GenerateContentConfig assembly

def _build_gen_content_config(
async def _build_gen_content_config(
self,
body: "Body",
__metadata__: "Metadata",
Expand Down Expand Up @@ -2725,7 +2749,7 @@ def _build_gen_content_config(
)

# Check if reasoning can be disabled via toggle, which overrides other settings.
is_avail, is_on = self._get_toggleable_feature_status(
is_avail, is_on = await self._get_toggleable_feature_status(
"gemini_reasoning_toggle", __metadata__
)
if is_avail and not is_on:
Expand Down Expand Up @@ -2799,7 +2823,7 @@ def _build_gen_content_config(
)

# Determine if URL context tool should be enabled.
is_avail, is_on = self._get_toggleable_feature_status(
is_avail, is_on = await self._get_toggleable_feature_status(
"gemini_url_context_toggle", __metadata__
)
enable_url_context = valves.ENABLE_URL_CONTEXT_TOOL # Start with valve default.
Expand Down Expand Up @@ -2831,7 +2855,7 @@ def _build_gen_content_config(
)

# Determine if Google Maps grounding should be enabled.
is_avail, is_on = self._get_toggleable_feature_status(
is_avail, is_on = await self._get_toggleable_feature_status(
"gemini_maps_grounding_toggle", __metadata__
)
if is_avail and is_on:
Expand Down Expand Up @@ -2965,7 +2989,7 @@ async def _execute_generation_attempt(
contents = await builder.build_contents()

# 4. Configuration Building
gen_content_conf = self._build_gen_content_config(
gen_content_conf = await self._build_gen_content_config(
body, __metadata__, valves, model_config
)
gen_content_conf.system_instruction = builder.system_prompt
Expand Down Expand Up @@ -3443,8 +3467,7 @@ async def _upload_image(
return None

log.debug("Adding the image file to the Open WebUI files database.")
file_item = await asyncio.to_thread(
Files.insert_new_file,
file_item = await Files.insert_new_file(
__metadata__.get("user_id"),
FileForm(
id=id,
Expand Down Expand Up @@ -4048,7 +4071,7 @@ def plugin_filter(record: "Record"):

# region 2.7 Utility helpers

def _determine_execution_order(
async def _determine_execution_order(
self,
valves: "Pipe.Valves",
__metadata__: "Metadata",
Expand All @@ -4064,15 +4087,15 @@ def _determine_execution_order(
has_paid_key = bool(valves.GEMINI_PAID_API_KEY)

# Retrieve Toggle Statuses
vertex_available, vertex_toggled_on = self._get_toggleable_feature_status(
vertex_available, vertex_toggled_on = await self._get_toggleable_feature_status(
"gemini_vertex_ai_toggle", __metadata__
)
# Vertex is only viable if toggled on AND we have a project ID
can_use_vertex = (
vertex_available and vertex_toggled_on and bool(valves.VERTEX_PROJECT)
)

paid_toggle_available, paid_toggled_on = self._get_toggleable_feature_status(
paid_toggle_available, paid_toggled_on = await self._get_toggleable_feature_status(
"gemini_paid_api", __metadata__
)

Expand Down Expand Up @@ -4209,7 +4232,7 @@ def _resolve_custom_params(
return merged_params

@staticmethod
def _get_toggleable_feature_status(
async def _get_toggleable_feature_status(
filter_id: str,
__metadata__: "Metadata",
) -> tuple[bool, bool]:
Expand All @@ -4234,7 +4257,7 @@ def _get_toggleable_feature_status(
- is_toggled_on: True if the user has the toggle ON in the UI for this request.
"""
# 1. Check if the filter is installed
f = Functions.get_function_by_id(filter_id)
f = await Functions.get_function_by_id(filter_id)
if not f:
log.warning(
f"The '{filter_id}' filter is not installed. "
Expand Down
Loading