import os import logging from logging.handlers import RotatingFileHandler from datetime import datetime import psutil import multiprocessing import ipaddress import json from concurrent.futures import ThreadPoolExecutor, as_completed from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from pymisp import PyMISP import config import urllib3 import requests from models import Base, Registro, Usuario from datetime import datetime, timedelta import threading urllib3.disable_warnings() class MISPProcessorTop: def __init__(self): # Directorio actual self.dir_actual = os.getcwd() # Directorio para Logs self.dir_logs = self.dir_actual + '/logs' # Directorio para data en JSON / BD self.dir_data = self.dir_actual + '/data' # Cuentas self.creators_accounts = {} # Se crea directorio logs y data por si no existe... os.makedirs(self.dir_logs, exist_ok=True) os.makedirs(self.dir_data, exist_ok=True) # Logging... rotating_handler = RotatingFileHandler( os.path.join(self.dir_logs, "misp_ioc_top_app_" + datetime.now().strftime("%Y%m%d") + ".log"), maxBytes=262144000, backupCount=10 ) logging.basicConfig( level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s' ) # Configuración MISP misp_url = config.MISP_CONFIG['URL'] misp_key = config.MISP_CONFIG['AUTHKEY'] self.misp = PyMISP(misp_url, misp_key, False) # Lista que almacena IoC procesados corresponden FP (FALSOPOSITIVO) self.procesados_ioc_fp = set() # Lista que almacena IoC procesados corresponden VP (VERDADEROPOSITIVO) self.procesados_ioc_vp = set() def calcular_numero_de_workers(self): # Obtener el número de CPUs num_cpus = multiprocessing.cpu_count() # Obtener la memoria disponible en GB mem = psutil.virtual_memory() available_memory_gb = mem.available / (1024 ** 3) # Ajustar el número de workers en función de CPUs y memoria workers_por_memoria = int(available_memory_gb // 1) num_workers = min(num_cpus, workers_por_memoria) return max(1, num_workers) # Asegurarse de que haya al menos un worker def calcular_fecha_7_dias_antes(self, fecha_str): # Convertir la cadena de texto en un objeto de fecha fecha = datetime.strptime(fecha_str, "%Y-%m-%d") # Calcular la fecha 7 días antes fecha_7_dias_antes = fecha - timedelta(days=7) # Devolver la fecha en formato YYYY-MM-DD return fecha_7_dias_antes.strftime("%Y-%m-%d") def eliminar_atributo_o_objeto(self, a, o, evento_id): try: logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + " en evento #" + evento_id) if o is None: # entonces se procesa atributo independiente de objeto logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}") self.misp.delete_attribute(a['uuid'], True) else: # Verificación de existencia del objeto en MISP objeto_existe = self.misp.get_object(o['uuid'], pythonify=True) if not objeto_existe: logging.warning(f"El objeto UUID {o['uuid']} no existe en MISP o ya fue eliminado. No se puede eliminar.") return False # Para que no se contemple en la cuenta de FP... # Se verifica para los casos de objetos que son con mas hash que sha256 if a['type'] == 'sha256': logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}") self.misp.delete_object(o['uuid'], True) else: counter_validos = 0 # Se recorre para ver tipos for at in o['Attribute']: if at['type'] not in config.IOC_TIPOS_OMITIR: counter_validos = counter_validos + 1 # Si es mayor a 1, no se puede borrar objeto completo if counter_validos > 1: logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {o['name']} , evento #{evento_id}") self.misp.delete_attribute(a['uuid'], True) else: logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}") self.misp.delete_object(o['uuid'], True) return False except Exception as err: logging.error(str(err)) return True def verificar_atributo_en_warninglist(self, a, lista): """ Verifica si un atributo coincide con alguna entrada en las WarningLists, considerando también el tipo de atributo según 'WarninglistType'. """ for l in lista: for entry in l['Warninglist']['WarninglistEntry']: # Verificar si el tipo del atributo está soportado por la entrada de la WarningList if a['type'] in l['Warninglist']['WarninglistType']: if 'ip-' in a['type'] and 'port' not in a['type']: if self._comparar_ip(entry['value'], a['value'], a['type']): return False elif any(item in a['type'] for item in ['domain', 'hostname']): if self._comparar_dominio(entry['value'], a['value'], a['type']): return False return True def procesar_atributo(self, a, o, e, lista): if a['value'] in self.procesados_ioc_fp: return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) elif a['value'] in self.procesados_ioc_vp: return True # No se realiza ningún proceso de eliminación if self.verificar_atributo_en_warninglist(a, lista): # Se verifica atributo que coincide con Warninglist... if not self.verificar_y_procesar_atributo(a, o, e, lista, ['ip-', 'domain', 'hostname','sha256']): return False else: return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) return True def verificar_y_procesar_atributo(self, a, o, e, lista, tipos): """ Verifica y procesa un atributo basado en su tipo. """ if any(item in a['type'] for item in tipos): if '|ip' not in a['type'] and 'port' not in a['type']: # Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP if a['value'] not in self.procesados_ioc_fp and a['value'] not in self.procesados_ioc_vp: if self.verificacion_ktip(a['value'], a['type']): self.procesados_ioc_fp.add(a['value']) return self.eliminar_atributo_o_objeto(a, o, e['Event']['id']) else: self.procesados_ioc_vp.add(a['value']) return True def _comparar_ip(self, valor_warning, valor_ip, tipo_ioc): """ Compara una IP con una entrada en la WarningList. """ try: if '/' in valor_warning: ip_obj = ipaddress.ip_address(valor_ip) cidr_obj = ipaddress.ip_network(valor_warning) if ip_obj in cidr_obj: # Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_ip, tipo_ioc): self.procesados_ioc_vp.add(valor_ip) return True elif valor_ip in valor_warning: if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_ip, tipo_ioc): self.procesados_ioc_vp.add(valor_ip) return True except Exception as e: logging.error(str(e)) return False def _comparar_dominio(self, valor_warning, valor_dominio, tipo_ioc): """ Compara un dominio con una entrada en la WarningList. """ if valor_dominio.strip() == valor_warning: if valor_dominio not in self.procesados_ioc_fp and valor_dominio not in self.procesados_ioc_vp: if not self.verificacion_ktip(valor_dominio, tipo_ioc): self.procesados_ioc_vp.add(valor_dominio) return True return False def verificacion_ktip(self, ioc, tipo_ioc): # URL Base url = config.KTIP_CONFIG['url_base'] # Encabezados de la solicitud, incluyendo la API key para autenticación headers = { 'x-api-key': config.KTIP_CONFIG['api_key'] } # Mapeo según tipo tipos_ktip = { "ip": ['ip-src','ip-dst'], "domain":['domain','hostname'], "hash": ['sha256'] } if tipo_ioc in tipos_ktip['ip']: url = url + 'ip?request=' + ioc elif tipo_ioc in tipos_ktip['domain']: url = url + 'domain?request=' + ioc elif tipo_ioc in tipos_ktip['hash']: url = url + 'hash?request=' + ioc else: logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}") return False try: # Realiza la solicitud GET a la API response = requests.get(url, headers=headers, verify=False) # Verifica si la solicitud fue exitosa if response.status_code == 200: data = response.json() # Ahora verificar si IoC está limpio if data['Zone'] == 'Green': return True # por defecto return False except (Exception, requests.exceptions.HTTPError) as e: logging.error(str(e)) return False def procesar_evento(self, e, lista, prom): # Se saca publish_timestamp para que recorra desde 7 dias atras en cada de eventos antiguos date_back = datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date() date_back = self.calcular_fecha_7_dias_antes(date_back.strftime('%Y-%m-%d')) date_back = datetime.strptime(date_back,'%Y-%m-%d').date() resultados = {} ioc_valido = True if str(e['Event']['event_creator_email']).lower() in self.creators_accounts: if 0 < int(e['Event']['attribute_count']) <= prom: logging.info(f"Procesando Evento #{e['Event']['id']} con fecha {e['Event']['date']}") for a in e['Event']['Attribute']: if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back: if a['type'] not in config.IOC_TIPOS_OMITIR: if not self.procesar_atributo(a, None, e, lista): ioc_valido = False else: # IoC a contar resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1 if 'Object' in e['Event']: for o in e['Event']['Object']: for a in o['Attribute']: if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back: if a['type'] not in config.IOC_TIPOS_OMITIR: if not self.procesar_atributo(a, o, e, lista): ioc_valido = False else: # IoC a contar resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1 if not ioc_valido: try: # Se consulta evento y se verifica cantidad de atributos evento = self.misp.get_event(int(e['Event']['id'])) if evento: if int(evento['Event']['attribute_count']) > 0: self.misp.publish(int(e['Event']['id'])) logging.info("Publicando cambios de evento #" + e['Event']['id']) else: # Se elimina evento con cero atributo... self.misp.delete_event(int(e['Event']['id'])) logging.info("Eliminando evento #" +e['Event']['id']+" por carencia de atributos") except Exception as err: logging.error(str(err)) return resultados def calcula_calidad_iocs(self, desde: str, hasta: str, a_por_evento=None): try: # Para salida output = [] self.creators_accounts = self.obtener_cuentas_sync() if self.creators_accounts: puntos = {k: 0 for k in self.creators_accounts.keys()} # Para filtrar versión de Warninglist donde buscar, se toma desde fecha "desde" actual = datetime.now().year version = str(actual) # Fechas para sacar la mayor... fechas = [] # Filtros para seleccionar Warninglist filtros = config.CONFIG_WL['filtros_buscar'] # Para acumular warninglist lista = [] logging.info("Limpiador de IoC App v1.0 comenzando...") # Actualizar Warninglist por si acaso... self.misp.update_warninglists() # Warninglist completas wl = self.misp.warninglists() for l in wl: if str(l['Warninglist']['version']).startswith(str(actual)): fechas.append(l['Warninglist']['version']) if fechas: # Saca la versión más alta... version = max(fechas) for l in wl: if str(l['Warninglist']['version']) == version: if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros): if int(str(l['Warninglist']['warninglist_entry_count'])) <= config.CONFIG_WL['max_reg']: lista.append(self.misp.get_warninglist(int(l['Warninglist']['id']))) if lista: # Promedio por defecto prom = 0 logging.info("Warninglist de MISP Cargadas : " + str(len(lista))) # Rango completo de fechas.... logging.info("Buscando IoC Desde :" + desde + " Hasta :" + hasta) eventos_tmp = self.misp.search(publish_timestamp=desde) #eventos = self.misp.search(date_from=desde, date_to=hasta, published=True) # Si existen eventos, se realiza proceso... if eventos_tmp: eventos = [] # Se seleccionan eventos para establecer limite de fechas for e in eventos_tmp: if datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date() <= datetime.strptime(hasta, '%Y-%m-%d').date(): eventos.append(e) # Atributos por evento es None, se calcula promedio... if a_por_evento is None: cont_atrr = 0 cont_eventos = 0 for e in eventos: if str(e['Event']['event_creator_email']).lower() in self.creators_accounts: cont_atrr += int(e['Event']['attribute_count']) cont_eventos += 1 logging.info("Promedio de atributos a procesar listos.") prom = int(cont_atrr / cont_eventos) if cont_eventos > 0 else 0 else: prom = int(a_por_evento) logging.info("Eventos por procesar :" + str(len(eventos))) logging.info("Máximo de atributos a procesar por evento :" + str(prom)) num_workers = 4 logging.info(f"Usando {num_workers} workers") with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(self.procesar_evento, e, lista, prom) for e in eventos] for future in as_completed(futures): result = future.result() for email, puntos_obtenidos in result.items(): puntos[email] += puntos_obtenidos puntos_ordenados = {k: v for k, v in sorted(puntos.items(), key=lambda item: item[1], reverse=True)} for k in puntos_ordenados.keys(): temp = {} temp['comunidad'] = self.creators_accounts[k] temp['cantidad_ioc'] = puntos_ordenados[k] output.append(temp) logging.info("Total Falsos positivos detectados y eliminados : " + str(len(self.procesados_ioc_fp))) logging.info("Proceso de conteo de IoC finalizado...") else: logging.error("No existe eventos para rango de fechas. Se detiene proceso") else: logging.error("No se encuentran Warninglist actualizadas para comparar. Se detiene proceso") else: logging.error("No se encuentran cuentas asociadas a MISP. Se detiene proceso") return output except Exception as err: logging.error(str(err)) def guarda_ioc_json(self, data: list, filename: str): try: salida = os.path.join(self.dir_data, filename) with open(salida, "w") as archivo_json: json.dump(data, archivo_json, indent=4) logging.info("Data volcada a ruta :" + salida) except Exception as e: logging.info("Error al escribir JSON :" + str(e)) def guardar_bd(self, data: list, fecha: str): try: ruta_base_datos = os.path.join(self.dir_data, "registros.db") engine = create_engine(f'sqlite:///{ruta_base_datos}') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() # Fecha corte fecha_corte = fecha.split("-") ano_t = int(fecha_corte[0]) mes_t = int(fecha_corte[1]) dia_t = int(fecha_corte[2]) for item in data: organizacion = str(item['comunidad']).upper() cantidad_ioc = item['cantidad_ioc'] nuevo_registro = Registro( organizacion=organizacion, ano=ano_t, mes=mes_t, dia=dia_t, fecha_creado = datetime.strptime(fecha, '%Y-%m-%d').date(), cantidad_ioc=cantidad_ioc ) session.add(nuevo_registro) try: session.commit() logging.info("Datos guardados exitosamente en la base de datos.") except Exception as e: logging.error(f"Error al guardar los datos en la base de datos: {e}") session.rollback() finally: session.close() except Exception as err: logging.error(f"Error al guardar en la base de datos: {err}") def obtener_cuentas_sync(self): cuentas = {} try: ruta_base_datos = os.path.join(self.dir_data, "registros.db") engine = create_engine(f'sqlite:///{ruta_base_datos}') Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() query_cuentas = session.query(Usuario).all() for item in query_cuentas: cuentas[item.usuario_sync] = item.organizacion return cuentas except Exception as err: logging.error(f"Error al obtener datos: {err}") return None finally: session.close()