Uecko_ERP_FactuGES_sync/app/db/sync_invoices.py
2025-11-07 19:22:36 +01:00

407 lines
17 KiB
Python

import logging
import textwrap
from typing import Dict, Any, Optional, Tuple
from uuid6 import uuid7
from config import load_config
from decimal import Decimal, ROUND_HALF_UP
from . import sql_sentences as SQL
from utils import limpiar_cadena, normalizar_telefono_con_plus, corregir_y_validar_email, normalizar_url_para_insert, map_tax_code, cents, money_round, tax_fraction_from_code
from striprtf.striprtf import rtf_to_text
# =========================
# constantes
# =========================
# Compañia RODAX
cte_company_id = '5e4dc5b3-96b9-4968-9490-14bd032fec5f'
def sync_invoices(conn_factuges, conn_mysql, last_execution_date):
config = load_config()
# LIMPIAMOS LAS FACTURAS DE FACTUGES QUE HAYAN SIDO ELIMINADAS DEL PROGRAMA NUEVO DE FACTURACION, PARA QUE PUEDAN SER MODIFICADAS
# Crear un cursor para ejecutar consultas SQL
cursor_mysql = None
try:
cursor_mysql = conn_mysql.cursor()
cursor_mysql.execute(SQL.SELECT_INVOICES_DELETED)
filas = cursor_mysql.fetchall()
cursor_mysql.close()
# Crear un conjunto con los IDs [0] de los customer_inovices que debo liberar en FactuGES, porque han sido eliminadas en programa de facturación nuevo
ids_verifactu_deleted = {str(fila[0]) for fila in filas}
# logging.info(f"Customer invoices rows to be deleted: {len(ids_verifactu_deleted)}")
# Verificar si hay filas en el resultado
if ids_verifactu_deleted:
sync_delete_invoices(conn_factuges, ids_verifactu_deleted, config)
else:
logging.info(f"There are customer invoices deleted")
except Exception as e:
if cursor_mysql is not None:
cursor_mysql.close()
logging.error(f"(ERROR) Failed to fetch from database:{
config['UECKO_MYSQL_DATABASE']} - using user:{config['UECKO_MYSQL_USER']}")
logging.error(e)
raise e
# BUSCAMOS FACTURAS ENVIADAS A VERIFACTU EN FACTUGES, PARA SUBIRLAS AL NUEVO PROGRAMA DE FACTURACIÓN
# Crear un cursor para ejecutar consultas SQL
cursor_FactuGES = None
try:
cursor_FactuGES = conn_factuges.cursor()
# Ejecutar la consulta de FACTURAS_CLIENTE
cursor_FactuGES.execute(SQL.SELECT_FACTUGES_FACTURAS_CLIENTE)
filas = cursor_FactuGES.fetchall()
except Exception as e:
if cursor_FactuGES is not None:
cursor_FactuGES.close()
logging.error(f"(ERROR) Failed to fetch from database:{
config['FACTUGES_DATABASE']} - using user:{config['FACTUGES_USER']}")
logging.error(e)
raise e
# Obtener los nombres de las columnas
columnas = [desc[0] for desc in cursor_FactuGES.description]
cursor_FactuGES.close()
# Convertir las filas en diccionarios con nombres de columnas como claves
tuplas_seleccionadas = []
for fila in filas:
tupla = dict(zip(columnas, fila))
tuplas_seleccionadas.append(tupla)
# Verificar si hay filas en el resultado
if tuplas_seleccionadas:
sync_invoices_from_FACTUGES(
conn_mysql, tuplas_seleccionadas, conn_factuges, config)
else:
logging.info(
"There are no new FACTURAS rows since the last run.")
def sync_delete_invoices(conn_factuges, ids_verifactu_deleted, config):
# Eliminamos todos los IDs asociados en FactuGES que han sido eliminados así liberaremos la factura borrador y podermos modificarla de nuevo, para volverla a subir una vez hechos los cambios.
# VERIFACTU = 0 and ID_VERIFACTU = NULL
cursor_FactuGES = None
try:
cursor_FactuGES = conn_factuges.cursor()
if ids_verifactu_deleted:
logging.info(f"Liberate factures: {ids_verifactu_deleted}")
cursor_FactuGES.executemany(SQL.LIMPIAR_FACTUGES_LINK, [(
id_verifactu,) for id_verifactu in ids_verifactu_deleted])
else:
logging.info("No articles to delete.")
except Exception as e:
# Escribir el error en el archivo de errores
logging.error(str(e))
raise e # Re-lanzar la excepción para detener el procesamiento
finally:
# Cerrar la conexión
if cursor_FactuGES is not None:
cursor_FactuGES.close()
def sync_invoices_from_FACTUGES(conn_mysql, filas, conn_factuges, config):
# Insertaremos cada factura existente en las filas a la nueva estructura de tablas del programa nuevo de facturacion.
# logging.info(f"FACTURAS_CLIENTE_DETALLE rows to be processed: {len(filas)}")
cursorMySQL = None
cursor_FactuGES = None
factuges_id_anterior = None
num_fac_procesed = 0
try:
cursorMySQL = conn_mysql.cursor()
cursor_FactuGES = conn_factuges.cursor()
# Insertar datos en la tabla 'customer_invoices'
for factura_detalle in filas:
# Preparamos los campos para evitar errores
customer_fields = normalize_customer_fields(factura_detalle)
header_invoice_fields = normalize_header_invoice_fields(
factura_detalle)
details_invoice_fields = normalize_details_invoice_fields(
factura_detalle)
logging.info(
f"FACTURAS_CLIENTE DETALLLLLLLLLLESSSSSSS: {details_invoice_fields}")
factuges_id = int(factura_detalle['ID_FACTURA'])
if factuges_id_anterior is None or factuges_id_anterior != factuges_id:
# Comprobamos si existe el cliente del primer item de la factura
customer_id = get_or_create_customer(
cursorMySQL,
cte_company_id,
str(factura_detalle["ID_CLIENTE"]),
customer_fields,
)
# ---- forma de pago
pm_id = get_or_create_payment_method(
cursorMySQL,
str(factura_detalle["ID_FORMA_PAGO"]),
str(factura_detalle["DES_FORMA_PAGO"]),
)
# campos pendiente de revisar en un futuro
# xxxxxxx = str(factura_detalle['ID_FORMA_PAGO']) según este id se debe de guardar en la factura los vencimiento asociados a la forma de pago
# ---- cabecera factura
invoice_id = insert_invoice_header(
cursorMySQL, cte_company_id, customer_fields, header_invoice_fields, customer_id, pm_id, str(
factura_detalle["DES_FORMA_PAGO"])
)
# ---- impuestos cabecera
insert_header_taxes_if_any(
cursorMySQL, invoice_id, factura_detalle['IVA'], factura_detalle['RECARGO_EQUIVALENCIA'], header_invoice_fields)
# Guardamos en Factuges el id de la customer_invoice
logging.info(
f"Updating FACTURAS_CLIENTE {invoice_id} {factuges_id}")
cursor_FactuGES.execute(
SQL.UPDATE_FACTUGES_LINK, (invoice_id, factuges_id))
num_fac_procesed += 1
# Insertamos detalles y taxes correspondientes siempre
# Siempre insertamos la línea
insert_item_and_taxes(cursorMySQL, invoice_id,
details_invoice_fields)
# Asignamos el id factura anterior para no volver a inserta cabecera
factuges_id_anterior = factuges_id
logging.info(
f"FACTURAS_CLIENTE rows to be processed: {str(num_fac_procesed)}")
except Exception as e:
# Escribir el error en el archivo de errores
logging.error(str(e))
raise e # Re-lanzar la excepción para detener el procesamiento
finally:
# Cerrar la conexión
if cursorMySQL is not None:
cursorMySQL.close()
if cursor_FactuGES is not None:
cursor_FactuGES.close()
def rtf_a_texto_plano(rtf: str) -> str:
"""
Convierte RTF a texto plano usando striprtf.
"""
return rtf_to_text(rtf).strip()
def normalize_customer_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
"""
Normaliza campos del registro de origen 'fd' (factura).
"""
# >>> aquí usa tus helpers reales:
def clean_phone(v): return normalizar_telefono_con_plus(v)
def clean_web(v): return normalizar_url_para_insert(v)
def clean_tin(v): return limpiar_cadena(v)
email1_ok, email1 = corregir_y_validar_email(fd.get("EMAIL_1"))
email2_ok, email2 = corregir_y_validar_email(fd.get("EMAIL_2"))
return {
"tin": clean_tin(str(fd.get("NIF_CIF"))),
"name": str(fd.get("NOMBRE")),
"street": str(fd.get("CALLE")),
"city": str(fd.get("POBLACION")),
"province": str(fd.get("PROVINCIA")),
"postal_code": str(fd.get("CODIGO_POSTAL")),
"country": "es",
"phone_primary": clean_phone(fd.get("TELEFONO_1")),
"phone_secondary": clean_phone(fd.get("TELEFONO_2")),
"mobile_primary": clean_phone(fd.get("MOVIL_1")),
"mobile_secondary": clean_phone(fd.get("MOVIL_2")),
"email_primary": email1 if email1_ok else None,
"email_secondary": email2 if email2_ok else None,
"website": clean_web(str(fd.get("PAGINA_WEB")))
}
def normalize_header_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
"""
Normaliza campos del registro de origen 'fd' (factura).
# campos pendiente de revisar en un futuro
# xxxxxxx = str(factura_detalle['ID_EMPRESA'])
# xxxxxxx = str(factura_detalle['OBSERVACIONES'])
"""
return {
"factuges_id": int(fd['ID_FACTURA']),
"reference": str(fd['REFERENCIA']),
"invoice_date": str(fd['FECHA_FACTURA']),
"operation_date": str(fd['FECHA_FACTURA']),
"description": textwrap.shorten(
f"{str(fd['REFERENCIA'])} - {str(fd.get('NOMBRE')) or ''}", width=50, placeholder=""),
# siempre tendrán 2 decimales
'subtotal_amount_value': cents(fd.get('IMPORTE_NETO')),
'discount_amount_value': cents(fd.get('IMPORTE_DESCUENTO')),
'discount_percentage_val': int((Decimal(str(fd.get('DESCUENTO') or 0)) * 100).to_integral_value())
if fd.get('DESCUENTO') is not None else 0,
'taxable_amount_value': cents(fd.get('BASE_IMPONIBLE')),
'tax_code': map_tax_code(str(fd.get("DES_TIPO_IVA"))),
'taxes_amount_value': cents((fd.get('IMPORTE_IVA') or 0) + (fd.get('IMPORTE_RE') or 0)),
'total_amount_value': cents(fd.get('IMPORTE_TOTAL')),
'base': cents(fd.get('BASE_IMPONIBLE')),
'iva': cents(fd.get('IMPORTE_IVA')),
're': cents(fd.get('IMPORTE_RE'))
}
def normalize_details_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
"""
Normaliza campos del registro de origen 'fd' (factura).
"""
tax_code = map_tax_code(str(fd.get("DES_TIPO_IVA")))
disc_pct = fd.get('DESCUENTO_DET')
# Calcular cuota de IVA de la línea
frac = tax_fraction_from_code(tax_code) # 0.21, 0.10, 0…
# Ttotal_det ya incluye descuento
line_base = Decimal(str(fd.get('IMPORTE_TOTAL_DET') or 0))
tax_amount = int(
(money_round(line_base * frac, 2)*100).to_integral_value())
logging.info(
f"FACTURAS_CLIENTE IVAAAAAAAAAAAAAAA {str(frac)} DETALLE: {str((money_round(line_base * frac, 2)*100))}")
# Se calcula en el objeto de negocio de nuevo, comprobar si coincide
# item_discount_amount = (
# (factura_detalle['IMPORTE_UNIDAD'] or 0)*((factura_detalle['DESCUENTO'] or 0)/100))*100
return {
'tax_code': tax_code,
'position': int(fd['POSICION']),
'description': rtf_a_texto_plano(str(fd['CONCEPTO'])),
'quantity_value': None if fd['CANTIDAD'] is None else int(Decimal(str(fd['CANTIDAD'] or 0)) * 100),
'unit_value': None if fd['IMPORTE_UNIDAD'] is None else int(Decimal(str(fd['IMPORTE_UNIDAD'] or 0)) * 10000),
'disc_pct': disc_pct,
'discount_percentage_value': None if disc_pct in (None, 0) else int(Decimal(str(disc_pct)) * 100),
'total_value': cents(fd.get('IMPORTE_TOTAL_DET')),
'tax_amount': tax_amount
}
def get_or_create_customer(cur, company_id: str, factuges_customer_id: str, fields: Dict[str, Any]) -> str:
"""
Comprobamos si existe el cliente del primer item de la factura y si no lo creamos
"""
cur.execute(SQL.SELECT_CUSTOMER_BY_FACTUGES, (factuges_customer_id,))
row = cur.fetchone()
if not row or not row[0]:
customer_id = str(uuid7())
logging.info("Inserting customer %s %s %s",
factuges_customer_id, fields["tin"], fields["name"])
cur.execute(
SQL.INSERT_CUSTOMER,
(
customer_id, fields["name"], fields["tin"], fields["street"], fields["city"], fields["province"],
fields["postal_code"], fields["country"], fields["phone_primary"], fields["phone_secondary"],
fields["mobile_primary"], fields["mobile_secondary"], fields["email_primary"], fields["email_secondary"],
fields["website"], factuges_customer_id, company_id,
),
)
return customer_id
customer_id = str(row[0])
logging.info("Updating customer %s %s", factuges_customer_id, customer_id)
cur.execute(
SQL.UPDATE_CUSTOMER,
(
fields["name"], fields["tin"], fields["street"], fields["city"], fields["province"], fields["postal_code"],
fields["country"], fields["phone_primary"], fields["phone_secondary"],
fields["mobile_primary"], fields["mobile_secondary"], fields["email_primary"], fields["email_secondary"],
fields["website"], customer_id,
),
)
return customer_id
def get_or_create_payment_method(cur, factuges_payment_id: str, description: str) -> str:
"""
En el caso de que la forma de pago no exista la creamos
"""
cur.execute(SQL.SELECT_PAYMENT_METHOD_BY_FACTUGES, (factuges_payment_id,))
row = cur.fetchone()
if not row or not row[0]:
pm_id = str(uuid7())
logging.info("Inserting payment method %s %s %s",
factuges_payment_id, pm_id, description)
cur.execute(SQL.INSERT_PAYMENT_METHOD,
(pm_id, description, factuges_payment_id))
return pm_id
pm_id = str(row[0])
logging.info("Payment method exists %s -> %s", factuges_payment_id, pm_id)
return pm_id
def insert_invoice_header(cur, company_id: str, cf: Dict[str, Any], hif: Dict[str, Any], customer_id: str,
payment_method_id: str, payment_method_description: str) -> str:
"""
Inserta cabecera y devuelve invoice_id
"""
invoice_id = str(uuid7())
logging.info("Inserting invoice %s %s %s",
invoice_id, hif.get('reference'), hif.get('invoice_date'))
cur.execute(
SQL.INSERT_INVOICE,
(
invoice_id, company_id, 'draft', 'F25/', hif.get('reference'), hif.get(
'invoice_date'), hif.get('operation_date'), hif.get('description'),
hif.get('subtotal_amount_value'), hif.get('discount_amount_value'), hif.get(
'discount_percentage_val'), hif.get('taxable_amount_value'),
hif.get('taxes_amount_value'), hif.get('total_amount_value'),
customer_id, cf.get("tin"), cf.get(
'name'), cf.get('street'), cf.get('city'),
cf.get('province'), cf.get('postal_code'), 'es',
payment_method_id, payment_method_description, hif.get(
'factuges_id'), company_id
),
)
return invoice_id
def insert_header_taxes_if_any(cur, invoice_id: str, IVA: str, RECARGO: str, hif: Dict[str, Any]) -> None:
"""
Inserta impuestos de cabecera
"""
# IVA (>= 0 acepta 0% también, si no quieres registrar 0, cambia condición a > 0)
if (IVA or 0) >= 0:
cur.execute(SQL.INSERT_INVOICE_TAX,
(str(uuid7()), invoice_id, hif.get('tax_code'), hif.get('base'), hif.get('iva')))
# Recargo equivalencia
if (RECARGO or 0) > 0:
cur.execute(SQL.INSERT_INVOICE_TAX,
(str(uuid7()), invoice_id, 'rec_5_2', hif.get('base'), hif.get('re')))
def insert_item_and_taxes(cur, invoice_id: str, fields: Dict[str, Any]) -> None:
"""
Inserta línea y sus impuestos derivados.
"""
item_id = str(uuid7())
logging.info("Inserting item %s pos=%s qty=%s",
item_id, fields.get('position'), fields.get('quantity_value'))
cur.execute(
SQL.INSERT_INVOICE_ITEM,
(item_id, invoice_id, fields.get('position'), fields.get('description'), fields.get('quantity_value'),
fields.get('unit_value'), fields.get('discount_percentage_value'), None, fields.get('total_value'))
)
logging.info("Inserting item tax %s code=%s base=%s tax=%s",
item_id, fields.get('tax_code'), fields.get('total_value'), fields.get('tax_amount'))
cur.execute(
SQL.INSERT_INVOICE_ITEM_TAX, (str(uuid7()), item_id, fields.get('tax_code'),
fields.get('total_value'), fields.get('tax_amount'))
)