Skip to content

Tests #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
pip install pytest pytest_asyncio hypothesis
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest
run: |
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ Current status: Working/In-Progress
- Launch with `python3 -m tldw_chatbook.app` (working on pip packaging)
- Inspiration from [Elia Chat](https://github.com/darrenburns/elia)

#### Chat Features
<details>
<summary> Chat Features </summary>

@@ -57,6 +58,7 @@ Current status: Working/In-Progress
- Support for searchingloading/editing/saving Notes from the Notes Database.
</details>
#### Notes & Media Features
<details>
<summary> Notes & Media Features </summary>
@@ -75,7 +77,7 @@ Current status: Working/In-Progress
-
</details>
#### Local LLM Management Features
<details>
<summary> Local LLM Inference </summary>
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -14,11 +14,11 @@

import requests

from tldw_cli.tldw_app.DB.Media_DB import Database, DatabaseError
from tldw_chatbook.DB.Client_Media_DB_v2 import MediaDatabase as Database, DatabaseError

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from tldw_cli.tldw_app.DB.Sync_Client import ClientSyncEngine, SYNC_BATCH_SIZE # Your client sync engine
from tldw_chatbook.DB.Sync_Client import ClientSyncEngine, SYNC_BATCH_SIZE # Your client sync engine



File renamed without changes.
File renamed without changes.
File renamed without changes.
887 changes: 258 additions & 629 deletions Tests/ChaChaNotesDB/test_chachanotes_db.py

Large diffs are not rendered by default.

1,128 changes: 1,128 additions & 0 deletions Tests/ChaChaNotesDB/test_chachanotes_db_properties.py

Large diffs are not rendered by default.

478 changes: 323 additions & 155 deletions Tests/Character_Chat/test_character_chat.py

Large diffs are not rendered by default.

1,023 changes: 311 additions & 712 deletions Tests/Chat/test_chat_functions.py

Large diffs are not rendered by default.

221 changes: 121 additions & 100 deletions Tests/Chat/test_prompt_template_manager.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,163 @@
# Tests/Chat/test_prompt_template_manager.py
# test_prompt_template_manager.py

import pytest
import json
from pathlib import Path
from unittest.mock import patch, mock_open

# Local Imports from this project
from tldw_chatbook.Chat.prompt_template_manager import (
PromptTemplate,
load_template,
apply_template_to_string,
get_available_templates,
DEFAULT_RAW_PASSTHROUGH_TEMPLATE,
_loaded_templates # For clearing cache in tests
_loaded_templates,
DEFAULT_RAW_PASSTHROUGH_TEMPLATE
)


# Fixture to clear the template cache before each test
# --- Test Setup ---

@pytest.fixture(autouse=True)
def clear_template_cache():
"""Fixture to clear the template cache before each test."""
original_templates = _loaded_templates.copy()
_loaded_templates.clear()
# Re-add the default passthrough because it's normally added at module load
# Ensure the default is always there for tests that might rely on it
_loaded_templates["raw_passthrough"] = DEFAULT_RAW_PASSTHROUGH_TEMPLATE
yield
# Restore original cache state if needed, though clearing is usually sufficient
_loaded_templates.clear()
_loaded_templates.update(original_templates)


@pytest.fixture
def mock_templates_dir(tmp_path: Path):
templates_dir = tmp_path / "prompt_templates_test"
def mock_templates_dir(tmp_path, monkeypatch):
"""Creates a temporary directory for prompt templates and patches the module-level constant."""
templates_dir = tmp_path / "prompt_templates"
templates_dir.mkdir()

# Create a valid template file
valid_template_data = {
"name": "test_valid",
"description": "A valid test template",
"system_message_template": "System: {sys_var}",
"user_message_content_template": "User: {user_var} - {message_content}"
# Patch the PROMPT_TEMPLATES_DIR constant in the target module
monkeypatch.setattr('tldw_chatbook.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR', templates_dir)

# Create some dummy template files
template1_data = {
"name": "test_template_1",
"description": "A simple test template.",
"user_message_content_template": "User said: {{message_content}}"
}
with open(templates_dir / "test_valid.json", "w") as f:
json.dump(valid_template_data, f)
(templates_dir / "test_template_1.json").write_text(json.dumps(template1_data))

# Create an invalid JSON template file
with open(templates_dir / "test_invalid_json.json", "w") as f:
f.write("{'name': 'invalid', 'description': 'bad json'") # Invalid JSON
template2_data = {
"name": "test_template_2",
"system_message_template": "System context: {{system_context}}",
"user_message_content_template": "{{message_content}}"
}
(templates_dir / "test_template_2.json").write_text(json.dumps(template2_data))

# Create an empty template file (valid JSON but might be handled as error by Pydantic)
with open(templates_dir / "test_empty.json", "w") as f:
json.dump({}, f)
# Create a malformed JSON file
(templates_dir / "malformed.json").write_text("{'invalid': 'json'")

return templates_dir


@pytest.mark.unit
def test_load_template_success(mock_templates_dir):
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", mock_templates_dir):
template = load_template("test_valid")
assert template is not None
assert template.name == "test_valid"
assert template.system_message_template == "System: {sys_var}"
# Test caching
template_cached = load_template("test_valid")
assert template_cached is template # Should be the same object from cache
# --- Test Cases ---

class TestPromptTemplateManager:

@pytest.mark.unit
def test_load_template_not_found(mock_templates_dir):
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", mock_templates_dir):
def test_load_template_success(self, mock_templates_dir):
"""Test successfully loading a valid template file."""
template = load_template("test_template_1")
assert template is not None
assert isinstance(template, PromptTemplate)
assert template.name == "test_template_1"
assert template.user_message_content_template == "User said: {{message_content}}"

def test_load_template_not_found(self, mock_templates_dir):
"""Test loading a template that does not exist."""
template = load_template("non_existent_template")
assert template is None


@pytest.mark.unit
def test_load_template_invalid_json(mock_templates_dir):
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", mock_templates_dir):
template = load_template("test_invalid_json")
assert template is None # Should fail to parse


@pytest.mark.unit
def test_load_template_empty_json_fails_validation(mock_templates_dir):
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", mock_templates_dir):
template = load_template("test_empty")
# Pydantic will raise validation error because 'name' is missing,
# load_template should catch this and return None.
def test_load_template_malformed_json(self, mock_templates_dir):
"""Test loading a template from a file with invalid JSON."""
template = load_template("malformed")
assert template is None

def test_load_template_caching(self, mock_templates_dir):
"""Test that a loaded template is cached and not re-read from disk."""
template1 = load_template("test_template_1")
assert "test_template_1" in _loaded_templates

@pytest.mark.unit
# Inside test_apply_template_to_string():
def test_apply_template_to_string():
template_str_jinja = "Hello {{name}}, welcome to {{place}}." # Use Jinja
data_full = {"name": "Alice", "place": "Wonderland"}
assert apply_template_to_string(template_str_jinja, data_full) == "Hello Alice, welcome to Wonderland."
# Modify the file on disk
(mock_templates_dir / "test_template_1.json").write_text(json.dumps({"name": "modified"}))

template_partial_jinja = "Hello {{name}}." # Use Jinja
data_partial = {"name": "Bob"}
assert apply_template_to_string(template_partial_jinja, data_partial) == "Hello Bob."
# Load again - should return the cached version
template2 = load_template("test_template_1")
assert template2 is not None
assert template2.name == "test_template_1" # Original name from cache
assert template2 == template1

# Test with missing data - Jinja renders empty for missing by default if not strict
assert apply_template_to_string(template_partial_jinja, {}) == "Hello ."

# Test with None template string
assert apply_template_to_string(None, data_full) == ""


@pytest.mark.unit
def test_get_available_templates(mock_templates_dir):
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", mock_templates_dir):
def test_get_available_templates(self, mock_templates_dir):
"""Test discovering available templates from the directory."""
available = get_available_templates()
assert isinstance(available, list)
assert "test_valid" in available
assert "test_invalid_json" in available
assert "test_empty" in available
assert len(available) == 3


@pytest.mark.unit
def test_get_available_templates_no_dir():
with patch("tldw_app.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR", Path("/non/existent/dir")):
available = get_available_templates()
assert available == []
assert set(available) == {"test_template_1", "test_template_2", "malformed"}

def test_get_available_templates_no_dir(self, tmp_path, monkeypatch):
"""Test getting templates when the directory doesn't exist."""
non_existent_dir = tmp_path / "non_existent_dir"
monkeypatch.setattr('tldw_chatbook.Chat.prompt_template_manager.PROMPT_TEMPLATES_DIR', non_existent_dir)
assert get_available_templates() == []

@pytest.mark.unit
def test_default_raw_passthrough_template():
assert DEFAULT_RAW_PASSTHROUGH_TEMPLATE is not None
assert DEFAULT_RAW_PASSTHROUGH_TEMPLATE.name == "raw_passthrough"
data = {"message_content": "test content", "original_system_message_from_request": "system content"}

# User message template (is "{{message_content}}")
assert apply_template_to_string(DEFAULT_RAW_PASSTHROUGH_TEMPLATE.user_message_content_template,
data) == "test content"
# System message template (is "{{original_system_message_from_request}}")
assert apply_template_to_string(DEFAULT_RAW_PASSTHROUGH_TEMPLATE.system_message_template,
data) == "system content"

data_empty_sys = {"original_system_message_from_request": ""}
assert apply_template_to_string(DEFAULT_RAW_PASSTHROUGH_TEMPLATE.system_message_template,
data_empty_sys) == ""

data_missing_sys = {"message_content": "some_content"} # original_system_message_from_request is missing
assert apply_template_to_string(DEFAULT_RAW_PASSTHROUGH_TEMPLATE.system_message_template,
data_missing_sys) == "" # Jinja renders missing as empty

def test_default_passthrough_template_is_available(self):
"""Test that the default 'raw_passthrough' template is loaded."""
template = load_template("raw_passthrough")
assert template is not None
assert template.name == "raw_passthrough"
assert template.user_message_content_template == "{{message_content}}"


class TestTemplateRendering:

def test_apply_template_to_string_success(self):
"""Test basic successful rendering."""
template_str = "Hello, {{ name }}!"
data = {"name": "World"}
result = apply_template_to_string(template_str, data)
assert result == "Hello, World!"

def test_apply_template_to_string_missing_placeholder(self):
"""Test rendering when a placeholder in the template is not in the data."""
template_str = "Hello, {{ name }}! Your age is {{ age }}."
data = {"name": "World"} # 'age' is missing
result = apply_template_to_string(template_str, data)
assert result == "Hello, World! Your age is ." # Jinja renders missing variables as empty strings

def test_apply_template_with_none_input_string(self):
"""Test that a None template string returns an empty string."""
data = {"name": "World"}
result = apply_template_to_string(None, data)
assert result == ""

def test_apply_template_with_complex_data(self):
"""Test rendering with more complex data structures like lists and dicts."""
template_str = "User: {{ user.name }}. Items: {% for item in items %}{{ item }}{% if not loop.last %}, {% endif %}{% endfor %}."
data = {
"user": {"name": "Alice"},
"items": ["apple", "banana", "cherry"]
}
result = apply_template_to_string(template_str, data)
assert result == "User: Alice. Items: apple, banana, cherry."

def test_safe_render_prevents_unsafe_operations(self):
"""Test that the sandboxed environment prevents access to unsafe attributes."""
# Attempt to access a private attribute or a method that could be unsafe
template_str = "Unsafe access: {{ my_obj.__class__ }}"

class MyObj: pass

data = {"my_obj": MyObj()}

# In a sandboxed environment, this should raise a SecurityError, which our wrapper catches.
# The wrapper then returns the original string.
result = apply_template_to_string(template_str, data)
assert result == template_str
553 changes: 0 additions & 553 deletions Tests/DB/test_sqlite_db.py

This file was deleted.

File renamed without changes.
301 changes: 301 additions & 0 deletions Tests/Event_Handlers/Chat_Events/test_chat_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
# /tests/Event_Handlers/Chat_Events/test_chat_events.py

import pytest
from unittest.mock import AsyncMock, MagicMock, patch, call

from rich.text import Text
# Mock Textual UI elements before they are imported by the module under test
from textual.widgets import (
Button, Input, TextArea, Static, Select, Checkbox, ListView, ListItem, Label
)
from textual.containers import VerticalScroll
from textual.css.query import QueryError

# Mock DB Errors
from tldw_chatbook.DB.ChaChaNotes_DB import ConflictError, CharactersRAGDBError, InputError

# Functions to test
from tldw_chatbook.Event_Handlers.Chat_Events.chat_events import (
handle_chat_send_button_pressed,
handle_chat_action_button_pressed,
handle_chat_new_conversation_button_pressed,
handle_chat_save_current_chat_button_pressed,
handle_chat_load_character_button_pressed,
handle_chat_clear_active_character_button_pressed,
# ... import other handlers as you write tests for them
)
from tldw_chatbook.Widgets.chat_message import ChatMessage

pytestmark = pytest.mark.asyncio


# A very comprehensive mock app fixture is needed here
@pytest.fixture
def mock_app():
app = AsyncMock()

# Mock services and DBs
app.chachanotes_db = MagicMock()
app.notes_service = MagicMock()
app.notes_service._get_db.return_value = app.chachanotes_db
app.media_db = MagicMock()

# Mock core app properties
app.API_IMPORTS_SUCCESSFUL = True
app.app_config = {
"api_settings": {
"openai": {"streaming": True, "api_key_env_var": "OPENAI_API_KEY"},
"anthropic": {"streaming": False, "api_key": "xyz-key"}
},
"chat_defaults": {"system_prompt": "Default system prompt."},
"USERS_NAME": "Tester"
}

# Mock app state
app.current_chat_conversation_id = None
app.current_chat_is_ephemeral = True
app.current_chat_active_character_data = None
app.current_ai_message_widget = None

# Mock app methods
app.query_one = MagicMock()
app.notify = AsyncMock()
app.copy_to_clipboard = MagicMock()
app.set_timer = MagicMock()
app.run_worker = MagicMock()
app.chat_wrapper = AsyncMock()

# Timers
app._conversation_search_timer = None

# --- Set up mock widgets ---
# This is complex; a helper function simplifies it.
def setup_mock_widgets(q_one_mock):
widgets = {
"#chat-input": MagicMock(spec=TextArea, text="User message", is_mounted=True),
"#chat-log": AsyncMock(spec=VerticalScroll, is_mounted=True),
"#chat-api-provider": MagicMock(spec=Select, value="OpenAI"),
"#chat-api-model": MagicMock(spec=Select, value="gpt-4"),
"#chat-system-prompt": MagicMock(spec=TextArea, text="UI system prompt"),
"#chat-temperature": MagicMock(spec=Input, value="0.7"),
"#chat-top-p": MagicMock(spec=Input, value="0.9"),
"#chat-min-p": MagicMock(spec=Input, value="0.1"),
"#chat-top-k": MagicMock(spec=Input, value="40"),
"#chat-llm-max-tokens": MagicMock(spec=Input, value="1024"),
"#chat-llm-seed": MagicMock(spec=Input, value=""),
"#chat-llm-stop": MagicMock(spec=Input, value=""),
"#chat-llm-response-format": MagicMock(spec=Select, value="text"),
"#chat-llm-n": MagicMock(spec=Input, value="1"),
"#chat-llm-user-identifier": MagicMock(spec=Input, value=""),
"#chat-llm-logprobs": MagicMock(spec=Checkbox, value=False),
"#chat-llm-top-logprobs": MagicMock(spec=Input, value=""),
"#chat-llm-logit-bias": MagicMock(spec=TextArea, text="{}"),
"#chat-llm-presence-penalty": MagicMock(spec=Input, value="0.0"),
"#chat-llm-frequency-penalty": MagicMock(spec=Input, value="0.0"),
"#chat-llm-tools": MagicMock(spec=TextArea, text="[]"),
"#chat-llm-tool-choice": MagicMock(spec=Input, value=""),
"#chat-llm-fixed-tokens-kobold": MagicMock(spec=Checkbox, value=False),
"#chat-strip-thinking-tags-checkbox": MagicMock(spec=Checkbox, value=True),
"#chat-character-search-results-list": AsyncMock(spec=ListView),
"#chat-character-name-edit": MagicMock(spec=Input),
"#chat-character-description-edit": MagicMock(spec=TextArea),
"#chat-character-personality-edit": MagicMock(spec=TextArea),
"#chat-character-scenario-edit": MagicMock(spec=TextArea),
"#chat-character-system-prompt-edit": MagicMock(spec=TextArea),
"#chat-character-first-message-edit": MagicMock(spec=TextArea),
"#chat-right-sidebar": MagicMock(), # Mock container
}

def query_one_side_effect(selector, _type=None):
# Special case for querying within the sidebar
if isinstance(selector, MagicMock) and hasattr(selector, 'query_one'):
return selector.query_one(selector, _type)

if selector in widgets:
return widgets[selector]

# Allow querying for sub-widgets inside a container like the right sidebar
if widgets["#chat-right-sidebar"].query_one.call_args:
inner_selector = widgets["#chat-right-sidebar"].query_one.call_args[0][0]
if inner_selector in widgets:
return widgets[inner_selector]

raise QueryError(f"Widget not found by mock: {selector}")

q_one_mock.side_effect = query_one_side_effect

# Make the sidebar mock also use the main query_one logic
widgets["#chat-right-sidebar"].query_one.side_effect = lambda sel, _type: widgets[sel]

setup_mock_widgets(app.query_one)

return app


# Mock external dependencies used in chat_events.py
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ccl')
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.os')
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ChatMessage', new_callable=AsyncMock)
async def test_handle_chat_send_button_pressed_basic(mock_chat_message_class, mock_os, mock_ccl, mock_app):
"""Test a basic message send operation."""
mock_os.environ.get.return_value = "fake-key"

await handle_chat_send_button_pressed(mock_app, MagicMock())

# Assert UI updates
mock_app.query_one("#chat-input").clear.assert_called_once()
mock_app.query_one("#chat-log").mount.assert_any_call(mock_chat_message_class.return_value) # Mounts user message
mock_app.query_one("#chat-log").mount.assert_any_call(mock_app.current_ai_message_widget) # Mounts AI placeholder

# Assert worker is called
mock_app.run_worker.assert_called_once()

# Assert chat_wrapper is called with correct parameters by the worker
worker_lambda = mock_app.run_worker.call_args[0][0]
worker_lambda() # Execute the lambda to trigger the call to chat_wrapper

mock_app.chat_wrapper.assert_called_once()
wrapper_kwargs = mock_app.chat_wrapper.call_args.kwargs
assert wrapper_kwargs['message'] == "User message"
assert wrapper_kwargs['api_endpoint'] == "OpenAI"
assert wrapper_kwargs['api_key'] == "fake-key"
assert wrapper_kwargs['system_message'] == "UI system prompt"
assert wrapper_kwargs['streaming'] is True # From config


@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ccl')
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.os')
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ChatMessage', new_callable=AsyncMock)
async def test_handle_chat_send_with_active_character(mock_chat_message_class, mock_os, mock_ccl, mock_app):
"""Test that an active character's system prompt overrides the UI."""
mock_os.environ.get.return_value = "fake-key"
mock_app.current_chat_active_character_data = {
'name': 'TestChar',
'system_prompt': 'You are TestChar.'
}

await handle_chat_send_button_pressed(mock_app, MagicMock())

worker_lambda = mock_app.run_worker.call_args[0][0]
worker_lambda()

wrapper_kwargs = mock_app.chat_wrapper.call_args.kwargs
assert wrapper_kwargs['system_message'] == "You are TestChar."


async def test_handle_new_conversation_button_pressed(mock_app):
"""Test that the new chat button clears state and UI."""
# Set some state to ensure it's cleared
mock_app.current_chat_conversation_id = "conv_123"
mock_app.current_chat_is_ephemeral = False
mock_app.current_chat_active_character_data = {'name': 'char'}

await handle_chat_new_conversation_button_pressed(mock_app, MagicMock())

mock_app.query_one("#chat-log").remove_children.assert_called_once()
assert mock_app.current_chat_conversation_id is None
assert mock_app.current_chat_is_ephemeral is True
assert mock_app.current_chat_active_character_data is None
# Check that a UI field was reset
assert mock_app.query_one("#chat-system-prompt").text == "Default system prompt."


@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ccl')
@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.display_conversation_in_chat_tab_ui',
new_callable=AsyncMock)
async def test_handle_save_current_chat_button_pressed(mock_display_conv, mock_ccl, mock_app):
"""Test saving an ephemeral chat."""
mock_app.current_chat_is_ephemeral = True
mock_app.current_chat_conversation_id = None

# Setup mock messages in the chat log
mock_msg1 = MagicMock(spec=ChatMessage, role="User", message_text="Hello", generation_complete=True,
image_data=None, image_mime_type=None)
mock_msg2 = MagicMock(spec=ChatMessage, role="AI", message_text="Hi", generation_complete=True, image_data=None,
image_mime_type=None)
mock_app.query_one("#chat-log").query.return_value = [mock_msg1, mock_msg2]

mock_ccl.create_conversation.return_value = "new_conv_id"

await handle_chat_save_current_chat_button_pressed(mock_app, MagicMock())

mock_ccl.create_conversation.assert_called_once()
create_kwargs = mock_ccl.create_conversation.call_args.kwargs
assert create_kwargs['title'].startswith("Chat: Hello...")
assert len(create_kwargs['initial_messages']) == 2
assert create_kwargs['initial_messages'][0]['content'] == "Hello"

assert mock_app.current_chat_conversation_id == "new_conv_id"
assert mock_app.current_chat_is_ephemeral is False
mock_app.notify.assert_called_with("Chat saved successfully!", severity="information")
mock_display_conv.assert_called_once_with(mock_app, "new_conv_id")


@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ccl')
async def test_handle_chat_action_button_pressed_edit_and_save(mock_ccl, mock_app):
"""Test the edit->save workflow for a chat message."""
mock_button = MagicMock(spec=Button, classes=["edit-button"])
mock_action_widget = AsyncMock(spec=ChatMessage)
mock_action_widget.message_text = "Original text"
mock_action_widget.message_id_internal = "msg_123"
mock_action_widget.message_version_internal = 0
mock_action_widget._editing = False # Start in non-editing mode
mock_static_text = mock_action_widget.query_one.return_value

# --- 1. First press: Start editing ---
await handle_chat_action_button_pressed(mock_app, mock_button, mock_action_widget)

mock_action_widget.mount.assert_called_once() # Mounts the TextArea
assert mock_action_widget._editing is True
assert "💾" in mock_button.label # Check for save emoji

# --- 2. Second press: Save edit ---
mock_action_widget._editing = True # Simulate being in editing mode
mock_edit_area = MagicMock(spec=TextArea, text="New edited text")
mock_action_widget.query_one.return_value = mock_edit_area
mock_ccl.edit_message_content.return_value = True

await handle_chat_action_button_pressed(mock_app, mock_button, mock_action_widget)

mock_edit_area.remove.assert_called_once()
assert mock_action_widget.message_text == "New edited text"
assert isinstance(mock_static_text.update.call_args[0][0], Text)
assert mock_static_text.update.call_args[0][0].plain == "New edited text"

mock_ccl.edit_message_content.assert_called_with(
mock_app.chachanotes_db, "msg_123", "New edited text", 0
)
assert mock_action_widget.message_version_internal == 1 # Version incremented
assert "✏️" in mock_button.label # Check for edit emoji


@patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.load_character_and_image')
async def test_handle_chat_load_character_with_greeting(mock_load_char, mock_app):
"""Test that loading a character into an empty, ephemeral chat posts a greeting."""
mock_app.current_chat_is_ephemeral = True
mock_app.query_one("#chat-log").query.return_value = [] # Empty chat log

char_data = {
'id': 'char_abc', 'name': 'Greeter', 'first_message': 'Hello, adventurer!'
}
mock_load_char.return_value = (char_data, None, None)

# Mock the list item from the character search list
mock_list_item = MagicMock(spec=ListItem)
mock_list_item.character_id = 'char_abc'
mock_app.query_one("#chat-character-search-results-list").highlighted_child = mock_list_item

with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events.ChatMessage',
new_callable=AsyncMock) as mock_chat_msg_class:
await handle_chat_load_character_button_pressed(mock_app, MagicMock())

