Skip to content

Sync #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Jun 10, 2025
Merged

Sync #65

Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fdda8d7
Media display
rmusser01 Jun 7, 2025
5a5c6b2
media search isn't broken anymore
rmusser01 Jun 7, 2025
fff4a9f
and that fixes the pytest req
rmusser01 Jun 7, 2025
a2d3b62
bugfix
rmusser01 Jun 8, 2025
9991c7d
media viewing
rmusser01 Jun 8, 2025
3345cdd
Update .gitignore
rmusser01 Jun 8, 2025
e974295
Media window refactor
rmusser01 Jun 8, 2025
5acb244
new media view + collapse button
rmusser01 Jun 8, 2025
c107bdb
f
rmusser01 Jun 8, 2025
9784e99
progress
rmusser01 Jun 8, 2025
2d5906d
progress
rmusser01 Jun 8, 2025
aa511ae
cleanup
rmusser01 Jun 8, 2025
f62f4ed
fix, but weird bug...
rmusser01 Jun 8, 2025
2fa2a9c
fml
rmusser01 Jun 8, 2025
9f12dbb
rewrote chunking lib to support new logging library
rmusser01 Jun 8, 2025
53cbe35
fix chunking and summarization logging
rmusser01 Jun 8, 2025
99c2d69
webscraper
rmusser01 Jun 8, 2025
5baeabe
and bug fixed.
rmusser01 Jun 8, 2025
bf74a65
scraper
rmusser01 Jun 8, 2025
7af882b
eh
rmusser01 Jun 9, 2025
6115894
progress
rmusser01 Jun 9, 2025
c0650f8
media search in chat works
rmusser01 Jun 9, 2025
f69eb89
media search bugs
rmusser01 Jun 9, 2025
5f0fddd
Update Client_Media_DB_v2.py
rmusser01 Jun 9, 2025
c91c227
Update app.py
rmusser01 Jun 9, 2025
f01f47b
Update chat_right_sidebar.py
rmusser01 Jun 9, 2025
bf0efe4
Update chat_right_sidebar.py
rmusser01 Jun 9, 2025
99f7b50
Update chat_right_sidebar.py
rmusser01 Jun 9, 2025
ab15775
Update chat_right_sidebar.py
rmusser01 Jun 9, 2025
e4b3ad7
CSS and check for on-load
rmusser01 Jun 10, 2025
07011fd
Update tldw_cli.tcss
rmusser01 Jun 10, 2025
c0bc64c
Update tldw_cli.tcss
rmusser01 Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
media search in chat works
rmusser01 committed Jun 9, 2025
commit c0650f8e409e72d0c2575f8bc7c7cfa919d6e404
44 changes: 33 additions & 11 deletions tldw_chatbook/Event_Handlers/Chat_Events/chat_events_sidebar.py
Original file line number Diff line number Diff line change
@@ -152,25 +152,25 @@ async def perform_media_search(app: 'TldwCli'):
search_fields = ['title', 'content', 'author', 'keywords', 'notes']
media_types_filter = None

# If no keywords are provided, use all keywords
# If no search criteria provided, we'll do a general search without keyword filtering
if not keywords_list and not search_term:
try:
all_keywords = db_instance.fetch_all_keywords()
keywords_list = all_keywords
logger.debug(f"No keywords provided, using all keywords: {len(keywords_list)} keywords")
except Exception as e:
logger.error(f"Error fetching all keywords: {e}")
# Continue with empty keywords list if fetching all keywords fails
logger.debug("No search term or keywords provided, performing general search")
# We'll leave both search_query and must_have_keywords as None to get all results

logger.debug(f"Media Search - Requesting page: {app.media_search_current_page}")
# logger.debug(f"Searching media DB with term: '{search_term}', fields: {search_fields}, types: {media_types_filter}") # This is a bit redundant with the one above

# Only apply keyword filtering if keywords were explicitly provided
must_have_keywords_param = keywords_list if keywords_list else None

logger.debug(f"Media Search - Parameters: search_query={search_term if search_term else 'None'}, must_have_keywords={must_have_keywords_param}")

