First commit

This commit is contained in:
David Arranz 2026-01-22 11:37:35 +01:00
commit a4f5a679a7
25 changed files with 570 additions and 0 deletions

12
.dockerignore Normal file
View File

@ -0,0 +1,12 @@
.venv
__pycache__
*.pyc
*.pyo
*.pyd
.pytest_cache
.mypy_cache
.ruff_cache
.git
.gitignore
tests
*.log

22
.env.example Normal file
View File

@ -0,0 +1,22 @@
# ============================================================
# Signing Service - Environment Variables
# Copy this file to `.env` and fill the real values
# ============================================================
# --------------------
# Application
# --------------------
APP_ENV=local # local | prod
LOG_LEVEL=INFO
# --------------------
# Secret manager
# --------------------
SECRET_PROVIDER=fake # fake | google | aws | azure | hcp
GCP_PROJECT_ID= # required if SECRET_PROVIDER=google
# --------------------
# PDF signing
# --------------------
PDF_CERT_SECRET_NAME=pdf-signing-cert
PDF_CERT_PASSWORD_SECRET_NAME=pdf-signing-cert-password

66
.gitignore vendored Normal file
View File

@ -0,0 +1,66 @@
# ===========================
# Python básico
# ===========================
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.so
*.egg
*.egg-info/
dist/
build/
# ===========================
# Entornos virtuales
# ===========================
venv/
.env/
.venv/
# ===========================
# Configuración de IDEs
# ===========================
#.vscode/
.idea/
*.code-workspace
# ===========================
# Herramientas / tests
# ===========================
.mypy_cache/
.pytest_cache/
.coverage
htmlcov/
.cache/
# ===========================
# Logs del proyecto
# ===========================
logs/
*.log
# ===========================
# Ficheros de entorno
# ===========================
#environment/*.env
#environment/*.env.*
# Si quieres tener uno de ejemplo en git:
# !environment/dev.example.env
# ===========================
# Docker
# ===========================
*.pid
docker-compose.override.yml
# ===========================
# Sistemas operativos
# ===========================
.DS_Store
Thumbs.db
# ===========================
# Otros
# ===========================
.env

9
.prettierrc Normal file
View File

@ -0,0 +1,9 @@
{
"bracketSpacing": true,
"useTabs": false,
"printWidth": 100,
"tabWidth": 4,
"semi": true,
"singleQuote": false,
"rcVerbose": true
}

0
Dockerfile Normal file
View File

0
LEEME.md Normal file
View File

22
pyproject.toml Normal file
View File

@ -0,0 +1,22 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "signing-service"
version = "0.1.0"
description = "FastAPI service for signing PDF documents using external secret managers"
requires-python = ">=3.11"
dependencies = [
"python-dotenv>=1.0",
"fastapi>=0.110",
"uvicorn[standard]>=0.27",
"pydantic-settings>=2.2",
"google-cloud-secret-manager>=2.18",
"pyhanko>=0.25",
"cryptography>=42",
]
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-cov", "ruff", "mypy"]

View File

View File

@ -0,0 +1,55 @@
from fastapi import APIRouter, UploadFile, File, Form, HTTPException
from fastapi.responses import Response
from signing_service.application.settings.container import get_settings
from signing_service.application.use_cases.sign_document import (
SignDocumentUseCase,
)
from signing_service.application.use_cases.sign_document_dto import (
SignDocumentCommand,
)
from signing_service.infrastructure.factories.secret_manager_factory import (
create_secret_manager,
)
from signing_service.infrastructure.factories.pdf_signer_factory import (
create_pdf_signer,
)
router = APIRouter(prefix="/documents", tags=["documents"])
@router.post("/sign")
async def sign_document(
file: UploadFile = File(...),
certificate_secret_name: str = Form(...),
):
try:
settings = get_settings()
pdf_bytes = await file.read()
use_case = SignDocumentUseCase(
secret_manager=create_secret_manager(),
pdf_signer=create_pdf_signer(),
cert_secret_name=settings.pdf_cert_secret_name,
cert_password_secret_name=settings.pdf_cert_password_secret_name,
)
command = SignDocumentCommand(
pdf_bytes=pdf_bytes,
certificate_secret_name=certificate_secret_name,
)
result = use_case.execute(command)
return Response(
content=result.signed_pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition": f"attachment; filename=signed_{file.filename}"
},
)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))

View File

@ -0,0 +1,54 @@
from pydantic_settings import BaseSettings
from pydantic import Field, model_validator
class AppSettings(BaseSettings):
# --------------------
# Application
# --------------------
app_env: str = Field(default="local")
log_level: str = Field(default="INFO")
# --------------------
# Secret manager
# --------------------
secret_provider: str = Field(default="fake")
gcp_project_id: str | None = None
# --------------------
# PDF signing
# --------------------
pdf_cert_secret_name: str
pdf_cert_password_secret_name: str
@model_validator(mode="after")
def validate_required_settings(self) -> "AppSettings":
"""
Fail fast if required configuration is missing.
"""
if not self.pdf_cert_secret_name:
raise ValueError("PDF_CERT_SECRET_NAME is required")
if not self.pdf_cert_password_secret_name:
raise ValueError("PDF_CERT_PASSWORD_SECRET_NAME is required")
# ---- Secret provider validation ----
if self.secret_provider == "google":
if not self.gcp_project_id:
raise ValueError(
"GCP_PROJECT_ID is required when SECRET_PROVIDER=google"
)
# ---- PDF signing validation ----
if self.app_env == "prod":
if not self.pdf_pfx_password:
raise ValueError(
"PDF_PFX_PASSWORD is required in production"
)
return self
class Config:
env_file = ".env"
case_sensitive = False

