# definiciones from io import BytesIO import xlsxwriter import settings from pymisp import PyMISP import urllib3 from datetime import datetime from zoneinfo import ZoneInfo import os import logging from logging.handlers import RotatingFileHandler import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from crontab import CronTab urllib3.disable_warnings() class MISPAlertManager: def __init__(self): self.dir_actual = os.getcwd() self.dir_logs = os.path.join(self.dir_actual, 'logs') self._setup_logging() self.misp = PyMISP(settings.MISP_CONFIG['URL_MISP'], settings.MISP_CONFIG['AUTHKEY'], False) def _setup_logging(self): os.makedirs(self.dir_logs, exist_ok=True) log_file = os.path.join(self.dir_logs, f"alertas_{datetime.now().strftime('%Y%m%d')}.log") rotating_handler = RotatingFileHandler(log_file, maxBytes=262144000, backupCount=10) logging.basicConfig( level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s' ) def enviar_alerta(self): # realizado (flag) realizado = False # Servidores servidores = [] # Se obtienen datos de conexión de servidores servidores_temp = self.obtener_servidores() if servidores_temp: # Solo se quiere entregar desconectados, se filtra por los desconcetados if settings.SERVERS_OFF: for x in servidores_temp: if x['connection_status'] != 'Connected': servidores.append(x) else: # Entonces todos los servidores servidores = servidores_temp # Se verifica que servidores tenga datos if servidores: # Se arma estructura de correo try: # Version de Servidor de MISP misp_version = self.misp.misp_instance_version['version'] # Configuración de la cuenta de Office 365 y del servidor SMTP smtp_server = settings.EMAIL_CONFIG['server_smtp_host'] smtp_port = settings.EMAIL_CONFIG['server_smtp_port'] from_address = settings.EMAIL_CONFIG['smtp_username'] to_address = settings.EMAIL_CONFIG['email_recipient'] password = settings.EMAIL_CONFIG['smtp_password'] # Crear el mensaje msg = MIMEMultipart() msg["From"] = settings.EMAIL_CONFIG['smtp_username'] # Dirección 'from' msg["To"] = to_address msg["Subject"] = settings.EMAIL_CONFIG['email_subject'] # Cuerpo del correo en HTML inciala html_body_start = """

Local Version MISP :"""+misp_version+"""

""" # HTML para datos html_body_data = "" # Se arma HTML para servidores for serv in servidores: # Se verifica estado para aplicar color (Rojo / Verde) if serv['connection_status'] == 'Connected': html_body_data += """""" else: html_body_data += """""" # Final de HTML html_body_end = """

Instance Name

Connection Status

Error Status

Remote Version

Remote Org

Status Code

Last Check

"""+serv['instance_name']+"""

"""+serv['connection_status']+"""

"""+serv['error_status']+"""

"""+serv['misp_remote_version']+"""

"""+serv['remote_org']+"""

"""+str(serv['status_code'])+"""

"""+serv['last_check']+"""

"""+serv['instance_name']+"""

"""+serv['connection_status']+"""

"""+serv['error_status']+"""

"""+serv['misp_remote_version']+"""

"""+serv['remote_org']+"""

"""+str(serv['status_code'])+"""

"""+serv['last_check']+"""