# Assert character data is loaded
assert mock_app.current_chat_active_character_data == char_data

# Assert greeting message was created and mounted
mock_chat_msg_class.assert_called_with(
message='Hello, adventurer!',
role='Greeter',
generation_complete=True
)
mock_app.query_one("#chat-log").mount.assert_called_once_with(mock_chat_msg_class.return_value)
227 changes: 227 additions & 0 deletions Tests/Event_Handlers/Chat_Events/test_chat_events_sidebar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# /tests/Event_Handlers/Chat_Events/test_chat_events_sidebar.py

import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from textual.widgets import Button, Input, ListView, TextArea, ListItem, Label
from textual.css.query import QueryError

# Functions to test
from tldw_chatbook.Event_Handlers.Chat_Events.chat_events_sidebar import (
_disable_media_copy_buttons,
perform_media_sidebar_search,
handle_chat_media_search_input_changed,
handle_chat_media_load_selected_button_pressed,
handle_chat_media_copy_title_button_pressed,
handle_chat_media_copy_content_button_pressed,
handle_chat_media_copy_author_button_pressed,
handle_chat_media_copy_url_button_pressed,
)

pytestmark = pytest.mark.asyncio


@pytest.fixture
def mock_app():
"""Provides a comprehensive mock of the TldwCli app."""
app = AsyncMock()

