.
This commit is contained in:
parent
41ebca932a
commit
b0c04bbe42
16
.env.example
16
.env.example
@ -13,10 +13,20 @@ LOG_LEVEL=INFO
|
||||
# Secret manager
|
||||
# --------------------
|
||||
SECRET_PROVIDER=infisical # fake | google | infisical
|
||||
GCP_PROJECT_ID= # required if SECRET_PROVIDER=google
|
||||
|
||||
# Infisical
|
||||
INFISICAL_HOST=https://eu.infisical.com
|
||||
INFISICAL_CLIENT_ID=35f83820-a9d3-4622-a0ab-ae6170f662fa
|
||||
INFISICAL_CLIENT_SECRET=2c158d6f77fe4fc684bdb78d0c1ed21ed7a762885f508facc575aa05c42397c5
|
||||
INFISICAL_PROJECT_ID=0bd3c2e0-39f5-4f92-8c6e-49bcc7eda896
|
||||
INFISICAL_ENV_SLUG=dev
|
||||
INFISICAL_TOKEN_AUTH=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZGVudGl0eUlkIjoiMmNjYmRkZmMtMGQyOC00ZmIwLTlhMGEtNWQyYTZiZWU3YzliIiwiaWRlbnRpdHlBY2Nlc3NUb2tlbklkIjoiZWMzNjg1OGUtNTkzYi00ZTE3LWI2ZmItN2NhODdlZDBhMmZhIiwiYXV0aFRva2VuVHlwZSI6ImlkZW50aXR5QWNjZXNzVG9rZW4iLCJpYXQiOjE3Njk2MTQ3NzQsImV4cCI6MTc3MjIwNjc3NH0.KZvzabbsgevhukPfc8LSlkOwos5eBaGvaqC2XPJEGHI
|
||||
|
||||
# Google
|
||||
GCP_PROJECT_ID=
|
||||
|
||||
# --------------------
|
||||
# PDF signing
|
||||
# --------------------
|
||||
PDF_CERT_SECRET_NAME=pdf-signing-cert
|
||||
PDF_CERT_PASSWORD_SECRET_NAME=pdf-signing-cert-password
|
||||
PDF_CERT_SECRET_NAME=pdf_cert_pfx_b64
|
||||
PDF_CERT_PASSWORD_SECRET_NAME=pdf_cert_password
|
||||
5
.infisical.json
Normal file
5
.infisical.json
Normal file
@ -0,0 +1,5 @@
|
||||
{
|
||||
"workspaceId": "0bd3c2e0-39f5-4f92-8c6e-49bcc7eda896",
|
||||
"defaultEnvironment": "dev",
|
||||
"gitBranchToEnvironmentMapping": null
|
||||
}
|
||||
42
docs/development-certificate.md
Normal file
42
docs/development-certificate.md
Normal file
@ -0,0 +1,42 @@
|
||||
## 1. Generar un certificado de prueba (local)
|
||||
|
||||
### 1.1 Generar clave privada y certificado
|
||||
|
||||
```bash
|
||||
openssl req -x509 -newkey rsa:2048 \
|
||||
-keyout dev.key \
|
||||
-out dev.crt \
|
||||
-days 90 \
|
||||
-nodes \
|
||||
-subj "/C=ES/O=ACME DEV/CN=ACME DEV TEST CERT"
|
||||
```
|
||||
|
||||
Esto genera:
|
||||
- `dev.key` → clave privada
|
||||
- `dev.crt` → certificado autofirmado
|
||||
- expiración: 90 días
|
||||
|
||||
### 1.2 Crear el archivo PFX (PKCS#12)
|
||||
|
||||
```bash
|
||||
openssl pkcs12 -export \
|
||||
-out dev.pfx \
|
||||
-inkey dev.key \
|
||||
-in dev.crt \
|
||||
-password pass:devpassword
|
||||
```
|
||||
|
||||
Resultado:
|
||||
|
||||
- archivo: `dev.pfx`
|
||||
- password: `devpassword`
|
||||
|
||||
## 2. Convertir el certificado a base64
|
||||
|
||||
```bash
|
||||
base64 dev.pfx > dev.pfx.b64
|
||||
```
|
||||
|
||||
Comprueba que:
|
||||
- el archivo no esté vacío
|
||||
- contiene texto base64 válido
|
||||
185
docs/development-secrets.md
Normal file
185
docs/development-secrets.md
Normal file
@ -0,0 +1,185 @@
|
||||
# Development Secrets & Test Certificates
|
||||
|
||||
Este documento describe **cómo generar y configurar secretos de prueba** para el
|
||||
entorno `development` del Factuges Document Signing Service.
|
||||
|
||||
⚠️ **IMPORTANTE**
|
||||
Este procedimiento es **solo para desarrollo y pruebas internas**.
|
||||
NO usar certificados reales ni secretos de producción.
|
||||
|
||||
---
|
||||
|
||||
## Objetivo
|
||||
|
||||
En `development` queremos:
|
||||
|
||||
- Probar el flujo completo de firmado de PDFs
|
||||
- Usar certificados de prueba (autofirmados)
|
||||
- Almacenar secretos en Infisical
|
||||
- Mantener el mismo flujo que en producción
|
||||
|
||||
La validez legal **NO es un objetivo en development**.
|
||||
|
||||
---
|
||||
|
||||
## Qué secretos necesita el Signing Service
|
||||
|
||||
Para firmar documentos PDF se necesitan exactamente **dos secretos**:
|
||||
|
||||
| Secreto | Descripción |
|
||||
|------|------------|
|
||||
| `PDF_CERT_PFX_B64` | Certificado PKCS#12 (`.pfx`) codificado en base64 |
|
||||
| `PDF_CERT_PASSWORD` | Password del certificado |
|
||||
|
||||
Estos secretos **solo existen en Infisical**, nunca en el repositorio.
|
||||
|
||||
---
|
||||
|
||||
## 1. Generar un certificado de prueba (local)
|
||||
|
||||
Usamos un certificado **autofirmado**, válido técnicamente pero **no legalmente**.
|
||||
|
||||
### 1.1 Generar clave privada y certificado
|
||||
|
||||
```bash
|
||||
openssl req -x509 -newkey rsa:2048 \
|
||||
-keyout dev.key \
|
||||
-out dev.crt \
|
||||
-days 3600 \
|
||||
-nodes \
|
||||
-subj "/C=ES/O=ACME DEV/CN=ACME DEV TEST CERT"
|
||||
```
|
||||
|
||||
Esto genera:
|
||||
|
||||
- dev.key → clave privada
|
||||
- dev.crt → certificado autofirmado
|
||||
- expiración: 3600 días
|
||||
|
||||
### 1.2 Crear el archivo PFX (PKCS#12)
|
||||
|
||||
```bash
|
||||
openssl pkcs12 -export \
|
||||
-out dev.pfx \
|
||||
-inkey dev.key \
|
||||
-in dev.crt \
|
||||
-password pass:devpassword
|
||||
```
|
||||
|
||||
|
||||
Resultado:
|
||||
|
||||
- archivo: dev.pfx
|
||||
- password: devpassword
|
||||
- ⚠️ Password solo para development
|
||||
|
||||
## 2. Convertir el certificado a base64
|
||||
|
||||
```bash
|
||||
base64 dev.pfx > dev.pfx.b64
|
||||
```
|
||||
|
||||
Comprueba que:
|
||||
|
||||
- el archivo no esté vacío
|
||||
- contiene texto base64 válido
|
||||
|
||||
## 3. Guardar los secretos en Infisical (environment = development)
|
||||
|
||||
### 3.1 Desde la UI de Infisical
|
||||
|
||||
- Accede a Infisical Cloud
|
||||
- Selecciona el Project
|
||||
- Selecciona el Environment: development
|
||||
- Ve a Secrets
|
||||
- Crea los siguientes secretos:
|
||||
|
||||
### Secreto 1
|
||||
|
||||
- Key: PDF_CERT_PFX_B64
|
||||
- Value: contenido de dev.pfx.b64
|
||||
- Type: secret
|
||||
|
||||
### Secreto 2
|
||||
|
||||
- Key: PDF_CERT_PASSWORD
|
||||
- Value: devpassword
|
||||
- Type: secret
|
||||
|
||||
Guarda los cambios.
|
||||
|
||||
### 3.2 (Opcional) Usando Infisical CLI
|
||||
|
||||
```bash
|
||||
infisical secrets set PDF_CERT_PFX_B64="$(cat dev.pfx.b64)"
|
||||
infisical secrets set PDF_CERT_PASSWORD="devpassword"
|
||||
```
|
||||
|
||||
|
||||
##4. Configuración local del Signing Service
|
||||
|
||||
Ejemplo de .env local:
|
||||
|
||||
```bash
|
||||
APP_ENV=development
|
||||
SECRET_PROVIDER=infisical
|
||||
|
||||
INFISICAL_PROJECT_ID=your_project_id
|
||||
INFISICAL_ENV_SLUG=development
|
||||
INFISICAL_TOKEN_AUTH=your_dev_token_auth
|
||||
|
||||
PDF_CERT_PFX_SECRET_NAME=PDF_CERT_PFX_B64
|
||||
PDF_CERT_PASSWORD_SECRET_NAME=PDF_CERT_PASSWORD
|
||||
```
|
||||
|
||||
***📌 El archivo .env NO debe commitearse.***
|
||||
|
||||
## 5. Comprobación manual
|
||||
|
||||
- 1. Arranca el Signing Service
|
||||
- 2. Llama al endpoint de firmado con un PDF de prueba
|
||||
- 3. Abre el PDF firmado con:
|
||||
- Adobe Reader
|
||||
- Okular
|
||||
- Foxit
|
||||
|
||||
Resultado esperado:
|
||||
|
||||
- La firma aparece como válida técnicamente
|
||||
- El visor muestra advertencia:
|
||||
```
|
||||
“El certificado no es de confianza”
|
||||
```
|
||||
|
||||
***✔️ Esto es correcto en development.***
|
||||
|
||||
## 6. Casos de prueba recomendados
|
||||
|
||||
En development se recomienda probar:
|
||||
|
||||
### Certificado caducado
|
||||
|
||||
- Generar un certificado con -days -1
|
||||
- Verificar que el servicio bloquea la firma
|
||||
|
||||
### Password incorrecto
|
||||
|
||||
- Cambiar PDF_CERT_PASSWORD
|
||||
- Verificar error controlado
|
||||
|
||||
### Certificado eliminado
|
||||
|
||||
- Borrar los secretos en Infisical
|
||||
- Verificar error CERT_NOT_CONFIGURED
|
||||
|
||||
### Rotación manual
|
||||
- Reemplazar PDF_CERT_PFX_B64 por otro certificado
|
||||
- Verificar que el servicio sigue firmando
|
||||
|
||||
## 7. Qué NO hacer nunca
|
||||
|
||||
❌ No usar certificados reales
|
||||
❌ No reutilizar secretos de producción
|
||||
❌ No commitear .pfx, .key, .crt
|
||||
❌ No loguear secretos
|
||||
❌ No compartir tokens de Infisical
|
||||
@ -3,7 +3,7 @@ requires = ["setuptools>=68", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "signing-service"
|
||||
name = "factuges-document-signing-service"
|
||||
version = "0.1.1"
|
||||
description = "FastAPI service for signing PDF documents using external secret managers"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
15
setup.cfg
Normal file
15
setup.cfg
Normal file
@ -0,0 +1,15 @@
|
||||
|
||||
[metadata]
|
||||
name = "factuges-document-signing-service"
|
||||
version = "0.1.1"
|
||||
description = "FastAPI service for signing PDF documents using external secret managers"
|
||||
author = Rodax Software
|
||||
author_email = info@rodax-software.com
|
||||
long_description = file: README.md
|
||||
long_description_content_type = text/markdown
|
||||
url = https://factuges.app
|
||||
license = "Propietaria"
|
||||
classifiers =
|
||||
Programming Language :: Python :: 3.11
|
||||
License :: OSI Approved :: MIT License
|
||||
Operating System :: OS Independent
|
||||
@ -23,6 +23,8 @@ router = APIRouter(prefix="/documents", tags=["documents"])
|
||||
async def sign_document(
|
||||
file: UploadFile = File(...),
|
||||
certificate_secret_name: str = Form(...),
|
||||
certificate_password_secret_name: str = Form(...),
|
||||
company_slug: str = Form(...),
|
||||
):
|
||||
try:
|
||||
settings = get_settings()
|
||||
@ -39,9 +41,11 @@ async def sign_document(
|
||||
command = SignDocumentCommand(
|
||||
pdf_bytes=pdf_bytes,
|
||||
certificate_secret_name=certificate_secret_name,
|
||||
certificate_password_secret_name=certificate_password_secret_name,
|
||||
company_slug=company_slug,
|
||||
)
|
||||
|
||||
result = use_case.execute(command)
|
||||
result = await use_case.execute(command)
|
||||
|
||||
return Response(
|
||||
content=result.signed_pdf_bytes,
|
||||
|
||||
@ -8,11 +8,19 @@ class AppSettings:
|
||||
# --------------------
|
||||
app_env: str
|
||||
log_level: str
|
||||
state_path: str
|
||||
local_tz: str
|
||||
|
||||
# --------------------
|
||||
# Secret manager
|
||||
# --------------------
|
||||
secret_provider: str
|
||||
infisical_host: str
|
||||
infisical_client_id: str | None
|
||||
infisical_client_secret: str | None
|
||||
infisical_project_id: str | None
|
||||
infisical_env_slug: str | None
|
||||
infisical_token_auth: str | None
|
||||
gcp_project_id: str | None
|
||||
|
||||
# --------------------
|
||||
|
||||
@ -46,6 +46,16 @@ def load_app_settings() -> AppSettings:
|
||||
normalize=str.upper,
|
||||
)
|
||||
|
||||
state_path = get_env(
|
||||
"STATE_PATH",
|
||||
default=".",
|
||||
)
|
||||
|
||||
local_tz = get_env(
|
||||
"LOCAL_TZ",
|
||||
default="UTC",
|
||||
)
|
||||
|
||||
# --------------------
|
||||
# Secret manager
|
||||
# --------------------
|
||||
@ -61,6 +71,36 @@ def load_app_settings() -> AppSettings:
|
||||
"Allowed values: fake | google | infisical"
|
||||
)
|
||||
|
||||
infisical_host = get_env(
|
||||
"INFISICAL_HOST",
|
||||
default="https://eu.infisical.com",
|
||||
)
|
||||
|
||||
infisical_client_id = get_env(
|
||||
"INFISICAL_CLIENT_ID",
|
||||
required=secret_provider == "infisical",
|
||||
)
|
||||
|
||||
infisical_client_secret = get_env(
|
||||
"INFISICAL_CLIENT_SECRET",
|
||||
required=secret_provider == "infisical",
|
||||
)
|
||||
|
||||
infisical_project_id = get_env(
|
||||
"INFISICAL_PROJECT_ID",
|
||||
required=secret_provider == "infisical",
|
||||
)
|
||||
|
||||
infisical_env_slug = get_env(
|
||||
"INFISICAL_ENV_SLUG",
|
||||
required=secret_provider == "infisical",
|
||||
)
|
||||
|
||||
infisical_token_auth = get_env(
|
||||
"INFISICAL_TOKEN_AUTH",
|
||||
required=secret_provider == "infisical",
|
||||
)
|
||||
|
||||
gcp_project_id = get_env(
|
||||
"GCP_PROJECT_ID",
|
||||
required=secret_provider == "google",
|
||||
@ -85,7 +125,15 @@ def load_app_settings() -> AppSettings:
|
||||
return AppSettings(
|
||||
app_env=app_env,
|
||||
log_level=log_level,
|
||||
state_path=state_path,
|
||||
local_tz=local_tz,
|
||||
secret_provider=secret_provider,
|
||||
infisical_host=infisical_host,
|
||||
infisical_client_id=infisical_client_id,
|
||||
infisical_client_secret=infisical_client_secret,
|
||||
infisical_project_id=infisical_project_id,
|
||||
infisical_env_slug=infisical_env_slug,
|
||||
infisical_token_auth=infisical_token_auth,
|
||||
gcp_project_id=gcp_project_id,
|
||||
pdf_cert_secret_name=pdf_cert_secret_name,
|
||||
pdf_cert_password_secret_name=pdf_cert_password_secret_name,
|
||||
|
||||
@ -7,3 +7,4 @@ from signing_service.application.settings.app_settings_loader import load_app_se
|
||||
@lru_cache
|
||||
def get_settings() -> AppSettings:
|
||||
return load_app_settings()
|
||||
|
||||
|
||||
85
src/signing_service/application/settings/setup_logger.py
Normal file
85
src/signing_service/application/settings/setup_logger.py
Normal file
@ -0,0 +1,85 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
|
||||
|
||||
def create_logger(
|
||||
name: str = "factuges-document-signing-service",
|
||||
*,
|
||||
level: int = logging.INFO,
|
||||
log_path: Optional[Union[str, Path]] = None,
|
||||
max_bytes: int = 5_000_000, # rotación opcional
|
||||
backup_count: int = 3,
|
||||
) -> logging.Logger:
|
||||
"""
|
||||
Crea un logger consistente para FactuGES Document Signing Service.
|
||||
|
||||
Reglas:
|
||||
- SIEMPRE envia logs a stdout (Docker-friendly).
|
||||
- SOLO en producción escribe también a fichero si `log_path` está definido.
|
||||
- `log_path` puede ser `str` o `Path`.
|
||||
- Evita duplicar handlers.
|
||||
"""
|
||||
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(level)
|
||||
|
||||
# Si ya está configurado, no duplicamos handlers
|
||||
if logger.handlers:
|
||||
return logger
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
# ------------------------------
|
||||
# 1) Handler de consola (siempre)
|
||||
# ------------------------------
|
||||
h_console = logging.StreamHandler(sys.stdout)
|
||||
h_console.setFormatter(formatter)
|
||||
logger.addHandler(h_console)
|
||||
|
||||
# ------------------------------
|
||||
# 2) Handler de fichero (solo prod)
|
||||
# ------------------------------
|
||||
is_production = os.getenv("ENV") == "production"
|
||||
|
||||
if log_path and is_production:
|
||||
p = Path(log_path)
|
||||
|
||||
# Aseguramos directorios
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Puedes usar FileHandler simple, pero Rotating es más seguro.
|
||||
h_file = RotatingFileHandler(
|
||||
filename=str(p),
|
||||
maxBytes=max_bytes,
|
||||
backupCount=backup_count,
|
||||
encoding="utf-8",
|
||||
)
|
||||
h_file.setFormatter(formatter)
|
||||
logger.addHandler(h_file)
|
||||
|
||||
# Verificación explícita
|
||||
try:
|
||||
test_msg = f"Log file active at: {p}"
|
||||
h_file.acquire()
|
||||
h_file.stream.write(f"{test_msg}\n")
|
||||
h_file.flush()
|
||||
h_file.release()
|
||||
|
||||
logger.info(test_msg)
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR: cannot write to log file {p}: {e}")
|
||||
raise
|
||||
|
||||
return logger
|
||||
|
||||
|
||||
# logger "global" ya creado
|
||||
logger = create_logger()
|
||||
8
src/signing_service/application/settings/version.py
Normal file
8
src/signing_service/application/settings/version.py
Normal file
@ -0,0 +1,8 @@
|
||||
from importlib.metadata import version, PackageNotFoundError
|
||||
|
||||
|
||||
def get_package_version() -> str:
|
||||
try:
|
||||
return version("factuges-document-signing-service") # nombre del paquete en [metadata].name
|
||||
except PackageNotFoundError:
|
||||
return "unknown"
|
||||
@ -19,7 +19,7 @@ class SignDocumentUseCase:
|
||||
self._cert_secret_name = cert_secret_name
|
||||
self._cert_password_secret_name = cert_password_secret_name
|
||||
|
||||
def execute(self, command: SignDocumentCommand) -> SignDocumentResult:
|
||||
async def execute(self, command: SignDocumentCommand) -> SignDocumentResult:
|
||||
certificate = self._secret_manager.get_secret(
|
||||
self._cert_secret_name
|
||||
)
|
||||
@ -28,7 +28,7 @@ class SignDocumentUseCase:
|
||||
self._cert_password_secret_name
|
||||
)
|
||||
|
||||
signed_pdf = self._pdf_signer.sign(
|
||||
signed_pdf = await self._pdf_signer.sign(
|
||||
pdf_bytes=command.pdf_bytes,
|
||||
certificate=certificate,
|
||||
password=password,
|
||||
|
||||
@ -5,6 +5,8 @@ from dataclasses import dataclass
|
||||
class SignDocumentCommand:
|
||||
pdf_bytes: bytes
|
||||
certificate_secret_name: str
|
||||
certificate_password_secret_name: str
|
||||
company_slug: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@ -3,11 +3,17 @@ from abc import ABC, abstractmethod
|
||||
|
||||
class PDFSignerPort(ABC):
|
||||
@abstractmethod
|
||||
def sign(
|
||||
async def sign(
|
||||
self,
|
||||
pdf_bytes: bytes,
|
||||
certificate: str,
|
||||
password: str,
|
||||
) -> bytes:
|
||||
"""Sign a PDF and return signed PDF bytes."""
|
||||
raise NotImplementedError
|
||||
"""
|
||||
Signs a PDF and returns signed PDF bytes.
|
||||
|
||||
- pdf_bytes: original PDF as bytes
|
||||
- certificate: base64-encoded PKCS#12 (PFX)
|
||||
- password: password for the PFX
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@ -13,9 +13,7 @@ def create_pdf_signer() -> PDFSignerPort:
|
||||
PyHankoPDFSigner,
|
||||
)
|
||||
|
||||
if not settings.pdf_pfx_password:
|
||||
raise ValueError("PDF_PFX_PASSWORD is required")
|
||||
if not settings.pdf_cert_password_secret_name:
|
||||
raise ValueError("PDF_CERT_PASSWORD_SECRET_NAME is required")
|
||||
|
||||
return PyHankoPDFSigner(
|
||||
pfx_password=settings.pdf_pfx_password
|
||||
)
|
||||
return PyHankoPDFSigner()
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
from signing_service.application.settings.container import get_settings
|
||||
from signing_service.domain.ports.secret_manager import SecretManagerPort
|
||||
from signing_service.infrastructure.secrets.fake_secret_manager import (
|
||||
FakeSecretManager,
|
||||
)
|
||||
from signing_service.infrastructure.secrets.infisical_secret_manager import InfisicalSecretManager
|
||||
from signing_service.infrastructure.secrets.fake_secret_manager import FakeSecretManager
|
||||
|
||||
|
||||
def create_secret_manager() -> SecretManagerPort:
|
||||
@ -13,15 +10,30 @@ def create_secret_manager() -> SecretManagerPort:
|
||||
if provider == "fake":
|
||||
return FakeSecretManager(
|
||||
secrets={
|
||||
"pdf-signing-cert": "FAKE_CERTIFICATE_DATA"
|
||||
"pdf-signing-cert": "FAKE_CERTIFICATE_DATA",
|
||||
"pdf-signing-cert-password": "FAKE_CERTIFICATE_PASSWORD",
|
||||
|
||||
}
|
||||
)
|
||||
|
||||
if provider == "infisical":
|
||||
if provider == "infisical":
|
||||
from signing_service.infrastructure.secrets.infisical_secret_manager import InfisicalSecretManager
|
||||
|
||||
if not settings.infisical_project_id:
|
||||
raise ValueError("INFISICAL_PROJECT_ID is required for Infisical Secret Manager")
|
||||
|
||||
if not settings.infisical_token_auth:
|
||||
raise ValueError("INFISICAL_TOKEN_AUTH is required for Infisical Secret Manager")
|
||||
|
||||
if not settings.infisical_env_slug:
|
||||
raise ValueError("INFISICAL_ENV_SLUG is required for Infisical Secret Manager")
|
||||
|
||||
return InfisicalSecretManager(
|
||||
token=settings.infisical_token,
|
||||
host=settings.infisical_host,
|
||||
client_id=settings.infisical_client_id,
|
||||
client_secret=settings.infisical_client_secret,
|
||||
project_id=settings.infisical_project_id,
|
||||
environment=settings.infisical_environment,
|
||||
environment=settings.infisical_env_slug,
|
||||
)
|
||||
|
||||
if provider == "google":
|
||||
|
||||
@ -8,7 +8,7 @@ class FakePDFSigner(PDFSignerPort):
|
||||
It only simulates the behavior for testing and development.
|
||||
"""
|
||||
|
||||
def sign(self, pdf_bytes: bytes, certificate: str, password: str) -> bytes:
|
||||
async def sign(self, pdf_bytes: bytes, certificate: str, password: str) -> bytes:
|
||||
# Simulación simple:
|
||||
# añadimos un marcador para indicar "firmado"
|
||||
signature_marker = f"\n\n-- Signed with certificate: {certificate} and password: {password}".encode()
|
||||
|
||||
@ -1,18 +1,18 @@
|
||||
import base64
|
||||
from csv import writer
|
||||
import io
|
||||
from pydoc import doc
|
||||
|
||||
from pyhanko.sign import signers
|
||||
from pyhanko.sign.general import load_cert_from_pfx
|
||||
from pyhanko.pdf_utils.incremental_writer import IncrementalPdfFileWriter
|
||||
|
||||
from signing_service.domain.ports.pdf_signer import PDFSignerPort
|
||||
|
||||
|
||||
class PyHankoPDFSigner(PDFSignerPort):
|
||||
def __init__(self, pfx_password: str) -> None:
|
||||
self._pfx_password = pfx_password.encode()
|
||||
|
||||
def sign(self, pdf_bytes: bytes, certificate: str, password: str,) -> bytes:
|
||||
async def sign(self, pdf_bytes: bytes, certificate: str, password: str,) -> bytes:
|
||||
"""
|
||||
pdf_bytes: original PDF byte
|
||||
certificate: base64-encoded PKCS#12 (PFX)
|
||||
"""
|
||||
|
||||
@ -20,30 +20,23 @@ class PyHankoPDFSigner(PDFSignerPort):
|
||||
pfx_bytes = base64.b64decode(certificate)
|
||||
|
||||
# 2️⃣ Cargar clave y certificado
|
||||
private_key, cert, other_certs = load_cert_from_pfx(
|
||||
pfx_bytes, password.encode()
|
||||
)
|
||||
|
||||
signer = signers.SimpleSigner(
|
||||
signing_cert=cert,
|
||||
signing_key=private_key,
|
||||
cert_registry=signers.SimpleCertificateStore.from_certs(
|
||||
other_certs
|
||||
),
|
||||
signer = signers.SimpleSigner.load_pkcs12_data(
|
||||
pkcs12_bytes=pfx_bytes,
|
||||
passphrase=password.encode(),
|
||||
other_certs=[],
|
||||
)
|
||||
|
||||
# 3️⃣ Preparar PDF
|
||||
input_pdf = io.BytesIO(pdf_bytes)
|
||||
output_pdf = io.BytesIO()
|
||||
input_pdf = IncrementalPdfFileWriter(io.BytesIO(pdf_bytes))
|
||||
|
||||
# 4️⃣ Firmar
|
||||
signers.sign_pdf(
|
||||
signed_pdf: io.BytesIO = await signers.async_sign_pdf(
|
||||
input_pdf,
|
||||
signers.PdfSignatureMetadata(
|
||||
field_name="Signature1"
|
||||
),
|
||||
signer=signer,
|
||||
output=output_pdf,
|
||||
)
|
||||
|
||||
return output_pdf.getvalue()
|
||||
# 5️⃣ Obtener PDF firmado
|
||||
return signed_pdf.getbuffer().tobytes()
|
||||
|
||||
@ -1,17 +1,31 @@
|
||||
from http import client
|
||||
from infisical_sdk import InfisicalSDKClient
|
||||
from signing_service.domain.ports.secret_manager import SecretManagerPort
|
||||
from signing_service.application.settings.setup_logger import logger
|
||||
|
||||
class InfisicalSecretManager(SecretManagerPort):
|
||||
def __init__(self, token: str, project_id: str, environment: str):
|
||||
self._client = InfisicalSDKClient(token=token)
|
||||
def __init__(self, host: str, client_id: str, client_secret: str, project_id: str, environment: str):
|
||||
self._client = InfisicalSDKClient(
|
||||
host=host,
|
||||
token=None)
|
||||
self._client.auth.universal_auth.login(
|
||||
client_id=client_id,
|
||||
client_secret=client_secret
|
||||
)
|
||||
self._project_id = project_id
|
||||
self._environment = environment
|
||||
|
||||
def get_secret(self, name: str) -> str:
|
||||
|
||||
def get_secret(self, name: str) -> str:
|
||||
secret = self._client.secrets.get_secret_by_name(
|
||||
secret_name=name,
|
||||
project_id=self._project_id,
|
||||
environment_slug=self._environment,
|
||||
secret_path="/"
|
||||
secret_path="/rodax/",
|
||||
expand_secret_references=True, # Optional
|
||||
view_secret_value=True, # Optional
|
||||
include_imports=True, # Optional
|
||||
version=None # Optional
|
||||
)
|
||||
return secret.secret_value
|
||||
|
||||
return secret.secretValue
|
||||
@ -1,18 +1,56 @@
|
||||
from fastapi import FastAPI
|
||||
from dotenv import load_dotenv
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dateutil import tz
|
||||
|
||||
from signing_service.application.settings.container import get_settings
|
||||
from signing_service.api.routes.sign_document import router as sign_router
|
||||
from signing_service.application.settings.setup_logger import create_logger
|
||||
from signing_service.api.routes.sign_document import router as sign_router
|
||||
from signing_service.application.settings.version import get_package_version
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# 👇 FAIL FAST: load settings at startup
|
||||
get_settings()
|
||||
settings = get_settings()
|
||||
version = get_package_version()
|
||||
local_tz = tz.gettz(settings.local_tz)
|
||||
|
||||
state_path = Path(settings.state_path)
|
||||
|
||||
# Logging
|
||||
log_dir = state_path / "logs"
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger = create_logger(
|
||||
name="factuges-document-signing-service",
|
||||
log_path=log_dir / "factuges-document-signing-service.log", # Solo lo genera en producción
|
||||
|
||||
)
|
||||
|
||||
logger.info("")
|
||||
logger.info("============================================================")
|
||||
logger.info("FactuGES Document Signing Service - START ")
|
||||
logger.info("Version: %s", version)
|
||||
logger.info("UTC Now: %s", datetime.utcnow().isoformat())
|
||||
logger.info("Environment: %s", settings.app_env)
|
||||
logger.info("")
|
||||
logger.info("Log Level: %s", settings.log_level)
|
||||
logger.info("Log: %s", log_dir / "factuges-document-signing-service.log")
|
||||
logger.info("")
|
||||
logger.info("Secret Provider: %s", settings.secret_provider)
|
||||
logger.info("Infisical project ID: %s", settings.infisical_project_id)
|
||||
logger.info("Infisical environment: %s", settings.infisical_env_slug)
|
||||
logger.info("")
|
||||
|
||||
app = FastAPI(title="FactuGES Document Signing Service")
|
||||
app.include_router(sign_router)
|
||||
app.add_event_handler("startup", lambda: logger.info("Application startup complete"))
|
||||
app.add_event_handler("shutdown", lambda: logger.info("Application shutdown complete"))
|
||||
app.add_exception_handler(Exception, lambda request, exc: logger.error(f"Unhandled exception: {exc}"))
|
||||
|
||||
@app.get("/health")
|
||||
def health_check() -> dict[str, str]:
|
||||
return {"status": "ok"}
|
||||
|
||||
# Register routers
|
||||
app.include_router(sign_router)
|
||||
logger.info("API routes registered from sign_document router")
|
||||
|
||||
app.add_api_route("/health", lambda: {"status": "ok"}, methods=["GET"])
|
||||
logger.info("Health check endpoint registered at /health")
|
||||
|
||||
79
src/signing_service/test_infisical.py
Normal file
79
src/signing_service/test_infisical.py
Normal file
@ -0,0 +1,79 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from infisical_sdk import InfisicalSDKClient
|
||||
from signing_service.application.settings.container import get_settings
|
||||
|
||||
load_dotenv()
|
||||
|
||||
def main() -> int:
|
||||
settings = get_settings()
|
||||
secret_name = os.environ.get("INFISICAL_SECRET_NAME", "pdf_cert_password")
|
||||
secret_path = os.environ.get("INFISICAL_SECRET_PATH", "/rodax")
|
||||
|
||||
missing = [k for k, v in {
|
||||
"INFISICAL_SERVICE_TOKEN": settings.infisical_token_auth,
|
||||
"INFISICAL_PROJECT_ID": settings.infisical_project_id,
|
||||
}.items() if not v]
|
||||
|
||||
if missing:
|
||||
print(f"Missing env vars: {', '.join(missing)}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
print("== Infisical smoke test ==")
|
||||
|
||||
print(f"host=https://eu.infisical.com")
|
||||
|
||||
print(f"client_id={settings.infisical_client_id}")
|
||||
print(f"client_secret={settings.infisical_client_secret}")
|
||||
print(f"project_id={settings.infisical_project_id}")
|
||||
print(f"environment={settings.infisical_env_slug}")
|
||||
print(f"secret_path={secret_path}")
|
||||
print(f"secret_name={secret_name}")
|
||||
|
||||
client = InfisicalSDKClient(host="https://eu.infisical.com", token=None)
|
||||
|
||||
try:
|
||||
# 1) Login
|
||||
#client.auth.token_auth.login(
|
||||
# token=settings.infisical_token_auth
|
||||
#)
|
||||
|
||||
client.auth.universal_auth.login(
|
||||
client_id=settings.infisical_client_id,
|
||||
client_secret=settings.infisical_client_secret
|
||||
)
|
||||
|
||||
print("OK: token_auth.login")
|
||||
|
||||
# 2) Read one secret (the real test)
|
||||
secret = client.secrets.get_secret_by_name(
|
||||
secret_name=secret_name,
|
||||
project_id=settings.infisical_project_id,
|
||||
environment_slug=settings.infisical_env_slug,
|
||||
secret_path=secret_path,
|
||||
view_secret_value=True,
|
||||
)
|
||||
|
||||
print(secret)
|
||||
|
||||
value = getattr(secret, "secretValue", None)
|
||||
if not value:
|
||||
print("ERROR: Secret read returned empty value", file=sys.stderr)
|
||||
return 3
|
||||
|
||||
print("OK: get_secret_by_name")
|
||||
print(f"OK: secret_value_length={len(value)}")
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
# Print the most useful info without dumping secrets
|
||||
print("ERROR: Infisical smoke test failed", file=sys.stderr)
|
||||
print(f"{type(e).__name__}: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@ -30,6 +30,7 @@ def test_sign_document_success():
|
||||
command = SignDocumentCommand(
|
||||
pdf_bytes=original_pdf,
|
||||
certificate_secret_name="pdf-signing-cert",
|
||||
certificate_password_secret_name="pdf-signing-cert-password",
|
||||
)
|
||||
|
||||
# WHEN: ejecutamos el caso de uso
|
||||
|
||||
Loading…
Reference in New Issue
Block a user