# main.py
import os
import re
import logging
import traceback
import urllib3
import asyncio
from playwright.async_api import async_playwright, TimeoutError
import time
from datetime import datetime, timedelta
from typing import Optional, List, Annotated
import requests
from fastapi import FastAPI, Request, Body, Header, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, ConfigDict
from logging.handlers import RotatingFileHandler
from sqlalchemy import select, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from contextlib import asynccontextmanager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from pymisp import PyMISP, PyMISPError
import config
from models import Base, ModificadosEv, VerificacionFalsoPositivo
urllib3.disable_warnings()
################################################################################
# CONFIGURACIONES DE LOGGING
################################################################################
directorio_actual = os.getcwd()
dir_logs = os.path.join(directorio_actual, "logs")
os.makedirs(dir_logs, exist_ok=True)
rotating_handler = RotatingFileHandler(
os.path.join(dir_logs, "misp_webhooks.log"),
maxBytes=262144000, # 250 MB
backupCount=10
)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
################################################################################
# BASE DE DATOS ASÍNCRONA (sqlite+aiosqlite)
################################################################################
ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db")
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
DATABASE_URL = f"sqlite+aiosqlite:///{ruta_base_datos}"
async_engine = create_async_engine(DATABASE_URL, echo=False, future=True)
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
# WAL y busy_timeout
cursor.execute('PRAGMA journal_mode=WAL;')
cursor.execute('PRAGMA busy_timeout=5000;')
cursor.close()
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
AsyncSessionLocal = sessionmaker(
bind=async_engine,
expire_on_commit=False,
class_=AsyncSession
)
# Crear tablas al iniciar
async def init_db():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
################################################################################
# PyMISP (bloqueante) => lo usamos con to_thread
################################################################################
misp = PyMISP(
url=config.MISP_CONFIG['misp_url'],
key=config.MISP_CONFIG['misp_authkey'],
ssl=False # Cambia a True si tienes SSL válido
)
# Set de FP
lista_fp = set()
# Para llenar lista de FP
def guarda_fp():
try:
global lista_fp
# Filtros para seleccionar Warninglist
filtros = config.CONFIG_WL['filtros_buscar']
max_reg_pw = config.CONFIG_WL['max_reg']
# limpia set
lista_fp.clear()
# Actualizar Warninglist por si acaso...
misp.update_warninglists()
# Warninglist completas
wl = misp.warninglists()
fechas = []
for l in wl:
fechas.append(str(l['Warninglist']['version']))
# Saca la versión más alta...
version = find_max(fechas)
types = ['domain','ip-src','ip-dst','hostname']
# Verifica que Warning sea del año actual
if str(version).startswith(datetime.now().strftime("%Y")):
for l in wl:
if str(l['Warninglist']['version']) == version:
if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros):
if int(str(l['Warninglist']['warninglist_entry_count'])) <= max_reg_pw:
valid_atributes = str(l['Warninglist']['valid_attributes']).split(",")
if any(tipo in valid_atributes for tipo in types):
wl = misp.get_warninglist(int(l['Warninglist']['id']))
for entry in wl['Warninglist']['WarninglistEntry']:
if '.' in entry['value'] and ':' not in entry['value'] and '/' not in entry['value']:
lista_fp.add(entry['value'])
# Se une FP_COMUNES + WL
lista_fp.update(config.FP_COMUNES)
else:
lista_fp.update(config.FP_COMUNES)
logging.info("WarningList no son del año actual. Se cargan Falsos positivos genericos")
logging.info("Falsos positivos cargados :"+str(len(lista_fp)))
except (Exception, PyMISPError) as e:
logging.error("Error al cargar FP en main :"+str(e))
async def run_comprobar_fp_periodicamente():
while True:
# Ejecuta la función en un thread para no bloquear el event loop
await asyncio.to_thread(guarda_fp)
# Espera 24 horas (86400 segundos)
await asyncio.sleep(86400)
@staticmethod
def convert_value(value):
try:
# Intentar convertir a fecha y retornar un número representativo
return int(datetime.strptime(value, "%Y%m%d").strftime("%Y%m%d"))
except ValueError:
# Si falla, convertir a entero directamente
return int(value)
def find_max(data):
max_value = max(data, key=convert_value)
return max_value
################################################################################
# STARTUP: INICIALIZAR LA BD
################################################################################
# Definimos el ciclo de vida como un generador
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
await init_db()
logging.info("Base de datos inicializada (async).")
asyncio.create_task(run_comprobar_fp_periodicamente())
logging.info("Cargando Falsos Positivos")
except (OperationalError, Exception) as e:
logging.error("Error: " + str(e))
# Yield indica el punto de inicio de la app
yield
# Aquí puedes agregar limpieza si es necesario
################################################################################
# FASTAPI APP
################################################################################
app = FastAPI(lifespan=lifespan,
version="1.2",
title="MISP-WEBHOOK",
description="Webhooks for MISP",
swagger_ui_parameters={"supportedSubmitMethods": []}
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
################################################################################
# PYDANTIC MODELS
################################################################################
class InputModel(BaseModel):
model_config = ConfigDict(extra="forbid")
class EventData(InputModel):
event_id: str
event_uuid: str
event_attribute_count: str
class ResponseData(BaseModel):
event_id: Optional[str] = None
status: Optional[str] = None
detail: Optional[str] = None
################################################################################
# DEPENDENCIA PARA OBTENER AsyncSession
################################################################################
async def get_db():
async with AsyncSessionLocal() as session:
yield session
disable_ids_data = Annotated[
EventData,
Body(
openapi_examples={
"fix_event": {
"summary": "Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.",
"description": "
Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.
",
"value": {
"event_id": "string",
"event_uuid": "string",
"event_attribute_count": "0"
},
},
},
),
]
event_data_responses = {
200: {
"description": "Success",
"content": {
"application/json": {
"examples": {
"Procesado": {
"summary": "Respuesta Procesado",
"value": {
"event_id": "1231445",
"status": "Procesado"
}
},
"Omitido": {
"summary": "Respuesta Omitido",
"value": {
"event_id": "1231445",
"status": "Omitido"
}
},
},
}
}
},
401: {
"description": "Invalid Credentials",
"content": {
"application/json": {
"examples": {
"Formato de token inválido.": {
"summary": "Formato de token inválido.",
"value": {
"detail": "Formato de token inválido."
}
},
"Valor de token inválido.": {
"summary": "Valor de token inválido.",
"value": {
"detail": "Valor de token inválido."
}
},
},
}
}
},
500: {
"description": "Internal Error",
"content": {
"application/json": {
"examples": {
"Error al procesar datos": {
"summary": "Error al procesar datos",
"value": {
"detail": "Error al procesar datos: "
}
},
"La respuesta de MISP no contiene 'Attribute'": {
"summary": "La respuesta de MISP no contiene 'Attribute'",
"value": {
"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."
}
},
},
}
}
},
}
################################################################################
# DICCIONARIO DE TIPOS
################################################################################
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"domain": ['domain','hostname'],
"hash": ['sha256'],
"url": ['url']
}
################################################################################
# LOCK ASÍNCRONO SI QUIERES SECUNDARIZAR ESCRITURAS
################################################################################
# Concurrencia de 2-3 no suele requerir un single-writer, pero si quieres
# evitar colisiones de escritura, lo agregamos.
writer_lock = asyncio.Lock()
################################################################################
# FUNCIONES DE BD (LECTURA Y ESCRITURA) CON REINTENTOS
################################################################################
async def async_check_falso_positivo_en_bd(ioc_value: str, ioc_type: str, days_valid: int = 1, db: AsyncSession = None) -> Optional[bool]:
"""
Lectura asíncrona en la BD, con reintentos si hay 'database is locked'.
"""
if not db:
return None
intento = 0
max_retries = 5
while True:
try:
limite_tiempo = datetime.now() - timedelta(days=days_valid)
stmt = (
select(VerificacionFalsoPositivo)
.where(
VerificacionFalsoPositivo.ioc_value == ioc_value,
VerificacionFalsoPositivo.ioc_type == ioc_type,
VerificacionFalsoPositivo.fecha_verificacion >= limite_tiempo
)
.order_by(VerificacionFalsoPositivo.fecha_verificacion.desc())
)
result = await db.scalars(stmt)
existe = result.first()
if existe:
return existe.es_falso_positivo
return None
except OperationalError as e:
if "database is locked" in str(e).lower():
intento += 1
if intento > max_retries:
raise
logging.warning(f"BD bloqueada en check_falso_positivo_en_bd, reintento #{intento}")
await asyncio.sleep(0.5)
else:
raise
async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_fp: bool, db_factory):
"""
Escritura asíncrona con reintentos, y lock asíncrono para single-writer (opcional).
"""
intento = 0
max_retries = 5
while True:
try:
async with writer_lock: # single-writer, evita colisiones si 2-3 llamadas hacen writes a la vez
async with db_factory() as db_session:
nuevo = VerificacionFalsoPositivo(
ioc_value=ioc_value,
ioc_type=ioc_type,
fecha_verificacion=datetime.now(),
es_falso_positivo=es_fp
)
db_session.add(nuevo)
await db_session.commit()
break
except OperationalError as e:
if "database is locked" in str(e).lower():
intento += 1
if intento > max_retries:
raise
logging.warning(f"BD bloqueada en guardar_falso_positivo_en_bd, reintento #{intento}")
await asyncio.sleep(0.5)
else:
raise
################################################################################
# FUNCION AUXILIAR KTIP
################################################################################
async def scan_ioc(ioc):
"""
Retorna True si es Falso Positivo (clean/good),
o False si es positivo (malicioso) o hubo un fallo.
"""
url = f"https://opentip.kaspersky.com/{ioc}/?tab=lookup"
try:
# Iniciamos Playwright
async with async_playwright() as p:
# Lanzamos Chromium en modo headless
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
# Intentamos navegar a la URL
await page.goto(url, timeout=30000)
logging.info("Ingresando a KTIP por navegador")
# Esperamos a que aparezca el div
await page.wait_for_selector("div.ReputationBlock_repBlock_HhC4CEbx", timeout=30000)
except TimeoutError:
logging.error(f"[ERROR] Timeout al procesar {ioc}")
await browser.close()
return False # O return None, según lo que prefieras indicar
except Exception as e:
# Cualquier otro error (por ejemplo, fallo de conexión)
logging.error(f"[ERROR] Al procesar {ioc}: {e}")
await browser.close()
return False
# Si llegamos aquí, se cargó la página y el div se encontró
div_inner = await page.inner_html("div.ReputationBlock_repBlock_HhC4CEbx")
await browser.close()
# Verificamos si contiene 'good' o 'clean'
div_lower = div_inner.lower()
if '>good<' in div_lower or '>clean<' in div_lower:
return True # IoC Falso Positivo
else:
return False # IoC Positivo (malicioso o no "clean")
except Exception as e:
# Captura cualquier error *fuera* del bloque de conexión con Playwright
logging.error(f"[ERROR] Fallo inesperado con {ioc}: {e}")
return False
def get_session_with_retries(max_retries=1):
session = requests.Session()
retries = Retry(
total=max_retries, # Número total de reintentos
backoff_factor=0, # Tiempo de espera entre reintentos (0 = sin espera adicional)
status_forcelist=[500, 502, 503, 504], # Códigos HTTP que van a forzar reintento
allowed_methods=["HEAD", "GET", "OPTIONS"] # Métodos que permiten reintentos
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
################################################################################
# FUNCIONES BLOQUEANTES => las llamamos con asyncio.to_thread
################################################################################
def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool:
"""
Llamada sincrónica (requests) a KTIP. Llamaremos a esta función con to_thread.
"""
global lista_fp
url = config.KTIP_CONFIG['url_base']
headers = {'x-api-key': config.KTIP_CONFIG['api_key']}
# Comprobaciones iniciales
if str(ioc).lower().startswith("https://www.virustotal.com/gui"):
return False
if str(ioc).lower() in lista_fp or any(str(ioc).lower().endswith(item) for item in lista_fp):
return True
# Construir la URL de KTIP según el tipo de IoC
if tipo_ioc in tipos_ktip['ip']:
url += f"ip?request={ioc}"
elif tipo_ioc in tipos_ktip['domain']:
url += f"domain?request={ioc.replace("www.","")}"
elif tipo_ioc in tipos_ktip['hash']:
url += f"hash?request={ioc}"
elif tipo_ioc in tipos_ktip['url']:
patron = re.compile(r'^https?://[^/]+/?$')
if bool(patron.match(ioc)):
url += f"domain?request={ioc}"
else:
url += f"url?request={ioc}"
else:
return False
# Crea la sesión con reintentos=1
session_ktip = get_session_with_retries(max_retries=1)
try:
# PRIMERA PETICIÓN con la sesión configurada
resp = session_ktip.get(url, headers=headers, verify=False)
try:
logging.info(f"Respondió KTIP con IoC: {ioc}")
if resp.status_code == 200:
data = resp.json() # Descarga el contenido en memoria
if data.get('Zone') == 'Green':
# Si el tipo es URL, se hace una segunda petición para obtener el contenido del IoC
if tipo_ioc in tipos_ktip['url']:
# Se usa requests normal sin reintentos y se habilita stream=True
resp_temp = requests.get(ioc, verify=False, stream=True)
try:
codes = [200, 403, 404]
if resp_temp.status_code in codes:
# Obtenemos el header "Content-Type" para determinar el tipo de contenido
content_type = resp_temp.headers.get("Content-Type", "").lower()
keywords = ['html','abuse', 'report', '{', '}']
if "text/html" in content_type:
# Si es HTML, usamos .text y verificamos las keywords
content = resp_temp.text.lower()
for w in keywords:
if w in content:
return False
else:
# Si no es HTML (p.ej. es un archivo binario como un .zip)
return False
finally:
# Cerramos la segunda respuesta
resp_temp.close()
return True
# Si 'Zone' no es 'Green'
return False
elif resp.status_code == 403:
# Se verifica que no sea url para procesar
if tipo_ioc not in tipos_ktip['url']:
# Se limpia por si domain tiene wwww
ioc_fix = ioc.replace("wwww.","")
is_false_positive = asyncio.run(scan_ioc(ioc_fix))
return is_false_positive
else:
logging.info(f"No proceso KTIP - IoC: {ioc}")
# Otros status code
return False
finally:
# Cerramos la primera respuesta
resp.close()
except Exception as e:
logging.error(f"Error verificacion_ktip: {e}")
return False
finally:
# Cerramos la sesión para liberar recursos
session_ktip.close()
def sync_eliminar_atributo_o_objeto(a, object_id, evento_id):
"""
Llamada sincrónica a PyMISP. La usaremos con to_thread.
"""
IOC_TIPOS_OMITIR = [
'comment','text','other','datetime','attachment','port','size-in-bytes',
'counter','integer','cpe','float','hex','phone-number','boolean',
'anonymised','pgp-public-key','pgp-private-key'
]
try:
logging.info(f"Falso Positivo encontrado : {a['type']}->{a['value']} en evento #{evento_id}")
if object_id is None:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
else:
objeto_existe = misp.direct_call(f"objects/view/{object_id}")
if 'Object' not in objeto_existe:
logging.warning(f"El objeto ID {object_id} no existe o ya fue eliminado.")
return False
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {objeto_existe['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
else:
counter_validos = 0
for at in objeto_existe['Object']['Attribute']:
if at['type'] not in IOC_TIPOS_OMITIR:
counter_validos += 1
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en objeto {objeto_existe['Object']['name']} , evento #{evento_id}")
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
else:
logging.info(f"Eliminando objeto UUID: {objeto_existe['Object']['uuid']} / {a['value']}, evento #{evento_id}")
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
return True
except (Exception, PyMISPError) as err:
logging.error(str(err))
return False
################################################################################
# COROUTINE PARA PROCESAR UN ATRIBUTO
################################################################################
async def procesar_atributo(a, event_id, days_ago, now, db_factory):
"""
Verifica si un atributo es un FP usando BD asíncrona y KTIP sincrónico con to_thread.
"""
try:
if any(a['type'] in lista for lista in tipos_ktip.values()):
attr_timestamp = int(a['timestamp'])
attr_datetime = datetime.fromtimestamp(attr_timestamp)
if not (days_ago.date() <= attr_datetime.date() <= now.date()):
return False
# Para URL valor debe dejarse intacto
if a['type'] in tipos_ktip['url']:
valor_lower = str(a['value']).strip()
else:
valor_lower = str(a['value']).strip().lower()
# 1) Lectura asíncrona en BD
async with db_factory() as db_session:
resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session)
# 2) Si ya está en BD
if resultado_bd is not None:
if resultado_bd is True:
# Llamada a MISP sincrónica => to_thread
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
return True
else:
return False
# 3) Si no está en BD, llamar KTIP en un hilo => to_thread
es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type'])
# 4) Guardar en BD con lock de escritura
await async_guardar_falso_positivo_en_bd(
valor_lower,
a['type'],
es_fp,
db_factory
)
if es_fp:
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
return True
else:
return False
return False
except Exception as e:
logging.error(f"Error en procesar_atributo({a}): {e}")
return False
################################################################################
# ENDPOINT ASÍNCRONO
################################################################################
@app.post(
"/webhook/misp_event_fixer",
response_model=ResponseData,
responses=event_data_responses,
response_model_exclude_none=True,
description="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white",
summary="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white"
)
async def webhook_misp_event_fixer(
request: Request,
event_data: disable_ids_data,
authorization: str = Header(...),
db: AsyncSession = Depends(get_db)
):
try:
# Validación token
if not authorization.startswith("Bearer "):
logging.error("Formato de Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Formato de token inválido."})
token = authorization.split("Bearer ")[1]
if token != config.JWT_TOKEN_GEN:
logging.error("Token Inválido")
return JSONResponse(status_code=401, content={"detail": "Valor de token inválido."})
# Revisar si ya fue procesado
stmt = select(ModificadosEv).where(
ModificadosEv.evento_uuid == event_data.event_uuid,
ModificadosEv.attribute_count >= int(event_data.event_attribute_count)
)
existe = (await db.scalars(stmt)).first()
if existe:
logging.info(f"Evento {event_data.event_uuid} ya procesado. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
num_attrs = int(event_data.event_attribute_count)
if 0 < num_attrs <= config.MAX_ATTRS:
# Llamada a MISP sincrónica => to_thread
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
# Validación de org
orgc_name = event_details["Event"]["Orgc"]["name"].lower()
if orgc_name in {o.lower() for o in config.ORG_OMITIR}:
logging.info(f"Evento {event_data.event_uuid} restringido para procesar. Omitido.")
return {"event_id": event_data.event_id, "status": "Omitido"}
logging.info(f"Procesando evento {event_data.event_uuid}, total atributos: {event_data.event_attribute_count}")
event_info = event_details.get("Event", {})
if not event_info:
logging.error("La respuesta de MISP no contiene 'Event'.")
return JSONResponse(
status_code=500,
content={"detail": "Error en la respuesta de MISP: 'Event' no encontrado."}
)
# Revisar tags => tlp:white => tlp:clear
tags = [t.get("name", "") for t in event_info.get("Tag", [])]
if "tlp:white" in tags and "tlp:clear" not in tags:
try:
await asyncio.to_thread(misp.tag, event_data.event_uuid, "tlp:clear", False)
logging.info(f"Se agregó 'tlp:clear' al evento {event_data.event_id}.")
except Exception as e:
logging.error(f"Excepción al agregar 'tlp:clear' al evento {event_data.event_id}: {e}")
attrs_event = event_info.get("Attribute", [])
objs_event = event_info.get("Object", [])
all_attrs = attrs_event[:]
for o in objs_event:
all_attrs.extend(o.get("Attribute", []))
now = datetime.now()
days_ago = now - timedelta(days=config.RANGO_DIAS)
# Procesamos cada atributo en paralelo (asyncio.gather)
tasks = []
for a in all_attrs:
tasks.append(
procesar_atributo(a, event_data.event_id, days_ago, now, AsyncSessionLocal)
)
results = await asyncio.gather(*tasks, return_exceptions=True)
total_eliminados = 0
for r in results:
if isinstance(r, Exception):
logging.error(f"Error procesando atributo: {r}")
elif r:
total_eliminados += 1
logging.info(f"Total de atributos eliminados: {total_eliminados}")
# Re-consultar MISP por si cambió attribute_count
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
if int(event_details["Event"]["attribute_count"]) > 0:
body = {"eventid": event_data.event_id}
attr_resp = await asyncio.to_thread(misp.direct_call, "attributes/restSearch", body)
if "Attribute" not in attr_resp:
logging.error("La respuesta de MISP no contiene 'Attribute'")
return JSONResponse(
status_code=500,
content={"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."}
)
# Ajuste correlación e IDS
for a in attr_resp["Attribute"]:
if a["type"] in config.NO_CORRELACIONES:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
if not a["disable_correlation"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"to_ids": "0", "disable_correlation": True
})
logging.info(f"Correlación deshabilitada para atributo: {a['id']}")
if config.NO_CORRELATIVOS_EXCLUSION:
try:
r_ex = await asyncio.to_thread(misp.direct_call, "correlation_exclusions/add", {
"value": a["value"],
"type": a["type"],
"enabled": True
})
if 'CorrelationExclusion' not in r_ex:
err = r_ex.get("errors", [])
if err and err[0] == 403:
logging.error(f"API Error: valor '{a['value']}' ya existe en exclusiones. Se omite.")
else:
logging.error(f"API Error: {r_ex.get('errors')}")
else:
logging.info(f"Se agregó valor a excepciones: {a['value']}")
except PyMISPError as e:
logging.error(f"PyMISPError: {e}")
except Exception as e:
logging.error(f"Excepción no manejada: {e}")
elif a["to_ids"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {"to_ids": "0"})
logging.info(f"Se deshabilita Flag IDS para atributo: {a['id']}")
elif a["type"] in config.IDS_CORRELACIONES:
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
if not a["to_ids"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"to_ids": "1", "disable_correlation": False
})
logging.info(f"Flag IDS activado para atributo: {a['id']}")
elif a["disable_correlation"]:
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
"disable_correlation": False
})
logging.info(f"Correlación habilitada para atributo: {a['id']}")
else:
logging.info("Evento sin atributos (luego de las eliminaciones).")
# Guardar en ModificadosEv
nuevo_registro = ModificadosEv(
evento_uuid=event_data.event_uuid,
publicado_fecha=datetime.now(),
attribute_count=int(event_details["Event"]["attribute_count"])
)
try:
db.add(nuevo_registro)
await db.commit()
logging.info(f"Registrado UUID de evento modificado: {event_data.event_uuid}")
except IntegrityError:
logging.warning(f"Registro duplicado: {event_data.event_uuid} ya existe.")
await db.rollback()
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
await db.rollback()
else:
logging.info(f"Evento {event_data.event_id} supera el límite de atributos a procesar o es 0. Se omite.")
logging.info("Evento procesado")
except Exception as err:
logging.error(f"Excepción no manejada: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"detail": f"Error al procesar datos: {err}"}
)
return {"event_id": event_data.event_id, "status": "Procesado"}