misp-alertas/defs.py

439 lines
22 KiB
Python
Raw Normal View History

2024-12-04 13:00:55 -03:00
# definiciones
from io import BytesIO
import xlsxwriter
import settings
from pymisp import PyMISP
import urllib3
from datetime import datetime
from zoneinfo import ZoneInfo
import os
import logging
from logging.handlers import RotatingFileHandler
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from crontab import CronTab
urllib3.disable_warnings()
class MISPAlertManager:
def __init__(self):
self.dir_actual = os.getcwd()
self.dir_logs = os.path.join(self.dir_actual, 'logs')
self._setup_logging()
self.misp = PyMISP(settings.MISP_CONFIG['URL_MISP'], settings.MISP_CONFIG['AUTHKEY'], False)
def _setup_logging(self):
os.makedirs(self.dir_logs, exist_ok=True)
log_file = os.path.join(self.dir_logs, f"alertas_{datetime.now().strftime('%Y%m%d')}.log")
rotating_handler = RotatingFileHandler(log_file, maxBytes=262144000, backupCount=10)
logging.basicConfig(
level=logging.INFO,
handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s'
)
def enviar_alerta(self):
# realizado (flag)
realizado = False
# Servidores
servidores = []
# Se obtienen datos de conexión de servidores
servidores_temp = self.obtener_servidores()
if servidores_temp:
# Solo se quiere entregar desconectados, se filtra por los desconcetados
if settings.SERVERS_OFF:
for x in servidores_temp:
if x['connection_status'] != 'Connected':
servidores.append(x)
else:
# Entonces todos los servidores
servidores = servidores_temp
# Se verifica que servidores tenga datos
if servidores:
# Se arma estructura de correo
try:
# Version de Servidor de MISP
misp_version = self.misp.misp_instance_version['version']
# Configuración de la cuenta de Office 365 y del servidor SMTP
smtp_server = settings.EMAIL_CONFIG['server_smtp_host']
smtp_port = settings.EMAIL_CONFIG['server_smtp_port']
2025-01-07 12:18:27 -03:00
smtp_user = settings.EMAIL_CONFIG['smtp_username']
from_address = settings.EMAIL_CONFIG['smtp_sender']
2024-12-04 13:00:55 -03:00
to_address = settings.EMAIL_CONFIG['email_recipient']
password = settings.EMAIL_CONFIG['smtp_password']
# Crear el mensaje
msg = MIMEMultipart()
2025-01-07 12:18:27 -03:00
msg["From"] = settings.EMAIL_CONFIG['smtp_sender'] # Dirección 'from'
2024-12-04 13:00:55 -03:00
msg["To"] = to_address
msg["Subject"] = settings.EMAIL_CONFIG['email_subject']
# Cuerpo del correo en HTML inciala
html_body_start = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
<br>
<p><h3>Local Version MISP :"""+misp_version+"""</h3></p>
<table style="border: 1px solid black; border-collapse: collapse;">
<tr>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Instance Name</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Connection Status</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Error Status</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Remote Version</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Remote Org</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Status Code</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: #21120e;">Last Check</strong>
</p>
</td>
</tr>"""
# HTML para datos
html_body_data = ""
# Se arma HTML para servidores
for serv in servidores:
# Se verifica estado para aplicar color (Rojo / Verde)
if serv['connection_status'] == 'Connected':
html_body_data += """<tr>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['instance_name']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: """+settings.EMAIL_CONFIG['output_color_status']+"""; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: """+settings.EMAIL_CONFIG['output_color_status']+""";">"""+serv['connection_status']+"""</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: """+settings.EMAIL_CONFIG['output_color_status']+"""; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: """+settings.EMAIL_CONFIG['output_color_status']+""";">"""+serv['error_status']+"""</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['misp_remote_version']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['remote_org']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+str(serv['status_code'])+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['last_check']+"""
</p>
</td>
</tr>"""
else:
html_body_data += """<tr>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['instance_name']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: """+settings.EMAIL_CONFIG['output_color_status_error']+"""; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: """+settings.EMAIL_CONFIG['output_color_status_error']+""";">"""+serv['connection_status']+"""</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: """+settings.EMAIL_CONFIG['output_color_status_error']+"""; font-size: 14px; line-height: 1.2; margin: 0;">
<strong style="color: """+settings.EMAIL_CONFIG['output_color_status_error']+""";">"""+serv['error_status']+"""</strong>
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['misp_remote_version']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['remote_org']+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+str(serv['status_code'])+"""
</p>
</td>
<td style="padding-left:10px;">
<p style="font-family: Arial, sans-serif; color: #21120e; font-size: 14px; line-height: 1.2; margin: 0;">
"""+serv['last_check']+"""
</p>
</td>
</tr>"""
# Final de HTML
html_body_end = """
</table>
</body>
</html>"""
# Ahora se junta todo el HTML
html_fix = html_body_start+html_body_data+html_body_end
# Adjuntar el cuerpo HTML al mensaje
msg.attach(MIMEText(html_fix, "html"))
# Se verifica si se desea adjuntar archivo
if settings.DATA_ATTACH:
# Se crear instancia de Workbook de Excel en memoria
output = BytesIO()
workbook = xlsxwriter.Workbook(output, {'in_memory': True}) # Crear el workbook en memoria
worksheet = workbook.add_worksheet("Servers")
# Add a bold format to use to highlight cells.
bold = workbook.add_format({'bold': 1})
# se escriben headers
worksheet.write(0, 0, "Instance Name", bold)
worksheet.write(0, 1, "Connection Status", bold)
worksheet.write(0, 2, "Error Status", bold)
worksheet.write(0, 3, "Remote Version", bold)
worksheet.write(0, 4, "Remote Org", bold)
worksheet.write(0, 5, "Status Code", bold)
worksheet.write(0, 6, "Last Check", bold)
for row_index, servidor in enumerate(servidores, start=1):
worksheet.write(row_index, 0, servidor['instance_name'])
worksheet.write(row_index, 1, servidor['connection_status'])
worksheet.write(row_index, 2, servidor['error_status'])
worksheet.write(row_index, 3, servidor['misp_remote_version'])
worksheet.write(row_index, 4, servidor['remote_org'])
worksheet.write(row_index, 5, servidor['status_code'])
worksheet.write(row_index, 6, servidor['last_check'])
worksheet.autofit()
workbook.close()
output.seek(0)
# Adjuntar el archivo XLSX desde la memoria
attached_file = MIMEApplication(output.read(), _subtype="xlsx")
attached_file.add_header(
'content-disposition',
'attachment',
filename=f'Servers_{datetime.now().strftime("%Y%m%d")}.xlsx'
)
msg.attach(attached_file)
logging.info("Se creo Excel con datos para adjuntar a correo")
# Iniciar una conexión con el servidor SMTP de Office 365
server = smtplib.SMTP(smtp_server, smtp_port)
server.ehlo()
if settings.EMAIL_CONFIG['tls']:
server.starttls() # Habilitar cifrado TLS
# Iniciar sesión en el servidor
2025-01-07 12:18:27 -03:00
server.login(smtp_user, password)
2024-12-04 13:00:55 -03:00
# Enviar el correo
server.sendmail(from_address, to_address, msg.as_string())
logging.info("Se envia correo a :"+to_address)
realizado = True
except Exception as e:
logging.error(str(e))
finally:
# Cerrar la conexión con el servidor
server.quit()
else:
logging.info("No existen servidores para hacer envio de alertas.")
else:
logging.info("No existen servidores para hacer envio de alertas.")
# return
return realizado
# Se obtiene lista de servidores de MISP
def obtener_servidores(self):
# Get Severs
servers = self.misp.servers(True)
# var Servers
server_list = []
for s in servers:
try:
# Estructura de datos
# data: instance_name, connection_status, status_log, misp_remote_version, remote_org, last_check
temp = {}
# consultar remote_organisation
rem_org = self.misp.get_organisation(s.remote_org_id, pythonify=True)
# Valor llaves por defecto
temp['instance_name'] = s.name
temp['connection_status'] = ''
temp['error_status'] = ''
temp['misp_remote_version'] = ''
temp['remote_org'] = rem_org.name
temp['status_code'] = 0
# Se verifica conexion
server_temp = self.misp.test_server(s.id)
# timestamp
temp['last_check'] = datetime.now(ZoneInfo("America/Santiago")).strftime("%Y-%m-%d %H:%M:%S %Z (%z)")
# status code de caso
temp['status_code'] = server_temp['status']
# Se verifica estado de conexión
if server_temp['status'] == 1:
temp['connection_status'] = 'Connected'
temp['error_status'] = 'OK. No error'
temp['misp_remote_version'] = server_temp['version']
if server_temp['status'] == 2:
temp['connection_status'] = 'Not Connected'
temp['error_status'] = 'Server unrecheable'
if server_temp['status'] == 3:
temp['connection_status'] = 'Not Connected'
temp['error_status'] = 'Unexpected error'
if server_temp['status'] == 4:
temp['connection_status'] = 'Not Connected'
temp['error_status'] = 'Authentication failed'
if server_temp['status'] == 8:
# Es limitada la conexión, pero tiene acceso a datos
if 'status' in server_temp['post']:
if server_temp['post']['status'] == 1:
temp['connection_status'] = 'Connected'
temp['error_status'] = 'Limited connected'
if 'info' in server_temp:
temp['misp_remote_version'] = server_temp['info']['version']
else:
temp['connection_status'] = 'Not Connected'
temp['error_status'] = 'Unexpected error'
else:
temp['connection_status'] = 'Not Connected'
temp['error_status'] = 'Unexpected error'
# Se agregan datos a lista de servidores
server_list.append(temp)
# Se reordena para que los no conectados queden al principio del array
server_list.sort(key=lambda x: x['connection_status'] != 'Not Connected')
# Log
logging.info("Se guarda datos de conexión de servidor ID:"+str(s.id))
except Exception as e:
logging.error("Error al obtener servidor :"+str(e))
return server_list
def schedule_job(self):
try:
cron = CronTab(user=True)
# Comando que deseas agendar
command = settings.CRON_CONFIG["command"]
# Elimina cualquier trabajo previo con el mismo comando
cron.remove_all(command=command)
# Crear un nuevo job
job = cron.new(command=command)
# Configurar el job de acuerdo a los intervalos especificados
interval = settings.CRON_CONFIG["interval"]
if interval.get("minutes"):
minutes = interval["minutes"]
if minutes < 60:
job.minute.every(minutes)
else:
hours = minutes // 60
remaining_minutes = minutes % 60
job.minute.on(remaining_minutes)
job.hour.every(hours)
if interval.get("hours"):
hours = interval["hours"]
if hours <= 24:
job.minute.on(0)
job.hour.every(hours)
else:
days = hours // 24
remaining_hours = hours % 24
job.minute.on(0)
job.hour.on(remaining_hours)
job.day.every(days)
if interval.get("days"):
days = interval["days"]
if days <= 30:
job.setall(f'0 0 */{days} * *')
else:
# Para intervalos de más de 30 días, convierte en meses
months = days // 30
job.setall(f'0 0 1 */{months} *') # Ejecuta cada cierto número de meses el primer día del mes
if interval.get("months"):
job.setall(f'0 0 1 */{interval["months"]} *')
cron.write()
logging.info("Se configura Job exitosamente en crontab. Comando insertado :"+settings.CRON_CONFIG["command"])
return True
except Exception as e:
logging.error(str(e))
return False