ruạṛ
import os import threading from contextvars import copy_context from typing import Callable, Optional from app.errors.mapper import map_exception from app.errors.serialize import simple_error_payload from app.utils.callbacks import call_cbs_callback from app.utils.log_helper import set_job_id_for_context_logging from app.utils.utility_functions import LogPrinter def run_threaded_task(cbs_job_id: int, action_name: str, task_func: Callable, on_success_data: Callable, on_error_message: Optional[Callable] = None): thread_name = f"{action_name}-{cbs_job_id}" for executing_thread in threading.enumerate(): # Prevent duplicate thread if one is already running if executing_thread.name == thread_name and not getattr(executing_thread, "task_completed", True): LogPrinter.warn(f"[{thread_name}] Task is already running. Duplicate not allowed.") return def internal_runner(): set_job_id_for_context_logging(str(cbs_job_id)) thread = threading.current_thread() thread.name = thread_name thread.task_completed = False pid = os.getpid() LogPrinter.note(f"[{thread.name}] Running on PID={pid} | Thread={thread.name}") try: result = task_func() data = on_success_data(result) LogPrinter.success(f"[{thread.name}] Task completed successfully") call_cbs_callback(cbs_job_id, action_name, "success", data) except Exception as e: bart_err = map_exception(e) if on_error_message: try: custom = on_error_message(e) if isinstance(custom, dict): # Accept either {"error": "..."} or {"message": "..."} msg = custom.get("error") or custom.get("message") if msg: bart_err = type(bart_err)(bart_err.code, msg) elif isinstance(custom, str): bart_err = type(bart_err)(bart_err.code, custom) except Exception as hook_err: LogPrinter.error(f"[{thread.name}] on_error_message failed: {hook_err}") payload = simple_error_payload(bart_err) LogPrinter.error(f"[{thread.name}] Task failed: {payload}") call_cbs_callback(cbs_job_id, action_name, "failed", payload) finally: thread.task_completed = True LogPrinter.note(f"[{thread.name}] Thread marked as completed") # Capture the current context (includes the ContextVar value of UUID) ctx = copy_context() # Spawn thread with context-aware runner new_thread = threading.Thread(target=ctx.run, args=(internal_runner,), daemon=True) new_thread.start() LogPrinter.note(f"[Main] Background thread launched: {new_thread.name}")
cải xoăn