This commit is contained in:
David Arranz 2025-11-21 19:42:03 +01:00
parent 9a34e38557
commit bb91c34ba7
6 changed files with 62 additions and 22 deletions

View File

@ -1,9 +1,10 @@
import logging import logging
from datetime import date
from config import load_config from config import load_config
import textwrap import textwrap
from typing import Dict, Any from typing import Dict, Any, Optional
from decimal import Decimal, ROUND_HALF_UP from decimal import Decimal, ROUND_HALF_UP
from utils import limpiar_cadena, normalizar_telefono_con_plus, corregir_y_validar_email, normalizar_url_para_insert, map_tax_code, cents, money_round, tax_fraction_from_code from utils import limpiar_cadena, normalizar_telefono_con_plus, corregir_y_validar_email, normalizar_url_para_insert, map_tax_code, cents, cents4, money_round, tax_fraction_from_code
from striprtf.striprtf import rtf_to_text from striprtf.striprtf import rtf_to_text
@ -66,7 +67,8 @@ def normalize_header_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
"factuges_id": int(fd['ID_FACTURA']), "factuges_id": int(fd['ID_FACTURA']),
"reference": str(fd['REFERENCIA']), "reference": str(fd['REFERENCIA']),
"invoice_date": str(fd['FECHA_FACTURA']), # Se asigna la fecha de la subida que es cuando se va a presentar a la AEAT
"invoice_date": date.today().strftime("%Y-%m-%d"),
"operation_date": str(fd['FECHA_FACTURA']), "operation_date": str(fd['FECHA_FACTURA']),
"description": textwrap.shorten( "description": textwrap.shorten(
f"{str(fd['REFERENCIA'])} - {str(fd.get('NOMBRE')) or ''}", width=50, placeholder=""), f"{str(fd['REFERENCIA'])} - {str(fd.get('NOMBRE')) or ''}", width=50, placeholder=""),
@ -90,19 +92,51 @@ def normalize_header_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
def normalize_details_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]: def normalize_details_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
""" """
Normaliza campos del registro de origen 'fd' (factura). Normaliza campos de detalle de factura desde el registro de origen `fd`.
Escalas:
- quantity_value: escala 2 (cents)
- unit_value: escala 4 (cents4)
- discount_percentage_value: escala 2 (10 -> 1000)
- total_value: escala 4 (cents4)
- tax_amount: escala 2 (cents), calculado con redondeo a 2 decimales
""" """
config = load_config() config = load_config()
tax_code = map_tax_code(str(fd.get("DES_TIPO_IVA"))) tax_code = map_tax_code(str(fd.get("DES_TIPO_IVA")))
disc_pct = fd.get('DESCUENTO_DET')
# Calcular cuota de IVA de la línea # Calcular cuota de IVA de la línea
frac = tax_fraction_from_code(tax_code) # 0.21, 0.10, 0… frac: Optional[Decimal] = tax_fraction_from_code(
# Ttotal_det ya incluye descuento tax_code) # p.ej. Decimal('0.21') o Decimal('0')
line_base = Decimal(str(fd.get('IMPORTE_TOTAL_DET') or 0))
tax_amount = int( # --- Campos base del origen ---
(money_round(line_base * frac, 2)*100).to_integral_value()) cantidad = fd.get("CANTIDAD")
importe_unidad = fd.get("IMPORTE_UNIDAD")
# total de línea (ya con descuento)
total_det = fd.get("IMPORTE_TOTAL_DET")
concepto_rtf = fd.get("CONCEPTO")
# --- cantidad y precio unitario ---
quantity_value = None if cantidad is None else cents(
cantidad) # escala 2
unit_value = None if importe_unidad is None else cents4(
importe_unidad) # escala 4
# --- descuento % (escala 2) ---
disc_raw = fd.get("DESCUENTO_DET")
disc_pct: Optional[Decimal] = None if disc_raw is None else Decimal(
str(disc_raw))
discount_percentage_value = None if (
disc_pct is None or disc_pct == 0) else cents(disc_pct)
# --- total de línea (escala 4) ---
total_value = cents4(total_det)
# --- cuota (escala 2) ---
# calculamos sobre el total en unidades monetarias con redondeo bancario a 2 decimales
if frac is None:
tax_amount = 0
else:
base = Decimal(str(total_det or 0))
tax_amount = cents(money_round(base * frac, 2))
# Se calcula en el objeto de negocio de nuevo, comprobar si coincide # Se calcula en el objeto de negocio de nuevo, comprobar si coincide
# item_discount_amount = ( # item_discount_amount = (
@ -110,12 +144,12 @@ def normalize_details_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
return { return {
'tax_code': tax_code, 'tax_code': tax_code,
'position': int(fd['POSICION']), 'position': int(fd.get("POSICION") or 0),
'description': rtf_a_texto_plano(str(fd['CONCEPTO'])), 'description': rtf_a_texto_plano(str(concepto_rtf or "")),
'quantity_value': None if fd['CANTIDAD'] is None else int(Decimal(str(fd['CANTIDAD'] or 0)) * 100), 'quantity_value': quantity_value,
'unit_value': None if fd['IMPORTE_UNIDAD'] is None else int(Decimal(str(fd['IMPORTE_UNIDAD'] or 0)) * 10000), 'unit_value': unit_value,
'disc_pct': disc_pct, 'disc_pct': disc_pct,
'discount_percentage_value': None if disc_pct in (None, 0) else int(Decimal(str(disc_pct)) * 100), 'discount_percentage_value': discount_percentage_value,
'total_value': cents(fd.get('IMPORTE_TOTAL_DET')), 'total_value': total_value,
'tax_amount': tax_amount 'tax_amount': tax_amount
} }

