236 lines
9.4 KiB
Python
236 lines
9.4 KiB
Python
import textwrap
|
|
from datetime import date
|
|
from decimal import ROUND_HALF_UP, Decimal
|
|
from typing import Any, Dict, Optional
|
|
|
|
from striprtf.striprtf import rtf_to_text
|
|
|
|
from app.config import load_config, logger
|
|
from app.utils import (
|
|
apply_discount_cents4,
|
|
cents,
|
|
cents4,
|
|
corregir_y_validar_email,
|
|
limpiar_cadena,
|
|
map_iva_code,
|
|
map_rec_by_iva_code,
|
|
money_round,
|
|
normalizar_telefono_con_plus,
|
|
normalizar_url_para_insert,
|
|
tax_fraction_from_code,
|
|
unscale_to_decimal,
|
|
)
|
|
|
|
|
|
def rtf_a_texto_plano(rtf: str) -> str:
|
|
"""
|
|
Convierte RTF a texto plano usando striprtf.
|
|
"""
|
|
return rtf_to_text(rtf).strip()
|
|
|
|
|
|
def normalize_customer_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Normaliza campos del registro de origen 'fd' (factura).
|
|
"""
|
|
config = load_config()
|
|
|
|
# >>> aquí usa tus helpers reales:
|
|
def clean_phone(v): return normalizar_telefono_con_plus(v)
|
|
def clean_web(v): return normalizar_url_para_insert(v)
|
|
def clean_tin(v): return limpiar_cadena(v)
|
|
|
|
email1_ok, email1 = corregir_y_validar_email(fd.get("EMAIL_1"))
|
|
email2_ok, email2 = corregir_y_validar_email(fd.get("EMAIL_2"))
|
|
|
|
return {
|
|
"is_company": config["CTE_IS_COMPANY"],
|
|
"tin": clean_tin(str(fd.get("NIF_CIF"))),
|
|
"name": str(fd.get("NOMBRE")),
|
|
"street": str(fd.get("CALLE")),
|
|
"city": str(fd.get("POBLACION")),
|
|
"province": str(fd.get("PROVINCIA")),
|
|
"postal_code": str(fd.get("CODIGO_POSTAL")),
|
|
"country": config['CTE_COUNTRY_CODE'],
|
|
"language_code": config['CTE_LANGUAGE_CODE'],
|
|
"phone_primary": clean_phone(fd.get("TELEFONO_1")),
|
|
"phone_secondary": clean_phone(fd.get("TELEFONO_2")),
|
|
"mobile_primary": clean_phone(fd.get("MOVIL_1")),
|
|
"mobile_secondary": clean_phone(fd.get("MOVIL_2")),
|
|
"email_primary": email1 if email1_ok else None,
|
|
"email_secondary": email2 if email2_ok else None,
|
|
"website": clean_web(str(fd.get("PAGINA_WEB")))
|
|
}
|
|
|
|
|
|
def normalize_header_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Normaliza campos del registro de origen 'fd' (factura).
|
|
|
|
# campos pendiente de revisar en un futuro
|
|
# xxxxxxx = str(factura_detalle['ID_EMPRESA'])
|
|
# xxxxxxx = str(factura_detalle['OBSERVACIONES'])
|
|
"""
|
|
config = load_config()
|
|
|
|
iva_code = map_iva_code(str(fd.get("DES_TIPO_IVA")))
|
|
# Cuota de IVA de la factura
|
|
iva_frac: Optional[Decimal] = tax_fraction_from_code(iva_code) # p.ej. Decimal('0.21') o Decimal('0')
|
|
iva_percentage_value = None if (
|
|
iva_frac is None or iva_frac == 0) else cents4(iva_frac)
|
|
iva_amount = cents(fd.get('IMPORTE_IVA'))
|
|
|
|
# Cuota de RE de la factura
|
|
rec_code = None
|
|
rec_frac = None
|
|
rec_percentage_value = None
|
|
rec_amount = None
|
|
importe_re = Decimal(str(fd.get("IMPORTE_RE") or 0))
|
|
if importe_re > 0:
|
|
rec_code = map_rec_by_iva_code(iva_code)
|
|
rec_frac: Optional[Decimal] = tax_fraction_from_code(rec_code) # p.ej. Decimal('0.05') o Decimal('0.01')
|
|
rec_percentage_value = None if (rec_frac is None or rec_frac == 0) else cents4(rec_frac)
|
|
rec_amount = cents(fd.get('IMPORTE_RE'))
|
|
|
|
return {
|
|
"company_id": config['CTE_COMPANY_ID'],
|
|
"is_proforma": config["CTE_IS_PROFORMA"],
|
|
"status": config["CTE_STATUS_INVOICE"],
|
|
"series": config['CTE_SERIE'],
|
|
|
|
"factuges_id": int(fd['ID_FACTURA']),
|
|
"reference": str(fd['REFERENCIA']),
|
|
# Se asigna la fecha de la subida que es cuando se va a presentar a la AEAT
|
|
"invoice_date": date.today().strftime("%Y-%m-%d"),
|
|
"operation_date": str(fd['FECHA_FACTURA']),
|
|
"description": textwrap.shorten(
|
|
f"{str(fd['REFERENCIA'])} - {str(fd.get('NOMBRE')) or ''}", width=50, placeholder="…"),
|
|
"notes": str(fd["OBSERVACIONES"]),
|
|
# siempre tendrán 2 decimales
|
|
'subtotal_amount_value': cents(fd.get('IMPORTE_NETO')),
|
|
'discount_amount_value': cents(fd.get('IMPORTE_DESCUENTO')),
|
|
'discount_percentage_val': int((Decimal(str(fd.get('DESCUENTO') or 0)) * 100).to_integral_value())
|
|
if fd.get('DESCUENTO') is not None else 0,
|
|
'taxable_amount_value': cents(fd.get('BASE_IMPONIBLE')),
|
|
'iva_code': iva_code,
|
|
'iva_percentage_value': iva_percentage_value,
|
|
'rec_code': rec_code,
|
|
'rec_percentage_value': rec_percentage_value,
|
|
'base': cents(fd.get('BASE_IMPONIBLE')),
|
|
'iva': iva_amount,
|
|
're': rec_amount,
|
|
'taxes_amount_value': cents((fd.get('IMPORTE_IVA') or 0) + (fd.get('IMPORTE_RE') or 0)),
|
|
'total_amount_value': cents(fd.get('IMPORTE_TOTAL')),
|
|
|
|
'retention_code': None,
|
|
'retention_percentage_value': None,
|
|
'retention_amount_value': None,
|
|
}
|
|
|
|
|
|
def normalize_details_invoice_fields(fd: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Normaliza campos de detalle de factura desde el registro de origen `fd`.
|
|
Escalas:
|
|
- quantity_value: escala 2 (cents)
|
|
- unit_value: escala 4 (cents4)
|
|
- discount_percentage_value: escala 2 (10 -> 1000)
|
|
- total_value: escala 4 (cents4)
|
|
- iva_amount: escala 2 (cents), calculado con redondeo a 2 decimales
|
|
-
|
|
"""
|
|
config = load_config()
|
|
|
|
# --- Campos base del origen ---
|
|
cantidad = fd.get("CANTIDAD")
|
|
importe_unidad = fd.get("IMPORTE_UNIDAD")
|
|
c = Decimal(str(cantidad or 0))
|
|
u = Decimal(str(importe_unidad or 0))
|
|
subtotal_amount_value = int((c * u * 10000).to_integral_value(rounding=ROUND_HALF_UP))
|
|
|
|
# total de línea (ya con descuento)
|
|
total_det = fd.get("IMPORTE_TOTAL_DET")
|
|
concepto_rtf = fd.get("CONCEPTO")
|
|
|
|
# --- cantidad y precio unitario ---
|
|
quantity_value = None if cantidad is None else cents(
|
|
cantidad) # escala 2
|
|
unit_value = None if importe_unidad is None else cents4(
|
|
importe_unidad) # escala 4
|
|
|
|
# --- descuento de linea % (escala 2) ---
|
|
disc_raw = fd.get("DESCUENTO_DET")
|
|
disc_pct: Optional[Decimal] = None if disc_raw is None else Decimal(
|
|
str(disc_raw))
|
|
discount_percentage_value = None if (
|
|
disc_pct is None or disc_pct == 0) else cents(disc_pct)
|
|
discount_amount_value, subtotal_amount_with_dto = apply_discount_cents4(subtotal_amount_value, disc_pct)
|
|
|
|
# --- descuento global por linea % (escala 2) ---
|
|
disc_global_raw = fd.get("DESCUENTO")
|
|
disc_global_pct: Optional[Decimal] = None if disc_global_raw is None else Decimal(str(disc_global_raw))
|
|
global_percentage_value = None if (disc_global_pct is None or disc_global_pct == 0) else cents(disc_global_pct)
|
|
global_discount_amount_value, taxable_amount_value = apply_discount_cents4(
|
|
subtotal_amount_with_dto, disc_global_pct)
|
|
|
|
total_discount_amount_value = discount_amount_value + global_discount_amount_value
|
|
|
|
# --- total de línea (escala 4) ---
|
|
total_value = cents4(total_det)
|
|
# DEBE SER LO MISMO LO CALCULADO QUE LO QUE NOS VIENE POR BD DE FACTUGES
|
|
logger.info("total con dto linea calculado: %s - total que llega: %s", subtotal_amount_with_dto, total_value)
|
|
# la base imponible sobre la que calcular impuestos es el neto menos el dto de linea y dto global
|
|
base = unscale_to_decimal(taxable_amount_value, 4)
|
|
|
|
logger.info("base imponible calculada: %s - subtotal: %s - descuentodto: %s - descuentoglobal: %s",
|
|
base, subtotal_amount_value, discount_amount_value, global_discount_amount_value)
|
|
|
|
# Calcular cuota de IVA de la línea
|
|
iva_code = map_iva_code(str(fd.get("DES_TIPO_IVA")))
|
|
iva_frac: Optional[Decimal] = tax_fraction_from_code(iva_code) # p.ej. Decimal('0.21') o Decimal('0')
|
|
iva_percentage_value = None if (
|
|
iva_frac is None or iva_frac == 0) else cents4(iva_frac)
|
|
iva_amount_c4 = 0
|
|
if iva_frac:
|
|
iva_amount_c4 = cents4(money_round(base * iva_frac, 2)) # escala 4
|
|
|
|
# Cuota cuota RE de la línea
|
|
rec_code = None
|
|
rec_frac: Optional[Decimal] = None
|
|
rec_percentage_value = None
|
|
rec_amount_c4 = 0
|
|
importe_re = Decimal(str(fd.get("IMPORTE_RE") or 0))
|
|
if importe_re > 0:
|
|
rec_code = map_rec_by_iva_code(iva_code)
|
|
rec_frac = tax_fraction_from_code(rec_code) # p.ej. Decimal('0.05') o Decimal('0.01')
|
|
if rec_frac and rec_frac != 0:
|
|
rec_percentage_value = cents4(rec_frac) # escala 4 del % (0.052 -> 520)
|
|
rec_amount_c4 = cents4(base * rec_frac) # escala 4
|
|
|
|
taxes_amount_value = iva_amount_c4 + rec_amount_c4
|
|
|
|
total_value = taxable_amount_value + taxes_amount_value
|
|
|
|
return {
|
|
'position': int(fd.get("POSICION") or 0),
|
|
'description': rtf_a_texto_plano(str(concepto_rtf or "")),
|
|
'quantity_value': quantity_value,
|
|
'unit_value': unit_value,
|
|
'subtotal_amount_value': subtotal_amount_value,
|
|
'disc_pct': disc_pct,
|
|
'discount_percentage_value': discount_percentage_value,
|
|
'discount_amount_value': discount_amount_value,
|
|
'global_percentage_value': global_percentage_value,
|
|
'global_discount_amount_value': global_discount_amount_value,
|
|
'total_discount_amount_value': total_discount_amount_value,
|
|
'taxable_amount_value': taxable_amount_value,
|
|
'total_value': total_value,
|
|
'iva_code': iva_code,
|
|
'iva_percentage_value': iva_percentage_value,
|
|
'iva_amount_value': iva_amount_c4,
|
|
'rec_code': rec_code,
|
|
'rec_percentage_value': rec_percentage_value,
|
|
'rec_amount_value': rec_amount_c4,
|
|
'taxes_amount_value': taxes_amount_value,
|
|
}
|