View File

@ -0,0 +1,7 @@
from functools import lru_cache
from signing_service.application.settings.app_settings import AppSettings
@lru_cache
def get_settings() -> AppSettings:
return AppSettings()

View File

@ -0,0 +1,37 @@
from signing_service.application.use_cases.sign_document_dto import (
SignDocumentCommand,
SignDocumentResult,
)
from signing_service.domain.ports.secret_manager import SecretManagerPort
from signing_service.domain.ports.pdf_signer import PDFSignerPort
class SignDocumentUseCase:
def __init__(
self,
secret_manager: SecretManagerPort,
pdf_signer: PDFSignerPort,
cert_secret_name: str,
cert_password_secret_name: str,
) -> None:
self._secret_manager = secret_manager
self._pdf_signer = pdf_signer
self._cert_secret_name = cert_secret_name
self._cert_password_secret_name = cert_password_secret_name
def execute(self, command: SignDocumentCommand) -> SignDocumentResult:
certificate = self._secret_manager.get_secret(
self._cert_secret_name
)
password = self._secret_manager.get_secret(
self._cert_password_secret_name
)
signed_pdf = self._pdf_signer.sign(
pdf_bytes=command.pdf_bytes,
certificate=certificate,
password=password,
)
return SignDocumentResult(signed_pdf_bytes=signed_pdf)

View File

@ -0,0 +1,12 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class SignDocumentCommand:
pdf_bytes: bytes
certificate_secret_name: str
@dataclass(frozen=True)
class SignDocumentResult:
signed_pdf_bytes: bytes

View File

@ -0,0 +1,13 @@
from abc import ABC, abstractmethod
class PDFSignerPort(ABC):
@abstractmethod
def sign(
self,
pdf_bytes: bytes,
certificate: str,
password: str,
) -> bytes:
"""Sign a PDF and return signed PDF bytes."""
raise NotImplementedError

View File

@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
class SecretManagerPort(ABC):
@abstractmethod
def get_secret(self, name: str) -> str:
"""Retrieve a secret by name."""
raise NotImplementedError

View File

@ -0,0 +1,21 @@
from signing_service.application.settings.container import get_settings
from signing_service.domain.ports.pdf_signer import PDFSignerPort
from signing_service.infrastructure.pdf.fake_pdf_signer import FakePDFSigner
def create_pdf_signer() -> PDFSignerPort:
settings = get_settings()
if settings.app_env == "local":
return FakePDFSigner()
from signing_service.infrastructure.pdf.pyhanko_pdf_signer import (
PyHankoPDFSigner,
)
if not settings.pdf_pfx_password:
raise ValueError("PDF_PFX_PASSWORD is required")
return PyHankoPDFSigner(
pfx_password=settings.pdf_pfx_password
)

View File

@ -0,0 +1,39 @@
from signing_service.application.settings.container import get_settings
from signing_service.domain.ports.secret_manager import SecretManagerPort
from signing_service.infrastructure.secrets.fake_secret_manager import (
FakeSecretManager,
)
from signing_service.infrastructure.secrets.infisical_secret_manager import InfisicalSecretManager
def create_secret_manager() -> SecretManagerPort:
settings = get_settings()
provider = settings.secret_provider.lower()
if provider == "fake":
return FakeSecretManager(
secrets={
"pdf-signing-cert": "FAKE_CERTIFICATE_DATA"
}
)
if provider == "infisical":
return InfisicalSecretManager(
token=settings.infisical_token,
project_id=settings.infisical_project_id,
environment=settings.infisical_environment,
)
if provider == "google":
from signing_service.infrastructure.secrets.google_secret_manager import (
GoogleSecretManager,
)
if not settings.gcp_project_id:
raise ValueError("GCP_PROJECT_ID is required for Google Secret Manager")
return GoogleSecretManager(
project_id=settings.gcp_project_id
)
raise ValueError(f"Unsupported secret provider: {provider}")

View File

@ -0,0 +1,16 @@
from signing_service.domain.ports.pdf_signer import PDFSignerPort
class FakePDFSigner(PDFSignerPort):
"""
Fake implementation of PDFSignerPort.
It does NOT sign a real PDF.
It only simulates the behavior for testing and development.
"""
def sign(self, pdf_bytes: bytes, certificate: str, password: str) -> bytes:
# Simulación simple:
# añadimos un marcador para indicar "firmado"
signature_marker = f"\n\n-- Signed with certificate: {certificate} and password: {password}".encode()
return pdf_bytes + signature_marker

View File

