misp-fixevent-webhook/main.py
Felipe Luis Quezada Valenzuela a248231bb1 [fix_newlogic_ok]
2025-03-03 17:11:49 -03:00

599 lines
24 KiB
Python

# main.py
import os
import re
import logging
import traceback
import urllib3
import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional, List, Annotated
import requests
from fastapi import FastAPI, Request, Body, Header, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, ConfigDict
from logging.handlers import RotatingFileHandler
from sqlalchemy import select, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from contextlib import asynccontextmanager
from pymisp import PyMISP, PyMISPError
import config
from models import Base, ModificadosEv, VerificacionFalsoPositivo
urllib3.disable_warnings()
directorio_actual = os.getcwd()
dir_logs = os.path.join(directorio_actual, "logs")
os.makedirs(dir_logs, exist_ok=True)
rotating_handler = RotatingFileHandler(
os.path.join(dir_logs, "misp_webhooks.log"),
maxBytes=262144000, # 250 MB
backupCount=10
)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db")
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
DATABASE_URL = f"sqlite+aiosqlite:///{ruta_base_datos}"
async_engine = create_async_engine(DATABASE_URL, echo=False, future=True)
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
# WAL y busy_timeout
cursor.execute('PRAGMA journal_mode=WAL;')
cursor.execute('PRAGMA busy_timeout = 5000;')
cursor.close()
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
AsyncSessionLocal = sessionmaker(
bind=async_engine,
expire_on_commit=False,
class_=AsyncSession
)
# Crear tablas al iniciar
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Definimos el ciclo de vida como un generador
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
await init_db()
logging.info("Base de datos inicializada (async).")
except (OperationalError, Exception) as e:
logging.error("No se logró establecer conexión con DB. Se utilizará archivo local. Error: " + str(e))
# Yield indica el punto de inicio de la app
yield
# Aquí puedes agregar limpieza si es necesario
app = FastAPI(lifespan=lifespan,
version="1.0",
title="MISP-WEBHOOK",
description="Webhooks for MISP",
swagger_ui_parameters={"supportedSubmitMethods": []}
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class InputModel(BaseModel):
model_config = ConfigDict(extra="forbid")
class EventData(InputModel):
event_id: str
event_uuid: str
event_attribute_count: str
class ResponseData(BaseModel):
event_id: Optional[str] = None
status: Optional[str] = None
detail: Optional[str] = None
async def get_db():
async with AsyncSessionLocal() as session:
yield session
disable_ids_data = Annotated[
EventData,
Body(
openapi_examples={
"fix_event": {
"summary": "Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.",
"description": "<p>Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.</p>",
"value": {
"event_id": "string",
"event_uuid": "string",
"event_attribute_count": "0"
},
},
},
),
]
event_data_responses = {
200: {
"description": "Success",
"content": {
"application/json": {
"examples": {
"Procesado": {
"summary": "Respuesta Procesado",
"value": {
"event_id": "1231445",
"status": "Procesado"
}
},
"Omitido": {
"summary": "Respuesta Omitido",
"value": {
"event_id": "1231445",
"status": "Omitido"
}
},
},
}
}
},
401: {
"description": "Invalid Credentials",
"content": {
"application/json": {
"examples": {
"Formato de token inválido.": {
"summary": "Formato de token inválido.",
"value": {
"detail": "Formato de token inválido."
}
},
"Valor de token inválido.": {
"summary": "Valor de token inválido.",
"value": {
"detail": "Valor de token inválido."
}
},
},
}
}
},
500: {
"description": "Internal Error",
"content": {
"application/json": {
"examples": {
"Error al procesar datos": {
"summary": "Error al procesar datos",
"value": {
"detail": "Error al procesar datos: <detalle_error>"
}
},
"La respuesta de MISP no contiene 'Attribute'": {
"summary": "La respuesta de MISP no contiene 'Attribute'",
"value": {
"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."
}
},
},
}
}
},
}
misp = PyMISP(
url=config.MISP_CONFIG['misp_url'],
key=config.MISP_CONFIG['misp_authkey'],
ssl=False # Cambia a True si tienes SSL válido
)
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"domain": ['domain','hostname'],
"hash": ['sha256'],
"url": ['url']
}
# Concurrencia de 2-3 no suele requerir un single-writer, pero si quieres
# evitar colisiones de escritura, lo agregamos.
writer_lock = asyncio.Lock()
async def async_check_falso_positivo_en_bd(ioc_value: str, ioc_type: str, days_valid: int = 1, db: AsyncSession = None) -> Optional[bool]:
"""
Lectura asíncrona en la BD, con reintentos si hay 'database is locked'.
"""
if not db:
return None
intento = 0
max_retries = 5
while True:
try:
limite_tiempo = datetime.now() - timedelta(days=days_valid)
stmt = (
select(VerificacionFalsoPositivo)
.where(
VerificacionFalsoPositivo.ioc_value == ioc_value,
VerificacionFalsoPositivo.ioc_type == ioc_type,
VerificacionFalsoPositivo.fecha_verificacion >= limite_tiempo
)
.order_by(VerificacionFalsoPositivo.fecha_verificacion.desc())
)
result = await db.scalars(stmt)
existe = result.first()
if existe:
return existe.es_falso_positivo
return None
except OperationalError as e:
if "database is locked" in str(e).lower():
intento += 1
if intento > max_retries:
raise
logging.warning(f"BD bloqueada en check_falso_positivo_en_bd, reintento #{intento}")
await asyncio.sleep(0.5)
else:
raise
async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_fp: bool, db_factory):
"""
Escritura asíncrona con reintentos, y lock asíncrono para single-writer (opcional).
"""
intento = 0
max_retries = 5
while True:
try:
async with writer_lock: # single-writer, evita colisiones si 2-3 llamadas hacen writes a la vez
async with db_factory() as db_session:
nuevo = VerificacionFalsoPositivo(
ioc_value=ioc_value,
ioc_type=ioc_type,
fecha_verificacion=datetime.now(),
es_falso_positivo=es_fp
)
db_session.add(nuevo)
await db_session.commit()
break
except OperationalError as e:
if "database is locked" in str(e).lower():
intento += 1
if intento > max_retries:
raise
logging.warning(f"BD bloqueada en guardar_falso_positivo_en_bd, reintento #{intento}")
await asyncio.sleep(0.5)
else:
raise
def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool:
"""
Llamada sincrónica (requests) a KTIP. Llamaremos a esta función con to_thread.
"""
url = config.KTIP_CONFIG['url_base']
headers = {'x-api-key': config.KTIP_CONFIG['api_key']}
if str(ioc).lower().startswith("https://www.virustotal.com/gui"):
return False
if str(ioc).lower() in config.FP_COMUNES or any(str(ioc).lower().endswith(item) for item in config.FP_COMUNES):
return True
if tipo_ioc in tipos_ktip['ip']:
url += f"ip?request={ioc}"
elif tipo_ioc in tipos_ktip['domain']:
url += f"domain?request={ioc}"
elif tipo_ioc in tipos_ktip['hash']:
url += f"hash?request={ioc}"
elif tipo_ioc in tipos_ktip['url']:
patron = re.compile(r'^https?://[^/]+/?$')
if bool(patron.match(ioc)):
url += f"domain?request={ioc}"
else:
url += f"url?request={ioc}"
else:
return False
try:
resp = requests.get(url, headers=headers, verify=False)
logging.info(f"Respondió KTIP con IoC: {ioc}")
if resp.status_code == 200:
data = resp.json()
if data.get('Zone') == 'Green':
if tipo_ioc in tipos_ktip['url']:
resp_temp = requests.get(ioc, verify=False)
if resp_temp.status_code == 200 or resp_temp.status_code == 403:
content = resp_temp.text.lower()
if 'html' in content:
return False
return True
elif resp.status_code == 403:
logging.info(f"No proceso KTIP - IoC: {ioc}")
return False
except Exception as e:
logging.error(f"Error verificacion_ktip: {e}")
return False
def sync_eliminar_atributo_o_objeto(a, object_id, evento_id):
"""
Llamada sincrónica a PyMISP. La usaremos con to_thread.
"""
IOC_TIPOS_OMITIR = [
'comment','text','other','datetime','attachment','port','size-in-bytes',
'counter','integer','cpe','float','hex','phone-number','boolean',
'anonymised','pgp-public-key','pgp-private-key'
]
try:
logging.info(f"Falso Positivo encontrado : {a['type']}->{a['value']} en evento #{evento_id}")
if object_id is None:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
else:
objeto_existe = misp.direct_call(f"objects/view/{object_id}")
if 'Object' not in objeto_existe:
logging.warning(f"El objeto ID {object_id} no existe o ya fue eliminado.")
return False
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {objeto_existe['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
else:
counter_validos = 0
for at in objeto_existe['Object']['Attribute']:
if at['type'] not in IOC_TIPOS_OMITIR:
counter_validos += 1
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en objeto {objeto_existe['Object']['name']} , evento #{evento_id}")
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
else:
logging.info(f"Eliminando objeto UUID: {objeto_existe['Object']['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
return True
except (Exception, PyMISPError) as err:
logging.error(str(err))
return False
async def procesar_atributo(a, event_id, days_ago, now, db_factory):
"""
Verifica si un atributo es un FP usando BD asíncrona y KTIP sincrónico con to_thread.
"""
try:
attr_timestamp = int(a['timestamp'])
attr_datetime = datetime.fromtimestamp(attr_timestamp)
if not (days_ago.date() <= attr_datetime.date() <= now.date()):
return False
valor_lower = str(a['value']).strip().lower()
# 1) Lectura asíncrona en BD
async with db_factory() as db_session:
resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session)
# 2) Si ya está en BD
if resultado_bd is not None:
if resultado_bd is True:
# Llamada a MISP sincrónica => to_thread
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
return True
else:
return False
# 3) Si no está en BD, llamar KTIP en un hilo => to_thread
es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type'])
# 4) Guardar en BD con lock de escritura
await async_guardar_falso_positivo_en_bd(
valor_lower,
a['type'],
es_fp,
db_factory
)
if es_fp:
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
return True
else:
return False
except Exception as e:
logging.error(f"Error en procesar_atributo({a}): {e}")
return False
@app.post(
"/webhook/misp_event_fixer",
response_model=ResponseData,
responses=event_data_responses,
response_model_exclude_none=True,
description="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white",
summary="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white"
)
async def webhook_misp_event_fixer(
request: Request,
event_data: disable_ids_data,
authorization: str = Header(...),
db: AsyncSession = Depends(get_db)
):
try:
# Validación token
if not authorization.startswith("Bearer "):
logging.error("Formato de Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Formato de token inválido."})
token = authorization.split("Bearer ")[1]
if token != config.JWT_TOKEN_GEN:
logging.error("Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Valor de token inválido."})
# Revisar si ya fue procesado
stmt = select(ModificadosEv).where(
ModificadosEv.evento_uuid == event_data.event_uuid,
ModificadosEv.attribute_count >= int(event_data.event_attribute_count)
)
existe = (await db.scalars(stmt)).first()
if existe:
logging.info(f"Evento {event_data.event_uuid} ya procesado. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
num_attrs = int(event_data.event_attribute_count)
if 0 < num_attrs <= config.MAX_ATTRS:
# Llamada a MISP sincrónica => to_thread
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
# Validación de org
orgc_name = event_details["Event"]["Orgc"]["name"].lower()
if orgc_name in {o.lower() for o in config.ORG_OMITIR}:
logging.info(f"Evento {event_data.event_uuid} restringido para procesar. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
logging.info(f"Procesando evento {event_data.event_uuid}, total atributos: {event_data.event_attribute_count}")
event_info = event_details.get("Event", {})
if not event_info:
logging.error("La respuesta de MISP no contiene 'Event'.")
return JSONResponse(
status_code=500,
content={"detail": "Error en la respuesta de MISP: 'Event' no encontrado."}
)
# Revisar tags => tlp:white => tlp:clear
tags = [t.get("name", "") for t in event_info.get("Tag", [])]
if "tlp:white" in tags and "tlp:clear" not in tags:
try:
await asyncio.to_thread(misp.tag, event_data.event_uuid, "tlp:clear", False)
logging.info(f"Se agregó 'tlp:clear' al evento {event_data.event_id}.")
except Exception as e:
logging.error(f"Excepción al agregar 'tlp:clear' al evento {event_data.event_id}: {e}")
attrs_event = event_info.get("Attribute", [])
objs_event = event_info.get("Object", [])
all_attrs = attrs_event[:]
for o in objs_event:
all_attrs.extend(o.get("Attribute", []))
now = datetime.now()
days_ago = now - timedelta(days=config.RANGO_DIAS)
# Procesamos cada atributo en paralelo (asyncio.gather)
tasks = []
for a in all_attrs:
tasks.append(
procesar_atributo(a, event_data.event_id, days_ago, now, AsyncSessionLocal)
)
results = await asyncio.gather(*tasks, return_exceptions=True)
total_eliminados = 0
for r in results:
if isinstance(r, Exception):
logging.error(f"Error procesando atributo: {r}")
elif r:
total_eliminados += 1
logging.info(f"Total de atributos eliminados: {total_eliminados}")
# Re-consultar MISP por si cambió attribute_count
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
if int(event_details["Event"]["attribute_count"]) > 0:
body = {"eventid": event_data.event_id}
attr_resp = await asyncio.to_thread(misp.direct_call, "attributes/restSearch", body)
if "Attribute" not in attr_resp:
logging.error("La respuesta de MISP no contiene 'Attribute'")
return JSONResponse(
status_code=500,
content={"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."}
)
# Ajuste correlación e IDS
for a in attr_resp["Attribute"]:
if a["type"] in config.NO_CORRELACIONES:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
if not a["disable_correlation"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"to_ids": "0", "disable_correlation": True
})
logging.info(f"Correlación deshabilitada para atributo: {a['id']}")
if config.NO_CORRELATIVOS_EXCLUSION:
try:
r_ex = await asyncio.to_thread(misp.direct_call, "correlation_exclusions/add", {
"value": a["value"],
"type": a["type"],
"enabled": True
})
if 'CorrelationExclusion' not in r_ex:
err = r_ex.get("errors", [])
if err and err[0] == 403:
logging.error(f"API Error: valor '{a['value']}' ya existe en exclusiones. Se omite.")
else:
logging.error(f"API Error: {r_ex.get('errors')}")
else:
logging.info(f"Se agregó valor a excepciones: {a['value']}")
except PyMISPError as e:
logging.error(f"PyMISPError: {e}")
except Exception as e:
logging.error(f"Excepción no manejada: {e}")
elif a["to_ids"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {"to_ids": "0"})
logging.info(f"Se deshabilita Flag IDS para atributo: {a['id']}")
elif a["type"] in config.IDS_CORRELACIONES:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
if not a["to_ids"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"to_ids": "1", "disable_correlation": False
})
logging.info(f"Flag IDS activado para atributo: {a['id']}")
elif a["disable_correlation"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"disable_correlation": False
})
logging.info(f"Correlación habilitada para atributo: {a['id']}")
else:
logging.info("Evento sin atributos (luego de las eliminaciones).")
# Guardar en ModificadosEv
nuevo_registro = ModificadosEv(
evento_uuid=event_data.event_uuid,
publicado_fecha=datetime.now(),
attribute_count=int(event_details["Event"]["attribute_count"])
)
try:
db.add(nuevo_registro)
await db.commit()
logging.info(f"Registrado UUID de evento modificado: {event_data.event_uuid}")
except IntegrityError:
logging.warning(f"Registro duplicado: {event_data.event_uuid} ya existe.")
await db.rollback()
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
await db.rollback()
else:
logging.info(f"Evento {event_data.event_id} supera el límite de atributos a procesar o es 0. Se omite.")
logging.info("Evento procesado")
except Exception as err:
logging.error(f"Excepción no manejada: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"detail": f"Error al procesar datos: {err}"}
)
return {"event_id": event_data.event_id, "status": "Procesado"}