# Mock UI components
app.query_one = MagicMock()
mock_results_list = AsyncMock(spec=ListView)
mock_review_display = AsyncMock(spec=TextArea)
mock_copy_title_btn = MagicMock(spec=Button)
mock_copy_content_btn = MagicMock(spec=Button)
mock_copy_author_btn = MagicMock(spec=Button)
mock_copy_url_btn = MagicMock(spec=Button)
mock_search_input = MagicMock(spec=Input)

# Configure query_one to return the correct mock widget
def query_one_side_effect(selector, _type):
if selector == "#chat-media-search-results-listview":
return mock_results_list
if selector == "#chat-media-content-display":
return mock_review_display
if selector == "#chat-media-copy-title-button":
return mock_copy_title_btn
if selector == "#chat-media-copy-content-button":
return mock_copy_content_btn
if selector == "#chat-media-copy-author-button":
return mock_copy_author_btn
if selector == "#chat-media-copy-url-button":
return mock_copy_url_btn
if selector == "#chat-media-search-input":
return mock_search_input
raise QueryError(f"Widget not found: {selector}")

app.query_one.side_effect = query_one_side_effect

# Mock DB and state
app.media_db = MagicMock()
app.current_sidebar_media_item = None

# Mock app methods
app.notify = AsyncMock()
app.copy_to_clipboard = MagicMock()
app.set_timer = MagicMock()
app.run_worker = MagicMock()

# For debouncing timer
app._media_sidebar_search_timer = None

return app


async def test_disable_media_copy_buttons(mock_app):
"""Test that all copy buttons are disabled and the current item is cleared."""
await _disable_media_copy_buttons(mock_app)

assert mock_app.current_sidebar_media_item is None
assert mock_app.query_one("#chat-media-copy-title-button", Button).disabled is True
assert mock_app.query_one("#chat-media-copy-content-button", Button).disabled is True
assert mock_app.query_one("#chat-media-copy-author-button", Button).disabled is True
assert mock_app.query_one("#chat-media-copy-url-button", Button).disabled is True


async def test_perform_media_sidebar_search_with_results(mock_app):
"""Test searching with a term that returns results."""
mock_media_items = [
{'title': 'Test Title 1', 'media_id': 'id12345678'},
{'title': 'Test Title 2', 'media_id': 'id87654321'},
]
mock_app.media_db.search_media_db.return_value = mock_media_items
mock_results_list = mock_app.query_one("#chat-media-search-results-listview", ListView)

with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_events_sidebar.ListItem',
side_effect=ListItem) as mock_list_item_class:
await perform_media_sidebar_search(mock_app, "test term")

mock_results_list.clear.assert_called_once()
mock_app.query_one("#chat-media-content-display", TextArea).clear.assert_called_once()
mock_app.media_db.search_media_db.assert_called_once()
assert mock_results_list.append.call_count == 2

# Check that ListItem was called with a Label containing the correct text
first_call_args = mock_list_item_class.call_args_list[0].args
assert isinstance(first_call_args[0], Label)
assert "Test Title 1" in first_call_args[0].renderable


async def test_perform_media_sidebar_search_no_results(mock_app):
"""Test searching with a term that returns no results."""
mock_app.media_db.search_media_db.return_value = []
mock_results_list = mock_app.query_one("#chat-media-search-results-listview", ListView)

await perform_media_sidebar_search(mock_app, "no results term")