@ -0,0 +1,49 @@
import base64
import io
from pyhanko.sign import signers
from pyhanko.sign.general import load_cert_from_pfx
from signing_service.domain.ports.pdf_signer import PDFSignerPort
class PyHankoPDFSigner(PDFSignerPort):
def __init__(self, pfx_password: str) -> None:
self._pfx_password = pfx_password.encode()
def sign(self, pdf_bytes: bytes, certificate: str, password: str,) -> bytes:
"""
certificate: base64-encoded PKCS#12 (PFX)
"""
# 1⃣ Decodificar certificado
pfx_bytes = base64.b64decode(certificate)
# 2⃣ Cargar clave y certificado
private_key, cert, other_certs = load_cert_from_pfx(
pfx_bytes, password.encode()
)
signer = signers.SimpleSigner(
signing_cert=cert,
signing_key=private_key,
cert_registry=signers.SimpleCertificateStore.from_certs(
other_certs
),
)
# 3⃣ Preparar PDF
input_pdf = io.BytesIO(pdf_bytes)
output_pdf = io.BytesIO()
# 4⃣ Firmar
signers.sign_pdf(
input_pdf,
signers.PdfSignatureMetadata(
field_name="Signature1"
),
signer=signer,
output=output_pdf,
)
return output_pdf.getvalue()

View File

@ -0,0 +1,12 @@
from signing_service.domain.ports.secret_manager import SecretManagerPort
class FakeSecretManager(SecretManagerPort):
def __init__(self, secrets: dict[str, str]) -> None:
self._secrets = secrets
def get_secret(self, name: str) -> str:
try:
return self._secrets[name]
except KeyError:
raise ValueError(f"Secret '{name}' not found")

View File

@ -0,0 +1,23 @@
from google.cloud import secretmanager
from signing_service.domain.ports.secret_manager import SecretManagerPort
class GoogleSecretManager(SecretManagerPort):
def __init__(self, project_id: str) -> None:
self._project_id = project_id
self._client = secretmanager.SecretManagerServiceClient()
def get_secret(self, name: str) -> str:
"""
Retrieve the latest version of a secret from Google Cloud Secret Manager.
"""
secret_path = (
f"projects/{self._project_id}/secrets/{name}/versions/latest"
)
response = self._client.access_secret_version(
request={"name": secret_path}
)
return response.payload.data.decode("utf-8")

View File

@ -0,0 +1,17 @@
from infisical_sdk import InfisicalSDKClient
from signing_service.domain.ports.secret_manager import SecretManagerPort
class InfisicalSecretManager(SecretManagerPort):
def __init__(self, token: str, project_id: str, environment: str):
self._client = InfisicalSDKClient(token=token)
self._project_id = project_id
self._environment = environment
def get_secret(self, name: str) -> str:
secret = self._client.secrets.get_secret_by_name(
secret_name=name,
project_id=self._project_id,
environment_slug=self._environment,
secret_path="/"
)
return secret.secret_value

View File

@ -0,0 +1,18 @@
from fastapi import FastAPI
from dotenv import load_dotenv
from signing_service.application.settings.container import get_settings
from signing_service.api.routes.sign_document import router as sign_router
load_dotenv()
# 👇 FAIL FAST: load settings at startup
get_settings()
app = FastAPI(title="Signing Service")
app.include_router(sign_router)
@app.get("/health")
def health_check() -> dict[str, str]:
return {"status": "ok"}

View File

@ -0,0 +1,58 @@
import pytest
from signing_service.application.use_cases.sign_document import (
SignDocumentUseCase,
)
from signing_service.application.use_cases.sign_document_dto import (
SignDocumentCommand,
)
from signing_service.infrastructure.pdf.fake_pdf_signer import FakePDFSigner
from signing_service.infrastructure.secrets.fake_secret_manager import (
FakeSecretManager,
)
def test_sign_document_success():
# GIVEN: dependencias fake
fake_secrets = {
"pdf-signing-cert": "FAKE_CERTIFICATE_DATA"
}
secret_manager = FakeSecretManager(fake_secrets)
pdf_signer = FakePDFSigner()
use_case = SignDocumentUseCase(
secret_manager=secret_manager,
pdf_signer=pdf_signer,
)
original_pdf = b"%PDF-1.4 fake pdf content"
command = SignDocumentCommand(
pdf_bytes=original_pdf,
certificate_secret_name="pdf-signing-cert",
)
# WHEN: ejecutamos el caso de uso
result = use_case.execute(command)
# THEN: el PDF está "firmado"
assert result.signed_pdf_bytes.startswith(original_pdf)
assert b"FAKE_CERTIFICATE_DATA" in result.signed_pdf_bytes
def test_sign_document_secret_not_found():
secret_manager = FakeSecretManager(secrets={})
pdf_signer = FakePDFSigner()
use_case = SignDocumentUseCase(
secret_manager=secret_manager,
pdf_signer=pdf_signer,
)
command = SignDocumentCommand(
pdf_bytes=b"fake pdf",
certificate_secret_name="missing-secret",
)
with pytest.raises(ValueError):
use_case.execute(command)