""" # Ahora se junta todo el HTML html_fix = html_body_start+html_body_data+html_body_end # Adjuntar el cuerpo HTML al mensaje msg.attach(MIMEText(html_fix, "html")) # Se verifica si se desea adjuntar archivo if settings.DATA_ATTACH: # Se crear instancia de Workbook de Excel en memoria output = BytesIO() workbook = xlsxwriter.Workbook(output, {'in_memory': True}) # Crear el workbook en memoria worksheet = workbook.add_worksheet("Servers") # Add a bold format to use to highlight cells. bold = workbook.add_format({'bold': 1}) # se escriben headers worksheet.write(0, 0, "Instance Name", bold) worksheet.write(0, 1, "Connection Status", bold) worksheet.write(0, 2, "Error Status", bold) worksheet.write(0, 3, "Remote Version", bold) worksheet.write(0, 4, "Remote Org", bold) worksheet.write(0, 5, "Status Code", bold) worksheet.write(0, 6, "Last Check", bold) for row_index, servidor in enumerate(servidores, start=1): worksheet.write(row_index, 0, servidor['instance_name']) worksheet.write(row_index, 1, servidor['connection_status']) worksheet.write(row_index, 2, servidor['error_status']) worksheet.write(row_index, 3, servidor['misp_remote_version']) worksheet.write(row_index, 4, servidor['remote_org']) worksheet.write(row_index, 5, servidor['status_code']) worksheet.write(row_index, 6, servidor['last_check']) worksheet.autofit() workbook.close() output.seek(0) # Adjuntar el archivo XLSX desde la memoria attached_file = MIMEApplication(output.read(), _subtype="xlsx") attached_file.add_header( 'content-disposition', 'attachment', filename=f'Servers_{datetime.now().strftime("%Y%m%d")}.xlsx' ) msg.attach(attached_file) logging.info("Se creo Excel con datos para adjuntar a correo") # Iniciar una conexión con el servidor SMTP de Office 365 server = smtplib.SMTP(smtp_server, smtp_port) server.ehlo() if settings.EMAIL_CONFIG['tls']: server.starttls() # Habilitar cifrado TLS # Iniciar sesión en el servidor server.login(from_address, password) # Enviar el correo server.sendmail(from_address, to_address, msg.as_string()) logging.info("Se envia correo a :"+to_address) realizado = True except Exception as e: logging.error(str(e)) finally: # Cerrar la conexión con el servidor server.quit() else: logging.info("No existen servidores para hacer envio de alertas.") else: logging.info("No existen servidores para hacer envio de alertas.") # return return realizado # Se obtiene lista de servidores de MISP def obtener_servidores(self): # Get Severs servers = self.misp.servers(True) # var Servers server_list = [] for s in servers: try: # Estructura de datos # data: instance_name, connection_status, status_log, misp_remote_version, remote_org, last_check temp = {} # consultar remote_organisation rem_org = self.misp.get_organisation(s.remote_org_id, pythonify=True) # Valor llaves por defecto temp['instance_name'] = s.name temp['connection_status'] = '' temp['error_status'] = '' temp['misp_remote_version'] = '' temp['remote_org'] = rem_org.name temp['status_code'] = 0 # Se verifica conexion server_temp = self.misp.test_server(s.id) # timestamp temp['last_check'] = datetime.now(ZoneInfo("America/Santiago")).strftime("%Y-%m-%d %H:%M:%S %Z (%z)") # status code de caso temp['status_code'] = server_temp['status'] # Se verifica estado de conexión if server_temp['status'] == 1: temp['connection_status'] = 'Connected' temp['error_status'] = 'OK. No error' temp['misp_remote_version'] = server_temp['version'] if server_temp['status'] == 2: temp['connection_status'] = 'Not Connected' temp['error_status'] = 'Server unrecheable' if server_temp['status'] == 3: temp['connection_status'] = 'Not Connected' temp['error_status'] = 'Unexpected error' if server_temp['status'] == 4: temp['connection_status'] = 'Not Connected' temp['error_status'] = 'Authentication failed' if server_temp['status'] == 8: # Es limitada la conexión, pero tiene acceso a datos if 'status' in server_temp['post']: if server_temp['post']['status'] == 1: temp['connection_status'] = 'Connected' temp['error_status'] = 'Limited connected' if 'info' in server_temp: temp['misp_remote_version'] = server_temp['info']['version'] else: temp['connection_status'] = 'Not Connected' temp['error_status'] = 'Unexpected error' else: temp['connection_status'] = 'Not Connected' temp['error_status'] = 'Unexpected error' # Se agregan datos a lista de servidores server_list.append(temp) # Se reordena para que los no conectados queden al principio del array server_list.sort(key=lambda x: x['connection_status'] != 'Not Connected') # Log logging.info("Se guarda datos de conexión de servidor ID:"+str(s.id)) except Exception as e: logging.error("Error al obtener servidor :"+str(e)) return server_list def schedule_job(self): try: cron = CronTab(user=True) # Comando que deseas agendar command = settings.CRON_CONFIG["command"] # Elimina cualquier trabajo previo con el mismo comando cron.remove_all(command=command) # Crear un nuevo job job = cron.new(command=command) # Configurar el job de acuerdo a los intervalos especificados interval = settings.CRON_CONFIG["interval"] if interval.get("minutes"): minutes = interval["minutes"] if minutes < 60: job.minute.every(minutes) else: hours = minutes // 60 remaining_minutes = minutes % 60 job.minute.on(remaining_minutes) job.hour.every(hours) if interval.get("hours"): hours = interval["hours"] if hours <= 24: job.minute.on(0) job.hour.every(hours) else: days = hours // 24 remaining_hours = hours % 24 job.minute.on(0) job.hour.on(remaining_hours) job.day.every(days) if interval.get("days"): days = interval["days"] if days <= 30: job.setall(f'0 0 */{days} * *') else: # Para intervalos de más de 30 días, convierte en meses months = days // 30 job.setall(f'0 0 1 */{months} *') # Ejecuta cada cierto número de meses el primer día del mes if interval.get("months"): job.setall(f'0 0 1 */{interval["months"]} *') cron.write() logging.info("Se configura Job exitosamente en crontab. Comando insertado :"+settings.CRON_CONFIG["command"]) return True except Exception as e: logging.error(str(e)) return False