846 lines
34 KiB
Python
846 lines
34 KiB
Python
# main.py
|
|
import os
|
|
import re
|
|
import logging
|
|
import traceback
|
|
import urllib3
|
|
import asyncio
|
|
from playwright.async_api import async_playwright, TimeoutError
|
|
import time
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Annotated
|
|
|
|
import requests
|
|
from fastapi import FastAPI, Request, Body, Header, Depends
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, ConfigDict
|
|
from logging.handlers import RotatingFileHandler
|
|
from sqlalchemy import select, event
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.exc import IntegrityError, OperationalError
|
|
from contextlib import asynccontextmanager
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from pymisp import PyMISP, PyMISPError
|
|
|
|
import config
|
|
from models import Base, ModificadosEv, VerificacionFalsoPositivo
|
|
|
|
urllib3.disable_warnings()
|
|
|
|
################################################################################
|
|
# CONFIGURACIONES DE LOGGING
|
|
################################################################################
|
|
directorio_actual = os.getcwd()
|
|
dir_logs = os.path.join(directorio_actual, "logs")
|
|
os.makedirs(dir_logs, exist_ok=True)
|
|
|
|
rotating_handler = RotatingFileHandler(
|
|
os.path.join(dir_logs, "misp_webhooks.log"),
|
|
maxBytes=262144000, # 250 MB
|
|
backupCount=10
|
|
)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[rotating_handler],
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
################################################################################
|
|
# BASE DE DATOS ASÍNCRONA (sqlite+aiosqlite)
|
|
################################################################################
|
|
ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db")
|
|
os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True)
|
|
|
|
DATABASE_URL = f"sqlite+aiosqlite:///{ruta_base_datos}"
|
|
async_engine = create_async_engine(DATABASE_URL, echo=False, future=True)
|
|
|
|
def set_sqlite_pragma(dbapi_connection, connection_record):
|
|
cursor = dbapi_connection.cursor()
|
|
# WAL y busy_timeout
|
|
cursor.execute('PRAGMA journal_mode=WAL;')
|
|
cursor.execute('PRAGMA busy_timeout=5000;')
|
|
cursor.close()
|
|
|
|
event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma)
|
|
|
|
AsyncSessionLocal = sessionmaker(
|
|
bind=async_engine,
|
|
expire_on_commit=False,
|
|
class_=AsyncSession
|
|
)
|
|
|
|
# Crear tablas al iniciar
|
|
async def init_db():
|
|
async with async_engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
################################################################################
|
|
# PyMISP (bloqueante) => lo usamos con to_thread
|
|
################################################################################
|
|
misp = PyMISP(
|
|
url=config.MISP_CONFIG['misp_url'],
|
|
key=config.MISP_CONFIG['misp_authkey'],
|
|
ssl=False # Cambia a True si tienes SSL válido
|
|
)
|
|
|
|
# Set de FP
|
|
lista_fp = set()
|
|
|
|
# Para llenar lista de FP
|
|
def guarda_fp():
|
|
try:
|
|
global lista_fp
|
|
# Filtros para seleccionar Warninglist
|
|
filtros = config.CONFIG_WL['filtros_buscar']
|
|
|
|
max_reg_pw = config.CONFIG_WL['max_reg']
|
|
|
|
# limpia set
|
|
lista_fp.clear()
|
|
|
|
# Actualizar Warninglist por si acaso...
|
|
misp.update_warninglists()
|
|
|
|
# Warninglist completas
|
|
wl = misp.warninglists()
|
|
|
|
fechas = []
|
|
|
|
for l in wl:
|
|
fechas.append(str(l['Warninglist']['version']))
|
|
|
|
# Saca la versión más alta...
|
|
version = find_max(fechas)
|
|
|
|
types = ['domain','ip-src','ip-dst','hostname']
|
|
|
|
# Verifica que Warning sea del año actual
|
|
if str(version).startswith(datetime.now().strftime("%Y")):
|
|
for l in wl:
|
|
if str(l['Warninglist']['version']) == version:
|
|
if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros):
|
|
if int(str(l['Warninglist']['warninglist_entry_count'])) <= max_reg_pw:
|
|
valid_atributes = str(l['Warninglist']['valid_attributes']).split(",")
|
|
if any(tipo in valid_atributes for tipo in types):
|
|
wl = misp.get_warninglist(int(l['Warninglist']['id']))
|
|
for entry in wl['Warninglist']['WarninglistEntry']:
|
|
if '.' in entry['value'] and ':' not in entry['value'] and '/' not in entry['value']:
|
|
lista_fp.add(entry['value'])
|
|
|
|
# Se une FP_COMUNES + WL
|
|
lista_fp.update(config.FP_COMUNES)
|
|
else:
|
|
lista_fp.update(config.FP_COMUNES)
|
|
logging.info("WarningList no son del año actual. Se cargan Falsos positivos genericos")
|
|
logging.info("Falsos positivos cargados :"+str(len(lista_fp)))
|
|
except (Exception, PyMISPError) as e:
|
|
logging.error("Error al cargar FP en main :"+str(e))
|
|
|
|
|
|
async def run_comprobar_fp_periodicamente():
|
|
while True:
|
|
# Ejecuta la función en un thread para no bloquear el event loop
|
|
await asyncio.to_thread(guarda_fp)
|
|
# Espera 24 horas (86400 segundos)
|
|
await asyncio.sleep(86400)
|
|
|
|
|
|
@staticmethod
|
|
def convert_value(value):
|
|
try:
|
|
# Intentar convertir a fecha y retornar un número representativo
|
|
return int(datetime.strptime(value, "%Y%m%d").strftime("%Y%m%d"))
|
|
except ValueError:
|
|
# Si falla, convertir a entero directamente
|
|
return int(value)
|
|
|
|
def find_max(data):
|
|
max_value = max(data, key=convert_value)
|
|
return max_value
|
|
|
|
|
|
|
|
|
|
################################################################################
|
|
# STARTUP: INICIALIZAR LA BD
|
|
################################################################################
|
|
|
|
# Definimos el ciclo de vida como un generador
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
try:
|
|
await init_db()
|
|
logging.info("Base de datos inicializada (async).")
|
|
|
|
asyncio.create_task(run_comprobar_fp_periodicamente())
|
|
logging.info("Cargando Falsos Positivos")
|
|
|
|
except (OperationalError, Exception) as e:
|
|
logging.error("Error: " + str(e))
|
|
# Yield indica el punto de inicio de la app
|
|
yield
|
|
# Aquí puedes agregar limpieza si es necesario
|
|
|
|
################################################################################
|
|
# FASTAPI APP
|
|
################################################################################
|
|
app = FastAPI(lifespan=lifespan,
|
|
version="1.2",
|
|
title="MISP-WEBHOOK",
|
|
description="Webhooks for MISP",
|
|
swagger_ui_parameters={"supportedSubmitMethods": []}
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
################################################################################
|
|
# PYDANTIC MODELS
|
|
################################################################################
|
|
class InputModel(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
|
|
class EventData(InputModel):
|
|
event_id: str
|
|
event_uuid: str
|
|
event_attribute_count: str
|
|
|
|
class ResponseData(BaseModel):
|
|
event_id: Optional[str] = None
|
|
status: Optional[str] = None
|
|
detail: Optional[str] = None
|
|
|
|
################################################################################
|
|
# DEPENDENCIA PARA OBTENER AsyncSession
|
|
################################################################################
|
|
async def get_db():
|
|
async with AsyncSessionLocal() as session:
|
|
yield session
|
|
|
|
disable_ids_data = Annotated[
|
|
EventData,
|
|
Body(
|
|
openapi_examples={
|
|
"fix_event": {
|
|
"summary": "Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.",
|
|
"description": "<p>Deshabilita correlación, limpia falsos positivos y corrige flag ids en tipos de atributos definidos.</p>",
|
|
"value": {
|
|
"event_id": "string",
|
|
"event_uuid": "string",
|
|
"event_attribute_count": "0"
|
|
},
|
|
},
|
|
},
|
|
),
|
|
]
|
|
|
|
event_data_responses = {
|
|
200: {
|
|
"description": "Success",
|
|
"content": {
|
|
"application/json": {
|
|
"examples": {
|
|
"Procesado": {
|
|
"summary": "Respuesta Procesado",
|
|
"value": {
|
|
"event_id": "1231445",
|
|
"status": "Procesado"
|
|
}
|
|
},
|
|
"Omitido": {
|
|
"summary": "Respuesta Omitido",
|
|
"value": {
|
|
"event_id": "1231445",
|
|
"status": "Omitido"
|
|
}
|
|
},
|
|
},
|
|
}
|
|
}
|
|
},
|
|
401: {
|
|
"description": "Invalid Credentials",
|
|
"content": {
|
|
"application/json": {
|
|
"examples": {
|
|
"Formato de token inválido.": {
|
|
"summary": "Formato de token inválido.",
|
|
"value": {
|
|
"detail": "Formato de token inválido."
|
|
}
|
|
},
|
|
"Valor de token inválido.": {
|
|
"summary": "Valor de token inválido.",
|
|
"value": {
|
|
"detail": "Valor de token inválido."
|
|
}
|
|
},
|
|
},
|
|
}
|
|
}
|
|
},
|
|
500: {
|
|
"description": "Internal Error",
|
|
"content": {
|
|
"application/json": {
|
|
"examples": {
|
|
"Error al procesar datos": {
|
|
"summary": "Error al procesar datos",
|
|
"value": {
|
|
"detail": "Error al procesar datos: <detalle_error>"
|
|
}
|
|
},
|
|
"La respuesta de MISP no contiene 'Attribute'": {
|
|
"summary": "La respuesta de MISP no contiene 'Attribute'",
|
|
"value": {
|
|
"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."
|
|
}
|
|
},
|
|
},
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
|
|
################################################################################
|
|
# DICCIONARIO DE TIPOS
|
|
################################################################################
|
|
tipos_ktip = {
|
|
"ip": ['ip-src','ip-dst'],
|
|
"domain": ['domain','hostname'],
|
|
"hash": ['sha256'],
|
|
"url": ['url']
|
|
}
|
|
|
|
################################################################################
|
|
# LOCK ASÍNCRONO SI QUIERES SECUNDARIZAR ESCRITURAS
|
|
################################################################################
|
|
# Concurrencia de 2-3 no suele requerir un single-writer, pero si quieres
|
|
# evitar colisiones de escritura, lo agregamos.
|
|
writer_lock = asyncio.Lock()
|
|
|
|
################################################################################
|
|
# FUNCIONES DE BD (LECTURA Y ESCRITURA) CON REINTENTOS
|
|
################################################################################
|
|
async def async_check_falso_positivo_en_bd(ioc_value: str, ioc_type: str, days_valid: int = 1, db: AsyncSession = None) -> Optional[bool]:
|
|
"""
|
|
Lectura asíncrona en la BD, con reintentos si hay 'database is locked'.
|
|
"""
|
|
if not db:
|
|
return None
|
|
|
|
intento = 0
|
|
max_retries = 5
|
|
while True:
|
|
try:
|
|
limite_tiempo = datetime.now() - timedelta(days=days_valid)
|
|
stmt = (
|
|
select(VerificacionFalsoPositivo)
|
|
.where(
|
|
VerificacionFalsoPositivo.ioc_value == ioc_value,
|
|
VerificacionFalsoPositivo.ioc_type == ioc_type,
|
|
VerificacionFalsoPositivo.fecha_verificacion >= limite_tiempo
|
|
)
|
|
.order_by(VerificacionFalsoPositivo.fecha_verificacion.desc())
|
|
)
|
|
result = await db.scalars(stmt)
|
|
existe = result.first()
|
|
if existe:
|
|
return existe.es_falso_positivo
|
|
return None
|
|
except OperationalError as e:
|
|
if "database is locked" in str(e).lower():
|
|
intento += 1
|
|
if intento > max_retries:
|
|
raise
|
|
logging.warning(f"BD bloqueada en check_falso_positivo_en_bd, reintento #{intento}")
|
|
await asyncio.sleep(0.5)
|
|
else:
|
|
raise
|
|
|
|
async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_fp: bool, db_factory):
|
|
"""
|
|
Escritura asíncrona con reintentos, y lock asíncrono para single-writer (opcional).
|
|
"""
|
|
intento = 0
|
|
max_retries = 5
|
|
while True:
|
|
try:
|
|
async with writer_lock: # single-writer, evita colisiones si 2-3 llamadas hacen writes a la vez
|
|
async with db_factory() as db_session:
|
|
nuevo = VerificacionFalsoPositivo(
|
|
ioc_value=ioc_value,
|
|
ioc_type=ioc_type,
|
|
fecha_verificacion=datetime.now(),
|
|
es_falso_positivo=es_fp
|
|
)
|
|
db_session.add(nuevo)
|
|
await db_session.commit()
|
|
|
|
break
|
|
except OperationalError as e:
|
|
if "database is locked" in str(e).lower():
|
|
intento += 1
|
|
if intento > max_retries:
|
|
raise
|
|
logging.warning(f"BD bloqueada en guardar_falso_positivo_en_bd, reintento #{intento}")
|
|
await asyncio.sleep(0.5)
|
|
else:
|
|
raise
|
|
|
|
|
|
################################################################################
|
|
# FUNCION AUXILIAR KTIP
|
|
################################################################################
|
|
async def scan_ioc(ioc):
|
|
"""
|
|
Retorna True si es Falso Positivo (clean/good),
|
|
o False si es positivo (malicioso) o hubo un fallo.
|
|
"""
|
|
url = f"https://opentip.kaspersky.com/{ioc}/?tab=lookup"
|
|
|
|
try:
|
|
# Iniciamos Playwright
|
|
async with async_playwright() as p:
|
|
# Lanzamos Chromium en modo headless
|
|
browser = await p.chromium.launch(headless=True)
|
|
page = await browser.new_page()
|
|
|
|
try:
|
|
# Intentamos navegar a la URL
|
|
await page.goto(url, timeout=30000)
|
|
logging.info("Ingresando a KTIP por navegador")
|
|
# Esperamos a que aparezca el div
|
|
await page.wait_for_selector("div.ReputationBlock_repBlock_HhC4CEbx", timeout=30000)
|
|
except TimeoutError:
|
|
logging.error(f"[ERROR] Timeout al procesar {ioc}")
|
|
await browser.close()
|
|
return False # O return None, según lo que prefieras indicar
|
|
except Exception as e:
|
|
# Cualquier otro error (por ejemplo, fallo de conexión)
|
|
logging.error(f"[ERROR] Al procesar {ioc}: {e}")
|
|
await browser.close()
|
|
return False
|
|
|
|
# Si llegamos aquí, se cargó la página y el div se encontró
|
|
div_inner = await page.inner_html("div.ReputationBlock_repBlock_HhC4CEbx")
|
|
|
|
await browser.close()
|
|
|
|
# Verificamos si contiene 'good' o 'clean'
|
|
div_lower = div_inner.lower()
|
|
if '>good<' in div_lower or '>clean<' in div_lower:
|
|
return True # IoC Falso Positivo
|
|
else:
|
|
return False # IoC Positivo (malicioso o no "clean")
|
|
|
|
except Exception as e:
|
|
# Captura cualquier error *fuera* del bloque de conexión con Playwright
|
|
logging.error(f"[ERROR] Fallo inesperado con {ioc}: {e}")
|
|
return False
|
|
|
|
|
|
def get_session_with_retries(max_retries=1):
|
|
session = requests.Session()
|
|
retries = Retry(
|
|
total=max_retries, # Número total de reintentos
|
|
backoff_factor=0, # Tiempo de espera entre reintentos (0 = sin espera adicional)
|
|
status_forcelist=[500, 502, 503, 504], # Códigos HTTP que van a forzar reintento
|
|
allowed_methods=["HEAD", "GET", "OPTIONS"] # Métodos que permiten reintentos
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retries)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
return session
|
|
|
|
################################################################################
|
|
# FUNCIONES BLOQUEANTES => las llamamos con asyncio.to_thread
|
|
################################################################################
|
|
def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool:
|
|
"""
|
|
Llamada sincrónica (requests) a KTIP. Llamaremos a esta función con to_thread.
|
|
"""
|
|
global lista_fp
|
|
|
|
url = config.KTIP_CONFIG['url_base']
|
|
headers = {'x-api-key': config.KTIP_CONFIG['api_key']}
|
|
|
|
# Comprobaciones iniciales
|
|
if str(ioc).lower().startswith("https://www.virustotal.com/gui"):
|
|
return False
|
|
|
|
if str(ioc).lower() in lista_fp or any(str(ioc).lower().endswith(item) for item in lista_fp):
|
|
return True
|
|
|
|
# Construir la URL de KTIP según el tipo de IoC
|
|
if tipo_ioc in tipos_ktip['ip']:
|
|
url += f"ip?request={ioc}"
|
|
elif tipo_ioc in tipos_ktip['domain']:
|
|
url += f"domain?request={ioc.replace("www.","")}"
|
|
elif tipo_ioc in tipos_ktip['hash']:
|
|
url += f"hash?request={ioc}"
|
|
elif tipo_ioc in tipos_ktip['url']:
|
|
patron = re.compile(r'^https?://[^/]+/?$')
|
|
if bool(patron.match(ioc)):
|
|
url += f"domain?request={ioc}"
|
|
else:
|
|
url += f"url?request={ioc}"
|
|
else:
|
|
return False
|
|
|
|
# Crea la sesión con reintentos=1
|
|
session_ktip = get_session_with_retries(max_retries=1)
|
|
|
|
try:
|
|
# PRIMERA PETICIÓN con la sesión configurada
|
|
resp = session_ktip.get(url, headers=headers, verify=False)
|
|
try:
|
|
logging.info(f"Respondió KTIP con IoC: {ioc}")
|
|
|
|
if resp.status_code == 200:
|
|
data = resp.json() # Descarga el contenido en memoria
|
|
if data.get('Zone') == 'Green':
|
|
# Si el tipo es URL, se hace una segunda petición para obtener el contenido del IoC
|
|
if tipo_ioc in tipos_ktip['url']:
|
|
# Se usa requests normal sin reintentos y se habilita stream=True
|
|
resp_temp = requests.get(ioc, verify=False, stream=True)
|
|
try:
|
|
codes = [200, 403, 404]
|
|
if resp_temp.status_code in codes:
|
|
# Obtenemos el header "Content-Type" para determinar el tipo de contenido
|
|
content_type = resp_temp.headers.get("Content-Type", "").lower()
|
|
keywords = ['html','abuse', 'report', '{', '}']
|
|
if "text/html" in content_type:
|
|
# Si es HTML, usamos .text y verificamos las keywords
|
|
content = resp_temp.text.lower()
|
|
for w in keywords:
|
|
if w in content:
|
|
return False
|
|
else:
|
|
# Si no es HTML (p.ej. es un archivo binario como un .zip)
|
|
return False
|
|
finally:
|
|
# Cerramos la segunda respuesta
|
|
resp_temp.close()
|
|
return True
|
|
# Si 'Zone' no es 'Green'
|
|
return False
|
|
|
|
elif resp.status_code == 403:
|
|
# Se verifica que no sea url para procesar
|
|
if tipo_ioc not in tipos_ktip['url']:
|
|
# Se limpia por si domain tiene wwww
|
|
ioc_fix = ioc.replace("wwww.","")
|
|
is_false_positive = asyncio.run(scan_ioc(ioc_fix))
|
|
return is_false_positive
|
|
else:
|
|
logging.info(f"No proceso KTIP - IoC: {ioc}")
|
|
# Otros status code
|
|
return False
|
|
|
|
finally:
|
|
# Cerramos la primera respuesta
|
|
resp.close()
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error verificacion_ktip: {e}")
|
|
return False
|
|
|
|
finally:
|
|
# Cerramos la sesión para liberar recursos
|
|
session_ktip.close()
|
|
|
|
def sync_eliminar_atributo_o_objeto(a, object_id, evento_id):
|
|
"""
|
|
Llamada sincrónica a PyMISP. La usaremos con to_thread.
|
|
"""
|
|
IOC_TIPOS_OMITIR = [
|
|
'comment','text','other','datetime','attachment','port','size-in-bytes',
|
|
'counter','integer','cpe','float','hex','phone-number','boolean',
|
|
'anonymised','pgp-public-key','pgp-private-key'
|
|
]
|
|
try:
|
|
logging.info(f"Falso Positivo encontrado : {a['type']}->{a['value']} en evento #{evento_id}")
|
|
if object_id is None:
|
|
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
|
|
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
|
|
else:
|
|
objeto_existe = misp.direct_call(f"objects/view/{object_id}")
|
|
if 'Object' not in objeto_existe:
|
|
logging.warning(f"El objeto ID {object_id} no existe o ya fue eliminado.")
|
|
return False
|
|
|
|
if a['type'] == 'sha256':
|
|
logging.info(f"Eliminando objeto UUID: {objeto_existe['uuid']} / {a['value']}, evento #{evento_id}")
|
|
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
|
|
else:
|
|
counter_validos = 0
|
|
for at in objeto_existe['Object']['Attribute']:
|
|
if at['type'] not in IOC_TIPOS_OMITIR:
|
|
counter_validos += 1
|
|
|
|
if counter_validos > 1:
|
|
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en objeto {objeto_existe['Object']['name']} , evento #{evento_id}")
|
|
misp.direct_call(f"attributes/delete/{a['uuid']}", {"hard": "1"})
|
|
else:
|
|
logging.info(f"Eliminando objeto UUID: {objeto_existe['Object']['uuid']} / {a['value']}, evento #{evento_id}")
|
|
misp.direct_call(f"objects/delete/{objeto_existe['Object']['uuid']}", {"hard": "1"})
|
|
|
|
return True
|
|
except (Exception, PyMISPError) as err:
|
|
logging.error(str(err))
|
|
return False
|
|
|
|
################################################################################
|
|
# COROUTINE PARA PROCESAR UN ATRIBUTO
|
|
################################################################################
|
|
async def procesar_atributo(a, event_id, days_ago, now, db_factory):
|
|
"""
|
|
Verifica si un atributo es un FP usando BD asíncrona y KTIP sincrónico con to_thread.
|
|
"""
|
|
try:
|
|
if any(a['type'] in lista for lista in tipos_ktip.values()):
|
|
attr_timestamp = int(a['timestamp'])
|
|
attr_datetime = datetime.fromtimestamp(attr_timestamp)
|
|
if not (days_ago.date() <= attr_datetime.date() <= now.date()):
|
|
return False
|
|
|
|
|
|
# Para URL valor debe dejarse intacto
|
|
if a['type'] in tipos_ktip['url']:
|
|
valor_lower = str(a['value']).strip()
|
|
else:
|
|
valor_lower = str(a['value']).strip().lower()
|
|
|
|
|
|
# 1) Lectura asíncrona en BD
|
|
async with db_factory() as db_session:
|
|
resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session)
|
|
|
|
# 2) Si ya está en BD
|
|
if resultado_bd is not None:
|
|
if resultado_bd is True:
|
|
# Llamada a MISP sincrónica => to_thread
|
|
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
|
|
# 3) Si no está en BD, llamar KTIP en un hilo => to_thread
|
|
es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type'])
|
|
|
|
# 4) Guardar en BD con lock de escritura
|
|
await async_guardar_falso_positivo_en_bd(
|
|
valor_lower,
|
|
a['type'],
|
|
es_fp,
|
|
db_factory
|
|
)
|
|
|
|
if es_fp:
|
|
await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id)
|
|
return True
|
|
else:
|
|
return False
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error en procesar_atributo({a}): {e}")
|
|
return False
|
|
|
|
################################################################################
|
|
# ENDPOINT ASÍNCRONO
|
|
################################################################################
|
|
@app.post(
|
|
"/webhook/misp_event_fixer",
|
|
response_model=ResponseData,
|
|
responses=event_data_responses,
|
|
response_model_exclude_none=True,
|
|
description="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white",
|
|
summary="Deshabilita correlación, limpia falsos positivos, corrige flag IDS y asigna tlp:clear si existe tlp:white"
|
|
)
|
|
async def webhook_misp_event_fixer(
|
|
request: Request,
|
|
event_data: disable_ids_data,
|
|
authorization: str = Header(...),
|
|
db: AsyncSession = Depends(get_db)
|
|
):
|
|
try:
|
|
# Validación token
|
|
if not authorization.startswith("Bearer "):
|
|
logging.error("Formato de Token Inválido")
|
|
return JSONResponse(status_code=401, content={"detail": "Formato de token inválido."})
|
|
|
|
token = authorization.split("Bearer ")[1]
|
|
if token != config.JWT_TOKEN_GEN:
|
|
logging.error("Token Inválido")
|
|
return JSONResponse(status_code=401, content={"detail": "Valor de token inválido."})
|
|
|
|
# Revisar si ya fue procesado
|
|
stmt = select(ModificadosEv).where(
|
|
ModificadosEv.evento_uuid == event_data.event_uuid,
|
|
ModificadosEv.attribute_count >= int(event_data.event_attribute_count)
|
|
)
|
|
existe = (await db.scalars(stmt)).first()
|
|
if existe:
|
|
logging.info(f"Evento {event_data.event_uuid} ya procesado. Omitido.")
|
|
return {"event_id": event_data.event_id, "status": "Omitido"}
|
|
|
|
num_attrs = int(event_data.event_attribute_count)
|
|
if 0 < num_attrs <= config.MAX_ATTRS:
|
|
# Llamada a MISP sincrónica => to_thread
|
|
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
|
|
|
|
# Validación de org
|
|
orgc_name = event_details["Event"]["Orgc"]["name"].lower()
|
|
if orgc_name in {o.lower() for o in config.ORG_OMITIR}:
|
|
logging.info(f"Evento {event_data.event_uuid} restringido para procesar. Omitido.")
|
|
return {"event_id": event_data.event_id, "status": "Omitido"}
|
|
|
|
logging.info(f"Procesando evento {event_data.event_uuid}, total atributos: {event_data.event_attribute_count}")
|
|
|
|
event_info = event_details.get("Event", {})
|
|
if not event_info:
|
|
logging.error("La respuesta de MISP no contiene 'Event'.")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={"detail": "Error en la respuesta de MISP: 'Event' no encontrado."}
|
|
)
|
|
|
|
# Revisar tags => tlp:white => tlp:clear
|
|
tags = [t.get("name", "") for t in event_info.get("Tag", [])]
|
|
if "tlp:white" in tags and "tlp:clear" not in tags:
|
|
try:
|
|
await asyncio.to_thread(misp.tag, event_data.event_uuid, "tlp:clear", False)
|
|
logging.info(f"Se agregó 'tlp:clear' al evento {event_data.event_id}.")
|
|
except Exception as e:
|
|
logging.error(f"Excepción al agregar 'tlp:clear' al evento {event_data.event_id}: {e}")
|
|
|
|
attrs_event = event_info.get("Attribute", [])
|
|
objs_event = event_info.get("Object", [])
|
|
all_attrs = attrs_event[:]
|
|
for o in objs_event:
|
|
all_attrs.extend(o.get("Attribute", []))
|
|
|
|
now = datetime.now()
|
|
days_ago = now - timedelta(days=config.RANGO_DIAS)
|
|
|
|
# Procesamos cada atributo en paralelo (asyncio.gather)
|
|
tasks = []
|
|
for a in all_attrs:
|
|
tasks.append(
|
|
procesar_atributo(a, event_data.event_id, days_ago, now, AsyncSessionLocal)
|
|
)
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
total_eliminados = 0
|
|
for r in results:
|
|
if isinstance(r, Exception):
|
|
logging.error(f"Error procesando atributo: {r}")
|
|
elif r:
|
|
total_eliminados += 1
|
|
|
|
logging.info(f"Total de atributos eliminados: {total_eliminados}")
|
|
|
|
# Re-consultar MISP por si cambió attribute_count
|
|
event_details = await asyncio.to_thread(misp.direct_call, f"events/view/{event_data.event_id}")
|
|
if int(event_details["Event"]["attribute_count"]) > 0:
|
|
body = {"eventid": event_data.event_id}
|
|
attr_resp = await asyncio.to_thread(misp.direct_call, "attributes/restSearch", body)
|
|
if "Attribute" not in attr_resp:
|
|
logging.error("La respuesta de MISP no contiene 'Attribute'")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={"detail": "Error en la respuesta de MISP: 'Attribute' no encontrado."}
|
|
)
|
|
|
|
# Ajuste correlación e IDS
|
|
for a in attr_resp["Attribute"]:
|
|
if a["type"] in config.NO_CORRELACIONES:
|
|
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
|
|
if not a["disable_correlation"]:
|
|
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
|
|
"to_ids": "0", "disable_correlation": True
|
|
})
|
|
logging.info(f"Correlación deshabilitada para atributo: {a['id']}")
|
|
|
|
if config.NO_CORRELATIVOS_EXCLUSION:
|
|
try:
|
|
r_ex = await asyncio.to_thread(misp.direct_call, "correlation_exclusions/add", {
|
|
"value": a["value"],
|
|
"type": a["type"],
|
|
"enabled": True
|
|
})
|
|
if 'CorrelationExclusion' not in r_ex:
|
|
err = r_ex.get("errors", [])
|
|
if err and err[0] == 403:
|
|
logging.error(f"API Error: valor '{a['value']}' ya existe en exclusiones. Se omite.")
|
|
else:
|
|
logging.error(f"API Error: {r_ex.get('errors')}")
|
|
else:
|
|
logging.info(f"Se agregó valor a excepciones: {a['value']}")
|
|
except PyMISPError as e:
|
|
logging.error(f"PyMISPError: {e}")
|
|
except Exception as e:
|
|
logging.error(f"Excepción no manejada: {e}")
|
|
elif a["to_ids"]:
|
|
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {"to_ids": "0"})
|
|
logging.info(f"Se deshabilita Flag IDS para atributo: {a['id']}")
|
|
elif a["type"] in config.IDS_CORRELACIONES:
|
|
logging.info(f"Procesando atributo {a['id']}: {a['type']} -> {a['value']}")
|
|
if not a["to_ids"]:
|
|
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
|
|
"to_ids": "1", "disable_correlation": False
|
|
})
|
|
logging.info(f"Flag IDS activado para atributo: {a['id']}")
|
|
elif a["disable_correlation"]:
|
|
await asyncio.to_thread(misp.direct_call, f"attributes/edit/{a['id']}", {
|
|
"disable_correlation": False
|
|
})
|
|
logging.info(f"Correlación habilitada para atributo: {a['id']}")
|
|
else:
|
|
logging.info("Evento sin atributos (luego de las eliminaciones).")
|
|
|
|
# Guardar en ModificadosEv
|
|
nuevo_registro = ModificadosEv(
|
|
evento_uuid=event_data.event_uuid,
|
|
publicado_fecha=datetime.now(),
|
|
attribute_count=int(event_details["Event"]["attribute_count"])
|
|
)
|
|
try:
|
|
db.add(nuevo_registro)
|
|
await db.commit()
|
|
logging.info(f"Registrado UUID de evento modificado: {event_data.event_uuid}")
|
|
except IntegrityError:
|
|
logging.warning(f"Registro duplicado: {event_data.event_uuid} ya existe.")
|
|
await db.rollback()
|
|
except Exception as e:
|
|
logging.error(f"Error al guardar los datos en la base de datos: {e}")
|
|
await db.rollback()
|
|
else:
|
|
logging.info(f"Evento {event_data.event_id} supera el límite de atributos a procesar o es 0. Se omite.")
|
|
|
|
logging.info("Evento procesado")
|
|
|
|
except Exception as err:
|
|
logging.error(f"Excepción no manejada: {traceback.format_exc()}")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={"detail": f"Error al procesar datos: {err}"}
|
|
)
|
|
|
|
return {"event_id": event_data.event_id, "status": "Procesado"}
|
|
|