mock_results_list.append.assert_called_once()
# The call argument is a ListItem, which contains a Label. We check the Label's content.
call_arg = mock_results_list.append.call_args[0][0]
assert isinstance(call_arg, ListItem)
assert call_arg.children[0].renderable == "No media found."


async def test_perform_media_sidebar_search_empty_term(mock_app):
"""Test that an empty search term clears results and does not search."""
await perform_media_sidebar_search(mock_app, "")
mock_app.media_db.search_media_db.assert_not_called()
mock_app.query_one("#chat-media-search-results-listview", ListView).clear.assert_called_once()


async def test_handle_chat_media_search_input_changed_debouncing(mock_app):
"""Test that input changes are debounced via set_timer."""
mock_timer = MagicMock()
mock_app._media_sidebar_search_timer = mock_timer
mock_input_widget = MagicMock(spec=Input, value=" new search ")

await handle_chat_media_search_input_changed(mock_app, mock_input_widget)

mock_timer.stop.assert_called_once()
mock_app.set_timer.assert_called_once()
# Check that run_worker is part of the callback, which calls perform_media_sidebar_search
callback_lambda = mock_app.set_timer.call_args[0][1]
# We can't easily execute the lambda here, but we can verify it's set.
assert callable(callback_lambda)


async def test_handle_chat_media_load_selected_button_pressed(mock_app):
"""Test loading a selected media item into the display."""
media_data = {
'title': 'Loaded Title', 'author': 'Author Name', 'media_type': 'Article',
'url': 'http://example.com', 'content': 'This is the full content.'
}
mock_list_item = MagicMock(spec=ListItem)
mock_list_item.media_data = media_data

mock_results_list = mock_app.query_one("#chat-media-search-results-listview", ListView)
mock_results_list.highlighted_child = mock_list_item

await handle_chat_media_load_selected_button_pressed(mock_app, MagicMock())

assert mock_app.current_sidebar_media_item == media_data
mock_app.query_one("#chat-media-content-display", TextArea).load_text.assert_called_once()
loaded_text = mock_app.query_one("#chat-media-content-display", TextArea).load_text.call_args[0][0]
assert "Title: Loaded Title" in loaded_text
assert "Author: Author Name" in loaded_text
assert "This is the full content." in loaded_text

assert mock_app.query_one("#chat-media-copy-title-button", Button).disabled is False


async def test_handle_chat_media_load_selected_no_selection(mock_app):
"""Test load button when nothing is selected."""
mock_results_list = mock_app.query_one("#chat-media-search-results-listview", ListView)
mock_results_list.highlighted_child = None

await handle_chat_media_load_selected_button_pressed(mock_app, MagicMock())

mock_app.notify.assert_called_with("No media item selected.", severity="warning")
mock_app.query_one("#chat-media-content-display", TextArea).clear.assert_called_once()
assert mock_app.query_one("#chat-media-copy-title-button", Button).disabled is True


async def test_handle_copy_buttons_with_data(mock_app):
"""Test all copy buttons when data is available."""
media_data = {'title': 'Copy Title', 'content': 'Copy Content', 'author': 'Copy Author', 'url': 'http://copy.url'}
mock_app.current_sidebar_media_item = media_data

# Test copy title
await handle_chat_media_copy_title_button_pressed(mock_app, MagicMock())
mock_app.copy_to_clipboard.assert_called_with('Copy Title')
mock_app.notify.assert_called_with("Title copied to clipboard.")

# Test copy content
await handle_chat_media_copy_content_button_pressed(mock_app, MagicMock())
mock_app.copy_to_clipboard.assert_called_with('Copy Content')
mock_app.notify.assert_called_with("Content copied to clipboard.")

# Test copy author
await handle_chat_media_copy_author_button_pressed(mock_app, MagicMock())
mock_app.copy_to_clipboard.assert_called_with('Copy Author')
mock_app.notify.assert_called_with("Author copied to clipboard.")

# Test copy URL
await handle_chat_media_copy_url_button_pressed(mock_app, MagicMock())
mock_app.copy_to_clipboard.assert_called_with('http://copy.url')
mock_app.notify.assert_called_with("URL copied to clipboard.")


async def test_handle_copy_buttons_no_data(mock_app):
"""Test copy buttons when data is not available."""
mock_app.current_sidebar_media_item = None

# Test copy title
await handle_chat_media_copy_title_button_pressed(mock_app, MagicMock())
mock_app.notify.assert_called_with("No media title to copy.", severity="warning")

# Test copy content
await handle_chat_media_copy_content_button_pressed(mock_app, MagicMock())
mock_app.notify.assert_called_with("No media content to copy.", severity="warning")

# Test copy author
await handle_chat_media_copy_author_button_pressed(mock_app, MagicMock())
mock_app.notify.assert_called_with("No media author to copy.", severity="warning")

# Test copy URL
await handle_chat_media_copy_url_button_pressed(mock_app, MagicMock())
mock_app.notify.assert_called_with("No media URL to copy.", severity="warning")
158 changes: 158 additions & 0 deletions Tests/Event_Handlers/Chat_Events/test_chat_streaming_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from rich.text import Text
from textual.containers import VerticalScroll
from textual.widgets import Static, TextArea

from tldw_chatbook.Event_Handlers.worker_events import StreamingChunk, StreamDone
from tldw_chatbook.Constants import TAB_CHAT, TAB_CCP

# Functions to test (they are methods on the app, so we test them as such)
from tldw_chatbook.Event_Handlers.Chat_Events.chat_streaming_events import (
handle_streaming_chunk,
handle_stream_done
)

pytestmark = pytest.mark.asyncio


@pytest.fixture
def mock_app():
"""Provides a mock app instance ('self' for the handlers)."""
app = AsyncMock()

# Mock logger and config
app.loguru_logger = MagicMock()
app.app_config = {"chat_defaults": {"strip_thinking_tags": True}}

# Mock UI state and components
app.current_tab = TAB_CHAT
mock_static_text = AsyncMock(spec=Static)
mock_chat_message_widget = AsyncMock()
mock_chat_message_widget.is_mounted = True
mock_chat_message_widget.message_text = ""
mock_chat_message_widget.query_one.return_value = mock_static_text
app.current_ai_message_widget = mock_chat_message_widget

mock_chat_log = AsyncMock(spec=VerticalScroll)
mock_chat_input = AsyncMock(spec=TextArea)

app.query_one = MagicMock(side_effect=lambda sel, type: mock_chat_log if sel == "#chat-log" else mock_chat_input)

# Mock DB and state
app.chachanotes_db = MagicMock()
app.current_chat_conversation_id = "conv_123"
app.current_chat_is_ephemeral = False

# Mock app methods
app.notify = AsyncMock()

return app


async def test_handle_streaming_chunk_appends_text(mock_app):
"""Test that a streaming chunk appends text and updates the widget."""
event = StreamingChunk(text_chunk="Hello, ")
mock_app.current_ai_message_widget.message_text = "Initial."

await handle_streaming_chunk(mock_app, event)

assert mock_app.current_ai_message_widget.message_text == "Initial.Hello, "

# Check that update is called with the full, escaped text
with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_streaming_events.escape_markup',
return_value="Escaped: Initial.Hello, ") as mock_escape:
await handle_streaming_chunk(mock_app, event)
mock_escape.assert_called_with("Initial.Hello, Hello, ")
mock_app.current_ai_message_widget.query_one().update.assert_called_with("Escaped: Initial.Hello, ")

# Check that scroll_end is called
mock_app.query_one.assert_called_with("#chat-log", VerticalScroll)
mock_app.query_one().scroll_end.assert_called()


async def test_handle_stream_done_success_and_save(mock_app):
"""Test successful stream completion and saving to DB."""
event = StreamDone(full_text="This is the final response.", error=None)
mock_app.current_ai_message_widget.role = "AI"

with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_streaming_events.ccl') as mock_ccl:
# Mock DB returns for getting the saved message details
mock_ccl.add_message_to_conversation.return_value = "msg_abc"
mock_app.chachanotes_db.get_message_by_id.return_value = {'id': 'msg_abc', 'version': 0}

await handle_stream_done(mock_app, event)

# Assert UI update
mock_app.current_ai_message_widget.query_one().update.assert_called_with("This is the final response.")
mock_app.current_ai_message_widget.mark_generation_complete.assert_called_once()

# Assert DB call
mock_ccl.add_message_to_conversation.assert_called_once_with(
mock_app.chachanotes_db, "conv_123", "AI", "This is the final response."
)
assert mock_app.current_ai_message_widget.message_id_internal == 'msg_abc'
assert mock_app.current_ai_message_widget.message_version_internal == 0

# Assert state reset
assert mock_app.current_ai_message_widget is None
mock_app.query_one().focus.assert_called_once()


async def test_handle_stream_done_with_tag_stripping(mock_app):
"""Test that <think> tags are stripped from the final text before saving."""
full_text = "<think>I should start.</think>This is the actual response.<think>I am done now.</think>"
expected_text = "This is the actual response."
event = StreamDone(full_text=full_text, error=None)
mock_app.app_config["chat_defaults"]["strip_thinking_tags"] = True

with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_streaming_events.ccl') as mock_ccl:
await handle_stream_done(mock_app, event)

# Check that the saved text is the stripped version
mock_ccl.add_message_to_conversation.assert_called_once()
saved_text = mock_ccl.add_message_to_conversation.call_args[0][3]
assert saved_text == expected_text

# Check that the displayed text is also the stripped version (escaped)
mock_app.current_ai_message_widget.query_one().update.assert_called_with(expected_text)


async def test_handle_stream_done_with_error(mock_app):
"""Test stream completion when an error occurred."""
event = StreamDone(full_text="Partial response.", error="API limit reached")

with patch('tldw_chatbook.Event_Handlers.Chat_Events.chat_streaming_events.ccl') as mock_ccl:
await handle_stream_done(mock_app, event)