View File

@ -140,14 +140,13 @@ LIMPIAR_FACTUGES_LINK = (
# OPCION A SACAMOS EL RESUMEN DE LA TAXES DE LA CABECERA # OPCION A SACAMOS EL RESUMEN DE LA TAXES DE LA CABECERA
consulta_sql_customer_invoices_issue = ( consulta_sql_customer_invoices_issue = (
f"SELECT ci.id, ci.series, ci.invoice_number, ci.invoice_date, ci.description, ci.customer_tin, ci.customer_name, ci.total_amount_value, ci.total_amount_scale, ci.reference, " f"SELECT ci.id, ci.series, ci.invoice_number, ci.invoice_date, ci.description, ci.customer_tin, ci.customer_name, ci.total_amount_value, ci.total_amount_scale, ci.reference, "
f"cit.taxable_amount_scale, cit.taxes_amount_scale, cit.tax_code, sum(cit.taxable_amount_value) as taxable_amount_value, sum(cit.taxes_amount_value) as taxes_amount_value, " f"cit.taxable_amount_scale, cit.taxes_amount_scale, cit.tax_code, cit.taxable_amount_value, cit.taxes_amount_value, "
f"vr.id as vrId, vr.uuid, vr.estado " f"vr.id as vrId, vr.uuid, vr.estado "
f"FROM customer_invoices as ci " f"FROM customer_invoices as ci "
f"LEFT JOIN customer_invoice_taxes cit on (ci.id = cit.invoice_id) " f"LEFT JOIN customer_invoice_taxes cit on (ci.id = cit.invoice_id) "
f"LEFT JOIN verifactu_records vr on (ci.id = vr.invoice_id) " f"LEFT JOIN verifactu_records vr on (ci.id = vr.invoice_id) "
f"WHERE (ci.is_proforma = 0) AND (ci.status= 'issued') " f"WHERE (ci.is_proforma = 0) AND (ci.status= 'issued') "
f"AND (vr.estado <> 'Correcto') " f"AND (vr.estado <> 'Correcto') "
f"group by 1,2,3,4,5,6,7,8,9,10,11 "
f"order by reference" f"order by reference"
) )

View File

@ -241,7 +241,7 @@ def insert_invoice_header(cur: str, cf: Dict[str, Any], hif: Dict[str, Any], cus
invoice_id = str(uuid7()) invoice_id = str(uuid7())
logging.info("Inserting invoice %s %s %s %s", logging.info("Inserting invoice %s %s %s %s",
invoice_id, hif.get('reference'), hif.get('invoice_date'), config['CTE_STATUS_INVOICE']) invoice_id, hif.get('reference'), hif.get('invoice_date'), hif.get('operation_date'), config['CTE_STATUS_INVOICE'])
cur.execute( cur.execute(
SQL.INSERT_INVOICE, SQL.INSERT_INVOICE,
( (

View File

@ -108,8 +108,9 @@ def procesar_factura_verifactu(
respuesta = crear_factura(factura, config) respuesta = crear_factura(factura, config)
if respuesta.get("status") == 200 and respuesta.get("ok"): if respuesta.get("status") == 200 and respuesta.get("ok"):
data = respuesta.get("data") data = respuesta.get("data")
qr_verifactu = f"data:image/png;base64,{data.get('qr', '')}"
cursor_mysql.execute(SQL.update_verifactu_records_with_invoiceId, (data.get("estado"), data.get( cursor_mysql.execute(SQL.update_verifactu_records_with_invoiceId, (data.get("estado"), data.get(
"uuid"), data.get("url"), data.get("qr"), factura.get("id"))) "uuid"), data.get("url"), qr_verifactu, factura.get("id")))
logging.info( logging.info(
f">>> Factura {factura.get("reference")} registrada en Verifactu") f">>> Factura {factura.get("reference")} registrada en Verifactu")
return True return True

View File

@ -5,7 +5,7 @@ from .send_orders_mail import send_orders_mail
from .text_converter import text_converter, limpiar_cadena from .text_converter import text_converter, limpiar_cadena
from .send_rest_api import validar_nif, estado_factura, crear_factura from .send_rest_api import validar_nif, estado_factura, crear_factura
from .tax_catalog_helper import TaxCatalog, get_default_tax_catalog, map_tax_code, calc_item_tax_amount, tax_fraction_from_code from .tax_catalog_helper import TaxCatalog, get_default_tax_catalog, map_tax_code, calc_item_tax_amount, tax_fraction_from_code
from .importes_helper import unscale_to_str, unscale_to_decimal, cents, money_round from .importes_helper import unscale_to_str, unscale_to_decimal, cents, cents4, money_round
from .telefonos_helper import normalizar_telefono_con_plus from .telefonos_helper import normalizar_telefono_con_plus
from .mails_helper import corregir_y_validar_email from .mails_helper import corregir_y_validar_email
from .websites_helper import normalizar_url_para_insert from .websites_helper import normalizar_url_para_insert

View File

@ -2,6 +2,12 @@ from decimal import Decimal, ROUND_HALF_UP
from typing import Any, Optional from typing import Any, Optional
def cents4(value: Optional[Decimal | float | int]) -> int:
"""Convierte a centésimas (valor * 10000). Soporta None."""
v = Decimal(str(value or 0))
return int((v * 10000).to_integral_value(rounding=ROUND_HALF_UP))
def cents(value: Optional[Decimal | float | int]) -> int: def cents(value: Optional[Decimal | float | int]) -> int:
"""Convierte a centésimas (valor * 100). Soporta None.""" """Convierte a centésimas (valor * 100). Soporta None."""
v = Decimal(str(value or 0)) v = Decimal(str(value or 0))