Uecko_ERP_FactuGES_sync/app/utils/last_execution_helper.py
2025-11-30 23:44:55 +01:00

70 lines
1.8 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Fecha por defecto si nunca se ha ejecutado
DEFAULT_FALLBACK = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
FMT = "%Y-%m-%d %H:%M:%S" # Formato persistido
def obtener_fecha_ultima_ejecucion(
path: str | Path,
*,
fallback: Optional[datetime] = None,
) -> datetime:
"""
Lee la última fecha de ejecución almacenada en `path`.
Retorna always-aware UTC.
- Si el archivo no existe → fallback
- Si está vacío o el formato es inválido → fallback
fallback predeterminado: DEFAULT_FALLBACK (2025-01-01)
"""
effective_fallback = fallback or DEFAULT_FALLBACK
p = Path(path)
try:
text = p.read_text(encoding="utf-8").strip()
if not text:
return effective_fallback
dt_naive = datetime.strptime(text, FMT)
return dt_naive.replace(tzinfo=timezone.utc)
except FileNotFoundError:
return effective_fallback
except ValueError:
return effective_fallback
def actualizar_fecha_ultima_ejecucion(
path: str | Path,
*,
momento: Optional[datetime] = None,
) -> None:
"""
Guarda `momento` (UTC) en `path`, creando directorios si hace falta.
- Si `momento` es None → ahora en UTC
- Si `momento` viene naive → se asume UTC
- Si trae tz → se convierte a UTC
"""
p = Path(path)
if momento is None:
momento = datetime.now(timezone.utc)
else:
if momento.tzinfo is None:
momento = momento.replace(tzinfo=timezone.utc)
else:
momento = momento.astimezone(timezone.utc)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(momento.strftime(FMT), encoding="utf-8")