import os import logging from logging.handlers import RotatingFileHandler from datetime import datetime import psutil import multiprocessing import ipaddress import json from concurrent.futures import ThreadPoolExecutor, as_completed from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from pymisp import PyMISP from pymisp.exceptions import PyMISPError import config import urllib3 import requests from models import Base, Registro, Usuario from datetime import datetime, timedelta import threading urllib3.disable_warnings() class MISPProcessorTop: def __init__(self): # Directorio actual self.dir_actual = os.getcwd() # Directorio para Logs self.dir_logs = self.dir_actual + '/logs' # Directorio para data en JSON / BD self.dir_data = self.dir_actual + '/data' # Cuentas self.creators_accounts = {} # Se crea directorio logs y data por si no existe... os.makedirs(self.dir_logs, exist_ok=True) os.makedirs(self.dir_data, exist_ok=True) # Logging... rotating_handler = RotatingFileHandler( os.path.join(self.dir_logs, "misp_ioc_top_app_" + datetime.now().strftime("%Y%m%d") + ".log"), maxBytes=262144000, backupCount=10 ) logging.basicConfig( level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s' ) # Configuración MISP misp_url = config.MISP_CONFIG['URL'] misp_key = config.MISP_CONFIG['AUTHKEY'] self.misp = PyMISP(misp_url, misp_key, False) # Lista que almacena IoC procesados corresponden FP (FALSOPOSITIVO) self.procesados_ioc_fp = set() # Lista que almacena IoC procesados corresponden VP (VERDADEROPOSITIVO) self.procesados_ioc_vp = set() def calcular_numero_de_workers(self): # Obtener el número de CPUs num_cpus = multiprocessing.cpu_count() # Obtener la memoria disponible en GB mem = psutil.virtual_memory() available_memory_gb = mem.available / (1024 ** 3) # Ajustar el número de workers en función de CPUs y memoria workers_por_memoria = int(available_memory_gb // 1) num_workers = min(num_cpus, workers_por_memoria) return max(1, num_workers) # Asegurarse de que haya al menos un worker def calcular_fecha_7_dias_antes(self, fecha_str): # Convertir la cadena de texto en un objeto de fecha fecha = datetime.strptime(fecha_str, "%Y-%m-%d") # Calcular la fecha 7 días antes fecha_7_dias_antes = fecha - timedelta(days=7) # Devolver la fecha en formato YYYY-MM-DD return fecha_7_dias_antes.strftime("%Y-%m-%d") def eliminar_atributo_o_objeto(self, a, o, evento_id): try: logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + " en evento #" + evento_id) if o is None: # entonces se procesa atributo independiente de objeto logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}") self.misp.delete_attribute(a['uuid'], True) else: # Verificación de existencia del objeto en MISP objeto_existe = self.misp.get_object(o['uuid'], pythonify=True) if not objeto_existe: logging.warning(f"El objeto UUID {o['uuid']} no existe en MISP o ya fue eliminado. No se puede eliminar.") return False # Para que no se contemple en la cuenta de FP... # Se verifica para los casos de objetos que son con mas hash que sha256 if a['type'] == 'sha256': logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}") self.misp.delete_object(o['uuid'], True) else: counter_validos = 0 # Se recorre para ver tipos for at in o['Attribute']: if at['type'] not in config.IOC_TIPOS_OMITIR: counter_validos = counter_validos + 1 # Si es mayor a 1, no se puede borrar objeto completo if counter_validos > 1: logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {o['name']} , evento #{evento_id}") self.misp.delete_attribute(a['uuid'], True) else: logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}") self.misp.delete_object(o['uuid'], True) return False except (Exception, PyMISPError) as err: logging.error(str(err)) return True def verificar_atributo_en_warninglist(self, a, lista): """ Verifica si un atributo coincide con alguna entrada en las WarningLists, considerando también el tipo de atributo según 'WarninglistType'. """ for l in lista: for entry in l['Warninglist']['WarninglistEntry']: # Verificar si el tipo del atributo está soportado por la entrada de la WarningList if a['type'] in l['Warninglist']['WarninglistType']: if 'ip-' in a['type'] and 'port' not in a['type']: if self._comparar_ip(entry['value'], a['value'], a['type']): return False elif any(item in a['type'] for item in ['domain', 'hostname']): if self._comparar_dominio(entry['value'], a['value'], a['type']): return False return True def procesar_atributo(self, a, o, e, lista): if a['value'] in self.procesados_ioc_fp: return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) elif a['value'] in self.procesados_ioc_vp: return True # No se realiza ningún proceso de eliminación if self.verificar_atributo_en_warninglist(a, lista): # Se verifica atributo que coincide con Warninglist... if not self.verificar_y_procesar_atributo(a, o, e, lista, ['ip-', 'domain', 'hostname','sha256']): return False else: return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) return True def verificar_y_procesar_atributo(self, a, o, e, lista, tipos): """ Verifica y procesa un atributo basado en su tipo. """ if any(item in a['type'] for item in tipos): if '|ip' not in a['type'] and 'port' not in a['type']: # Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP if a['value'] not in self.procesados_ioc_fp and a['value'] not in self.procesados_ioc_vp: if self.verificacion_ktip(a['value'], a['type']): self.procesados_ioc_fp.add(a['value']) return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) else: self.procesados_ioc_vp.add(a['value']) return True def _comparar_ip(self, valor_warning, valor_ip, tipo_ioc): """ Compara una IP con una entrada en la WarningList. """ try: if '/' in valor_warning: ip_obj = ipaddress.ip_address(valor_ip) cidr_obj = ipaddress.ip_network(valor_warning) if ip_obj in cidr_obj: # Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_ip, tipo_ioc): self.procesados_ioc_vp.add(valor_ip) return True elif valor_ip in valor_warning: if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_ip, tipo_ioc): self.procesados_ioc_vp.add(valor_ip) return True except Exception as e: logging.error(str(e)) return False def _comparar_dominio(self, valor_warning, valor_dominio, tipo_ioc): """ Compara un dominio con una entrada en la WarningList. """ if valor_dominio.strip() == valor_warning: if valor_dominio not in self.procesados_ioc_fp and valor_dominio not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_dominio, tipo_ioc): self.procesados_ioc_vp.add(valor_dominio) return True return False def verificacion_ktip(self, ioc, tipo_ioc): # URL Base url = config.KTIP_CONFIG['url_base'] # Encabezados de la solicitud, incluyendo la API key para autenticación headers = { 'x-api-key': config.KTIP_CONFIG['api_key'] } # Mapeo según tipo tipos_ktip = { "ip": ['ip-src','ip-dst'], "domain":['domain','hostname'], "hash": ['sha256'] } if tipo_ioc in tipos_ktip['ip']: url = url + 'ip?request=' + ioc elif tipo_ioc in tipos_ktip['domain']: url = url + 'domain?request=' + ioc elif tipo_ioc in tipos_ktip['hash']: url = url + 'hash?request=' + ioc else: logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}") return False try: # Realiza la solicitud GET a la API response = requests.get(url, headers=headers, verify=False) # Verifica si la solicitud fue exitosa if response.status_code == 200: data = response.json() # Ahora verificar si IoC está limpio if data['Zone'] == 'Green': return True # por defecto return False except (Exception, requests.exceptions.HTTPError) as e: logging.error(str(e)) return False def procesar_evento(self, e, lista, prom): # Se saca publish_timestamp para que recorra desde 7 dias atras en cada de eventos antiguos date_back = datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date() date_back = self.calcular_fecha_7_dias_antes(date_back.strftime('%Y-%m-%d')) date_back = datetime.strptime(date_back,'%Y-%m-%d').date() resultados = {} ioc_valido = True if str(e['Event']['event_creator_email']).lower() in self.creators_accounts: if 0 < int(e['Event']['attribute_count']) <= prom: logging.info(f"Procesando Evento #{e['Event']['id']} con fecha {e['Event']['date']}") for a in e['Event']['Attribute']: if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back: if a['type'] not in config.IOC_TIPOS_OMITIR: if not self.procesar_atributo(a, None, e, lista): ioc_valido = False else: # IoC a contar resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1 if 'Object' in e['Event']: for o in e['Event']['Object']: for a in o['Attribute']: if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back: if a['type'] not in config.IOC_TIPOS_OMITIR: if not self.procesar_atributo(a, o, e, lista): ioc_valido = False else: # IoC a contar resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1 if not ioc_valido: try: # Se consulta evento y se verifica cantidad de atributos evento = self.misp.get_event(int(e['Event']['id'])) if evento: if int(evento['Event']['attribute_count']) > 0: self.misp.publish(int(e['Event']['id'])) logging.info("Publicando cambios de evento #" + e['Event']['id']) else: # Se elimina evento con cero atributo... self.misp.delete_event(int(e['Event']['id'])) logging.info("Eliminando evento #" +e['Event']['id']+" por carencia de atributos") except (Exception, PyMISPError) as err: logging.error(str(err)) return resultados def calcula_calidad_iocs(self, desde: str, hasta: str, a_por_evento=None): try: # Para salida output = [] self.creators_accounts = self.obtener_cuentas_sync() if self.creators_accounts: puntos = {k: 0 for k in self.creators_accounts.keys()} # Para filtrar versión de Warninglist donde buscar, se toma desde fecha "desde" actual = datetime.now().year version = str(actual) # Fechas para sacar la mayor... fechas = [] # Filtros para seleccionar Warninglist filtros = config.CONFIG_WL['filtros_buscar'] # Para acumular warninglist lista = [] logging.info("Limpiador de IoC App v1.0 comenzando...") # Actualizar Warninglist por si acaso... self.misp.update_warninglists() # Warninglist completas wl = self.misp.warninglists() for l in wl: if str(l['Warninglist']['version']).startswith(str(actual)): fechas.append(l['Warninglist']['version']) if fechas: # Saca la versión más alta... version = max(fechas) for l in wl: if str(l['Warninglist']['version']) == version: if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros): if int(str(l['Warninglist']['warninglist_entry_count'])) <= config.CONFIG_WL['max_reg']: lista.append(self.misp.get_warninglist(int(l['Warninglist']['id']))) if lista: # Promedio por defecto prom = 0 logging.info("Warninglist de MISP Cargadas : " + str(len(lista))) # Rango completo de fechas.... logging.info("Buscando IoC Desde :" + desde + " Hasta :" + hasta) #eventos_tmp = self.misp.search(publish_timestamp=desde) #eventos_tmp = self.misp.search(date_from=desde, published=True) eventos_tmp = self.misp.search_index(publish_timestamp=desde, published=True) # Si existen eventos, se realiza proceso... if eventos_tmp: logging.info("Recolectando eventos para procesar") eventos = [] # Se seleccionan eventos para establecer limite de fechas for e in eventos_tmp: if datetime.fromtimestamp(int(e['publish_timestamp'])).date() <= datetime.strptime(hasta, '%Y-%m-%d').date(): # Event get ev = self.misp.get_event(int(e['id'])) eventos.append(ev) # Atributos por evento es None, se calcula promedio... if a_por_evento is None: cont_atrr = 0 cont_eventos = 0 for e in eventos: if str(e['Event']['event_creator_email']).lower() in self.creators_accounts: cont_atrr += int(e['Event']['attribute_count']) cont_eventos += 1 logging.info("Promedio de atributos a procesar listos.") prom = int(cont_atrr / cont_eventos) if cont_eventos > 0 else 0 else: prom = int(a_por_evento) logging.info("Eventos por procesar :" + str(len(eventos))) logging.info("Máximo de atributos a procesar por evento :" + str(prom)) num_workers = config.WORKERS_THR logging.info(f"Usando {num_workers} workers") with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(self.procesar_evento, e, lista, prom) for e in eventos] for future in as_completed(futures): result = future.result() for email, puntos_obtenidos in result.items(): puntos[email] += puntos_obtenidos puntos_ordenados = {k: v for k, v in sorted(puntos.items(), key=lambda item: item[1], reverse=True)} for k in puntos_ordenados.keys(): temp = {} temp['comunidad'] = self.creators_accounts[k] temp['cantidad_ioc'] = puntos_ordenados[k] output.append(temp) logging.info("Total Falsos positivos detectados y eliminados : " + str(len(self.procesados_ioc_fp))) logging.info("Proceso de conteo de IoC finalizado...") else: logging.error("No existe eventos para rango de fechas. Se detiene proceso") else: logging.error("No se encuentran Warninglist actualizadas para comparar. Se detiene proceso") else: logging.error("No se encuentran cuentas asociadas a MISP. Se detiene proceso") return output except (Exception, PyMISPError) as err: logging.error(str(err)) def guarda_ioc_json(self, data: list, filename: str): try: salida = os.path.join(self.dir_data, filename) with open(salida, "w") as archivo_json: json.dump(data, archivo_json, indent=4) logging.info("Data volcada a ruta :" + salida) except Exception as e: logging.info("Error al escribir JSON :" + str(e)) def guardar_bd(self, data: list, fecha: str): try: ruta_base_datos = os.path.join(self.dir_data, "registros.db") engine = create_engine(f'sqlite:///{ruta_base_datos}') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # Fecha corte fecha_corte = fecha.split("-") ano_t = int(fecha_corte[0]) mes_t = int(fecha_corte[1]) dia_t = int(fecha_corte[2]) for item in data: organizacion = str(item['comunidad']).upper() cantidad_ioc = item['cantidad_ioc'] nuevo_registro = Registro( organizacion=organizacion, ano=ano_t, mes=mes_t, dia=dia_t, fecha_creado = datetime.strptime(fecha, '%Y-%m-%d').date(), cantidad_ioc=cantidad_ioc ) session.add(nuevo_registro) try: session.commit() logging.info("Datos guardados exitosamente en la base de datos.") except Exception as e: logging.error(f"Error al guardar los datos en la base de datos: {e}") session.rollback() finally: session.close() except Exception as err: logging.error(f"Error al guardar en la base de datos: {err}") def obtener_cuentas_sync(self): cuentas = {} try: ruta_base_datos = os.path.join(self.dir_data, "registros.db") engine = create_engine(f'sqlite:///{ruta_base_datos}') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() query_cuentas = session.query(Usuario).all() for item in query_cuentas: cuentas[item.usuario_sync] = item.organizacion return cuentas except Exception as err: logging.error(f"Error al obtener datos: {err}") return None finally: session.close()