Uecko_ERP_FactuGES_sync/app/utils/last_execution_helper.py
2025-11-27 20:08:06 +01:00

61 lines
1.9 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import os
DEFAULT_PATH = "./last_execution.txt"
FMT = "%Y-%m-%d %H:%M:%S"
def obtener_fecha_ultima_ejecucion(
path: str = DEFAULT_PATH,
*,
fallback: Optional[datetime] = None,
) -> datetime:
"""
Lee la última fecha de ejecución desde `path` y la devuelve como aware (UTC).
Si no existe o hay error de parseo, devuelve `fallback` (por defecto 2024-01-01 UTC).
"""
if fallback is None:
fallback = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
try:
with open(path, "r", encoding="utf8") as f:
fecha_str = f.read().strip()
# Se guarda como texto sin tz; interpretamos como UTC
dt_naive = datetime.strptime(fecha_str, FMT)
return dt_naive.replace(tzinfo=timezone.utc)
except FileNotFoundError:
return fallback
except ValueError:
# Formato inválido en el archivo -> usar fallback
return fallback
def actualizar_fecha_ultima_ejecucion(
path: str = DEFAULT_PATH,
*,
momento: Optional[datetime] = None,
) -> None:
"""
Escribe en `path` la fecha/hora (UTC) en formato YYYY-MM-DD HH:MM:SS.
Si `momento` es None, usa ahora en UTC.
Crea directorios intermedios si no existen.
"""
if momento is None:
momento = datetime.now(timezone.utc)
else:
# Normalizamos a UTC si viene con tz; si es naive, asumimos UTC
if momento.tzinfo is None:
momento = momento.replace(tzinfo=timezone.utc)
else:
momento = momento.astimezone(timezone.utc)
# Asegurar carpeta si `path` incluye directorios
folder = os.path.dirname(os.path.abspath(path))
if folder and not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
with open(path, "w", encoding="utf8") as f:
f.write(momento.strftime(FMT))