Uecko_ERP_FactuGES_sync/app/db/sync_catalog.py
2025-08-28 10:51:05 +02:00

220 lines
8.7 KiB
Python

import logging
from uuid import uuid4
from config import load_config
def sync_catalog(conn_factuges, conn_mysql, last_execution_date):
config = load_config()
logging.info(f"Tarifa: {config['FACTUGES_NOMBRE_TARIFA']}")
logging.info(f"Precio punto: {config['FACTUGES_PRECIO_PUNTO']}")
# Construir la consulta SQL con la condición de fecha de modificación
consulta_sql_art_modificados = (
f"SELECT art.id || '' AS id, art.tarifa as tarifa, COALESCE(art.referencia,'') AS referencia, "
f"TRIM(COALESCE(art.familia, '') || ' ' || COALESCE(art.referencia_prov, '') || ' ' || COALESCE(art.descripcion, '')) as descripcion_es, "
f"TRIM(COALESCE(art_idioma_en.descripcion, '')) AS descripcion_en, "
f"TRUNC(art.precio_coste * 100) || '' AS puntos, "
f"TRUNC(ROUND(art.precio_coste * {
config['FACTUGES_PRECIO_PUNTO']}, 2) * 100) || '' AS pvp "
f"FROM articulos AS art "
f"LEFT JOIN ARTICULOS_IDIOMAS AS art_idioma_en ON art.id = art_idioma_en.id_articulo AND art_idioma_en.id_idioma = 2 "
f"WHERE "
f"(art.eliminado = 0) AND "
f"(art.tarifa = '{config['FACTUGES_NOMBRE_TARIFA']}') "
f"AND (art.FECHA_MODIFICACION > '{last_execution_date}')"
)
consulta_sql_all_tarifa = (
f"SELECT art.id || '' AS id, art.tarifa as tarifa "
f"FROM articulos AS art "
f"WHERE "
f"(art.eliminado = 0) AND "
f"(art.tarifa = '{config['FACTUGES_NOMBRE_TARIFA']}') "
)
# Crear un cursor para ejecutar consultas SQL
cursor_FactuGES = None
try:
cursor_FactuGES = conn_factuges.cursor()
# Ejecutar la consulta de articulos modificados
cursor_FactuGES.execute(consulta_sql_art_modificados)
filas = cursor_FactuGES.fetchall()
except Exception as e:
if cursor_FactuGES is not None:
cursor_FactuGES.close()
logging.error(f"(ERROR) Failed to fetch from database:{
config['FACTUGES_DATABASE']} - using user:{config['FACTUGES_USER']}")
logging.error(e)
raise e
# Obtener los nombres de las columnas
columnas = [desc[0] for desc in cursor_FactuGES.description]
cursor_FactuGES.close()
# Convertir las filas en diccionarios con nombres de columnas como claves
tuplas_seleccionadas = []
for fila in filas:
tupla = dict(zip(columnas, fila))
tuplas_seleccionadas.append(tupla)
logging.info(f"Catalog rows to be processed: {len(tuplas_seleccionadas)}")
# Verificar si hay filas en el resultado
if tuplas_seleccionadas:
insertar_datos(conn_mysql, tuplas_seleccionadas, config)
else:
logging.info(
"There are no new or modified catalog rows since the last run.")
# Verificamos que en el catálogo de mysql solo hay los artículos del catálogo de FactuGES
# es decir, que si un artículo lo asignan a otra tarifa debe desaparecer del catálogo mysql
try:
cursor_FactuGES.execute(consulta_sql_all_tarifa)
filas = cursor_FactuGES.fetchall()
cursor_FactuGES.close()
# Crear un conjunto con los IDs [0] de los artículos en FactuGES para una búsqueda rápida
ids_factuges = {str(fila[0]) for fila in filas}
logging.info(f"{config['FACTUGES_NOMBRE_TARIFA']} rows to be processed: {
len(ids_factuges)}")
# Verificar si hay filas en el resultado
if ids_factuges:
eliminar_datos(conn_mysql, ids_factuges, config)
else:
logging.info(f"There are no rows in the {
config['FACTUGES_NOMBRE_TARIFA']}.")
except Exception as e:
if cursor_FactuGES is not None:
cursor_FactuGES.close()
logging.error(f"(ERROR) Failed to fetch from database:{
config['FACTUGES_DATABASE']} - using user:{config['FACTUGES_USER']}")
logging.error(e)
raise e
def eliminar_datos(conn_mysql, ids_factuges, config):
# Recorrer todos los articulos del catálogo web para ver si estan en filas, si no están se eliminan
select_all_catalog_query = (
"SELECT catalog.id, catalog.id_article FROM catalog"
)
delete_catalog_query = (
"DELETE FROM catalog WHERE catalog.id_article = %s"
)
cursorMySQL = None
try:
cursorMySQL = conn_mysql.cursor()
cursorMySQL.execute(select_all_catalog_query)
catalog_rows = cursorMySQL.fetchall()
logging.info(
f">>>>Comprobar que todos los artículos del catálogo existen en FactuGES")
ids_a_eliminar = [
catalog_row[1] # id_article
for catalog_row in catalog_rows
if str(catalog_row[1]) not in ids_factuges
]
if ids_a_eliminar:
logging.info(f"Deleting articles: {ids_a_eliminar}")
cursorMySQL.executemany(delete_catalog_query, [(
id_article,) for id_article in ids_a_eliminar])
else:
logging.info("No articles to delete.")
except Exception as e:
# Escribir el error en el archivo de errores
logging.error(str(e))
raise e # Re-lanzar la excepción para detener el procesamiento
finally:
# Cerrar la conexión
if cursorMySQL is not None:
cursorMySQL.close()
def insertar_datos(conn_mysql, filas, config):
insert_catalog_query = (
"INSERT INTO catalog (id, catalog_name, id_article, points, retail_price, created_at, updated_at) "
"VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"
)
insert_translation_query = (
"INSERT INTO catalog_translations (id, lang_code, catalog_id, description) "
"VALUES (%s, %s, %s, %s)"
)
update_catalog_query = (
"UPDATE catalog set "
"points = %s, "
"retail_price = %s, "
"updated_at = Now() "
"WHERE id_article=%s"
)
update_translation_query = (
"UPDATE catalog_translations SET "
"description = %s "
"WHERE lang_code = %s AND "
"catalog_id IN (SELECT catalog.id FROM catalog WHERE catalog.id_article = %s)"
)
select_catalog_query = (
"SELECT count(catalog.id) FROM catalog WHERE catalog.id_article = %s"
)
cursorMySQL = None
try:
cursorMySQL = conn_mysql.cursor()
# Insertar datos en la tabla 'catalog'
for articulo in filas:
# Generar un ID único para la tabla catalog
id_catalog = str(uuid4())
id_article = int(articulo['ID'])
points = int(articulo['PUNTOS'])
retail_price = int(articulo['PVP'])
tarifa = config['FACTUGES_NOMBRE_TARIFA']
cursorMySQL.execute(select_catalog_query, (id_article, ))
row_count = cursorMySQL.fetchone()
is_new = row_count[0] < 1
if is_new:
logging.info(f"Inserting article {id_article} {tarifa}")
cursorMySQL.execute(
insert_catalog_query, (id_catalog, tarifa, id_article, points, retail_price))
else:
logging.info(f"Updating article {id_article} {tarifa}")
cursorMySQL.execute(update_catalog_query,
(points, retail_price, id_article))
# Insertar traducciones en la tabla 'catalog_translations'
for lang_code, desc_key in [('es', 'DESCRIPCION_ES'), ('en', 'DESCRIPCION_EN')]:
descripcion_traducida = articulo.get(desc_key, '')
if descripcion_traducida:
if (is_new):
logging.info(f"Inserting translation {
lang_code} {descripcion_traducida}")
# Generar un ID único para cada traducción
id_translation = str(uuid4())
cursorMySQL.execute(
insert_translation_query, (id_translation, lang_code, id_catalog, descripcion_traducida))
else:
logging.info(f"Updating translation {
lang_code} {descripcion_traducida}")
cursorMySQL.execute(
update_translation_query, (descripcion_traducida, lang_code, id_article))
except Exception as e:
# Escribir el error en el archivo de errores
logging.error(str(e))
raise e # Re-lanzar la excepción para detener el procesamiento
finally:
# Cerrar la conexión
if cursorMySQL is not None:
cursorMySQL.close()