misp-topcontrib/defs.py
Felipe Luis Quezada Valenzuela e1e538a3e2 #edit config
2025-01-09 10:17:54 -03:00

592 lines
26 KiB
Python

import os
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
import psutil
import multiprocessing
import ipaddress
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import sessionmaker
from pymisp import PyMISP
from pymisp.exceptions import PyMISPError
import config
import urllib3
import requests
from models import Base, Registro, Usuario, ModificadosEv
from datetime import datetime, timedelta
import threading
urllib3.disable_warnings()
class MISPProcessorTop:
def __init__(self):
# Directorio actual
self.dir_actual = os.getcwd()
# Directorio para Logs
self.dir_logs = self.dir_actual + '/logs'
# Directorio para data en JSON / BD
self.dir_data = self.dir_actual + '/data'
# Cuentas
self.creators_accounts = {}
# Se crea directorio logs y data por si no existe...
os.makedirs(self.dir_logs, exist_ok=True)
os.makedirs(self.dir_data, exist_ok=True)
# Logging...
rotating_handler = RotatingFileHandler(
os.path.join(self.dir_logs, "misp_ioc_top_app_" + datetime.now().strftime("%Y%m%d") + ".log"),
maxBytes=262144000,
backupCount=10
)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Configuración MISP
misp_url = config.MISP_CONFIG['URL']
misp_key = config.MISP_CONFIG['AUTHKEY']
self.misp = PyMISP(misp_url, misp_key, False)
# Lista que almacena IoC procesados corresponden FP (FALSOPOSITIVO)
self.procesados_ioc_fp = set()
# Lista que almacena IoC procesados corresponden VP (VERDADEROPOSITIVO)
self.procesados_ioc_vp = set()
def calcular_numero_de_workers(self):
# Obtener el número de CPUs
num_cpus = multiprocessing.cpu_count()
# Obtener la memoria disponible en GB
mem = psutil.virtual_memory()
available_memory_gb = mem.available / (1024 ** 3)
# Ajustar el número de workers en función de CPUs y memoria
workers_por_memoria = int(available_memory_gb // 1)
num_workers = min(num_cpus, workers_por_memoria)
return max(1, num_workers) # Asegurarse de que haya al menos un worker
def calcular_fecha_7_dias_antes(self, fecha_str):
# Convertir la cadena de texto en un objeto de fecha
fecha = datetime.strptime(fecha_str, "%Y-%m-%d")
# Calcular la fecha 7 días antes
fecha_7_dias_antes = fecha - timedelta(days=7)
# Devolver la fecha en formato YYYY-MM-DD
return fecha_7_dias_antes.strftime("%Y-%m-%d")
def eliminar_atributo_o_objeto(self, a, o, evento_id):
try:
logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + " en evento #" + evento_id)
if o is None:
# entonces se procesa atributo independiente de objeto
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
else:
# Verificación de existencia del objeto en MISP
objeto_existe = self.misp.get_object(o['uuid'], pythonify=True)
if not objeto_existe:
logging.warning(f"El objeto UUID {o['uuid']} no existe en MISP o ya fue eliminado. No se puede eliminar.")
return False # Para que no se contemple en la cuenta de FP...
# Se verifica para los casos de objetos que son con mas hash que sha256
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
else:
counter_validos = 0
# Se recorre para ver tipos
for at in o['Attribute']:
if at['type'] not in config.IOC_TIPOS_OMITIR:
counter_validos = counter_validos + 1
# Si es mayor a 1, no se puede borrar objeto completo
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {o['name']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
else:
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
return False
except (Exception, PyMISPError) as err:
logging.error(str(err))
return True
def verificar_atributo_en_warninglist(self, a, lista):
"""
Verifica si un atributo coincide con alguna entrada en las WarningLists,
considerando también el tipo de atributo según 'WarninglistType'.
"""
for l in lista:
for entry in l['Warninglist']['WarninglistEntry']:
# Verificar si el tipo del atributo está soportado por la entrada de la WarningList
if a['type'] in l['Warninglist']['WarninglistType']:
if 'ip-' in a['type'] and 'port' not in a['type']:
if self._comparar_ip(entry['value'], a['value'], a['type']):
return False
elif any(item in a['type'] for item in ['domain', 'hostname']):
if self._comparar_dominio(entry['value'], a['value'], a['type']):
return False
return True
def procesar_atributo(self, a, o, e, lista):
if a['value'] in self.procesados_ioc_fp:
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
elif a['value'] in self.procesados_ioc_vp:
return True # No se realiza ningún proceso de eliminación
if self.verificar_atributo_en_warninglist(a, lista):
# Se verifica atributo que coincide con Warninglist...
if not self.verificar_y_procesar_atributo(a, o, e, lista, ['ip-', 'domain', 'hostname','sha256']):
return False
else:
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
return True
def verificar_y_procesar_atributo(self, a, o, e, lista, tipos):
"""
Verifica y procesa un atributo basado en su tipo.
"""
if any(item in a['type'] for item in tipos):
if '|ip' not in a['type'] and 'port' not in a['type']:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if a['value'] not in self.procesados_ioc_fp and a['value'] not in self.procesados_ioc_vp:
if self.verificacion_ktip(a['value'], a['type']):
self.procesados_ioc_fp.add(a['value'])
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
else:
self.procesados_ioc_vp.add(a['value'])
return True
def _comparar_ip(self, valor_warning, valor_ip, tipo_ioc):
"""
Compara una IP con una entrada en la WarningList.
"""
try:
if '/' in valor_warning:
ip_obj = ipaddress.ip_address(valor_ip)
cidr_obj = ipaddress.ip_network(valor_warning)
if ip_obj in cidr_obj:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
self.procesados_ioc_vp.add(valor_ip)
return True
elif valor_ip in valor_warning:
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
self.procesados_ioc_vp.add(valor_ip)
return True
except Exception as e:
logging.error(str(e))
return False
def _comparar_dominio(self, valor_warning, valor_dominio, tipo_ioc):
"""
Compara un dominio con una entrada en la WarningList.
"""
if valor_dominio.strip() == valor_warning:
if valor_dominio not in self.procesados_ioc_fp and valor_dominio not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_dominio, tipo_ioc):
self.procesados_ioc_vp.add(valor_dominio)
return True
return False
def verificacion_ktip(self, ioc, tipo_ioc):
# URL Base
url = config.KTIP_CONFIG['url_base']
# Encabezados de la solicitud, incluyendo la API key para autenticación
headers = {
'x-api-key': config.KTIP_CONFIG['api_key']
}
# Mapeo según tipo
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"domain":['domain','hostname'],
"hash": ['sha256']
}
if tipo_ioc in tipos_ktip['ip']:
url = url + 'ip?request=' + ioc
elif tipo_ioc in tipos_ktip['domain']:
url = url + 'domain?request=' + ioc
elif tipo_ioc in tipos_ktip['hash']:
url = url + 'hash?request=' + ioc
else:
logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}")
return False
try:
# Realiza la solicitud GET a la API
response = requests.get(url, headers=headers, verify=False)
# Verifica si la solicitud fue exitosa
if response.status_code == 200:
data = response.json()
# Ahora verificar si IoC está limpio
if data['Zone'] == 'Green':
return True
# por defecto
return False
except (Exception, requests.exceptions.HTTPError) as e:
logging.error(str(e))
return False
def procesar_evento(self, e, lista, prom):
# Se saca publish_timestamp para que recorra desde 7 dias atras en cada de eventos antiguos
date_back = datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date()
date_back = self.calcular_fecha_7_dias_antes(date_back.strftime('%Y-%m-%d'))
date_back = datetime.strptime(date_back,'%Y-%m-%d').date()
resultados = {}
ioc_valido = True
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
if 0 < int(e['Event']['attribute_count']) <= prom:
logging.info(f"Procesando Evento #{e['Event']['id']} con fecha {e['Event']['date']}")
for a in e['Event']['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, None, e, lista):
ioc_valido = False
else:
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if 'Object' in e['Event']:
for o in e['Event']['Object']:
for a in o['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, o, e, lista):
ioc_valido = False
else:
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if not ioc_valido:
try:
# Se consulta evento y se verifica cantidad de atributos
evento = self.misp.get_event(int(e['Event']['id']))
if evento:
if int(evento['Event']['attribute_count']) > 0:
self.misp.publish(int(e['Event']['id']))
logging.info("Publicando cambios de evento #" + e['Event']['id'])
# Se guarda uuid de evento que fue modificado en BD, manteniendo el timestamp de publish
self.guardar_bd_procesados_mod(e['Event']['uuid'], int(e['Event']['publish_timestamp']), datetime.now())
else:
# Se elimina evento con cero atributo...
self.misp.delete_event(int(e['Event']['id']))
logging.info("Eliminando evento #" +e['Event']['id']+" por carencia de atributos")
except (Exception, PyMISPError) as err:
logging.error(str(err))
return resultados
@staticmethod
def convert_value(value):
try:
# Intentar convertir a fecha y retornar un número representativo
return int(datetime.strptime(value, "%Y%m%d").strftime("%Y%m%d"))
except ValueError:
# Si falla, convertir a entero directamente
return int(value)
def find_max(self, data):
# Llamar al método estático directamente
max_value = max(data, key=self.convert_value)
return max_value
def calcula_calidad_iocs(self, desde: str, hasta: str, a_por_evento=None):
try:
# Para salida
output = []
self.creators_accounts = self.obtener_cuentas_sync()
if self.creators_accounts:
puntos = {k: 0 for k in self.creators_accounts.keys()}
# Fechas para sacar la mayor...
fechas = []
# Filtros para seleccionar Warninglist
filtros = config.CONFIG_WL['filtros_buscar']
# Para acumular warninglist
lista = []
logging.info("Limpiador de IoC App v1.0 comenzando...")
# Actualizar Warninglist por si acaso...
self.misp.update_warninglists()
# Warninglist completas
wl = self.misp.warninglists()
for l in wl:
fechas.append(str(l['Warninglist']['version']))
# Saca la versión más alta...
#version = max(fechas, key=self.convert_value)
version = self.find_max(fechas)
for l in wl:
if str(l['Warninglist']['version']) == version:
if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros):
if int(str(l['Warninglist']['warninglist_entry_count'])) <= config.CONFIG_WL['max_reg']:
lista.append(self.misp.get_warninglist(int(l['Warninglist']['id'])))
if lista:
# Promedio por defecto
prom = 0
logging.info("Warninglist de MISP Cargadas : " + str(len(lista)))
logging.info("Versión de Warninglists a utilizar: " + str(version))
# Rango completo de fechas....
logging.info("Buscando IoC Desde :" + desde + " Hasta :" + hasta)
#eventos_tmp = self.misp.search(publish_timestamp=desde)
#eventos_tmp = self.misp.search(date_from=desde, published=True)
eventos_tmp = self.misp.search_index(publish_timestamp=desde, published=True)
# Si existen eventos, se realiza proceso...
if eventos_tmp:
logging.info("Recolectando eventos para procesar")
eventos = []
# Se seleccionan eventos para establecer limite de fechas
for e in eventos_tmp:
if datetime.fromtimestamp(int(e['publish_timestamp'])).date() <= datetime.strptime(hasta, '%Y-%m-%d').date():
try:
# se verifica que evento no haya sido procesado en periodo anterior
prev_evento = self.obtener_evento_mod_db(e['uuid'])
# Si no existe en modificados....
if not prev_evento:
# Event get
ev = self.misp.get_event(int(e['id']))
eventos.append(ev)
elif datetime.fromtimestamp(int(e['publish_timestamp'])) > prev_evento['pub_mod_fecha'] + timedelta(hours=1):
# Verificar fecha con desfase, si es mayor se agrega para procesar
# Event get
ev = self.misp.get_event(int(e['id']))
eventos.append(ev)
except Exception as err_c:
logging.error("Error al tratar de agregar evento +"+e['id']+" :"+str(err_c))
# Atributos por evento es None, se calcula promedio...
if a_por_evento is None:
cont_atrr = 0
cont_eventos = 0
for e in eventos:
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
cont_atrr += int(e['Event']['attribute_count'])
cont_eventos += 1
logging.info("Promedio de atributos a procesar listos.")
prom = int(cont_atrr / cont_eventos) if cont_eventos > 0 else 0
else:
prom = int(a_por_evento)
logging.info("Eventos por procesar :" + str(len(eventos)))
logging.info("Máximo de atributos a procesar por evento :" + str(prom))
num_workers = config.WORKERS_THR
logging.info(f"Usando {num_workers} workers")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(self.procesar_evento, e, lista, prom) for e in eventos]
for future in as_completed(futures):
result = future.result()
for email, puntos_obtenidos in result.items():
puntos[email] += puntos_obtenidos
puntos_ordenados = {k: v for k, v in sorted(puntos.items(), key=lambda item: item[1], reverse=True)}
for k in puntos_ordenados.keys():
temp = {}
temp['comunidad'] = self.creators_accounts[k]
temp['cantidad_ioc'] = puntos_ordenados[k]
output.append(temp)
logging.info("Total Falsos positivos detectados y eliminados : " + str(len(self.procesados_ioc_fp)))
logging.info("Proceso de conteo de IoC finalizado...")
else:
logging.error("No existe eventos para rango de fechas. Se detiene proceso")
else:
logging.error("No se encuentran Warninglist actualizadas para comparar. Se detiene proceso")
else:
logging.error("No se encuentran cuentas asociadas a MISP. Se detiene proceso")
return output
except (Exception, PyMISPError) as err:
logging.error(str(err))
def guarda_ioc_json(self, data: list, filename: str):
try:
salida = os.path.join(self.dir_data, filename)
with open(salida, "w") as archivo_json:
json.dump(data, archivo_json, indent=4)
logging.info("Data volcada a ruta :" + salida)
except Exception as e:
logging.info("Error al escribir JSON :" + str(e))
def obtener_evento_mod_db(self, e_uuid: str):
# Output inicial vacío
output = {}
try:
# Ruta de la base de datos
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Consultar el último registro con el evento_uuid específico
ultimo_procesado = (
session.query(ModificadosEv)
.filter_by(evento_uuid=e_uuid)
.order_by(ModificadosEv.pub_mod_fecha.desc()) # Ordenar por fecha descendente
.first() # Obtener solo el primer registro
)
# Si se encuentra un registro, convertirlo a dict
if ultimo_procesado:
output = ultimo_procesado.to_dict()
except SQLAlchemyError as db_error:
logging.error(f"Error en la consulta a la base de datos: {db_error}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error general al conectarse a la base de datos: {err}")
# Retornar el resultado
return output
def guardar_bd_procesados_mod(self, e_uuid: str, timestamp_fecha: int, mod_fecha: datetime):
try:
# Ruta de la base de datos
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Crear el nuevo registro
nuevo_registro = ModificadosEv(
evento_uuid=e_uuid,
publicado_fecha=datetime.fromtimestamp(timestamp_fecha),
pub_mod_fecha=mod_fecha
)
try:
session.add(nuevo_registro)
session.commit()
logging.info(f"Registrado UUID de evento modificado: {e_uuid}")
except IntegrityError:
# Manejo específico para violación de restricciones
logging.warning(f"Registro duplicado: {e_uuid} ya existe con la misma fecha.")
session.rollback()
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error general al guardar en la base de datos: {err}")
def guardar_bd(self, data: list, fecha: str):
try:
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Fecha corte
fecha_corte = fecha.split("-")
ano_t = int(fecha_corte[0])
mes_t = int(fecha_corte[1])
dia_t = int(fecha_corte[2])
for item in data:
organizacion = str(item['comunidad']).upper()
cantidad_ioc = item['cantidad_ioc']
nuevo_registro = Registro(
organizacion=organizacion,
ano=ano_t,
mes=mes_t,
dia=dia_t,
fecha_creado = datetime.strptime(fecha, '%Y-%m-%d').date(),
cantidad_ioc=cantidad_ioc
)
session.add(nuevo_registro)
try:
session.commit()
logging.info("Datos guardados exitosamente en la base de datos.")
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error al guardar en la base de datos: {err}")
def obtener_cuentas_sync(self):
cuentas = {}
try:
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
query_cuentas = session.query(Usuario).all()
for item in query_cuentas:
cuentas[item.usuario_sync] = item.organizacion
return cuentas
except Exception as err:
logging.error(f"Error al obtener datos: {err}")
return None
finally:
session.close()