misp-topcontrib/defs.py

593 lines
26 KiB
Python
Raw Normal View History

2024-11-06 14:53:19 -03:00
import os
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
import psutil
import multiprocessing
import ipaddress
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine
2025-01-09 10:17:54 -03:00
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
2024-11-06 14:53:19 -03:00
from sqlalchemy.orm import sessionmaker
from pymisp import PyMISP
2024-12-04 11:09:10 -03:00
from pymisp.exceptions import PyMISPError
2024-11-06 14:53:19 -03:00
import config
import urllib3
import requests
2025-01-09 10:17:54 -03:00
from models import Base, Registro, Usuario, ModificadosEv
2024-11-06 14:53:19 -03:00
from datetime import datetime, timedelta
import threading
urllib3.disable_warnings()
class MISPProcessorTop:
def __init__(self):
# Directorio actual
self.dir_actual = os.getcwd()
# Directorio para Logs
self.dir_logs = self.dir_actual + '/logs'
# Directorio para data en JSON / BD
self.dir_data = self.dir_actual + '/data'
# Cuentas
self.creators_accounts = {}
# Se crea directorio logs y data por si no existe...
os.makedirs(self.dir_logs, exist_ok=True)
os.makedirs(self.dir_data, exist_ok=True)
# Logging...
rotating_handler = RotatingFileHandler(
os.path.join(self.dir_logs, "misp_ioc_top_app_" + datetime.now().strftime("%Y%m%d") + ".log"),
maxBytes=262144000,
backupCount=10
)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Configuración MISP
misp_url = config.MISP_CONFIG['URL']
misp_key = config.MISP_CONFIG['AUTHKEY']
self.misp = PyMISP(misp_url, misp_key, False)
# Lista que almacena IoC procesados corresponden FP (FALSOPOSITIVO)
self.procesados_ioc_fp = set()
# Lista que almacena IoC procesados corresponden VP (VERDADEROPOSITIVO)
self.procesados_ioc_vp = set()
def calcular_numero_de_workers(self):
# Obtener el número de CPUs
num_cpus = multiprocessing.cpu_count()
# Obtener la memoria disponible en GB
mem = psutil.virtual_memory()
available_memory_gb = mem.available / (1024 ** 3)
# Ajustar el número de workers en función de CPUs y memoria
workers_por_memoria = int(available_memory_gb // 1)
num_workers = min(num_cpus, workers_por_memoria)
return max(1, num_workers) # Asegurarse de que haya al menos un worker
def calcular_fecha_7_dias_antes(self, fecha_str):
# Convertir la cadena de texto en un objeto de fecha
fecha = datetime.strptime(fecha_str, "%Y-%m-%d")
# Calcular la fecha 7 días antes
fecha_7_dias_antes = fecha - timedelta(days=7)
# Devolver la fecha en formato YYYY-MM-DD
return fecha_7_dias_antes.strftime("%Y-%m-%d")
def eliminar_atributo_o_objeto(self, a, o, evento_id):
try:
logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + " en evento #" + evento_id)
if o is None:
# entonces se procesa atributo independiente de objeto
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
else:
# Verificación de existencia del objeto en MISP
objeto_existe = self.misp.get_object(o['uuid'], pythonify=True)
if not objeto_existe:
logging.warning(f"El objeto UUID {o['uuid']} no existe en MISP o ya fue eliminado. No se puede eliminar.")
return False # Para que no se contemple en la cuenta de FP...
# Se verifica para los casos de objetos que son con mas hash que sha256
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
else:
counter_validos = 0
# Se recorre para ver tipos
for at in o['Attribute']:
if at['type'] not in config.IOC_TIPOS_OMITIR:
counter_validos = counter_validos + 1
# Si es mayor a 1, no se puede borrar objeto completo
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {o['name']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
else:
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
return False
2024-12-04 11:09:10 -03:00
except (Exception, PyMISPError) as err:
2024-11-06 14:53:19 -03:00
logging.error(str(err))
return True
def verificar_atributo_en_warninglist(self, a, lista):
"""
Verifica si un atributo coincide con alguna entrada en las WarningLists,
considerando también el tipo de atributo según 'WarninglistType'.
"""
for l in lista:
for entry in l['Warninglist']['WarninglistEntry']:
# Verificar si el tipo del atributo está soportado por la entrada de la WarningList
if a['type'] in l['Warninglist']['WarninglistType']:
if 'ip-' in a['type'] and 'port' not in a['type']:
if self._comparar_ip(entry['value'], a['value'], a['type']):
return False
elif any(item in a['type'] for item in ['domain', 'hostname']):
if self._comparar_dominio(entry['value'], a['value'], a['type']):
return False
return True
def procesar_atributo(self, a, o, e, lista):
if a['value'] in self.procesados_ioc_fp:
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
elif a['value'] in self.procesados_ioc_vp:
return True # No se realiza ningún proceso de eliminación
if self.verificar_atributo_en_warninglist(a, lista):
# Se verifica atributo que coincide con Warninglist...
if not self.verificar_y_procesar_atributo(a, o, e, lista, ['ip-', 'domain', 'hostname','sha256']):
return False
else:
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
return True
def verificar_y_procesar_atributo(self, a, o, e, lista, tipos):
"""
Verifica y procesa un atributo basado en su tipo.
"""
if any(item in a['type'] for item in tipos):
if '|ip' not in a['type'] and 'port' not in a['type']:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if a['value'] not in self.procesados_ioc_fp and a['value'] not in self.procesados_ioc_vp:
if self.verificacion_ktip(a['value'], a['type']):
self.procesados_ioc_fp.add(a['value'])
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
else:
self.procesados_ioc_vp.add(a['value'])
return True
def _comparar_ip(self, valor_warning, valor_ip, tipo_ioc):
"""
Compara una IP con una entrada en la WarningList.
"""
try:
if '/' in valor_warning:
ip_obj = ipaddress.ip_address(valor_ip)
cidr_obj = ipaddress.ip_network(valor_warning)
if ip_obj in cidr_obj:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
self.procesados_ioc_vp.add(valor_ip)
return True
elif valor_ip in valor_warning:
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
self.procesados_ioc_vp.add(valor_ip)
return True
except Exception as e:
logging.error(str(e))
return False
def _comparar_dominio(self, valor_warning, valor_dominio, tipo_ioc):
"""
Compara un dominio con una entrada en la WarningList.
"""
if valor_dominio.strip() == valor_warning:
if valor_dominio not in self.procesados_ioc_fp and valor_dominio not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_dominio, tipo_ioc):
self.procesados_ioc_vp.add(valor_dominio)
return True
return False
def verificacion_ktip(self, ioc, tipo_ioc):
# URL Base
url = config.KTIP_CONFIG['url_base']
# Encabezados de la solicitud, incluyendo la API key para autenticación
headers = {
'x-api-key': config.KTIP_CONFIG['api_key']
}
# Mapeo según tipo
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"domain":['domain','hostname'],
"hash": ['sha256']
}
if tipo_ioc in tipos_ktip['ip']:
url = url + 'ip?request=' + ioc
elif tipo_ioc in tipos_ktip['domain']:
url = url + 'domain?request=' + ioc
elif tipo_ioc in tipos_ktip['hash']:
url = url + 'hash?request=' + ioc
else:
logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}")
return False
try:
# Realiza la solicitud GET a la API
response = requests.get(url, headers=headers, verify=False)
# Verifica si la solicitud fue exitosa
if response.status_code == 200:
data = response.json()
# Ahora verificar si IoC está limpio
if data['Zone'] == 'Green':
return True
# por defecto
return False
except (Exception, requests.exceptions.HTTPError) as e:
logging.error(str(e))
return False
def procesar_evento(self, e, lista, prom):
# Se saca publish_timestamp para que recorra desde 7 dias atras en cada de eventos antiguos
date_back = datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date()
date_back = self.calcular_fecha_7_dias_antes(date_back.strftime('%Y-%m-%d'))
date_back = datetime.strptime(date_back,'%Y-%m-%d').date()
resultados = {}
ioc_valido = True
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
if 0 < int(e['Event']['attribute_count']) <= prom:
logging.info(f"Procesando Evento #{e['Event']['id']} con fecha {e['Event']['date']}")
for a in e['Event']['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, None, e, lista):
ioc_valido = False
else:
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if 'Object' in e['Event']:
for o in e['Event']['Object']:
for a in o['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, o, e, lista):
ioc_valido = False
else:
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if not ioc_valido:
try:
# Se consulta evento y se verifica cantidad de atributos
evento = self.misp.get_event(int(e['Event']['id']))
if evento:
if int(evento['Event']['attribute_count']) > 0:
self.misp.publish(int(e['Event']['id']))
logging.info("Publicando cambios de evento #" + e['Event']['id'])
2025-01-09 10:17:54 -03:00
# Se guarda uuid de evento que fue modificado en BD, manteniendo el timestamp de publish
self.guardar_bd_procesados_mod(e['Event']['uuid'], int(e['Event']['publish_timestamp']), datetime.now())
2024-11-06 14:53:19 -03:00
else:
# Se elimina evento con cero atributo...
self.misp.delete_event(int(e['Event']['id']))
logging.info("Eliminando evento #" +e['Event']['id']+" por carencia de atributos")
2024-12-04 11:09:10 -03:00
except (Exception, PyMISPError) as err:
2024-11-06 14:53:19 -03:00
logging.error(str(err))
return resultados
2025-01-09 10:17:54 -03:00
@staticmethod
def convert_value(value):
try:
# Intentar convertir a fecha y retornar un número representativo
return int(datetime.strptime(value, "%Y%m%d").strftime("%Y%m%d"))
except ValueError:
# Si falla, convertir a entero directamente
return int(value)
def find_max(self, data):
# Llamar al método estático directamente
max_value = max(data, key=self.convert_value)
return max_value
2024-11-06 14:53:19 -03:00
def calcula_calidad_iocs(self, desde: str, hasta: str, a_por_evento=None):
try:
# Para salida
output = []
self.creators_accounts = self.obtener_cuentas_sync()
if self.creators_accounts:
puntos = {k: 0 for k in self.creators_accounts.keys()}
# Fechas para sacar la mayor...
fechas = []
# Filtros para seleccionar Warninglist
filtros = config.CONFIG_WL['filtros_buscar']
# Para acumular warninglist
lista = []
logging.info("Limpiador de IoC App v1.0 comenzando...")
# Actualizar Warninglist por si acaso...
self.misp.update_warninglists()
# Warninglist completas
wl = self.misp.warninglists()
for l in wl:
2025-01-09 10:17:54 -03:00
fechas.append(str(l['Warninglist']['version']))
2024-11-06 14:53:19 -03:00
2025-01-09 10:17:54 -03:00
# Saca la versión más alta...
#version = max(fechas, key=self.convert_value)
version = self.find_max(fechas)
2024-11-06 14:53:19 -03:00
for l in wl:
if str(l['Warninglist']['version']) == version:
if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros):
if int(str(l['Warninglist']['warninglist_entry_count'])) <= config.CONFIG_WL['max_reg']:
lista.append(self.misp.get_warninglist(int(l['Warninglist']['id'])))
if lista:
# Promedio por defecto
prom = 0
logging.info("Warninglist de MISP Cargadas : " + str(len(lista)))
2025-01-09 10:17:54 -03:00
logging.info("Versión de Warninglists a utilizar: " + str(version))
2024-11-06 14:53:19 -03:00
# Rango completo de fechas....
logging.info("Buscando IoC Desde :" + desde + " Hasta :" + hasta)
2024-12-04 11:09:10 -03:00
#eventos_tmp = self.misp.search(publish_timestamp=desde)
#eventos_tmp = self.misp.search(date_from=desde, published=True)
2024-12-04 12:30:08 -03:00
eventos_tmp = self.misp.search_index(publish_timestamp=desde, published=True)
2024-11-06 14:53:19 -03:00
# Si existen eventos, se realiza proceso...
if eventos_tmp:
2024-12-04 11:09:10 -03:00
logging.info("Recolectando eventos para procesar")
2024-11-06 14:53:19 -03:00
eventos = []
# Se seleccionan eventos para establecer limite de fechas
for e in eventos_tmp:
2024-12-04 11:09:10 -03:00
if datetime.fromtimestamp(int(e['publish_timestamp'])).date() <= datetime.strptime(hasta, '%Y-%m-%d').date():
2025-01-09 10:17:54 -03:00
try:
# se verifica que evento no haya sido procesado en periodo anterior
prev_evento = self.obtener_evento_mod_db(e['uuid'])
# Si no existe en modificados....
if not prev_evento:
# Event get
ev = self.misp.get_event(int(e['id']))
eventos.append(ev)
elif datetime.fromtimestamp(int(e['publish_timestamp'])) > prev_evento['pub_mod_fecha'] + timedelta(hours=1):
# Verificar fecha con desfase, si es mayor se agrega para procesar
# Event get
ev = self.misp.get_event(int(e['id']))
eventos.append(ev)
except Exception as err_c:
logging.error("Error al tratar de agregar evento +"+e['id']+" :"+str(err_c))
2024-11-06 14:53:19 -03:00
# Atributos por evento es None, se calcula promedio...
if a_por_evento is None:
cont_atrr = 0
cont_eventos = 0
for e in eventos:
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
cont_atrr += int(e['Event']['attribute_count'])
cont_eventos += 1
logging.info("Promedio de atributos a procesar listos.")
prom = int(cont_atrr / cont_eventos) if cont_eventos > 0 else 0
else:
prom = int(a_por_evento)
logging.info("Eventos por procesar :" + str(len(eventos)))
logging.info("Máximo de atributos a procesar por evento :" + str(prom))
2024-12-04 11:09:10 -03:00
num_workers = config.WORKERS_THR
2024-11-06 14:53:19 -03:00
logging.info(f"Usando {num_workers} workers")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(self.procesar_evento, e, lista, prom) for e in eventos]
for future in as_completed(futures):
result = future.result()
for email, puntos_obtenidos in result.items():
puntos[email] += puntos_obtenidos
puntos_ordenados = {k: v for k, v in sorted(puntos.items(), key=lambda item: item[1], reverse=True)}
for k in puntos_ordenados.keys():
temp = {}
temp['comunidad'] = self.creators_accounts[k]
temp['cantidad_ioc'] = puntos_ordenados[k]
output.append(temp)
logging.info("Total Falsos positivos detectados y eliminados : " + str(len(self.procesados_ioc_fp)))
logging.info("Proceso de conteo de IoC finalizado...")
else:
logging.error("No existe eventos para rango de fechas. Se detiene proceso")
else:
logging.error("No se encuentran Warninglist actualizadas para comparar. Se detiene proceso")
else:
logging.error("No se encuentran cuentas asociadas a MISP. Se detiene proceso")
return output
2024-12-04 11:09:10 -03:00
except (Exception, PyMISPError) as err:
2024-11-06 14:53:19 -03:00
logging.error(str(err))
def guarda_ioc_json(self, data: list, filename: str):
try:
salida = os.path.join(self.dir_data, filename)
with open(salida, "w") as archivo_json:
json.dump(data, archivo_json, indent=4)
logging.info("Data volcada a ruta :" + salida)
except Exception as e:
logging.info("Error al escribir JSON :" + str(e))
2025-01-09 10:17:54 -03:00
def obtener_evento_mod_db(self, e_uuid: str):
# Output inicial vacío
output = {}
try:
# Ruta de la base de datos
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
# Consultar el último registro con el evento_uuid específico
ultimo_procesado = (
session.query(ModificadosEv)
.filter_by(evento_uuid=e_uuid)
.order_by(ModificadosEv.pub_mod_fecha.desc()) # Ordenar por fecha descendente
.first() # Obtener solo el primer registro
)
# Si se encuentra un registro, convertirlo a dict
if ultimo_procesado:
output = ultimo_procesado.to_dict()
except SQLAlchemyError as db_error:
logging.error(f"Error en la consulta a la base de datos: {db_error}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error general al conectarse a la base de datos: {err}")
# Retornar el resultado
return output
def guardar_bd_procesados_mod(self, e_uuid: str, timestamp_fecha: int, mod_fecha: datetime):
try:
# Ruta de la base de datos
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Crear el nuevo registro
nuevo_registro = ModificadosEv(
evento_uuid=e_uuid,
publicado_fecha=datetime.fromtimestamp(timestamp_fecha),
pub_mod_fecha=mod_fecha
)
try:
session.add(nuevo_registro)
session.commit()
logging.info(f"Registrado UUID de evento modificado: {e_uuid}")
except IntegrityError:
# Manejo específico para violación de restricciones
logging.warning(f"Registro duplicado: {e_uuid} ya existe con la misma fecha.")
session.rollback()
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error general al guardar en la base de datos: {err}")
2024-11-06 14:53:19 -03:00
def guardar_bd(self, data: list, fecha: str):
try:
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Fecha corte
fecha_corte = fecha.split("-")
ano_t = int(fecha_corte[0])
mes_t = int(fecha_corte[1])
dia_t = int(fecha_corte[2])
for item in data:
organizacion = str(item['comunidad']).upper()
cantidad_ioc = item['cantidad_ioc']
nuevo_registro = Registro(
organizacion=organizacion,
ano=ano_t,
mes=mes_t,
dia=dia_t,
fecha_creado = datetime.strptime(fecha, '%Y-%m-%d').date(),
cantidad_ioc=cantidad_ioc
)
session.add(nuevo_registro)
try:
session.commit()
logging.info("Datos guardados exitosamente en la base de datos.")
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
session.rollback()
finally:
session.close()
except Exception as err:
logging.error(f"Error al guardar en la base de datos: {err}")
def obtener_cuentas_sync(self):
cuentas = {}
try:
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
query_cuentas = session.query(Usuario).all()
for item in query_cuentas:
cuentas[item.usuario_sync] = item.organizacion
return cuentas
except Exception as err:
logging.error(f"Error al obtener datos: {err}")
return None
finally:
session.close()