misp-fixevent-webhook/main.py
Felipe Luis Quezada Valenzuela e9e3987226 first commit
2025-02-13 16:41:25 -03:00

508 lines
20 KiB
Python

from fastapi import FastAPI, Request, Body, Header, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, ConfigDict
from typing import List, Annotated, Optional
from pymisp import PyMISP, PyMISPError
from sqlalchemy import create_engine, select, event
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.exc import IntegrityError
import urllib3
import requests
import config
import os
import logging
from logging.handlers import RotatingFileHandler
from models import Base, ModificadosEv
from datetime import datetime, timedelta
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
urllib3.disable_warnings()
directorio_actual = os.getcwd()
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"domain": ['domain','hostname'],
"hash": ['sha256']
}
dir_logs = os.path.join(directorio_actual, 'logs')
os.makedirs(dir_logs, exist_ok=True)
rotating_handler = RotatingFileHandler(
os.path.join(dir_logs, "misp_webhooks.log"),
maxBytes=262144000,
backupCount=10
)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db")
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
sync_engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(bind=sync_engine)
async_engine = create_async_engine(f'sqlite+aiosqlite:///{ruta_base_datos}', echo=False, future=True)
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute('PRAGMA journal_mode=WAL;')
cursor.close()
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
AsyncSessionLocal = sessionmaker(
bind=async_engine,
expire_on_commit=False,
class_=AsyncSession
)
app = FastAPI(
version="1.0",
title="MISP-WEBHOOK",
description="Webhooks for MISP",
swagger_ui_parameters={"supportedSubmitMethods": []}
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
misp = PyMISP(config.MISP_CONFIG['misp_url'], config.MISP_CONFIG['misp_authkey'], False)
class InputModel(BaseModel):
model_config = ConfigDict(extra="forbid")
class EventData(InputModel):
event_id: str
event_uuid: str
event_attribute_count: str
class ResponseData(InputModel):
event_id: Optional[str] = None
status: Optional[str] = None
detail: Optional[str] = None
async def get_db():
async with AsyncSessionLocal() as session:
yield session
disable_ids_data = Annotated[
EventData,
Body(
openapi_examples={
"fix_event": {
"summary": "Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.",
"description": "<p>Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.</p>",
"value": {
"event_id": "string",
"event_uuid": "string",
"event_attribute_count": "0"
},
},
},
),
]
event_data_responses = {
200: {
"description": "Success",
"content": {
"application/json": {
"examples": {
"Procesado": {
"summary": "Respuesta Procesado",
"value": {
"event_id": "1231445",
"status": "Procesado"
}
},
"Omitido": {
"summary": "Respuesta Omitido",
"value": {
"event_id": "1231445",
"status": "Omitido"
}
},
},
}
}
},
401: {
"description": "Invalid Credentials",
"content": {
"application/json": {
"examples": {
"Formato de token inválido.": {
"summary": "Formato de token inválido.",
"value": {
"detail": "Formato de token inválido."
}
},
"Valor de token inválido.": {
"summary": "Valor de token inválido.",
"value": {
"detail": "Valor de token inválido."
}
},
},
}
}
},
500: {
"description": "Internal Error",
"content": {
"application/json": {
"examples": {
"Error al procesar datos": {
"summary": "Error al procesar datos",
"value": {
"detail": "Error al procesar datos: <detalle_error>"
}
},
"La respuesta de MISP no contiene 'Attribute'": {
"summary": "La respuesta de MISP no contiene 'Attribute'",
"value": {
"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."
}
},
},
}
}
},
}
def verificacion_ktip(ioc, tipo_ioc):
"""Verifica en KTIP si un IoC se considera 'Green' (limpio)."""
url = config.KTIP_CONFIG['url_base']
headers = {
'x-api-key': config.KTIP_CONFIG['api_key']
}
if str(ioc).lower() in config.FP_COMUNES or any(str(ioc).lower().endswith(item) for item in config.FP_COMUNES):
return True
if tipo_ioc in tipos_ktip['ip']:
url = url + 'ip?request=' + ioc
elif tipo_ioc in tipos_ktip['domain']:
url = url + 'domain?request=' + ioc
elif tipo_ioc in tipos_ktip['hash']:
url = url + 'hash?request=' + ioc
else:
logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}")
return False
try:
response = requests.get(url, headers=headers, verify=False)
if response.status_code == 200:
logging.info("Respondió KTIP con IoC :" + str(ioc))
data = response.json()
if data.get('Zone') == 'Green':
return True
return False
except (Exception, requests.exceptions.HTTPError) as e:
logging.error(str(e))
return False
def eliminar_atributo_o_objeto(a, object_id, evento_id):
"""Elimina un atributo o un objeto completo de MISP si es un FP."""
IOC_TIPOS_OMITIR = [
'comment', 'text', 'other', 'datetime', 'attachment', 'port',
'size-in-bytes', 'counter', 'integer', 'cpe', 'float', 'hex',
'phone-number', 'boolean', 'anonymised', 'pgp-public-key', 'pgp-private-key'
]
try:
logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + f" en evento #{evento_id}")
if object_id is None:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
misp.direct_call("attributes/delete/" + a["uuid"], {"hard": "1"})
else:
objeto_existe = misp.direct_call("objects/view/" + object_id)
if 'Object' not in objeto_existe:
logging.warning(f"El objeto ID {object_id} no existe en MISP o ya fue eliminado. No se puede eliminar.")
return False
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {objeto_existe['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call("objects/delete/" + objeto_existe['Object']['uuid'], {"hard": "1"})
else:
counter_validos = 0
for at in objeto_existe['Object']['Attribute']:
if at['type'] not in IOC_TIPOS_OMITIR:
counter_validos += 1
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {objeto_existe['Object']['name']} , evento #{evento_id}")
misp.direct_call("attributes/delete/" + a["uuid"], {"hard": "1"})
else:
logging.info(f"Eliminando objeto UUID: {objeto_existe['Object']['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call("objects/delete/" + objeto_existe['Object']["uuid"], {"hard": "1"})
return True
except (Exception, PyMISPError) as err:
logging.error(str(err))
return False
def procesar_atributo(a, event_id, days_ago, now, lock_data, fp_temp, nfp_temp):
"""
Verifica si un atributo es un falso positivo (usando verificacion_ktip).
Si corresponde, lo elimina de MISP.
Retorna True si se eliminó, False si no se hace nada.
"""
try:
attr_timestamp = int(a['timestamp'])
attr_datetime = datetime.fromtimestamp(attr_timestamp)
if not (days_ago.date() <= attr_datetime.date() <= now.date()):
return False
valor_lower = str(a['value']).strip().lower()
with lock_data:
if valor_lower in fp_temp:
if a['object_id'] == "0":
eliminar_atributo_o_objeto(a, None, event_id)
else:
eliminar_atributo_o_objeto(a, a['object_id'], event_id)
return True
elif valor_lower in nfp_temp:
return False
if any(a['type'] in lista for lista in tipos_ktip.values()):
es_falso_positivo = verificacion_ktip(a['value'], a['type'])
if es_falso_positivo:
if a['object_id'] == "0":
eliminar_atributo_o_objeto(a, None, event_id)
else:
eliminar_atributo_o_objeto(a, a['object_id'], event_id)
fp_temp.add(valor_lower)
return True
else:
nfp_temp.add(valor_lower)
return False
return False
except Exception as e:
logging.error(f"Error en procesar_atributo({a}): {e}")
return False
@app.post(
"/webhook/misp_event_fixer",
response_model=ResponseData,
responses=event_data_responses,
response_model_exclude_none=True,
description="Deshabilita correlación, limpia falsos positivos, corrige flag IDS en tipos de atributos definidos y asigna tlp:clear si existe tlp:white en un evento",
summary="Deshabilita correlación, limpia falsos positivos, corrige flag IDS en tipos de atributos definidos y asigna tlp:clear si existe tlp:white en un evento"
)
async def webhook_misp_event_fixer(
request: Request,
event_data: disable_ids_data,
authorization: str = Header(...),
db: AsyncSession = Depends(get_db)
):
try:
if not authorization.startswith("Bearer "):
logging.error("Formato de Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Formato de token inválido."})
token = authorization.split("Bearer ")[1]
if token != config.JWT_TOKEN_GEN:
logging.error("Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Valor de token inválido."})
stmt = select(ModificadosEv).where(
ModificadosEv.evento_uuid == event_data.event_uuid,
ModificadosEv.attribute_count >= int(event_data.event_attribute_count)
)
existe = (await db.scalars(stmt)).first()
if existe:
logging.info(f"Evento {event_data.event_uuid} ya procesado. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
if 0 < int(event_data.event_attribute_count) <= config.MAX_ATTRS:
relative_path = 'events/view/' + str(event_data.event_id)
event_details = misp.direct_call(relative_path)
valores_minusculas = {item.lower() for item in config.ORG_OMITIR}
if str(event_details['Event']['Orgc']['name']).lower() in valores_minusculas:
logging.info(f"Evento {event_data.event_uuid} restringido para procesar. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
logging.info(f"Procesando evento {event_data.event_uuid}, total atributos: {event_data.event_attribute_count}")
event_info = event_details["Event"]
tags = [tag["name"] for tag in event_info.get("Tag", [])]
if "tlp:white" in tags and "tlp:clear" not in tags:
try:
relative_path = "tags/attachTagToObject"
body = {
"uuid": event_data.event_uuid,
"tag": "tlp:clear",
"local": False
}
response = misp.direct_call(relative_path, body)
if "errors" in response:
logging.error(f"Error al agregar tlp:clear al evento {event_data.event_id}: {response['errors']}")
else:
logging.info(f"Se agregó tlp:clear al evento {event_data.event_id}.")
except Exception as e:
logging.error(f"Excepción al agregar tlp:clear al evento {event_data.event_id}: {e}")
attrs_event = event_details['Event'].get('Attribute', [])
objs_event = event_details['Event'].get('Object', [])
attrs_objetos = []
for o in objs_event:
attrs_objetos.extend(o.get('Attribute', []))
all_attrs = attrs_event + attrs_objetos
now = datetime.now()
days_ago = now - timedelta(days=config.RANGO_DIAS)
max_workers = getattr(config, 'NUM_WORKERS', 4)
total_eliminados = 0
# Aquí definimos los sets y el Lock para la ejecución actual
lock_data = threading.Lock()
fp_temp = set()
nfp_temp = set()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for a in all_attrs:
f = executor.submit(
procesar_atributo,
a,
event_data.event_id,
days_ago,
now,
lock_data,
fp_temp,
nfp_temp
)
futures.append(f)
for future in as_completed(futures):
try:
resultado = future.result()
if resultado:
total_eliminados += 1
except Exception as e:
logging.error(f"Error procesando atributo en hilo: {e}")
logging.info(f"Total de atributos eliminados: {total_eliminados}")
relative_path = 'events/view/' + str(event_data.event_id)
event_details = misp.direct_call(relative_path)
if int(event_details['Event']['attribute_count']) > 0:
relative_path_attr = 'attributes/restSearch'
body = {"eventid": event_data.event_id}
attr = misp.direct_call(relative_path_attr, body)
if "Attribute" not in attr:
logging.error("La respuesta de MISP no contiene 'Attribute'")
return JSONResponse(
status_code=500,
content={"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."}
)
for a in attr['Attribute']:
if a['type'] in config.NO_CORRELATIVOS:
if not a['disable_correlation']:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
relative_path_det = 'attributes/edit/' + a['id']
body_det = {
"to_ids": "0",
"disable_correlation": True
}
misp.direct_call(relative_path_det, body_det)
logging.info(f"Correlación deshabilitada para atributo: {a['id']}")
if config.NO_CORRELATIVOS_EXCLUSION:
relative_path_corr = 'correlation_exclusions/add'
body_corr = {
"value": a['value'],
"type": a['type'],
"enabled": True
}
try:
response = misp.direct_call(relative_path_corr, body_corr)
if 'CorrelationExclusion' not in response:
err = response.get("errors", [])
if err and err[0] == 403:
logging.error(f"API Error: valor '{a['value']}' ya existe en exclusiones. Se omite.")
else:
logging.error(f"API Error: {response.get('errors')}")
else:
logging.info(f"Se agregó valor a excepciones: {a['value']}")
except PyMISPError as e:
logging.error(f"PyMISPError: {e}")
except UnicodeEncodeError as e:
logging.error(f"UnicodeEncodeError: {e}")
except Exception as e:
logging.error(f"Excepción no manejada: {e}")
elif a['to_ids']:
relative_path_det = 'attributes/edit/' + a['id']
body_det = {"to_ids": "0"}
misp.direct_call(relative_path_det, body_det)
logging.info(f"Se deshabilita Flag IDS para atributo: {a['id']}")
elif a['type'] in config.IDS_CORRELATIVOS:
if not a['to_ids']:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
relative_temp = 'attributes/edit/' + a['id']
body_temp = {
"to_ids": "1",
"disable_correlation": False
}
misp.direct_call(relative_temp, body_temp)
logging.info(f"Flag IDS activado para atributo: {a['id']}")
elif a['disable_correlation']:
relative_path_det = 'attributes/edit/' + a['id']
body_det = {"disable_correlation": False}
misp.direct_call(relative_path_det, body_det)
logging.info(f"Correlación habilitada para atributo: {a['id']}")
else:
logging.info("Evento sin atributos.")
nuevo_registro = ModificadosEv(
evento_uuid=event_data.event_uuid,
publicado_fecha=datetime.now(),
attribute_count=int(event_details['Event']['attribute_count'])
)
try:
db.add(nuevo_registro)
await db.commit()
logging.info(f"Registrado UUID de evento modificado: {event_data.event_uuid}")
except IntegrityError:
logging.warning(f"Registro duplicado: {event_data.event_uuid} ya existe con la misma fecha.")
await db.rollback()
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
await db.rollback()
else:
logging.info(f"Evento {event_data.event_id} supera el límite de atributos a procesar. Se omite.")
logging.info("Evento procesado")
except Exception as err:
logging.error(f"Excepción no manejada: {traceback.format_exc()}")
return JSONResponse(status_code=500, content={"detail": f"Error al procesar datos: {err}"})
return {"event_id": event_data.event_id, "status": "Procesado"}