misp-fixcor/defs.py
Felipe Luis Quezada Valenzuela d158cc260b #edit defs
2025-01-31 09:53:27 -03:00

92 lines
4.1 KiB
Python

import logging
from datetime import datetime, timezone
from pymisp import PyMISP, PyMISPError
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select
import config
from db_setup import ModificadosEv
class MISPProcessor:
def __init__(self):
self.misp = PyMISP(config.MISP_CONFIG["misp_url"], config.MISP_CONFIG["misp_authkey"], ssl=False)
def search_events(self, published=True):
events_data = self.misp.search_index(publish_timestamp=config.RANGO_DIAS, published=published
)
parsed = []
for e in events_data:
eid = e.get("id")
uuid = e.get("uuid")
c = e.get("attribute_count", 0) # o len(ev["Attribute"]) si quieres
parsed.append({"id": eid, "uuid": uuid, "attribute_count": str(c)})
return parsed
def fix_correlation(self, session, event_id, event_uuid, event_attribute_count):
# Para publicar
publica = False
stmt = select(ModificadosEv).where(ModificadosEv.evento_uuid == event_uuid, ModificadosEv.attribute_count == int(event_attribute_count))
existe = session.scalars(stmt).first()
if existe:
logging.info(f"[OMITIDO] {event_uuid} ya en DB.")
return
try:
count = int(event_attribute_count)
except:
logging.error(f"[ERROR] count inválido: {event_attribute_count}")
return
if count > config.MAX_ATTRS:
logging.info(f"[OMITIDO] {event_id} excede {config.MAX_ATTRS}.")
return
resp = self.misp.direct_call("attributes/restSearch", {"eventid": event_id})
if "Attribute" not in resp:
logging.error(f"[ERROR] Sin Attribute en {event_id}")
return
for a in resp["Attribute"]:
t = a["type"]
if t in config.NO_CORRELATIVOS:
if not a["disable_correlation"]:
self.misp.direct_call(f"attributes/edit/{a['id']}", {"to_ids": "0","disable_correlation": True})
publica = True
if config.NO_CORRELATIVOS_EXCLUSION:
try:
ex_resp = self.misp.direct_call("correlation_exclusions/add", {
"value": a["value"],
"type": a["type"],
"enabled": True
})
if "CorrelationExclusion" not in ex_resp and ex_resp.get("errors"):
logging.warning(f"Exclusión error: {ex_resp.get('errors')}")
except PyMISPError as e:
logging.error(f"PyMISPError excl: {e}")
else:
if a["to_ids"]:
self.misp.direct_call(f"attributes/edit/{a['id']}", {"to_ids": "0"})
publica = True
elif t in config.IDS_CORRELATIVOS:
if not a["to_ids"]:
self.misp.direct_call(f"attributes/edit/{a['id']}", {"to_ids": "1", "disable_correlation": False})
publica = True
else:
if a["disable_correlation"]:
self.misp.direct_call(f"attributes/edit/{a['id']}", {"disable_correlation": False})
publica = True
new_ev = ModificadosEv(evento_uuid=event_uuid, publicado_fecha=datetime.now(), attribute_count=int(event_attribute_count))
session.add(new_ev)
try:
session.commit()
logging.info(f"[PROCESADO] {event_id} => {event_uuid}")
except IntegrityError:
session.rollback()
logging.warning(f"[WARNING] Duplicado {event_uuid}")
# Si pasa flag
if publica:
try:
publish_resp = self.misp.publish(int(event_id), alert=False)
logging.info(f"[PUBLICADO] Evento {event_id} => {publish_resp}")
except PyMISPError as e:
logging.error(f"[ERROR PUBLICAR] {event_id}: {e}")
except Exception as ex:
logging.error(f"[ERROR PUBLICAR] {event_id}: {ex}")