from fastapi import FastAPI, HTTPException, Depends, Query from fastapi.middleware.cors import CORSMiddleware from sqlalchemy import create_engine, func, desc from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import label from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.future import select from pydantic import BaseModel from typing import Optional import os import logging from logging.handlers import RotatingFileHandler from models import Base, Registro from datetime import datetime, date # Configurar el nivel de logging para SQLAlchemy logging.getLogger('sqlalchemy').setLevel(logging.WARNING) # Directorio actual directorio_actual = os.getcwd() # Crear directorio de logs si no existe dir_logs = os.path.join(directorio_actual, 'logs') os.makedirs(dir_logs, exist_ok=True) rotating_handler = RotatingFileHandler(os.path.join(dir_logs, "server_errors.log"), maxBytes=262144000, backupCount=10) logging.basicConfig(level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s') # Configurar la ruta de la base de datos ruta_base_datos = os.path.join(directorio_actual, "data", "registros.db") os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True) # Crear un motor síncrono para la creación de tablas sync_engine = create_engine(f'sqlite:///{ruta_base_datos}') Base.metadata.create_all(bind=sync_engine) # Crear un motor asíncrono para las operaciones de la base de datos async_engine = create_async_engine(f'sqlite+aiosqlite:///{ruta_base_datos}', echo=False, future=True) # Función síncrona para ejecutar PRAGMAs al establecer una conexión def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute('PRAGMA journal_mode=WAL;') cursor.close() # Escucha el evento 'connect' para aplicar el PRAGMA cada vez que se conecta a la base de datos from sqlalchemy import event event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma) # Crear una sesión asíncrona AsyncSessionLocal = sessionmaker( bind=async_engine, expire_on_commit=False, class_=AsyncSession ) # FastAPI app = FastAPI(version="1.0.0", title="MISP Top Contrib", description="

Esta API fue desarrollada para entregar información de la cantidad de contribuciones (IoC) de calidad entregadas por cada comunidad conectada a la plataforma MISP de CSIRT de Gobierno.

") # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Modelo Pydantic para la salida de datos (sin el campo id) class RegistroOutput(BaseModel): organizacion: str dia: Optional[int] = None mes: Optional[int] = None ano: Optional[int] = None fecha_creado: Optional[date] = None cantidad_ioc: int # Modelo Pydantic para la cantidad de IoC por organización class IoCCountOutput(BaseModel): organizacion: str mes: Optional[int] = None # Hacemos opcional el campo 'mes' ano: Optional[int] = None # Hacemos opcional el campo 'ano' cantidad_total_ioc: int # Dependencia para obtener la sesión de base de datos async def get_db(): async with AsyncSessionLocal() as session: yield session # Método GET para obtener todos los registros @app.get( "/api/stats/", response_model=list[RegistroOutput], response_model_exclude_none=True, description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por período.", summary="Cantidad de contribuciones (IoC) de cada organización.", responses={ 404: { "description": "No se encontraron registros.", "content": { "application/json": { "example": {"detail": "No se encontraron registros"} } } } } ) async def leer_registros( start_date: str = Query(None, description="Fecha de inicio en formato YYYY-MM-DD"), end_date: str = Query(None, description="Fecha de fin en formato YYYY-MM-DD"), organizacion: str = Query(None, description="Organizacion a filtrar"), db: AsyncSession = Depends(get_db) ): try: # Construir la consulta base query = select(Registro.organizacion, Registro.fecha_creado, Registro.cantidad_ioc).filter(Registro.cantidad_ioc > 0) # Si se proporcionan fechas de inicio y fin, aplicar el filtro de rango de fechas if start_date: start_datetime = datetime.strptime(start_date, "%Y-%m-%d").date() query = query.filter(Registro.fecha_creado >= start_datetime) if end_date: end_datetime = datetime.strptime(end_date, "%Y-%m-%d").date() query = query.filter(Registro.fecha_creado <= end_datetime) if organizacion: query = query.filter(Registro.organizacion == organizacion.upper()) # Ordenar los resultados query = query.order_by(desc(Registro.fecha_creado)) # Ejecutar la consulta result = await db.execute(query) registros = result.fetchall() registros = [{"organizacion": r[0], "fecha_creado": r[1], "cantidad_ioc": r[2]} for r in registros] if not registros: raise HTTPException(status_code=404, detail="No se encontraron registros") return registros except HTTPException as http_err: raise http_err except Exception as e: logging.error(f"Error al obtener registros: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") # Método GET para obtener la cantidad de IoC por organización, filtrado opcionalmente por año @app.get( "/api/stats/cantidad_por_organizacion/", response_model=list[IoCCountOutput], response_model_exclude_none=True, description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por año específico.", summary="Cantidad total de contribuciones (IoC) de cada organización (Año).", responses={ 404: { "description": "No se encontraron registros.", "content": { "application/json": { "example": {"detail": "No se encontraron registros."} } } } } ) async def cantidad_ioc_por_organizacion(ano: int = Query(None, description="Año para filtrar los resultados"), db: AsyncSession = Depends(get_db)): try: query = select( Registro.organizacion, label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc)) ).filter(Registro.cantidad_ioc > 0) if ano is not None: query = query.filter(Registro.ano == ano) query = query.group_by(Registro.organizacion).order_by( desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc)))) result = await db.execute(query) resultados = result.fetchall() if not resultados: raise HTTPException(status_code=404, detail="No se encontraron registros.") return [IoCCountOutput(organizacion=r.organizacion, cantidad_total_ioc=r.cantidad_total_ioc) for r in resultados] except HTTPException as http_err: raise http_err except Exception as e: logging.error(f"Error al obtener cantidad de IoC por organización: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor") # Método GET para obtener registros filtrados por año y mes @app.get( "/api/stats/por_periodo/", response_model=list[IoCCountOutput], response_model_exclude_none=True, description="Obtiene cantidad de IoC recopilados de cada organización por un período específico.", summary="Cantidad de contribuciones (IoC) de cada organización de un período específico.", responses={ 404: { "description": "No se encontraron registros para este período.", "content": { "application/json": { "example": {"detail": "No se encontraron registros para este período"} } } } } ) async def registros_por_fecha(ano: int, mes: int, db: AsyncSession = Depends(get_db)): try: query = select( Registro.organizacion, Registro.mes, Registro.ano, label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc)) ).filter(Registro.cantidad_ioc > 0) if ano is not None: query = query.filter(Registro.ano == ano) if mes is not None: query = query.filter(Registro.mes == mes) query = query.group_by(Registro.organizacion, Registro.ano, Registro.mes).order_by( desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc)))) result = await db.execute(query) registros = result.fetchall() if not registros: raise HTTPException(status_code=404, detail="No se encontraron registros para este periodo") return registros except HTTPException as http_err: raise http_err except Exception as e: logging.error(f"Error al obtener registros por fecha: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor")