misp-topcontrib/main.py

239 lines
9.2 KiB
Python
Raw Normal View History

2024-11-06 14:53:19 -03:00
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import create_engine, func, desc
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import label
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.future import select
from pydantic import BaseModel
from typing import Optional
import os
import logging
from logging.handlers import RotatingFileHandler
from models import Base, Registro
from datetime import datetime, date
# Configurar el nivel de logging para SQLAlchemy
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
# Directorio actual
directorio_actual = os.getcwd()
# Crear directorio de logs si no existe
dir_logs = os.path.join(directorio_actual, 'logs')
os.makedirs(dir_logs, exist_ok=True)
rotating_handler = RotatingFileHandler(os.path.join(dir_logs, "server_errors.log"), maxBytes=262144000, backupCount=10)
logging.basicConfig(level=logging.INFO, handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s')
# Configurar la ruta de la base de datos
ruta_base_datos = os.path.join(directorio_actual, "data", "registros.db")
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
# Crear un motor síncrono para la creación de tablas
sync_engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(bind=sync_engine)
# Crear un motor asíncrono para las operaciones de la base de datos
async_engine = create_async_engine(f'sqlite+aiosqlite:///{ruta_base_datos}', echo=False, future=True)
# Función síncrona para ejecutar PRAGMAs al establecer una conexión
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute('PRAGMA journal_mode=WAL;')
cursor.close()
# Escucha el evento 'connect' para aplicar el PRAGMA cada vez que se conecta a la base de datos
from sqlalchemy import event
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
# Crear una sesión asíncrona
AsyncSessionLocal = sessionmaker(
bind=async_engine,
expire_on_commit=False,
class_=AsyncSession
)
# FastAPI
app = FastAPI(version="1.0.0", title="MISP Top Contrib",
description="<p>Esta API fue desarrollada para entregar información de la cantidad de contribuciones (IoC) de calidad entregadas por cada comunidad conectada a la plataforma MISP de CSIRT de Gobierno.</p>")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Modelo Pydantic para la salida de datos (sin el campo id)
class RegistroOutput(BaseModel):
organizacion: str
dia: Optional[int] = None
mes: Optional[int] = None
ano: Optional[int] = None
fecha_creado: Optional[date] = None
cantidad_ioc: int
# Modelo Pydantic para la cantidad de IoC por organización
class IoCCountOutput(BaseModel):
organizacion: str
mes: Optional[int] = None # Hacemos opcional el campo 'mes'
ano: Optional[int] = None # Hacemos opcional el campo 'ano'
cantidad_total_ioc: int
# Dependencia para obtener la sesión de base de datos
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# Método GET para obtener todos los registros
@app.get(
"/api/stats/", response_model=list[RegistroOutput], response_model_exclude_none=True,
description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por período.",
summary="Cantidad de contribuciones (IoC) de cada organización.",
responses={
404: {
"description": "No se encontraron registros.",
"content": {
"application/json": {
"example": {"detail": "No se encontraron registros"}
}
}
}
}
)
async def leer_registros(
start_date: str = Query(None, description="Fecha de inicio en formato YYYY-MM-DD"),
end_date: str = Query(None, description="Fecha de fin en formato YYYY-MM-DD"),
organizacion: str = Query(None, description="Organizacion a filtrar"),
db: AsyncSession = Depends(get_db)
):
try:
# Construir la consulta base
query = select(Registro.organizacion, Registro.fecha_creado, Registro.cantidad_ioc).filter(Registro.cantidad_ioc > 0)
# Si se proporcionan fechas de inicio y fin, aplicar el filtro de rango de fechas
if start_date:
start_datetime = datetime.strptime(start_date, "%Y-%m-%d").date()
query = query.filter(Registro.fecha_creado >= start_datetime)
if end_date:
end_datetime = datetime.strptime(end_date, "%Y-%m-%d").date()
query = query.filter(Registro.fecha_creado <= end_datetime)
if organizacion:
query = query.filter(Registro.organizacion == organizacion.upper())
# Ordenar los resultados
query = query.order_by(desc(Registro.fecha_creado))
# Ejecutar la consulta
result = await db.execute(query)
registros = result.fetchall()
registros = [{"organizacion": r[0], "fecha_creado": r[1], "cantidad_ioc": r[2]} for r in registros]
if not registros:
raise HTTPException(status_code=404, detail="No se encontraron registros")
return registros
except HTTPException as http_err:
raise http_err
except Exception as e:
logging.error(f"Error al obtener registros: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
# Método GET para obtener la cantidad de IoC por organización, filtrado opcionalmente por año
@app.get(
"/api/stats/cantidad_por_organizacion/", response_model=list[IoCCountOutput], response_model_exclude_none=True,
description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por año específico.",
summary="Cantidad total de contribuciones (IoC) de cada organización (Año).",
responses={
404: {
"description": "No se encontraron registros.",
"content": {
"application/json": {
"example": {"detail": "No se encontraron registros."}
}
}
}
}
)
async def cantidad_ioc_por_organizacion(ano: int = Query(None, description="Año para filtrar los resultados"),
db: AsyncSession = Depends(get_db)):
try:
query = select(
Registro.organizacion,
label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))
).filter(Registro.cantidad_ioc > 0)
if ano is not None:
query = query.filter(Registro.ano == ano)
query = query.group_by(Registro.organizacion).order_by(
desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))))
result = await db.execute(query)
resultados = result.fetchall()
if not resultados:
raise HTTPException(status_code=404, detail="No se encontraron registros.")
return [IoCCountOutput(organizacion=r.organizacion, cantidad_total_ioc=r.cantidad_total_ioc) for r in
resultados]
except HTTPException as http_err:
raise http_err
except Exception as e:
logging.error(f"Error al obtener cantidad de IoC por organización: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")
# Método GET para obtener registros filtrados por año y mes
@app.get(
"/api/stats/por_periodo/",
response_model=list[IoCCountOutput], response_model_exclude_none=True,
description="Obtiene cantidad de IoC recopilados de cada organización por un período específico.",
summary="Cantidad de contribuciones (IoC) de cada organización de un período específico.",
responses={
404: {
"description": "No se encontraron registros para este período.",
"content": {
"application/json": {
"example": {"detail": "No se encontraron registros para este período"}
}
}
}
}
)
async def registros_por_fecha(ano: int, mes: int, db: AsyncSession = Depends(get_db)):
try:
query = select(
Registro.organizacion,
Registro.mes,
Registro.ano,
label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))
).filter(Registro.cantidad_ioc > 0)
if ano is not None:
query = query.filter(Registro.ano == ano)
if mes is not None:
query = query.filter(Registro.mes == mes)
query = query.group_by(Registro.organizacion, Registro.ano, Registro.mes).order_by(
desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))))
result = await db.execute(query)
registros = result.fetchall()
if not registros:
raise HTTPException(status_code=404, detail="No se encontraron registros para este periodo")
return registros
except HTTPException as http_err:
raise http_err
except Exception as e:
logging.error(f"Error al obtener registros por fecha: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor")