# main.py import os import re import logging import traceback import urllib3 import asyncio import time from datetime import datetime, timedelta from typing import Optional, List, Annotated import requests from fastapi import FastAPI, Request, Body, Header, Depends from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, ConfigDict from logging.handlers import RotatingFileHandler from sqlalchemy import select, event from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError from contextlib import asynccontextmanager from pymisp import PyMISP, PyMISPError import config from models import Base, ModificadosEv, VerificacionFalsoPositivo urllib3.disable_warnings() directorio_actual = os.getcwd() dir_logs = os.path.join(directorio_actual, "logs") os.makedirs(dir_logs, exist_ok=True) rotating_handler = RotatingFileHandler( os.path.join(dir_logs, "misp_webhooks.log"), maxBytes=262144000, # 250 MB backupCount=10 ) logging.basicConfig( level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s' ) ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db") os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True) DATABASE_URL = f"sqlite+aiosqlite:///{ruta_base_datos}" async_engine = create_async_engine(DATABASE_URL, echo=False, future=True) def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() # WAL y busy_timeout cursor.execute('PRAGMA journal_mode=WAL;') cursor.execute('PRAGMA busy_timeout = 5000;') cursor.close() event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma) AsyncSessionLocal = sessionmaker( bind=async_engine, expire_on_commit=False, class_=AsyncSession ) # Crear tablas al iniciar async def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Definimos el ciclo de vida como un generador @asynccontextmanager async def lifespan(app: FastAPI): try: await init_db() logging.info("Base de datos inicializada (async).") except (OperationalError, Exception) as e: logging.error("No se logró establecer conexión con DB. Se utilizará archivo local. Error: " + str(e)) # Yield indica el punto de inicio de la app yield # Aquí puedes agregar limpieza si es necesario app = FastAPI(lifespan=lifespan, version="1.0", title="MISP-WEBHOOK", description="Webhooks for MISP", swagger_ui_parameters={"supportedSubmitMethods": []} ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class InputModel(BaseModel): model_config = ConfigDict(extra="forbid") class EventData(InputModel): event_id: str event_uuid: str event_attribute_count: str class ResponseData(BaseModel): event_id: Optional[str] = None status: Optional[str] = None detail: Optional[str] = None async def get_db(): async with AsyncSessionLocal() as session: yield session disable_ids_data = Annotated[ EventData, Body( openapi_examples={ "fix_event": { "summary": "Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.", "description": "

Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.