media_items, total_matches = db_instance.search_media_db(
search_query=search_term if search_term else None,
search_fields=search_fields,
media_types=media_types_filter,
date_range=None, # No date range filtering
must_have_keywords=keywords_list if keywords_list else None,
must_have_keywords=must_have_keywords_param,
must_not_have_keywords=None,
sort_by="last_modified_desc", # Default sort order
media_ids_filter=None, # No specific media IDs to filter
@@ -202,11 +202,33 @@ async def perform_media_search(app: 'TldwCli'):
# FIX: Await the async append method.
await results_list_view.append(ListItem(Label("No media found.")))
else:
# Get all media IDs to fetch keywords in batch
media_ids = [item.get('id') for item in media_items if isinstance(item, dict) and item.get('id')]

# Fetch keywords for all media items in one batch operation
keywords_map = {}
if media_ids:
try:
keywords_map = db_instance.fetch_keywords_for_media_batch(media_ids)
logger.debug(f"Fetched keywords for {len(keywords_map)} media items")
except Exception as e:
logger.error(f"Error fetching keywords batch: {e}")
# Continue without keywords if fetching fails

for item_dict in media_items:
if isinstance(item_dict, dict):
title = item_dict.get('title', 'Untitled')
media_id = item_dict.get('media_id', 'Unknown ID')
display_label = f"{title} (ID: {media_id[:8]}...)"
author = item_dict.get('author', 'Unknown Author')
uuid_value = item_dict.get('uuid', 'Unknown')

# Get keywords for this media item
media_id = item_dict.get('id')
keywords = keywords_map.get(media_id, []) if media_id else []
keywords_str = ", ".join(keywords) if keywords else "None"

# Create a formatted display with all required fields
display_label = f"Title: {title}\nAuthor: {author}\nID: {uuid_value}\nKeywords: {keywords_str}"

list_item = ListItem(Label(display_label))
setattr(list_item, 'media_data', item_dict)
await results_list_view.append(list_item)
252 changes: 252 additions & 0 deletions tldw_chatbook/Metrics/Otel_Metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
# Otel_Metrics.py
"""
A thread-safe, generic metrics library built on top of the OpenTelemetry API.
This library decouples your application's instrumentation from the observability
backend. It is configured here to export to Prometheus, but could be easily
swapped to another exporter (like OTLP) with minimal code changes.
Key Features:
- Uses the standard OpenTelemetry API for future-proofing.
- Thread-safe instrument creation.
- Configuration via standard environment variables (e.g., OTEL_SERVICE_NAME).
- Decorator that emits metrics and adds events to active traces.
- Automatic collection of system and runtime metrics.
IMPORTANT: A NOTE ON ATTRIBUTE CARDINALITY
In OpenTelemetry, labels are called 'attributes'. The same warning applies:
attributes should only be used for values with low cardinality. Do not use
user IDs, request IDs, etc., as attributes.
"""
#
# Imports
import functools
import os
import threading
import time
import logging
#
# Third-Party Libraries
from opentelemetry import metrics, trace
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.instrumentation.system_metrics import SystemMetricsInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
#
# Local Imports
#
#######################################################################################################################
#
# Statics:
# Global Meter object. The "Meter" is how you create instruments (counters, etc.)
_meter = None
#############################################################
#
# Functions:

# A thread-safe registry for dynamically created OTel instruments.
_instrument_registry = {}
_instrument_lock = threading.Lock()
_meter = None


def init_metrics():
"""
Initializes the OpenTelemetry SDK. Should be called once at startup.
Configures a Prometheus exporter and sets global resource attributes
which are attached to all emitted metrics. Configuration is read from
standard OTel environment variables.
"""
global _meter

# Use standard OTel env vars for configuration.
service_name = os.getenv("OTEL_SERVICE_NAME", "unknown_service")
service_version = os.getenv("OTEL_SERVICE_VERSION", "0.1.0")

resource = Resource(attributes={
SERVICE_NAME: service_name,
SERVICE_VERSION: service_version,
})

# The reader is the "exporter" for metrics.
# This one starts a Prometheus-compatible server.
reader = PrometheusMetricReader()
provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(provider)

_meter = metrics.get_meter("app.metrics.library")

# Automatically instrument system metrics (CPU, memory, etc.)
SystemMetricsInstrumentor().instrument()

logging.info(
f"OTel metrics initialized for service '{service_name}'. "
f"Prometheus exporter available on port 9464 at /metrics"
)


def _get_meter():
"""Returns the global meter, initializing if necessary."""
if not _meter:
logging.warning("Metrics not explicitly initialized. Calling init_metrics() with defaults.")
init_metrics()
return _meter


def _get_or_create_instrument(instrument_type, name, unit="", description=""):
"""
Internal function to get an instrument or create it if it doesn't exist.
Uses a double-checked lock for thread safety and performance.
"""
if name in _instrument_registry:
return _instrument_registry[name]

with _instrument_lock:
if name in _instrument_registry:
return _instrument_registry[name]

meter = _get_meter()
instrument = None
if instrument_type == 'counter':
instrument = meter.create_counter(name, unit=unit, description=description)
elif instrument_type == 'histogram':
instrument = meter.create_histogram(name, unit=unit, description=description)
else:
raise ValueError(f"Unsupported instrument type: {instrument_type}")

_instrument_registry[name] = instrument
return instrument


def log_counter(metric_name, value=1, labels=None, documentation=""):
"""
Increments a counter. Documentation is used only on first creation.
In OTel, 'labels' are called 'attributes'.
"""
try:
counter = _get_or_create_instrument(
'counter', metric_name, unit="1", description=documentation
)
counter.add(value, attributes=(labels or {}))
except Exception as e:
logging.error(f"Failed to log OTel counter {metric_name}: {e}")


def log_histogram(metric_name, value, labels=None, documentation=""):
"""
Records a value in a histogram. Documentation is used only on first creation.
"""
try:
histogram = _get_or_create_instrument(
'histogram', metric_name, unit="s", description=documentation
)
histogram.record(value, attributes=(labels or {}))
except Exception as e:
logging.error(f"Failed to log OTel histogram {metric_name}: {e}")


def timeit(metric_name=None, documentation="Execution time and call count of a function."):
"""
Decorator that times a function.
- Emits a histogram for duration.
- Emits a counter for calls.
- Adds a 'status' attribute for success/error.
- Adds an event to the current trace span, if one exists.
"""

def decorator(func):
base_name = metric_name or func.__name__

@functools.wraps(func)
def wrapper(*args, **kwargs):
# Get current span from the context. It's a no-op if no tracer is configured.
span = trace.get_current_span()
start_time = time.time()
status = "error"

try:
result = func(*args, **kwargs)
status = "success"
return result
finally:
elapsed_time = time.time() - start_time
common_attributes = {"function": func.__name__, "status": status}

# 1. Log metrics for aggregation
log_histogram(
metric_name=f"{base_name}_duration_seconds",
value=elapsed_time,
labels=common_attributes,
documentation="Duration of function execution in seconds."
)
log_counter(
metric_name=f"{base_name}_calls_total",
labels=common_attributes,
documentation=f"Total calls to the function."
)

# 2. Add a precise event to the active trace for debugging
span.add_event(
name=f"finished {func.__name__}",
attributes={
"duration_sec": round(elapsed_time, 4),
"status": status,
}
)

return wrapper

return decorator



# --- Example Usage ----
# --- Application Code ---
# @timeit(metric_name="data_processing")
# import time
# import logging
# from metrics_otel import init_metrics, timeit, log_counter
# def process_data(user_id):
# """A sample function to process some data."""
# logging.info(f"Processing data for user {user_id}...")
# time.sleep(0.2)
#
# if user_id % 5 == 0:
# # This is a good use of a custom counter
# log_counter(
# "special_user_processed_total",
# labels={"user_type": "vip"},
# documentation="Counter for a special type of user processing."
# )
#
# if user_id % 10 == 0:
# raise ValueError("Simulating a failure")
#
# logging.info("Done.")
#
#
# def main():
# # Initialize OpenTelemetry metrics ONCE at application start.
# # It reads configuration from environment variables.
# init_metrics()
#
# # Main application loop
# user_id = 0
# while True:
# try:
# process_data(user_id)
# except ValueError as e:
# logging.error(f"Failed to process data for user {user_id}: {e}")
#
# user_id += 1
# time.sleep(1)
#
#
# if __name__ == "__main__":
# main()

#
# End of Otel_Metrics.py
#######################################################################################################################
103 changes: 57 additions & 46 deletions tldw_chatbook/Metrics/logger_config.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@
import sys
import os
from datetime import datetime
from typing import Optional

#
# 3rd-Party Imports
from loguru import logger
@@ -15,7 +17,18 @@
#
# Functions:

log_metrics_file = '~/.local/tldw_cli/Logs/tldw_metrics_logs.json'
# Sensible default locations for logs
DEFAULT_APP_LOG_PATH = '~/.local/tldw_cli/Logs/tldw_app.log'
DEFAULT_METRICS_LOG_PATH = '~/.local/tldw_cli/Logs/tldw_metrics.json'

def _ensure_log_dir_exists(file_path: str):
"""Ensure the directory for the log file exists."""
# Expand the user's home directory if '~' is used
expanded_path = os.path.expanduser(file_path)
log_dir = os.path.dirname(expanded_path)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
return expanded_path

def retention_function(files):
"""
@@ -68,64 +81,62 @@ def serialize(value):
})


def setup_logger(args):
def setup_logger(
log_level: str = "DEBUG",
console_format: str = "{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}",
app_log_path: Optional[str] = DEFAULT_APP_LOG_PATH,
metrics_log_path: Optional[str] = DEFAULT_METRICS_LOG_PATH,
):
"""
Sets up Loguru using command-line arguments (if provided)
and configuration file settings.
Sets up Loguru sinks for console, a standard application log, and a JSON metrics log.
Args:
log_level (str): The minimum log level to output (e.g., "DEBUG", "INFO").
console_format (str): The format string for console output.
app_log_path (Optional[str]): Path for the standard text log file. If None, this sink is disabled.
metrics_log_path (Optional[str]): Path for the structured JSON metrics log. If None, this sink is disabled.
This function adds:
- A console sink with a simple human‑readable format.
- A file sink for standard logs.
- Optionally, a file sink with JSON formatting for metrics.
Returns:
The configured logger instance.
"""
# Remove any previously added sinks.
# Start with a clean slate
logger.remove()

# Determine the log level (from args; default to DEBUG)
log_level = args.log_level.upper() if hasattr(args, "log_level") else "DEBUG"

# Console sink with simple format
# 1. Console Sink (always enabled)
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}"
level=log_level.upper(),
format=console_format
)

# Determine the file sink for standard logs.
# Prefer the command-line argument if provided; otherwise, use the config.
if hasattr(args, "log_file") and args.log_file:
file_log_path = args.log_file
logger.info(f"Log file created at: {file_log_path}")
else:
file_log_path = '~/.local/tldw_cli/Logs/tldw_app_logs.json'
logger.info(f"No logfile provided via command-line. Using default: {file_log_path}")

# Ensure directory exists
log_dir = os.path.dirname(file_log_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)

# Standard file sink
logger.add(
file_log_path,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}"
)

if log_metrics_file:
metrics_dir = os.path.dirname(log_metrics_file)
if metrics_dir and not os.path.exists(metrics_dir):
os.makedirs(metrics_dir, exist_ok=True)
# 2. Standard Application File Sink
if app_log_path:
path = _ensure_log_dir_exists(app_log_path)
logger.add(
path,
level=log_level.upper(),
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
rotation="10 MB", # Rotate file when it reaches 10 MB
retention="7 days", # Keep logs for 7 days
enqueue=True, # Make logging non-blocking
backtrace=True, # Show full stack trace on exceptions
diagnose=True, # Add exception variable values
)
logger.info(f"Application logs will be written to: {path}")

# 3. JSON Metrics File Sink
if metrics_log_path:
path = _ensure_log_dir_exists(metrics_log_path)
logger.add(
log_metrics_file,
level="DEBUG",
format="{time} - {level} - {message}", # Simple format for JSON sink
serialize=True, # This enables JSON serialization
path,
level="DEBUG", # Typically, you want all levels for metrics
serialize=True, # This is the key for JSON output
rotation="10 MB",
# Loguru’s built-in retention can be a simple number (e.g., 5) meaning “keep 5 files
retention=5,
retention=5, # Keeps the 5 most recent log files
enqueue=True,
)
logger.info(f"JSON metrics logs will be written to: {path}")

return logger

# def setup_logger(log_file_path="tldw_app_logs.json"):
215 changes: 215 additions & 0 deletions tldw_chatbook/Metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# metrics.py
"""
A thread-safe, generic metrics library built on top of the official
prometheus_client.
Key Features:
- Dynamically creates metrics, avoiding hardcoded names.
- Thread-safe metric creation for use in web servers.
- Ergonomic API that doesn't require repeating documentation.
- Decorators for common patterns like timing functions.
IMPORTANT: A NOTE ON LABEL CARDINALITY
Metric labels should only be used for values with a small, finite set of
possibilities (low cardinality). Using labels with high cardinality values
(e.g., user_id, request_id, file_path) will cause an explosion in the number
of time series, overwhelming your Prometheus server.
- DO use labels for: status codes, environments, machine types, API endpoints.
- DO NOT use labels for: user IDs, session IDs, trace IDs, URLs, or any
unbounded unique identifier.
"""
#
# Imports
import functools
import threading
import time
import logging
import psutil#
# Third-party Imports
from prometheus_client import Counter, Histogram, Gauge, start_http_server
#
# Local Imports
#
######################################################################################################################
#
# Functions:

# A thread-safe registry for dynamically created metrics.
_metrics_registry = {}
_registry_lock = threading.Lock()


def _get_or_create_metric(metric_type, name, documentation, label_keys=None):
"""
Internal function to get a metric from the registry or create it if it
doesn't exist. Uses a double-checked lock for thread safety and performance.
"""
label_keys = tuple(sorted(label_keys or []))
registry_key = (metric_type, name, label_keys)

# Fast path: check if metric exists without locking.
if registry_key in _metrics_registry:
return _metrics_registry[registry_key]

# Slow path: acquire lock to safely create the metric.
with _registry_lock:
# Double-check if another thread created it while we were waiting.
if registry_key in _metrics_registry:
return _metrics_registry[registry_key]

if metric_type == 'counter':
metric = Counter(name, documentation, label_keys)
elif metric_type == 'histogram':
metric = Histogram(name, documentation, label_keys)
elif metric_type == 'gauge':
metric = Gauge(name, documentation, label_keys)
else:
raise ValueError(f"Unsupported metric type: {metric_type}")

_metrics_registry[registry_key] = metric
return metric


def log_counter(metric_name, value=1, labels=None, documentation=""):
"""
Increments a counter metric. The metric is created on first use.
Documentation is only used during the initial creation of the metric.
"""
try:
label_keys = list(labels.keys()) if labels else []
eff_labels = labels or {}
counter = _get_or_create_metric('counter', metric_name, documentation, label_keys)
counter.labels(**eff_labels).inc(value)
except Exception as e:
logging.error(f"Failed to log counter {metric_name}: {e}")


def log_histogram(metric_name, value, labels=None, documentation=""):
"""
Observes a value for a histogram metric. The metric is created on first use.
Documentation is only used during the initial creation of the metric.
"""
try:
label_keys = list(labels.keys()) if labels else []
eff_labels = labels or {}
histogram = _get_or_create_metric('histogram', metric_name, documentation, label_keys)
histogram.labels(**eff_labels).observe(value)
except Exception as e:
logging.error(f"Failed to log histogram {metric_name}: {e}")


def log_gauge(metric_name, value, labels=None, documentation=""):
"""
Sets the value of a gauge metric. The metric is created on first use.
Documentation is only used during the initial creation of the metric.
"""
try:
label_keys = list(labels.keys()) if labels else []
eff_labels = labels or {}
gauge = _get_or_create_metric('gauge', metric_name, documentation, label_keys)
gauge.labels(**eff_labels).set(value)
except Exception as e:
logging.error(f"Failed to log gauge {metric_name}: {e}")


def timeit(metric_name=None, documentation="Execution time of a function."):
"""
Decorator that times a function, logging a histogram for duration and a
counter for total calls. It also adds a 'status' label for success/error.
"""

def decorator(func):
base_name = metric_name or func.__name__

@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
status = "error" # Default to error
try:
result = func(*args, **kwargs)
status = "success"
return result
finally:
elapsed = time.time() - start
common_labels = {"function": func.__name__, "status": status}

log_histogram(
metric_name=f"{base_name}_duration_seconds",
value=elapsed,
labels=common_labels,
documentation=documentation
)

log_counter(
metric_name=f"{base_name}_calls_total",
labels=common_labels,
documentation=f"Total calls to {func.__name__}"
)

return wrapper

return decorator


def log_resource_usage():
"""Logs current CPU and Memory usage of the process as gauges."""
process = psutil.Process()
memory_mb = process.memory_info().rss / (1024 ** 2)
cpu_percent = process.cpu_percent(interval=None) # Non-blocking

log_gauge(
"process_memory_mb",
memory_mb,
documentation="Current memory usage of the process in Megabytes."
)
log_gauge(
"process_cpu_percent",
cpu_percent,
documentation="Current CPU usage of the process as a percentage."
)


def init_metrics_server(port=8000):
"""Starts the Prometheus HTTP server in a separate thread."""
start_http_server(port)
logging.info(f"Prometheus metrics server started on port {port}")


# --- Sample Usage ---
# pip install opentelemetry-sdk opentelemetry-exporter-prometheus opentelemetry-instrumentation-system-metrics
# OTEL_SERVICE_NAME=video-processor OTEL_SERVICE_VERSION=1.2.3 python main_app.py
#
# @timeit() # Uses the function name `process_data` to build metric names
# def process_data(user_id):
# """A sample function to process some data."""
# print(f"Processing data for user {user_id}...")
# time.sleep(0.5)
# if user_id % 5 == 0:
# # You can still log custom counters inside your functions
# log_counter(
# "special_user_processed_total",
# "Counter for a special type of user.",
# labels={"user_type": "vip"}
# )
# print("Done.")
#
# def main():
# # Start the metrics server once at the beginning of your app
# init_metrics_server(port=8000)
#
# # Example usage
# user_id = 0
# while True:
# process_data(user_id)
# log_resource_usage() # Log resource usage in your main loop
# user_id += 1
# time.sleep(1)
#
# if __name__ == "__main__":
# main()

#
# End of metrics.py
############################################################################################################
259 changes: 153 additions & 106 deletions tldw_chatbook/Metrics/metrics_logger.py
Original file line number Diff line number Diff line change
@@ -2,143 +2,190 @@
#
# Imports
import functools
import sys
import time
from datetime import datetime, timezone
from typing import Any, Optional, Dict, Union, Callable
import psutil
#
# Third-party Imports
#
# Local Imports
import logging
from loguru import logger
#
############################################################################################################
#
# Functions:

def log_counter(metric_name, labels=None, value=1):
log_entry = {
"event": metric_name,
"type": "counter",
"value": value,
"labels": labels or {},
# datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
# FIXME
"timestamp": datetime.now(timezone.utc).isoformat() + "Z"
}
logging.info("metric", extra=log_entry)


def log_histogram(metric_name, value, labels=None):
log_entry = {
"event": metric_name,
"type": "histogram",
"value": value,
"labels": labels or {},
# datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).
# FIXME
"timestamp": datetime.now(timezone.utc).isoformat() + "Z"
}
logging.info("metric", extra=log_entry)


def timeit(func):
# 1. Refined Type Hinting for clarity and correctness
LabelValue = Union[str, int, float, bool]
LabelDict = Dict[str, LabelValue]

# 2. (Gold Standard) Define a custom "METRIC" level for powerful filtering
# This allows separating metrics from regular application logs at the sink level.
logger.level("METRIC", no=25, color="<blue>", icon="📊")


def _log_metric(
metric_name: str,
metric_type: str,
value: Any,
labels: Optional[LabelDict] = None,
):
"""
Private helper to log a structured metric using idiomatic loguru binding.
"""
# 3. Bind each piece of data to the top level for a flatter, queryable JSON
bound_logger = logger.bind(
event=metric_name,
type=metric_type,
value=value,
labels=labels or {},
timestamp=datetime.now(timezone.utc).isoformat(),
)
# Use the custom METRIC level
bound_logger.log("METRIC", f"{metric_type.capitalize()} '{metric_name}': {value}")


def timeit(
metric_name: Optional[str] = None,
labels: Optional[LabelDict] = None,
log_summary: bool = True,
log_call_count: bool = False,
):
"""
Decorator that times the execution of the wrapped function
and logs the result using log_histogram. Optionally, you could also
log a counter each time the function is called.
A robust decorator that times a function, logging a histogram and status.
Args:
metric_name (str, optional): Custom name for the metric. Defaults to function name.
labels (dict, optional): Extra labels to add to the metric.
log_summary (bool): If True, logs a human-readable summary at INFO level.
log_call_count (bool): If True, also logs a counter metric for each call.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start

# Print to console (optional)
logging.info(f"{func.__name__} executed in {elapsed:.2f} seconds.")
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 4. Robust timing and status tracking
m_name = metric_name or f"{func.__name__}_duration_seconds"
all_labels = {"function": func.__name__}
if labels:
all_labels.update(labels)

start_time = time.perf_counter()
status = "success"
try:
result = func(*args, **kwargs)
return result
except Exception:
status = "failure"
raise # Re-raise the exception after marking status
finally:
elapsed_time = time.perf_counter() - start_time
final_labels = {**all_labels, "status": status}

# Log the primary histogram metric
_log_metric(m_name, "histogram", elapsed_time, final_labels)

# Optionally log a separate counter metric
if log_call_count:
counter_name = f"{func.__name__}_calls_total"
_log_metric(counter_name, "counter", 1, final_labels)

if log_summary:
logger.info(
f"Function '{func.__name__}' finished in {elapsed_time:.4f}s "
f"with status '{status}'."
)

return wrapper

return decorator


class MetricsLogger:
"""
5. A class-based API for providing context (base labels) to a set of metrics.
# Log how long the function took (histogram)
log_histogram(
metric_name=f"{func.__name__}_duration_seconds",
value=elapsed,
labels={"function": func.__name__}
)
This is useful for grouping all metrics from a specific module or request.
"""

# (Optional) log how many times the function has been called
log_counter(
metric_name=f"{func.__name__}_calls",
labels={"function": func.__name__}
)
def __init__(self, base_labels: Optional[LabelDict] = None):
self._base_labels = base_labels or {}

return result
return wrapper
# Add '@timeit' decorator to functions you want to time
def _get_labels(self, labels: Optional[LabelDict]) -> LabelDict:
"""Merge instance labels with call-specific labels."""
final_labels = self._base_labels.copy()
if labels:
final_labels.update(labels)
return final_labels

def log_counter(self, name: str, value: int = 1, labels: Optional[LabelDict] = None):
_log_metric(name, "counter", value, self._get_labels(labels))

def log_resource_usage():
process = psutil.Process()
memory = process.memory_info().rss / (1024 ** 2) # Convert to MB
cpu = process.cpu_percent(interval=0.1)
logging.info(f"Memory: {memory:.2f} MB, CPU: {cpu:.2f}%")
def log_gauge(self, name: str, value: float, labels: Optional[LabelDict] = None):
_log_metric(name, "gauge", value, self._get_labels(labels))

#
# End of Functions
############################################################################################################
def log_histogram(self, name: str, value: float, labels: Optional[LabelDict] = None):
_log_metric(name, "histogram", value, self._get_labels(labels))

# # Prometheus
# # metrics_logger.py (Prometheus version)
# from prometheus_client import Counter, Histogram, start_http_server
# import logging
# from functools import wraps
# import time
def log_resource_usage(self, labels: Optional[LabelDict] = None):
process = psutil.Process()
combined_labels = self._get_labels(labels)
self.log_gauge("process_memory_mb", process.memory_info().rss / (1024 ** 2), combined_labels)
self.log_gauge("process_cpu_percent", process.cpu_percent(interval=0.1), combined_labels)


# For convenience, a default instance for simple, one-off logging
default_metrics = MetricsLogger()
log_counter = default_metrics.log_counter
log_gauge = default_metrics.log_gauge
log_histogram = default_metrics.log_histogram
log_resource_usage = default_metrics.log_resource_usage

# # Example usage block to demonstrate the new features
# if __name__ == "__main__":
# from logger_config import setup_logger
#
# # Initialize Prometheus metrics
# VIDEOS_PROCESSED = Counter('videos_processed_total', 'Total number of videos processed', ['whisper_model', 'api_name'])
# VIDEOS_FAILED = Counter('videos_failed_total', 'Total number of videos failed to process', ['whisper_model', 'api_name'])
# TRANSCRIPTIONS_GENERATED = Counter('transcriptions_generated_total', 'Total number of transcriptions generated', ['whisper_model'])
# SUMMARIES_GENERATED = Counter('summaries_generated_total', 'Total number of summaries generated', ['whisper_model'])
# VIDEO_PROCESSING_TIME = Histogram('video_processing_time_seconds', 'Time spent processing videos', ['whisper_model', 'api_name'])
# TOTAL_PROCESSING_TIME = Histogram('total_processing_time_seconds', 'Total time spent processing all videos', ['whisper_model', 'api_name'])
# # Configure sinks. One for console, one just for metrics.
# logger.remove()
# logger.add(sys.stdout, level="INFO", format="{level.icon} {level.name}: {message}")
# logger.add(
# "test_metrics_only.json",
# level="METRIC", # This sink will ONLY capture our metrics!
# serialize=True
# )
#
# def init_metrics_server(port=8000):
# start_http_server(port)
# logger.info("--- Testing Advanced Metrics Logger ---")
#
# def log_counter(metric_name, labels=None, value=1):
# if metric_name == "videos_processed_total":
# VIDEOS_PROCESSED.labels(**(labels or {})).inc(value)
# elif metric_name == "videos_failed_total":
# VIDEOS_FAILED.labels(**(labels or {})).inc(value)
# elif metric_name == "transcriptions_generated_total":
# TRANSCRIPTIONS_GENERATED.labels(**(labels or {})).inc(value)
# elif metric_name == "summaries_generated_total":
# SUMMARIES_GENERATED.labels(**(labels or {})).inc(value)
#
# def log_histogram(metric_name, value, labels=None):
# if metric_name == "video_processing_time_seconds":
# VIDEO_PROCESSING_TIME.labels(**(labels or {})).observe(value)
# elif metric_name == "total_processing_time_seconds":
# TOTAL_PROCESSING_TIME.labels(**(labels or {})).observe(value)


# # main.py or equivalent entry point
# from metrics_logger import init_metrics_server
# # 1. Test the robust @timeit decorator
# @timeit(log_call_count=True)
# def successful_task():
# time.sleep(0.1)
#
#
# def main():
# # Start Prometheus metrics server on port 8000
# init_metrics_server(port=8000)
# @timeit
# def failing_task():
# time.sleep(0.1)
# raise ValueError("Something went wrong")
#
# # Initialize and launch your Gradio app
# create_video_transcription_tab()
#
# successful_task()
# try:
# failing_task()
# except ValueError as e:
# logger.warning(f"Caught expected exception: {e}")
#
# if __name__ == "__main__":
# main()

# prometheus.yml
# scrape_configs:
# - job_name: 'video_transcription_app'
# static_configs:
# - targets: ['localhost:8000'] # Replace with your application's host and port
# # 2. Test the class-based logger with context
# api_logger = MetricsLogger(base_labels={"component": "api", "version": "v2"})
# api_logger.log_counter("requests_total", labels={"endpoint": "/users"})
# api_logger.log_counter("requests_total", labels={"endpoint": "/data"})
#
# # 3. Test the default instance for one-off metrics
# log_resource_usage()
#
# logger.info("--- Test complete. Check 'test_metrics_only.json' ---")

#
# End of metrics_logger.py