# Assert UI is updated with error message
mock_static_widget = mock_app.current_ai_message_widget.query_one()
mock_static_widget.update.assert_called_once()
update_call_arg = mock_static_widget.update.call_args[0][0]
assert isinstance(update_call_arg, Text)
assert "Partial response." in update_call_arg.plain
assert "Stream Error" in update_call_arg.plain
assert "API limit reached" in update_call_arg.plain

# Assert role is changed and DB is NOT called
assert mock_app.current_ai_message_widget.role == "System"
mock_ccl.add_message_to_conversation.assert_not_called()

# Assert state reset
assert mock_app.current_ai_message_widget is None


async def test_handle_stream_done_no_widget(mock_app):
"""Test graceful handling when the AI widget is missing."""
mock_app.current_ai_message_widget = None
event = StreamDone(full_text="Some text", error="Some error")

await handle_stream_done(mock_app, event)

# Just ensure it doesn't crash and notifies about the error
mock_app.notify.assert_called_once_with(
"Stream error (display widget missing): Some error",
severity="error",
timeout=10
)
File renamed without changes.
Empty file.
82 changes: 0 additions & 82 deletions Tests/MediaDB2/conftest.py

This file was deleted.

668 changes: 0 additions & 668 deletions Tests/MediaDB2/test_sqlite_db.py

This file was deleted.

Empty file added Tests/Media_DB/__init__.py
Empty file.
File renamed without changes.
348 changes: 348 additions & 0 deletions Tests/Media_DB/test_media_db_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
# test_media_db_v2_properties.py
#
# Property-based tests for the Media_DB_v2 library using Hypothesis.
# These tests verify the logical correctness and invariants of the database
# operations across a wide range of generated data.
#
# Imports
from datetime import datetime, timezone, timedelta
from typing import Iterator, Callable, Any, Generator
import pytest
import uuid
from pathlib import Path
#
# Third-Party Imports
from hypothesis import given, strategies as st, settings, HealthCheck, assume
#
# Local Imports
# Adjust the import path based on your project structure
from tldw_chatbook.DB.Client_Media_DB_v2 import (
MediaDatabase,
InputError,
DatabaseError,
ConflictError, fetch_keywords_for_media, empty_trash
)
#
#######################################################################################################################
#
# --- Hypothesis Settings ---

# A custom profile for database-intensive tests.
# It increases the deadline and suppresses health checks that are common
# but expected in I/O-heavy testing scenarios.
settings.register_profile(
"db_test_suite",
deadline=2000,
suppress_health_check=[
HealthCheck.too_slow,
HealthCheck.function_scoped_fixture,
HealthCheck.data_too_large,
]
)
settings.load_profile("db_test_suite")


# --- Pytest Fixtures ---


@pytest.fixture
def db_factory(tmp_path: Path) -> Generator[Callable[[], MediaDatabase], Any, None]:
"""
A factory that creates fresh, isolated MediaDatabase instances on demand.
Manages cleanup of all created instances.
"""
created_dbs = []

def _create_db_instance() -> MediaDatabase:
db_file = tmp_path / f"prop_test_{uuid.uuid4().hex}.db"
client_id = f"client_{uuid.uuid4().hex[:8]}"
db = MediaDatabase(db_path=db_file, client_id=client_id)
created_dbs.append(db)
return db

yield _create_db_instance

# Teardown: close all connections that were created by the factory
for db in created_dbs:
db.close_connection()

@pytest.fixture
def db_instance(db_factory: Callable[[], MediaDatabase]) -> MediaDatabase:
"""
Provides a single, fresh MediaDatabase instance for a test function.
This fixture uses the `db_factory` to create and manage the instance.
"""
return db_factory()

# --- Hypothesis Strategies ---

# Strategy for generating text that is guaranteed to have non-whitespace content.
st_required_text = st.text(min_size=1, max_size=50).map(lambda s: s.strip()).filter(lambda s: len(s) > 0)

# Strategy for a single, clean keyword.
st_keyword_text = st.text(
alphabet=st.characters(whitelist_categories=["L", "N", "S", "P"]),
min_size=2,
max_size=20
).map(lambda s: s.strip()).filter(lambda s: len(s) > 0)

# Strategy for generating a list of unique, case-insensitive keywords.
st_keywords_list = st.lists(
st_keyword_text,
min_size=1,
max_size=5,
unique_by=lambda s: s.lower()
).filter(lambda l: len(l) > 0) # Ensure list is not empty after filtering


# A composite strategy to generate a valid dictionary of media data for creation.
@st.composite
def st_media_data(draw):
"""Generates a dictionary of plausible data for a new media item."""
return {
"title": draw(st_required_text),
"content": draw(st.text(min_size=10, max_size=500)),
"media_type": draw(st.sampled_from(['article', 'video', 'obsidian_note', 'pdf'])),
"author": draw(st.one_of(st.none(), st.text(min_size=3, max_size=30))),
"keywords": draw(st_keywords_list)
}


# --- Property Test Classes ---

class TestMediaItemProperties:
"""Property-based tests for the core Media item lifecycle."""

@given(media_data=st_media_data())
def test_media_item_roundtrip(self, db_instance: MediaDatabase, media_data: dict):
"""
Property: A media item, once added, should be retrievable with the same data.
"""
media_data["content"] += f" {uuid.uuid4().hex}"

media_id, media_uuid, msg = db_instance.add_media_with_keywords(**media_data)

assert "added" in msg
assert media_id is not None
assert media_uuid is not None

retrieved = db_instance.get_media_by_id(media_id)
assert retrieved is not None

assert retrieved['title'] == media_data['title']
assert retrieved['content'] == media_data['content']
assert retrieved['type'] == media_data['media_type']
assert retrieved['author'] == media_data['author']
assert retrieved['version'] == 1
assert not retrieved['deleted']

linked_keywords = {kw.lower().strip() for kw in fetch_keywords_for_media(media_id, db_instance)}
expected_keywords = {kw.lower().strip() for kw in media_data['keywords']}
assert linked_keywords == expected_keywords

# FIX: The get_all_document_versions function defaults to NOT including content.
# We must explicitly request it for the assertion to work.
doc_versions = db_instance.get_all_document_versions(media_id, include_content=True)
assert len(doc_versions) == 1
assert doc_versions[0]['version_number'] == 1
assert doc_versions[0]['content'] == media_data['content']

# ... other tests in this class are correct ...
@given(initial_media=st_media_data(), update_media=st_media_data())
def test_update_increments_version_and_changes_data(self, db_instance: MediaDatabase, initial_media: dict,
update_media: dict):
initial_media["content"] += f" initial_{uuid.uuid4().hex}"
update_media["content"] += f" update_{uuid.uuid4().hex}"
media_id, media_uuid, _ = db_instance.add_media_with_keywords(**initial_media)
original = db_instance.get_media_by_id(media_id)
media_id_up, media_uuid_up, msg = db_instance.add_media_with_keywords(
url=original['url'],
overwrite=True,
**update_media
)
assert media_id_up == media_id
assert media_uuid_up == media_uuid
assert "updated" in msg
updated = db_instance.get_media_by_id(media_id)
assert updated is not None
assert updated['version'] == original['version'] + 1
assert updated['title'] == update_media['title']
assert updated['content'] == update_media['content']
doc_versions = db_instance.get_all_document_versions(media_id)
assert len(doc_versions) == 2

@given(media_data=st_media_data())
def test_soft_delete_makes_item_unfindable_by_default(self, db_instance: MediaDatabase, media_data: dict):
unique_word = f"hypothesis_{uuid.uuid4().hex}"
media_data["content"] = f"{media_data['content']} {unique_word}"
media_id, _, _ = db_instance.add_media_with_keywords(**media_data)
original = db_instance.get_media_by_id(media_id)
assert original is not None
db_instance.soft_delete_media(media_id)
assert db_instance.get_media_by_id(media_id) is None
results, total = db_instance.search_media_db(search_query=unique_word)
assert total == 0
raw_record = db_instance.get_media_by_id(media_id, include_deleted=True)
assert raw_record is not None
assert raw_record['deleted'] == 1
assert raw_record['version'] == original['version'] + 1


class TestSearchProperties:
@given(media_data=st_media_data())
def test_search_finds_item_by_its_properties(self, db_instance: MediaDatabase, media_data: dict):
unique_word = f"hypothesis_{uuid.uuid4().hex}"
media_data["content"] = f"{media_data['content']} {unique_word}"
media_id, _, _ = db_instance.add_media_with_keywords(**media_data)
results, total = db_instance.search_media_db(search_query=unique_word, search_fields=['content'])
assert total == 1
assert results[0]['id'] == media_id
keyword_to_find = media_data["keywords"][0]
results, total = db_instance.search_media_db(search_query=None, must_have_keywords=[keyword_to_find],
media_ids_filter=[media_id])
assert total == 1
assert results[0]['id'] == media_id
results, total = db_instance.search_media_db(search_query=None, media_types=[media_data['media_type']],
media_ids_filter=[media_id])
assert total == 1
assert results[0]['id'] == media_id

@given(item1=st_media_data(), item2=st_media_data())
def test_search_isolates_results_correctly(self, db_instance: MediaDatabase, item1: dict, item2: dict):
item1_kws = set(kw.lower() for kw in item1['keywords'])
item2_kws = set(kw.lower() for kw in item2['keywords'])
assume(item1_kws.isdisjoint(item2_kws))
item1["content"] += f" item1_{uuid.uuid4().hex}"
item2["content"] += f" item2_{uuid.uuid4().hex}"
id1, _, _ = db_instance.add_media_with_keywords(**item1)
id2, _, _ = db_instance.add_media_with_keywords(**item2)
keyword_to_find = item1['keywords'][0]
results, total = db_instance.search_media_db(search_query=None, must_have_keywords=[keyword_to_find],
media_ids_filter=[id1, id2])
assert total == 1
assert results[0]['id'] == id1