", "value": { "event_id": "string", "event_uuid": "string", "event_attribute_count": "0" }, }, }, ), ] event_data_responses = { 200: { "description": "Success", "content": { "application/json": { "examples": { "Procesado": { "summary": "Respuesta Procesado", "value": { "event_id": "1231445", "status": "Procesado" } }, "Omitido": { "summary": "Respuesta Omitido", "value": { "event_id": "1231445", "status": "Omitido" } }, }, } } }, 401: { "description": "Invalid Credentials", "content": { "application/json": { "examples": { "Formato de token inválido.": { "summary": "Formato de token inválido.", "value": { "detail": "Formato de token inválido." } }, "Valor de token inválido.": { "summary": "Valor de token inválido.", "value": { "detail": "Valor de token inválido." } }, }, } } }, 500: { "description": "Internal Error", "content": { "application/json": { "examples": { "Error al procesar datos": { "summary": "Error al procesar datos", "value": { "detail": "Error al procesar datos: " } }, "La respuesta de MISP no contiene 'Attribute'": { "summary": "La respuesta de MISP no contiene 'Attribute'", "value": { "detail": "Error en la respuesta de MISP: 'Attribute' no encontrado." } }, }, } } }, } misp = PyMISP( url=config.MISP_CONFIG['misp_url'], key=config.MISP_CONFIG['misp_authkey'], ssl=False # Cambia a True si tienes SSL válido ) tipos_ktip = { "ip": ['ip-src','ip-dst'], "domain": ['domain','hostname'], "hash": ['sha256'], "url": ['url'] } # Concurrencia de 2-3 no suele requerir un single-writer, pero si quieres # evitar colisiones de escritura, lo agregamos. writer_lock = asyncio.Lock() async def async_check_falso_positivo_en_bd(ioc_value: str, ioc_type: str, days_valid: int = 1, db: AsyncSession = None) -> Optional[bool]: """ Lectura asíncrona en la BD, con reintentos si hay 'database is locked'. """ if not db: return None intento = 0 max_retries = 5 while True: try: limite_tiempo = datetime.now() - timedelta(days=days_valid) stmt = ( select(VerificacionFalsoPositivo) .where( VerificacionFalsoPositivo.ioc_value == ioc_value, VerificacionFalsoPositivo.ioc_type == ioc_type, VerificacionFalsoPositivo.fecha_verificacion >= limite_tiempo ) .order_by(VerificacionFalsoPositivo.fecha_verificacion.desc()) ) result = await db.scalars(stmt) existe = result.first() if existe: return existe.es_falso_positivo return None except OperationalError as e: if "database is locked" in str(e).lower(): intento += 1 if intento > max_retries: raise logging.warning(f"BD bloqueada en check_falso_positivo_en_bd, reintento #{intento}") await asyncio.sleep(0.5) else: raise async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_fp: bool, db_factory): """ Escritura asíncrona con reintentos, y lock asíncrono para single-writer (opcional). """ intento = 0 max_retries = 5 while True: try: async with writer_lock: # single-writer, evita colisiones si 2-3 llamadas hacen writes a la vez async with db_factory() as db_session: nuevo = VerificacionFalsoPositivo( ioc_value=ioc_value, ioc_type=ioc_type, fecha_verificacion=datetime.now(), es_falso_positivo=es_fp ) db_session.add(nuevo) await db_session.commit() break except OperationalError as e: if "database is locked" in str(e).lower(): intento += 1 if intento > max_retries: raise logging.warning(f"BD bloqueada en guardar_falso_positivo_en_bd, reintento #{intento}") await asyncio.sleep(0.5) else: raise def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool: """ Llamada sincrónica (requests) a KTIP. Llamaremos a esta función con to_thread. """ url = config.KTIP_CONFIG['url_base'] headers = {'x-api-key': config.KTIP_CONFIG['api_key']} if str(ioc).lower().startswith("https://www.virustotal.com/gui"): return False if str(ioc).lower() in config.FP_COMUNES or any(str(ioc).lower().endswith(item) for item in config.FP_COMUNES): return True if tipo_ioc in tipos_ktip['ip']: url += f"ip?request={ioc}" elif tipo_ioc in tipos_ktip['domain']: url += f"domain?request={ioc}" elif tipo_ioc in tipos_ktip['hash']: url += f"hash?request={ioc}" elif tipo_ioc in tipos_ktip['url']: patron = re.compile(r'^https?://[^/]+/?$') if bool(patron.match(ioc)): url += f"domain?request={ioc}" else: url += f"url?request={ioc}" else: return False try: resp = requests.get(url, headers=headers, verify=False) logging.info(f"Respondió KTIP con IoC: {ioc}") if resp.status_code == 200: data = resp.json() if data.get('Zone') == 'Green': if tipo_ioc in tipos_ktip['url']: resp_temp = requests.get(ioc, verify=False) if resp_temp.status_code == 200: content = resp_temp.text.lower() if 'html' in content: return False return True elif resp.status_code == 403: logging.info(f"No proceso KTIP - IoC: {ioc}") return False except Exception as e: logging.error(f"Error verificacion_ktip: {e}") return False def sync_eliminar_atributo_o_objeto(a, object_id, evento_id): """ Llamada sincrónica a PyMISP. La usaremos con to_thread. """ IOC_TIPOS_OMITIR = [ 'comment','text','other','datetime','attachment','port','size-in-bytes', 'counter','integer','cpe','float','hex','phone-number','boolean', 'anonymised','pgp-public-key','pgp-private-key' ] try: logging.info(f"Falso Positivo encontrado : {a['type']}->{a['value']} en evento #{evento_id}") if object_id is None: logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}") misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"}) else: objeto_existe = misp.direct_call(f"objects/view/{object_id}") if 'Object' not in objeto_existe: logging.warning(f"El objeto ID {object_id} no existe o ya fue eliminado.") return False if a['type'] == 'sha256': logging.info(f"Eliminando objeto UUID: {objeto_existe['uuid']} / {a['value']}, evento #{evento_id}") misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"}) else: counter_validos = 0 for at in objeto_existe['Object']['Attribute']: if at['type'] not in IOC_TIPOS_OMITIR: counter_validos += 1 if counter_validos > 1: logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en objeto {objeto_existe['Object']['name']} , evento #{evento_id}") misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"}) else: logging.info(f"Eliminando objeto UUID: {objeto_existe['Object']['uuid']} / {a['value']}, evento #{evento_id}") misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"}) return True except (Exception, PyMISPError) as err: logging.error(str(err)) return False async def procesar_atributo(a, event_id, days_ago, now, db_factory): """ Verifica si un atributo es un FP usando BD asíncrona y KTIP sincrónico con to_thread. """ try: attr_timestamp = int(a['timestamp']) attr_datetime = datetime.fromtimestamp(attr_timestamp) if not (days_ago.date() <= attr_datetime.date() <= now.date()): return False valor_lower = str(a['value']).strip().lower() # 1) Lectura asíncrona en BD async with db_factory() as db_session: resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session) # 2) Si ya está en BD if resultado_bd is not None: if resultado_bd is True: # Llamada a MISP sincrónica => to_thread await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id) return True else: return False # 3) Si no está en BD, llamar KTIP en un hilo => to_thread es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type']) # 4) Guardar en BD con lock de escritura await async_guardar_falso_positivo_en_bd( valor_lower, a['type'], es_fp, db_factory ) if es_fp: await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id) return True else: return False except Exception as e: logging.error(f"Error en procesar_atributo({a}): {e}") return False @app.post( "/webhook/misp_event_fixer", response_model=ResponseData, responses=event_data_responses, response_model_exclude_none=True, description="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white", summary="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white" ) async def webhook_misp_event_fixer( request: Request, event_data: disable_ids_data, authorization: str = Header(...), db: AsyncSession = Depends(get_db) ): try: # Validación token if not authorization.startswith("Bearer "): logging.error("Formato de Token Inválido") return JSONResponse(status_code=401, content={"detail": "Formato de token inválido."}) token = authorization.split("Bearer ")[1] if token != config.JWT_TOKEN_GEN: logging.error("Token Inválido") return JSONResponse(status_code=401, content={"detail": "Valor de token inválido."}) # Revisar si ya fue procesado stmt = select(ModificadosEv).where( ModificadosEv.evento_uuid == event_data.event_uuid, ModificadosEv.attribute_count >= int(event_data.event_attribute_count) ) existe = (await db.scalars(stmt)).first() if existe: logging.info(f"Evento {event_data.event_uuid} ya procesado. Omitido.") return {"event_id": event_data.event_id, "status": "Omitido"} num_attrs = int(event_data.event_attribute_count) if 0 < num_attrs <= config.MAX_ATTRS: # Llamada a MISP sincrónica => to_thread event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}") # Validación de org orgc_name = event_details["Event"]["Orgc"]["name"].lower() if orgc_name in {o.lower() for o in config.ORG_OMITIR}: logging.info(f"Evento {event_data.event_uuid} restringido para procesar. Omitido.") return {"event_id": event_data.event_id, "status": "Omitido"} logging.info(f"Procesando evento {event_data.event_uuid}, total atributos: {event_data.event_attribute_count}") event_info = event_details.get("Event", {}) if not event_info: logging.error("La respuesta de MISP no contiene 'Event'.") return JSONResponse( status_code=500, content={"detail": "Error en la respuesta de MISP: 'Event' no encontrado."} ) # Revisar tags => tlp:white => tlp:clear tags = [t.get("name", "") for t in event_info.get("Tag", [])] if "tlp:white" in tags and "tlp:clear" not in tags: try: await asyncio.to_thread(misp.tag, event_data.event_uuid, "tlp:clear", False) logging.info(f"Se agregó 'tlp:clear' al evento {event_data.event_id}.") except Exception as e: logging.error(f"Excepción al agregar 'tlp:clear' al evento {event_data.event_id}: {e}") attrs_event = event_info.get("Attribute", []) objs_event = event_info.get("Object", []) all_attrs = attrs_event[:] for o in objs_event: all_attrs.extend(o.get("Attribute", [])) now = datetime.now() days_ago = now - timedelta(days=config.RANGO_DIAS) # Procesamos cada atributo en paralelo (asyncio.gather) tasks = [] for a in all_attrs: tasks.append( procesar_atributo(a, event_data.event_id, days_ago, now, AsyncSessionLocal) ) results = await asyncio.gather(*tasks, return_exceptions=True) total_eliminados = 0 for r in results: if isinstance(r, Exception): logging.error(f"Error procesando atributo: {r}") elif r: total_eliminados += 1 logging.info(f"Total de atributos eliminados: {total_eliminados}") # Re-consultar MISP por si cambió attribute_count event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}") if int(event_details["Event"]["attribute_count"]) > 0: body = {"eventid": event_data.event_id} attr_resp = await asyncio.to_thread(misp.direct_call, "attributes/restSearch", body) if "Attribute" not in attr_resp: logging.error("La respuesta de MISP no contiene 'Attribute'") return JSONResponse( status_code=500, content={"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."} ) # Ajuste correlación e IDS for a in attr_resp["Attribute"]: if a["type"] in config.NO_CORRELACIONES: logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}") if not a["disable_correlation"]: await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", { "to_ids": "0", "disable_correlation": True }) logging.info(f"Correlación deshabilitada para atributo: {a['id']}") if config.NO_CORRELATIVOS_EXCLUSION: try: r_ex = await asyncio.to_thread(misp.direct_call, "correlation_exclusions/add", { "value": a["value"], "type": a["type"], "enabled": True }) if 'CorrelationExclusion' not in r_ex: err = r_ex.get("errors", []) if err and err[0] == 403: logging.error(f"API Error: valor '{a['value']}' ya existe en exclusiones. Se omite.") else: logging.error(f"API Error: {r_ex.get('errors')}") else: logging.info(f"Se agregó valor a excepciones: {a['value']}") except PyMISPError as e: logging.error(f"PyMISPError: {e}") except Exception as e: logging.error(f"Excepción no manejada: {e}") elif a["to_ids"]: await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {"to_ids": "0"}) logging.info(f"Se deshabilita Flag IDS para atributo: {a['id']}") elif a["type"] in config.IDS_CORRELACIONES: logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}") if not a["to_ids"]: await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", { "to_ids": "1", "disable_correlation": False }) logging.info(f"Flag IDS activado para atributo: {a['id']}") elif a["disable_correlation"]: await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", { "disable_correlation": False }) logging.info(f"Correlación habilitada para atributo: {a['id']}") else: logging.info("Evento sin atributos (luego de las eliminaciones).") # Guardar en ModificadosEv nuevo_registro = ModificadosEv( evento_uuid=event_data.event_uuid, publicado_fecha=datetime.now(), attribute_count=int(event_details["Event"]["attribute_count"]) ) try: db.add(nuevo_registro) await db.commit() logging.info(f"Registrado UUID de evento modificado: {event_data.event_uuid}") except IntegrityError: logging.warning(f"Registro duplicado: {event_data.event_uuid} ya existe.") await db.rollback() except Exception as e: logging.error(f"Error al guardar los datos en la base de datos: {e}") await db.rollback() else: logging.info(f"Evento {event_data.event_id} supera el límite de atributos a procesar o es 0. Se omite.") logging.info("Evento procesado") except Exception as err: logging.error(f"Excepción no manejada: {traceback.format_exc()}") return JSONResponse( status_code=500, content={"detail": f"Error al procesar datos: {err}"} ) return {"event_id": event_data.event_id, "status": "Procesado"}