238 lines
9.2 KiB
Python
238 lines
9.2 KiB
Python
from fastapi import FastAPI, HTTPException, Depends, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy import create_engine, func, desc
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.sql import label
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.future import select
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
import os
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from models import Base, Registro
|
|
from datetime import datetime, date
|
|
|
|
# Configurar el nivel de logging para SQLAlchemy
|
|
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
|
|
|
|
# Directorio actual
|
|
directorio_actual = os.getcwd()
|
|
|
|
# Crear directorio de logs si no existe
|
|
dir_logs = os.path.join(directorio_actual, 'logs')
|
|
os.makedirs(dir_logs, exist_ok=True)
|
|
|
|
rotating_handler = RotatingFileHandler(os.path.join(dir_logs, "server_errors.log"), maxBytes=262144000, backupCount=10)
|
|
logging.basicConfig(level=logging.INFO, handlers=[rotating_handler],
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Configurar la ruta de la base de datos
|
|
ruta_base_datos = os.path.join(directorio_actual, "data", "registros.db")
|
|
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
|
|
|
|
# Crear un motor síncrono para la creación de tablas
|
|
sync_engine = create_engine(f'sqlite:///{ruta_base_datos}')
|
|
Base.metadata.create_all(bind=sync_engine)
|
|
|
|
# Crear un motor asíncrono para las operaciones de la base de datos
|
|
async_engine = create_async_engine(f'sqlite+aiosqlite:///{ruta_base_datos}', echo=False, future=True)
|
|
|
|
# Función síncrona para ejecutar PRAGMAs al establecer una conexión
|
|
def set_sqlite_pragma(dbapi_connection, connection_record):
|
|
cursor = dbapi_connection.cursor()
|
|
cursor.execute('PRAGMA journal_mode=WAL;')
|
|
cursor.close()
|
|
|
|
# Escucha el evento 'connect' para aplicar el PRAGMA cada vez que se conecta a la base de datos
|
|
from sqlalchemy import event
|
|
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
|
|
|
|
# Crear una sesión asíncrona
|
|
AsyncSessionLocal = sessionmaker(
|
|
bind=async_engine,
|
|
expire_on_commit=False,
|
|
class_=AsyncSession
|
|
)
|
|
|
|
# FastAPI
|
|
app = FastAPI(version="1.0.0", title="MISP Top Contrib",
|
|
description="<p>Esta API fue desarrollada para entregar información de la cantidad de contribuciones (IoC) de calidad entregadas por cada comunidad conectada a la plataforma MISP de CSIRT de Gobierno.</p>")
|
|
|
|
# CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Modelo Pydantic para la salida de datos (sin el campo id)
|
|
class RegistroOutput(BaseModel):
|
|
organizacion: str
|
|
dia: Optional[int] = None
|
|
mes: Optional[int] = None
|
|
ano: Optional[int] = None
|
|
fecha_creado: Optional[date] = None
|
|
cantidad_ioc: int
|
|
|
|
# Modelo Pydantic para la cantidad de IoC por organización
|
|
class IoCCountOutput(BaseModel):
|
|
organizacion: str
|
|
mes: Optional[int] = None # Hacemos opcional el campo 'mes'
|
|
ano: Optional[int] = None # Hacemos opcional el campo 'ano'
|
|
cantidad_total_ioc: int
|
|
|
|
# Dependencia para obtener la sesión de base de datos
|
|
async def get_db():
|
|
async with AsyncSessionLocal() as session:
|
|
yield session
|
|
|
|
# Método GET para obtener todos los registros
|
|
@app.get(
|
|
"/api/stats/", response_model=list[RegistroOutput], response_model_exclude_none=True,
|
|
description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por período.",
|
|
summary="Cantidad de contribuciones (IoC) de cada organización.",
|
|
responses={
|
|
404: {
|
|
"description": "No se encontraron registros.",
|
|
"content": {
|
|
"application/json": {
|
|
"example": {"detail": "No se encontraron registros"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
async def leer_registros(
|
|
start_date: str = Query(None, description="Fecha de inicio en formato YYYY-MM-DD"),
|
|
end_date: str = Query(None, description="Fecha de fin en formato YYYY-MM-DD"),
|
|
organizacion: str = Query(None, description="Organizacion a filtrar"),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
try:
|
|
# Construir la consulta base
|
|
query = select(Registro.organizacion, Registro.fecha_creado, Registro.cantidad_ioc).filter(Registro.cantidad_ioc > 0)
|
|
|
|
# Si se proporcionan fechas de inicio y fin, aplicar el filtro de rango de fechas
|
|
if start_date:
|
|
start_datetime = datetime.strptime(start_date, "%Y-%m-%d").date()
|
|
query = query.filter(Registro.fecha_creado >= start_datetime)
|
|
|
|
if end_date:
|
|
end_datetime = datetime.strptime(end_date, "%Y-%m-%d").date()
|
|
query = query.filter(Registro.fecha_creado <= end_datetime)
|
|
|
|
if organizacion:
|
|
query = query.filter(Registro.organizacion == organizacion.upper())
|
|
|
|
|
|
# Ordenar los resultados
|
|
query = query.order_by(desc(Registro.fecha_creado))
|
|
|
|
# Ejecutar la consulta
|
|
result = await db.execute(query)
|
|
registros = result.fetchall()
|
|
|
|
registros = [{"organizacion": r[0], "fecha_creado": r[1], "cantidad_ioc": r[2]} for r in registros]
|
|
|
|
if not registros:
|
|
raise HTTPException(status_code=404, detail="No se encontraron registros")
|
|
return registros
|
|
|
|
except HTTPException as http_err:
|
|
raise http_err
|
|
except Exception as e:
|
|
logging.error(f"Error al obtener registros: {e}")
|
|
raise HTTPException(status_code=500, detail="Error interno del servidor")
|
|
|
|
# Método GET para obtener la cantidad de IoC por organización, filtrado opcionalmente por año
|
|
@app.get(
|
|
"/api/stats/cantidad_por_organizacion/", response_model=list[IoCCountOutput], response_model_exclude_none=True,
|
|
description="Obtiene el listado completo de organizaciones junto a la cantidad de IoC recopilados por año específico.",
|
|
summary="Cantidad total de contribuciones (IoC) de cada organización (Año).",
|
|
responses={
|
|
404: {
|
|
"description": "No se encontraron registros.",
|
|
"content": {
|
|
"application/json": {
|
|
"example": {"detail": "No se encontraron registros."}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
async def cantidad_ioc_por_organizacion(ano: int = Query(None, description="Año para filtrar los resultados"),
|
|
db: AsyncSession = Depends(get_db)):
|
|
try:
|
|
query = select(
|
|
Registro.organizacion,
|
|
label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))
|
|
).filter(Registro.cantidad_ioc > 0)
|
|
|
|
if ano is not None:
|
|
query = query.filter(Registro.ano == ano)
|
|
|
|
query = query.group_by(Registro.organizacion).order_by(
|
|
desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))))
|
|
result = await db.execute(query)
|
|
resultados = result.fetchall()
|
|
|
|
if not resultados:
|
|
raise HTTPException(status_code=404, detail="No se encontraron registros.")
|
|
|
|
return [IoCCountOutput(organizacion=r.organizacion, cantidad_total_ioc=r.cantidad_total_ioc) for r in
|
|
resultados]
|
|
except HTTPException as http_err:
|
|
raise http_err
|
|
except Exception as e:
|
|
logging.error(f"Error al obtener cantidad de IoC por organización: {e}")
|
|
raise HTTPException(status_code=500, detail="Error interno del servidor")
|
|
|
|
# Método GET para obtener registros filtrados por año y mes
|
|
@app.get(
|
|
"/api/stats/por_periodo/",
|
|
response_model=list[IoCCountOutput], response_model_exclude_none=True,
|
|
description="Obtiene cantidad de IoC recopilados de cada organización por un período específico.",
|
|
summary="Cantidad de contribuciones (IoC) de cada organización de un período específico.",
|
|
responses={
|
|
404: {
|
|
"description": "No se encontraron registros para este período.",
|
|
"content": {
|
|
"application/json": {
|
|
"example": {"detail": "No se encontraron registros para este período"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
async def registros_por_fecha(ano: int, mes: int, db: AsyncSession = Depends(get_db)):
|
|
try:
|
|
query = select(
|
|
Registro.organizacion,
|
|
Registro.mes,
|
|
Registro.ano,
|
|
label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))
|
|
).filter(Registro.cantidad_ioc > 0)
|
|
|
|
if ano is not None:
|
|
query = query.filter(Registro.ano == ano)
|
|
|
|
if mes is not None:
|
|
query = query.filter(Registro.mes == mes)
|
|
|
|
query = query.group_by(Registro.organizacion, Registro.ano, Registro.mes).order_by(
|
|
desc(label("cantidad_total_ioc", func.sum(Registro.cantidad_ioc))))
|
|
|
|
result = await db.execute(query)
|
|
registros = result.fetchall()
|
|
if not registros:
|
|
raise HTTPException(status_code=404, detail="No se encontraron registros para este periodo")
|
|
return registros
|
|
except HTTPException as http_err:
|
|
raise http_err
|
|
except Exception as e:
|
|
logging.error(f"Error al obtener registros por fecha: {e}")
|
|
raise HTTPException(status_code=500, detail="Error interno del servidor")
|
|
|