@given(media_data=st_media_data())
def test_soft_deleted_item_is_not_in_fts_search(self, db_instance: MediaDatabase, media_data: dict):
unique_term = f"fts_{uuid.uuid4().hex}"
media_data['title'] = f"{media_data['title']} {unique_term}"
media_data['content'] += f" {uuid.uuid4().hex}"
media_id, _, _ = db_instance.add_media_with_keywords(**media_data)
results, total = db_instance.search_media_db(search_query=unique_term)
assert total == 1
was_deleted = db_instance.soft_delete_media(media_id)
assert was_deleted is True
results, total = db_instance.search_media_db(search_query=unique_term)
assert total == 0


class TestIdempotencyAndConstraints:
"""Tests for idempotency of operations and enforcement of DB constraints."""

@settings(deadline=None)
@given(media_data=st_media_data())
def test_mark_as_trash_is_idempotent(self, db_instance: MediaDatabase, media_data: dict):
"""
Property: Marking an item as trash multiple times has the same effect as
marking it once. The version should only increment on the first call.
"""
media_data["content"] += f" {uuid.uuid4().hex}"
media_id, _, _ = db_instance.add_media_with_keywords(**media_data)

assert db_instance.mark_as_trash(media_id) is True
item_v2 = db_instance.get_media_by_id(media_id, include_trash=True)
assert item_v2['version'] == 2

assert db_instance.mark_as_trash(media_id) is False
item_still_v2 = db_instance.get_media_by_id(media_id, include_trash=True)
assert item_still_v2['version'] == 2

@given(
media1=st_media_data(),
media2=st_media_data(),
url_part1=st.uuids().map(str),
url_part2=st.uuids().map(str),
)
def test_add_media_with_conflicting_hash_is_handled(self,
db_instance: MediaDatabase,
media1: dict,
media2: dict,
url_part1: str,
url_part2: str):
# Ensure URLs will be different, a highly unlikely edge case otherwise
assume(url_part1 != url_part2)
# Ensure titles are different to test a metadata-only update.
assume(media1['title'] != media2['title'])

# Make content identical to trigger a content hash conflict.
media2['content'] = media1['content']

# Use the deterministic UUIDs from Hypothesis to build the URLs.
media1['url'] = f"http://example.com/{url_part1}"
media2['url'] = f"http://example.com/{url_part2}"

id1, _, _ = db_instance.add_media_with_keywords(**media1)

# 1. Test with overwrite=False. Should fail due to conflict.
id2, _, msg2 = db_instance.add_media_with_keywords(**media2, overwrite=False)
assert id2 is None
assert "already exists. Overwrite not enabled." in msg2

# 2. Test with overwrite=True. Should update the existing item's metadata.
id3, _, msg3 = db_instance.add_media_with_keywords(**media2, overwrite=True)
assert id3 == id1
assert "updated" in msg3

# 3. Verify the metadata was actually updated in the database.
final_item = db_instance.get_media_by_id(id1)
assert final_item is not None
assert final_item['title'] == media2['title']


class TestTimeBasedAndSearchQueries:
# ... other tests in this class are correct ...

@given(days=st.integers(min_value=1, max_value=365))
def test_empty_trash_respects_time_threshold(self, db_instance: MediaDatabase, days: int):
"""
Property: `empty_trash` should only soft-delete items whose `trash_date`
is older than the specified threshold.
"""
media_id, _, _ = db_instance.add_media_with_keywords(
title="Trash Test", content=f"...{uuid.uuid4().hex}", media_type="article", keywords=["test"])

# This call handles versioning correctly, bumping version to 2
db_instance.mark_as_trash(media_id)
item_v2 = db_instance.get_media_by_id(media_id, include_trash=True)

past_date = datetime.now(timezone.utc) - timedelta(days=days + 1)

# FIX: The manual update MUST comply with the database triggers.
# This means we have to increment the version and supply a client_id.
# This makes the test setup robust.
with db_instance.transaction():
db_instance.execute_query(
"UPDATE Media SET trash_date = ?, version = ?, client_id = ?, last_modified = ? WHERE id = ?",
(
past_date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z',
item_v2['version'] + 1, # Manually increment version for this setup step
'test_setup_client',
db_instance._get_current_utc_timestamp_str(),
media_id
)
)

# Now the item is at version 3.
# `empty_trash` will find this item and call `soft_delete_media`,
# which will correctly read version 3 and update to version 4.
processed_count, _ = empty_trash(db_instance=db_instance, days_threshold=days)
assert processed_count == 1

final_item = db_instance.get_media_by_id(media_id, include_trash=True, include_deleted=True)
assert final_item['deleted'] == 1
assert final_item['version'] == 4 # Initial: 1, Trash: 2, Manual Date Change: 3, Delete: 4


#
# End of test_media_db_properties.py
#######################################################################################################################
460 changes: 460 additions & 0 deletions Tests/Media_DB/test_media_db_v2.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -12,8 +12,8 @@
import requests
#
# Local Imports
from .test_sqlite_db import get_entity_version
from tldw_cli.tldw_app.DB.Sync_Client import ClientSyncEngine
from .test_media_db_v2 import get_entity_version
from tldw_chatbook.DB.Sync_Client import ClientSyncEngine
#
#######################################################################################################################
#
Empty file added Tests/Prompts_DB/__init__.py
Empty file.
1,102 changes: 1,102 additions & 0 deletions Tests/Prompts_DB/tests_prompts_db.py

Large diffs are not rendered by default.

574 changes: 574 additions & 0 deletions Tests/Prompts_DB/tests_prompts_db_properties.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ transformers = ["transformers"]
dev = [ # Example for development dependencies
"pytest",
"textual-dev", # For Textual development tools
"hypothesis",
"pytest_asyncio",
]


