import os
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
import psutil
import multiprocessing
import ipaddress
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from pymisp import PyMISP
import config
import urllib3
import requests
from models import Base, Registro, Usuario
from datetime import datetime, timedelta
import threading
class MISPProcessorTop:
def __init__(self):
# Directorio actual
self.dir_actual = os.getcwd()
# Directorio para Logs
self.dir_logs = self.dir_actual + '/logs'
# Directorio para data en JSON / BD
self.dir_data = self.dir_actual + '/data'
# Cuentas
self.creators_accounts = {}
# Se crea directorio logs y data por si no existe...
os.makedirs(self.dir_logs, exist_ok=True)
os.makedirs(self.dir_data, exist_ok=True)
# Logging...
rotating_handler = RotatingFileHandler(
os.path.join(self.dir_logs, "misp_ioc_top_app_" + datetime.now().strftime("%Y%m%d") + ".log"),
format='%(asctime)s - %(levelname)s - %(message)s'
# Configuración MISP
misp_url = config.MISP_CONFIG['URL']
misp_key = config.MISP_CONFIG['AUTHKEY']
self.misp = PyMISP(misp_url, misp_key, False)
# Lista que almacena IoC procesados corresponden FP (FALSOPOSITIVO)
self.procesados_ioc_fp = set()
# Lista que almacena IoC procesados corresponden VP (VERDADEROPOSITIVO)
self.procesados_ioc_vp = set()
def calcular_numero_de_workers(self):
# Obtener el número de CPUs
num_cpus = multiprocessing.cpu_count()
# Obtener la memoria disponible en GB
mem = psutil.virtual_memory()
available_memory_gb = mem.available / (1024 ** 3)
# Ajustar el número de workers en función de CPUs y memoria
workers_por_memoria = int(available_memory_gb // 1)
num_workers = min(num_cpus, workers_por_memoria)
return max(1, num_workers) # Asegurarse de que haya al menos un worker
def calcular_fecha_7_dias_antes(self, fecha_str):
# Convertir la cadena de texto en un objeto de fecha
fecha = datetime.strptime(fecha_str, "%Y-%m-%d")
# Calcular la fecha 7 días antes
fecha_7_dias_antes = fecha - timedelta(days=7)
# Devolver la fecha en formato YYYY-MM-DD
return fecha_7_dias_antes.strftime("%Y-%m-%d")
def eliminar_atributo_o_objeto(self, a, o, evento_id):
logging.info("Falso Positivo encontrado : " + a['type'] + "->" + a['value'] + " en evento #" + evento_id)
if o is None:
# entonces se procesa atributo independiente de objeto
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
# Verificación de existencia del objeto en MISP
objeto_existe = self.misp.get_object(o['uuid'], pythonify=True)
if not objeto_existe:
logging.warning(f"El objeto UUID {o['uuid']} no existe en MISP o ya fue eliminado. No se puede eliminar.")
return False # Para que no se contemple en la cuenta de FP...
# Se verifica para los casos de objetos que son con mas hash que sha256
if a['type'] == 'sha256':
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
counter_validos = 0
# Se recorre para ver tipos
for at in o['Attribute']:
if at['type'] not in config.IOC_TIPOS_OMITIR:
counter_validos = counter_validos + 1
# Si es mayor a 1, no se puede borrar objeto completo
if counter_validos > 1:
logging.info(f"Eliminando atributo UUID: {a['uuid']} / {a['value']} en el objeto {o['name']} , evento #{evento_id}")
self.misp.delete_attribute(a['uuid'], True)
logging.info(f"Eliminando objeto UUID: {o['uuid']} / {a['value']}, evento #{evento_id}")
self.misp.delete_object(o['uuid'], True)
return False
except Exception as err:
return True
def verificar_atributo_en_warninglist(self, a, lista):
Verifica si un atributo coincide con alguna entrada en las WarningLists,
considerando también el tipo de atributo según 'WarninglistType'.
for l in lista:
for entry in l['Warninglist']['WarninglistEntry']:
# Verificar si el tipo del atributo está soportado por la entrada de la WarningList
if a['type'] in l['Warninglist']['WarninglistType']:
if 'ip-' in a['type'] and 'port' not in a['type']:
if self._comparar_ip(entry['value'], a['value'], a['type']):
return False
elif any(item in a['type'] for item in ['domain', 'hostname']):
if self._comparar_dominio(entry['value'], a['value'], a['type']):
return False
return True
def procesar_atributo(self, a, o, e, lista):
if a['value'] in self.procesados_ioc_fp:
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
elif a['value'] in self.procesados_ioc_vp:
return True # No se realiza ningún proceso de eliminación
if self.verificar_atributo_en_warninglist(a, lista):
# Se verifica atributo que coincide con Warninglist...
if not self.verificar_y_procesar_atributo(a, o, e, lista, ['ip-', 'domain', 'hostname','sha256']):
return False
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
return True
def verificar_y_procesar_atributo(self, a, o, e, lista, tipos):
Verifica y procesa un atributo basado en su tipo.
if any(item in a['type'] for item in tipos):
if '|ip' not in a['type'] and 'port' not in a['type']:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if a['value'] not in self.procesados_ioc_fp and a['value'] not in self.procesados_ioc_vp:
if self.verificacion_ktip(a['value'], a['type']):
return self.eliminar_atributo_o_objeto(a, o, e['Event']['id'])
return True
def _comparar_ip(self, valor_warning, valor_ip, tipo_ioc):
Compara una IP con una entrada en la WarningList.
if '/' in valor_warning:
ip_obj = ipaddress.ip_address(valor_ip)
cidr_obj = ipaddress.ip_network(valor_warning)
if ip_obj in cidr_obj:
# Solo llamar a verificacion_ktip si el valor no está en las listas de FP o VP
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
return True
elif valor_ip in valor_warning:
if valor_ip not in self.procesados_ioc_fp and valor_ip not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_ip, tipo_ioc):
return True
except Exception as e:
return False
def _comparar_dominio(self, valor_warning, valor_dominio, tipo_ioc):
Compara un dominio con una entrada en la WarningList.
if valor_dominio.strip() == valor_warning:
if valor_dominio not in self.procesados_ioc_fp and valor_dominio not in self.procesados_ioc_vp:
if not self.verificacion_ktip(valor_dominio, tipo_ioc):
return True
return False
def verificacion_ktip(self, ioc, tipo_ioc):
# URL Base
url = config.KTIP_CONFIG['url_base']
# Encabezados de la solicitud, incluyendo la API key para autenticación
headers = {
'x-api-key': config.KTIP_CONFIG['api_key']
# Mapeo según tipo
tipos_ktip = {
"ip": ['ip-src','ip-dst'],
"hash": ['sha256']
if tipo_ioc in tipos_ktip['ip']:
url = url + 'ip?request=' + ioc
elif tipo_ioc in tipos_ktip['domain']:
url = url + 'domain?request=' + ioc
elif tipo_ioc in tipos_ktip['hash']:
url = url + 'hash?request=' + ioc
logging.warning(f"Tipo de IoC no reconocido para verificación KT: {tipo_ioc}")
return False
# Realiza la solicitud GET a la API
response = requests.get(url, headers=headers, verify=False)
# Verifica si la solicitud fue exitosa
if response.status_code == 200:
data = response.json()
# Ahora verificar si IoC está limpio
if data['Zone'] == 'Green':
return True
# por defecto
return False
except (Exception, requests.exceptions.HTTPError) as e:
return False
def procesar_evento(self, e, lista, prom):
# Se saca publish_timestamp para que recorra desde 7 dias atras en cada de eventos antiguos
date_back = datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date()
date_back = self.calcular_fecha_7_dias_antes(date_back.strftime('%Y-%m-%d'))
date_back = datetime.strptime(date_back,'%Y-%m-%d').date()
resultados = {}
ioc_valido = True
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
if 0 < int(e['Event']['attribute_count']) <= prom:
logging.info(f"Procesando Evento #{e['Event']['id']} con fecha {e['Event']['date']}")
for a in e['Event']['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, None, e, lista):
ioc_valido = False
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if 'Object' in e['Event']:
for o in e['Event']['Object']:
for a in o['Attribute']:
if datetime.fromtimestamp(int(a['timestamp'])).date() >= date_back:
if a['type'] not in config.IOC_TIPOS_OMITIR:
if not self.procesar_atributo(a, o, e, lista):
ioc_valido = False
# IoC a contar
resultados[e['Event']['event_creator_email']] = resultados.get(e['Event']['event_creator_email'], 0) + 1
if not ioc_valido:
# Se consulta evento y se verifica cantidad de atributos
evento = self.misp.get_event(int(e['Event']['id']))
if evento:
if int(evento['Event']['attribute_count']) > 0:
logging.info("Publicando cambios de evento #" + e['Event']['id'])
# Se elimina evento con cero atributo...
logging.info("Eliminando evento #" +e['Event']['id']+" por carencia de atributos")
except Exception as err:
return resultados
def calcula_calidad_iocs(self, desde: str, hasta: str, a_por_evento=None):
# Para salida
output = []
self.creators_accounts = self.obtener_cuentas_sync()
if self.creators_accounts:
puntos = {k: 0 for k in self.creators_accounts.keys()}
# Para filtrar versión de Warninglist donde buscar, se toma desde fecha "desde"
actual = datetime.now().year
version = str(actual)
# Fechas para sacar la mayor...
fechas = []
# Filtros para seleccionar Warninglist
filtros = config.CONFIG_WL['filtros_buscar']
# Para acumular warninglist
lista = []
logging.info("Limpiador de IoC App v1.0 comenzando...")
# Actualizar Warninglist por si acaso...
# Warninglist completas
wl = self.misp.warninglists()
for l in wl:
if str(l['Warninglist']['version']).startswith(str(actual)):
if fechas:
# Saca la versión más alta...
version = max(fechas)
for l in wl:
if str(l['Warninglist']['version']) == version:
if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros):
if int(str(l['Warninglist']['warninglist_entry_count'])) <= config.CONFIG_WL['max_reg']:
if lista:
# Promedio por defecto
prom = 0
logging.info("Warninglist de MISP Cargadas : " + str(len(lista)))
# Rango completo de fechas....
logging.info("Buscando IoC Desde :" + desde + " Hasta :" + hasta)
eventos_tmp = self.misp.search(publish_timestamp=desde)
#eventos = self.misp.search(date_from=desde, date_to=hasta, published=True)
# Si existen eventos, se realiza proceso...
if eventos_tmp:
eventos = []
# Se seleccionan eventos para establecer limite de fechas
for e in eventos_tmp:
if datetime.fromtimestamp(int(e['Event']['publish_timestamp'])).date() <= datetime.strptime(hasta, '%Y-%m-%d').date():
# Atributos por evento es None, se calcula promedio...
if a_por_evento is None:
cont_atrr = 0
cont_eventos = 0
for e in eventos:
if str(e['Event']['event_creator_email']).lower() in self.creators_accounts:
cont_atrr += int(e['Event']['attribute_count'])
cont_eventos += 1
logging.info("Promedio de atributos a procesar listos.")
prom = int(cont_atrr / cont_eventos) if cont_eventos > 0 else 0
prom = int(a_por_evento)
logging.info("Eventos por procesar :" + str(len(eventos)))
logging.info("Máximo de atributos a procesar por evento :" + str(prom))
num_workers = 4
logging.info(f"Usando {num_workers} workers")
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(self.procesar_evento, e, lista, prom) for e in eventos]
for future in as_completed(futures):
result = future.result()
for email, puntos_obtenidos in result.items():
puntos[email] += puntos_obtenidos
puntos_ordenados = {k: v for k, v in sorted(puntos.items(), key=lambda item: item[1], reverse=True)}
for k in puntos_ordenados.keys():
temp = {}
temp['comunidad'] = self.creators_accounts[k]
temp['cantidad_ioc'] = puntos_ordenados[k]
logging.info("Total Falsos positivos detectados y eliminados : " + str(len(self.procesados_ioc_fp)))
logging.info("Proceso de conteo de IoC finalizado...")
logging.error("No existe eventos para rango de fechas. Se detiene proceso")
logging.error("No se encuentran Warninglist actualizadas para comparar. Se detiene proceso")
logging.error("No se encuentran cuentas asociadas a MISP. Se detiene proceso")
return output
except Exception as err:
def guarda_ioc_json(self, data: list, filename: str):
salida = os.path.join(self.dir_data, filename)
with open(salida, "w") as archivo_json:
json.dump(data, archivo_json, indent=4)
logging.info("Data volcada a ruta :" + salida)
except Exception as e:
logging.info("Error al escribir JSON :" + str(e))
def guardar_bd(self, data: list, fecha: str):
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Session = sessionmaker(bind=engine)
session = Session()
# Fecha corte
fecha_corte = fecha.split("-")
ano_t = int(fecha_corte[0])
mes_t = int(fecha_corte[1])
dia_t = int(fecha_corte[2])
for item in data:
organizacion = str(item['comunidad']).upper()
cantidad_ioc = item['cantidad_ioc']
nuevo_registro = Registro(
fecha_creado = datetime.strptime(fecha, '%Y-%m-%d').date(),
logging.info("Datos guardados exitosamente en la base de datos.")
except Exception as e:
logging.error(f"Error al guardar los datos en la base de datos: {e}")
except Exception as err:
logging.error(f"Error al guardar en la base de datos: {err}")
def obtener_cuentas_sync(self):
cuentas = {}
ruta_base_datos = os.path.join(self.dir_data, "registros.db")
engine = create_engine(f'sqlite:///{ruta_base_datos}')
Session = sessionmaker(bind=engine)
session = Session()
query_cuentas = session.query(Usuario).all()
for item in query_cuentas:
cuentas[item.usuario_sync] = item.organizacion
return cuentas
except Exception as err:
logging.error(f"Error al obtener datos: {err}")
return None