64 changes: 35 additions & 29 deletions tldw_chatbook/DB/ChaChaNotes_DB.py
Original file line number Diff line number Diff line change
@@ -1901,6 +1901,7 @@ def search_character_cards(self, search_term: str, limit: int = 10) -> List[Dict
Raises:
CharactersRAGDBError: For database errors during the search.
"""
safe_search_term = f'"{search_term}"'
query = """
SELECT cc.*
FROM character_cards_fts fts
@@ -1914,7 +1915,7 @@ def search_character_cards(self, search_term: str, limit: int = 10) -> List[Dict
rows = cursor.fetchall()
return [self._deserialize_row_fields(row, self._CHARACTER_CARD_JSON_FIELDS) for row in rows if row]
except CharactersRAGDBError as e:
logger.error(f"Error searching character cards for '{search_term}': {e}")
logger.error(f"Error searching character cards for '{safe_search_term}': {e}")
raise

# --- Conversation Methods ---
@@ -2296,6 +2297,7 @@ def search_conversations_by_title(self, title_query: str, character_id: Optional
if not title_query.strip():
logger.warning("Empty title_query provided for conversation search. Returning empty list.")
return []
safe_search_term = f'"{title_query}"'
base_query = """
SELECT c.*
FROM conversations_fts fts
@@ -2315,7 +2317,7 @@ def search_conversations_by_title(self, title_query: str, character_id: Optional
cursor = self.execute_query(base_query, tuple(params_list))
return [dict(row) for row in cursor.fetchall()]
except CharactersRAGDBError as e:
logger.error(f"Error searching conversations for title '{title_query}': {e}")
logger.error(f"Error searching conversations for title '{safe_search_term}': {e}")
raise

# --- Message Methods ---
@@ -2429,27 +2431,25 @@ def get_messages_for_conversation(self, conversation_id: str, limit: int = 100,
order_by_timestamp: str = "ASC") -> List[Dict[str, Any]]:
"""
Lists messages for a specific conversation.
Returns non-deleted messages, ordered by `timestamp` according to `order_by_timestamp`.
Includes all fields, including `image_data` and `image_mime_type`.
Args:
conversation_id: The UUID of the conversation.
limit: Maximum number of messages to return. Defaults to 100.
offset: Number of messages to skip. Defaults to 0.
order_by_timestamp: Sort order for 'timestamp' field ("ASC" or "DESC").
Defaults to "ASC".
Returns:
A list of message dictionaries. Can be empty.
Raises:
InputError: If `order_by_timestamp` has an invalid value.
CharactersRAGDBError: For database errors.
Crucially, it also ensures the parent conversation is not soft-deleted.
"""
if order_by_timestamp.upper() not in ["ASC", "DESC"]:
raise InputError("order_by_timestamp must be 'ASC' or 'DESC'.")
query = f"SELECT id, conversation_id, parent_message_id, sender, content, image_data, image_mime_type, timestamp, ranking, last_modified, version, client_id, deleted FROM messages WHERE conversation_id = ? AND deleted = 0 ORDER BY timestamp {order_by_timestamp} LIMIT ? OFFSET ?" # Explicitly list columns

# The new query joins with conversations to check its 'deleted' status.
query = f"""
SELECT m.id, m.conversation_id, m.parent_message_id, m.sender, m.content,
m.image_data, m.image_mime_type, m.timestamp, m.ranking,
m.last_modified, m.version, m.client_id, m.deleted
FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.conversation_id = ?
AND m.deleted = 0
AND c.deleted = 0
ORDER BY m.timestamp {order_by_timestamp}
LIMIT ? OFFSET ?
"""
try:
cursor = self.execute_query(query, (conversation_id, limit, offset))
return [dict(row) for row in cursor.fetchall()]
@@ -2667,6 +2667,7 @@ def search_messages_by_content(self, content_query: str, conversation_id: Option
Raises:
CharactersRAGDBError: For database search errors.
"""
safe_search_term = f'"{content_query}"'
base_query = """
SELECT m.*
FROM messages_fts fts
@@ -2686,7 +2687,7 @@ def search_messages_by_content(self, content_query: str, conversation_id: Option
cursor = self.execute_query(base_query, tuple(params_list))
return [dict(row) for row in cursor.fetchall()]
except CharactersRAGDBError as e:
logger.error(f"Error searching messages for content '{content_query}': {e}")
logger.error(f"Error searching messages for content '{safe_search_term}': {e}")
raise

# --- Keyword, KeywordCollection, Note Methods (CRUD + Search) ---
@@ -3252,7 +3253,8 @@ def search_keywords(self, search_term: str, limit: int = 10) -> List[Dict[str, A
Returns:
A list of matching keyword dictionaries.
"""
return self._search_generic_items_fts("keywords_fts", "keywords", "keyword", search_term, limit)
safe_search_term = f'"{search_term}"'
return self._search_generic_items_fts("keywords_fts", "keywords", "keyword", safe_search_term, limit)

# Keyword Collections
def add_keyword_collection(self, name: str, parent_id: Optional[int] = None) -> Optional[int]:
@@ -3372,7 +3374,8 @@ def soft_delete_keyword_collection(self, collection_id: int, expected_version: i
)

def search_keyword_collections(self, search_term: str, limit: int = 10) -> List[Dict[str, Any]]:
return self._search_generic_items_fts("keyword_collections_fts", "keyword_collections", "name", search_term,
safe_search_term = f'"{search_term}"'
return self._search_generic_items_fts("keyword_collections_fts", "keyword_collections", "name", safe_search_term,
limit)

# Notes (Now with UUID and specific methods)
@@ -3536,19 +3539,21 @@ def soft_delete_note(self, note_id: str, expected_version: int) -> bool | None:

def search_notes(self, search_term: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Searches notes_fts (title and content). Corrected JOIN condition."""
# notes_fts matches against title and content
# FTS table column group: notes_fts
# Content table: notes, content_rowid: rowid (maps to notes.rowid)
# FTS5 requires wrapping terms with special characters in double quotes
# to be treated as a literal phrase.
safe_search_term = f'"{search_term}"'

query = """
SELECT main.*
FROM notes_fts fts
JOIN notes main ON fts.rowid = main.rowid -- Corrected Join condition
WHERE fts.notes_fts MATCH ? \
JOIN notes main ON fts.rowid = main.rowid
WHERE fts.notes_fts MATCH ?
AND main.deleted = 0
ORDER BY rank LIMIT ? \
ORDER BY rank LIMIT ?
"""
try:
cursor = self.execute_query(query, (search_term, limit))
# Pass the quoted string as the parameter
cursor = self.execute_query(query, (safe_search_term, limit))
return [dict(row) for row in cursor.fetchall()]
except CharactersRAGDBError as e:
logger.error(f"Error searching notes for '{search_term}': {e}")
@@ -3824,6 +3829,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
logger.debug(
f"Exception in nested transaction block on thread {threading.get_ident()}: {exc_type.__name__}. Outermost transaction will handle rollback if this exception propagates.")


# Return False to re-raise any exceptions that occurred within the `with` block,
# allowing them to be handled by the caller or to propagate further up.
# This is standard behavior for context managers.
513 changes: 225 additions & 288 deletions tldw_chatbook/DB/Client_Media_DB_v2.py

Large diffs are not rendered by default.

136 changes: 65 additions & 71 deletions tldw_chatbook/DB/Prompts_DB.py
Original file line number Diff line number Diff line change
@@ -1121,7 +1121,8 @@ def soft_delete_keyword(self, keyword_text: str) -> bool:
def get_prompt_by_id(self, prompt_id: int, include_deleted: bool = False) -> Optional[Dict]:
query = "SELECT * FROM Prompts WHERE id = ?"
params = [prompt_id]
if not include_deleted: query += " AND deleted = 0"
if not include_deleted:
query += " AND deleted = 0"
try:
cursor = self.execute_query(query, tuple(params))
result = cursor.fetchone()
@@ -1133,7 +1134,8 @@ def get_prompt_by_id(self, prompt_id: int, include_deleted: bool = False) -> Opt
def get_prompt_by_uuid(self, prompt_uuid: str, include_deleted: bool = False) -> Optional[Dict]:
query = "SELECT * FROM Prompts WHERE uuid = ?"
params = [prompt_uuid]
if not include_deleted: query += " AND deleted = 0"
if not include_deleted:
query += " AND deleted = 0"
try:
cursor = self.execute_query(query, tuple(params))
result = cursor.fetchone()
@@ -1145,7 +1147,8 @@ def get_prompt_by_uuid(self, prompt_uuid: str, include_deleted: bool = False) ->
def get_prompt_by_name(self, name: str, include_deleted: bool = False) -> Optional[Dict]:
query = "SELECT * FROM Prompts WHERE name = ?"
params = [name]
if not include_deleted: query += " AND deleted = 0"
if not include_deleted:
query += " AND deleted = 0"
try:
cursor = self.execute_query(query, tuple(params))
result = cursor.fetchone()
@@ -1231,7 +1234,7 @@ def fetch_keywords_for_prompt(self, prompt_id: int, include_deleted: bool = Fals

def search_prompts(self,
search_query: Optional[str],
search_fields: Optional[List[str]] = None, # e.g. ['name', 'details', 'keywords']
search_fields: Optional[List[str]] = None, # e.g. ['name', 'details', 'keywords']
page: int = 1,
results_per_page: int = 20,
include_deleted: bool = False
@@ -1240,96 +1243,86 @@ def search_prompts(self,
if results_per_page < 1: raise ValueError("Results per page must be >= 1")

if search_query and not search_fields:
search_fields = ["name", "details", "system_prompt", "user_prompt", "author"] # Default FTS fields
search_fields = ["name", "details", "system_prompt", "user_prompt", "author"]
elif not search_fields:
search_fields = []

offset = (page - 1) * results_per_page

base_select_parts = ["p.id", "p.uuid", "p.name", "p.author", "p.details",
"p.system_prompt", "p.user_prompt", "p.last_modified", "p.version", "p.deleted"]
count_select = "COUNT(DISTINCT p.id)"
base_from = "FROM Prompts p"
joins = []
base_select = "SELECT p.*"
count_select = "SELECT COUNT(p.id)"
from_clause = "FROM Prompts p"
conditions = []
params = []

if not include_deleted:
conditions.append("p.deleted = 0")

fts_search_active = False
if search_query:
fts_query_parts = []
if "name" in search_fields: fts_query_parts.append("name")
if "author" in search_fields: fts_query_parts.append("author")
if "details" in search_fields: fts_query_parts.append("details")
if "system_prompt" in search_fields: fts_query_parts.append("system_prompt")
if "user_prompt" in search_fields: fts_query_parts.append("user_prompt")

# FTS on prompt fields
if fts_query_parts:
fts_search_active = True
if not any("prompts_fts fts_p" in j_item for j_item in joins):
joins.append("JOIN prompts_fts fts_p ON fts_p.rowid = p.id")
# Build FTS query: field1:query OR field2:query ...
# For simple matching, just use the query directly if FTS table covers all these.
# The FTS table definition needs to match these fields.
# Assuming prompts_fts has 'name', 'author', 'details', 'system_prompt', 'user_prompt'
conditions.append("fts_p.prompts_fts MATCH ?")
params.append(search_query) # User provides FTS syntax or simple terms

# FTS on keywords (if specified in search_fields)
# --- Robust FTS search using subqueries ---
if search_query and search_fields:
matching_prompt_ids = set()
text_search_fields = {"name", "author", "details", "system_prompt", "user_prompt"}

# Search in prompt text fields
if any(field in text_search_fields for field in search_fields):
try:
cursor = self.execute_query("SELECT rowid FROM prompts_fts WHERE prompts_fts MATCH ?", (search_query,))
matching_prompt_ids.update(row['rowid'] for row in cursor.fetchall())
except sqlite3.Error as e:
logging.error(f"FTS search on prompts failed: {e}", exc_info=True)
raise DatabaseError(f"FTS search on prompts failed: {e}") from e


# Search in keywords
if "keywords" in search_fields:
fts_search_active = True
# Join for keywords
if not any("PromptKeywordLinks pkl" in j_item for j_item in joins):
joins.append("JOIN PromptKeywordLinks pkl ON p.id = pkl.prompt_id")
if not any("PromptKeywordsTable pkw" in j_item for j_item in joins):
joins.append("JOIN PromptKeywordsTable pkw ON pkl.keyword_id = pkw.id AND pkw.deleted = 0")
if not any("prompt_keywords_fts fts_k" in j_item for j_item in joins):
joins.append("JOIN prompt_keywords_fts fts_k ON fts_k.rowid = pkw.id")

conditions.append("fts_k.prompt_keywords_fts MATCH ?")
params.append(search_query) # Match against keywords

order_by_clause_str = "ORDER BY p.last_modified DESC, p.id DESC"
if fts_search_active:
# FTS results are naturally sorted by relevance (rank) by SQLite.
# We can select rank if needed for explicit sorting or display.
if "fts_p.rank AS relevance_score" not in " ".join(base_select_parts) and "fts_p" in " ".join(joins) :
base_select_parts.append("fts_p.rank AS relevance_score") # Add if fts_p is used
elif "fts_k.rank AS relevance_score_kw" not in " ".join(base_select_parts) and "fts_k" in " ".join(joins):
base_select_parts.append("fts_k.rank AS relevance_score_kw") # Add if fts_k is used
# A more complex ranking might be needed if both prompt and keyword FTS are active.
# For now, default sort or rely on SQLite's combined FTS rank if multiple MATCH clauses are used.
order_by_clause_str = "ORDER BY p.last_modified DESC, p.id DESC" # Fallback, FTS rank is implicit

final_select_stmt = f"SELECT DISTINCT {', '.join(base_select_parts)}"
join_clause = " ".join(list(dict.fromkeys(joins))) # Unique joins
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
try:
# 1. Find keyword IDs matching the query
kw_cursor = self.execute_query("SELECT rowid FROM prompt_keywords_fts WHERE prompt_keywords_fts MATCH ?", (search_query,))
matching_keyword_ids = {row['rowid'] for row in kw_cursor.fetchall()}

# 2. Find prompt IDs linked to those keywords
if matching_keyword_ids:
placeholders = ','.join('?' * len(matching_keyword_ids))
link_cursor = self.execute_query(
f"SELECT DISTINCT prompt_id FROM PromptKeywordLinks WHERE keyword_id IN ({placeholders})",
tuple(matching_keyword_ids)
)
matching_prompt_ids.update(row['prompt_id'] for row in link_cursor.fetchall())
except sqlite3.Error as e:
logging.error(f"FTS search on keywords failed: {e}", exc_info=True)
raise DatabaseError(f"FTS search on keywords failed: {e}") from e

if not matching_prompt_ids:
return [], 0 # No matches found, short-circuit

# Add the final ID list to the main query conditions
id_placeholders = ','.join('?' * len(matching_prompt_ids))
conditions.append(f"p.id IN ({id_placeholders})")
params.extend(list(matching_prompt_ids))

# --- Build and Execute Final Query ---
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
order_by_clause = "ORDER BY p.last_modified DESC, p.id DESC"

try:
count_sql = f"SELECT {count_select} {base_from} {join_clause} {where_clause}"
count_cursor = self.execute_query(count_sql, tuple(params))
total_matches = count_cursor.fetchone()[0]
# Get total count
count_sql = f"{count_select} {from_clause} {where_clause}"
total_matches = self.execute_query(count_sql, tuple(params)).fetchone()[0]

results_list = []
if total_matches > 0 and offset < total_matches:
results_sql = f"{final_select_stmt} {base_from} {join_clause} {where_clause} {order_by_clause_str} LIMIT ? OFFSET ?"
if total_matches > 0:
# Get paginated results
results_sql = f"{base_select} {from_clause} {where_clause} {order_by_clause} LIMIT ? OFFSET ?"
paginated_params = tuple(params + [results_per_page, offset])
results_cursor = self.execute_query(results_sql, paginated_params)
results_list = [dict(row) for row in results_cursor.fetchall()]
# If keywords need to be attached to each result
# Attach keywords to each result
for res_dict in results_list:
res_dict['keywords'] = self.fetch_keywords_for_prompt(res_dict['id'], include_deleted=False)

return results_list, total_matches
except sqlite3.Error as e:
if "no such table: prompts_fts" in str(e).lower() or "no such table: prompt_keywords_fts" in str(e).lower():
logging.error(f"FTS table missing in {self.db_path_str}. Search may fail or be incomplete.")
# Fallback to LIKE search or raise error
# For now, let it fail and be caught by generic error.
logging.error(f"DB error during prompt search in '{self.db_path_str}': {e}", exc_info=True)
except (DatabaseError, sqlite3.Error) as e:
logging.error(f"DB error during prompt search: {e}", exc_info=True)
raise DatabaseError(f"Failed to search prompts: {e}") from e

# --- Sync Log Access Methods ---
@@ -1355,6 +1348,7 @@ def get_sync_log_entries(self, since_change_id: int = 0, limit: Optional[int] =
logger.error(f"Error fetching sync_log entries: {e}")
raise DatabaseError("Failed to fetch sync_log entries") from e


def delete_sync_log_entries(self, change_ids: List[int]) -> int:
if not change_ids: return 0
if not all(isinstance(cid, int) for cid in change_ids):
5 changes: 1 addition & 4 deletions tldw_chatbook/DB/Sync_Client.py
Original file line number Diff line number Diff line change
@@ -13,10 +13,7 @@
# Third-Party Imports
#
# Local Imports
try:
from tldw_cli.tldw_app.DB.Media_DB import Database, ConflictError, DatabaseError, InputError
except ImportError:
logger.error("ERROR: Could not import the 'Media_DB' library. Make sure Media_DB.py is accessible.")
from tldw_chatbook.DB.Client_Media_DB_v2 import MediaDatabase as Database, ConflictError, DatabaseError, InputError
#
#######################################################################################################################
#
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Optional

from cv2 import data
#from cv2 import data
from textual.containers import Container
from textual.css.query import QueryError
from textual.widgets import Input, TextArea, RichLog
@@ -33,7 +33,8 @@
__all__ = [
# ─── Ollama ───────────────────────────────────────────────────────────────
"handle_ollama_nav_button_pressed",
"handle_ollama_list_models_button_pressed",
# FIXME
#"handle_ollama_list_models_button_pressed",
"handle_ollama_show_model_button_pressed",
"handle_ollama_delete_model_button_pressed",
"handle_ollama_copy_model_button_pressed",
@@ -106,57 +107,58 @@ async def handle_ollama_nav_button_pressed(app: "TldwCli") -> None:
app.notify("An unexpected error occurred while switching to Ollama view.", severity="error")


async def handle_ollama_list_models_button_pressed(app: "TldwCli") -> None:
"""Handles the 'List Models' button press for Ollama."""
logger = getattr(app, "loguru_logger", logging.getLogger(__name__))
logger.debug("Ollama 'List Models' button pressed.")
try:
base_url_input = app.query_one("#ollama-server-url", Input)
log_output_widget = app.query_one("#ollama-combined-output", RichLog)

base_url = base_url_input.value.strip()
if not base_url:
app.notify("Ollama Server URL is required.", severity="error")
base_url_input.focus()
return

log_output_widget.clear()
_update_ollama_combined_output(app, f"Attempting to list models from: {base_url}...")

app.run_worker(
_worker_ollama_list_models,
base_url,
thread=True,
name=f"ollama_list_models_{time.monotonic()}",
group="ollama_api",
description="Listing Ollama local models",
on_success=partial(_on_list_models_success, app),
on_error=partial(_on_ollama_worker_error, app, "list_models")
)
if logging.error:
log_output_widget.write(f"Error listing models: {logging.error}")

if logging.error: # This is the original error check, the one above is newly added by script
log_output_widget.write(f"Error listing models: {logging.error}")
app.notify("Error listing Ollama models.", severity="error")
elif data and data.get('models'):
try:
# Assuming 'data' is the JSON response, and 'models' is a list within it.
formatted_models = json.dumps(data['models'], indent=2)
log_output_widget.write(formatted_models)
app.notify(f"Successfully listed {len(data['models'])} Ollama models.")
except (TypeError, KeyError, json.JSONDecodeError) as e:
log_output_widget.write(f"Error processing model list response: {e}\nRaw data: {data}")
app.notify("Error processing model list from Ollama.", severity="error")
else:
log_output_widget.write("No models found or unexpected response.")
app.notify("No Ollama models found or unexpected response.", severity="warning")
except QueryError as e: # pragma: no cover
logger.error(f"QueryError in handle_ollama_list_models_button_pressed: {e}", exc_info=True)
app.notify("Error accessing Ollama UI elements for listing models.", severity="error")
except Exception as e: # pragma: no cover
logger.error(f"Unexpected error in handle_ollama_list_models_button_pressed: {e}", exc_info=True)
app.notify("An unexpected error occurred while listing Ollama models.", severity="error")
# FIXME
# async def handle_ollama_list_models_button_pressed(app: "TldwCli") -> None:
# """Handles the 'List Models' button press for Ollama."""
# logger = getattr(app, "loguru_logger", logging.getLogger(__name__))
# logger.debug("Ollama 'List Models' button pressed.")
# try:
# base_url_input = app.query_one("#ollama-server-url", Input)
# log_output_widget = app.query_one("#ollama-combined-output", RichLog)
#
# base_url = base_url_input.value.strip()
# if not base_url:
# app.notify("Ollama Server URL is required.", severity="error")
# base_url_input.focus()
# return
#
# log_output_widget.clear()
# _update_ollama_combined_output(app, f"Attempting to list models from: {base_url}...")
#
# app.run_worker(
# _worker_ollama_list_models,
# base_url,
# thread=True,
# name=f"ollama_list_models_{time.monotonic()}",
# group="ollama_api",
# description="Listing Ollama local models",
# on_success=partial(_on_list_models_success, app),
# on_error=partial(_on_ollama_worker_error, app, "list_models")
# )
# if logging.error:
# log_output_widget.write(f"Error listing models: {logging.error}")
#
# if logging.error: # This is the original error check, the one above is newly added by script
# log_output_widget.write(f"Error listing models: {logging.error}")
# app.notify("Error listing Ollama models.", severity="error")
# elif data and data.get('models'):
# try:
# # Assuming 'data' is the JSON response, and 'models' is a list within it.
# formatted_models = json.dumps(data['models'], indent=2)
# log_output_widget.write(formatted_models)
# app.notify(f"Successfully listed {len(data['models'])} Ollama models.")
# except (TypeError, KeyError, json.JSONDecodeError) as e:
# log_output_widget.write(f"Error processing model list response: {e}\nRaw data: {data}")
# app.notify("Error processing model list from Ollama.", severity="error")
# else:
# log_output_widget.write("No models found or unexpected response.")
# app.notify("No Ollama models found or unexpected response.", severity="warning")
# except QueryError as e: # pragma: no cover
# logger.error(f"QueryError in handle_ollama_list_models_button_pressed: {e}", exc_info=True)
# app.notify("Error accessing Ollama UI elements for listing models.", severity="error")
# except Exception as e: # pragma: no cover
# logger.error(f"Unexpected error in handle_ollama_list_models_button_pressed: {e}", exc_info=True)
# app.notify("An unexpected error occurred while listing Ollama models.", severity="error")


async def handle_ollama_show_model_button_pressed(app: "TldwCli") -> None:
Loading