diff --git a/README.md b/README.md index 6ec1882..772cd90 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Como resultado este proyecto implementa una API en un servidor para recibir los ## Componentes -Este proyecto utiliza un BD local (SQLite en modo WALL) y FastAPI. +Este proyecto utiliza un BD local SQLite en modo WAL, FastAPI. ## Configuración inicial @@ -55,12 +55,25 @@ RANGO_DIAS = 7 # Se debe establecer Manual JWT_TOKEN_GEN = "BEARER_TOKEN" -# Máximo de atributos a procesar por evento (4000 es el recomendado) -MAX_ATTRS = 4000 +# Máximo de atributos a procesar por evento (2000 es el recomendado) +MAX_ATTRS = 2000 + +def cargar_fp_en_set(ruta_archivo: str) -> set: + """ + Lee un archivo de texto línea a línea y devuelve un set con cada línea limpia. + """ + conjunto = set() + with open(ruta_archivo, "r", encoding="utf-8") as archivo: + for linea in archivo: + linea_limpia = linea.strip() + if linea_limpia: # Evita líneas vacías + conjunto.add(linea_limpia) + return conjunto # Falsos positivos comunes # FP Comunes -FP_COMUNES = ['0.0.0.0','4.4.4.4','8.8.8.8','localhost','.local','google.com','amazon.com','microsoft.com','cloudflare.com'] +FP_COMUNES = cargar_fp_en_set("fp.txt") + # Organizaciones que se puede omitir la revisión de eventos ORG_OMITIR = [] @@ -128,6 +141,10 @@ IDS_CORRELACIONES = [ "payment-card-number" # Número de tarjeta de pago ] +CONFIG_WL = { + "filtros_buscar": ["osint", "google", "ipv4", "1000","domains","websites","microsoft","amazon","cloudflare","tranco","cisco","azure","office"], + "max_reg": 10000 +} ``` En config.py puedes realizar los ajustes: @@ -142,9 +159,9 @@ En config.py puedes realizar los ajustes: - JWT_TOKEN_GEN: Token personal generado. -- MAX_ATTRS: Cantidad de atributos a procesar por evento. Por defecto el máximo son 4000. No se recomienda aumentar este valor. +- MAX_ATTRS: Cantidad de atributos a procesar por evento. Por defecto el máximo son 2000. No se recomienda aumentar este valor mas alla de los 4000 Attrs. -- FP_COMUNES: Falsos positivos conocidos. +- FP_COMUNES: Falsos positivos conocidos que se obtienen de un archivo txt. - ORG_OMITIR: Organizaciones que se puede omitir la revisión de eventos. @@ -152,6 +169,8 @@ En config.py puedes realizar los ajustes: - IDS_CORRELACIONES: Lista de tipos de atributos que deberán tener el flag IDS activado dentro de MISP. Por defecto estos atributos son correlacionados dentro de MISP. +- CONFIG_WL: Configuración de filtros para obtener falsos positivos de WarningList de MISP. + # Configuración Inicial ## Instalación en entorno virtual @@ -168,7 +187,18 @@ source venv/bin/activate ``` shell pip install -r requirements.txt ``` -3. Se debe crear archivo .sh o editar el archivo existente (start_api.sh) y anexarlo como servicio para que inicie con el sistema operativo: + +3. Se debe desactivar entorno para configurar playwright inicialmente. +```shell +deactivate +# Instala dependencias en Ubuntu +sudo venv/bin/playwright install-deps + +# instala dependencias de playwright +sudo venv/bin/playwright install +``` + +4. Se debe crear archivo .sh o editar el archivo existente (start_api.sh) y anexarlo como servicio para que inicie con el sistema operativo: ``` shell #!/bin/bash diff --git a/config.py b/config.py index c6fc439..e7bc792 100644 --- a/config.py +++ b/config.py @@ -22,9 +22,21 @@ JWT_TOKEN_GEN = "JWT_TOKEN" # Maximo de atributos a procesar por evento (2000 es el recomendado) MAX_ATTRS = 2000 +def cargar_fp_en_set(ruta_archivo: str) -> set: + """ + Lee un archivo de texto línea a línea y devuelve un set con cada línea limpia. + """ + conjunto = set() + with open(ruta_archivo, "r", encoding="utf-8") as archivo: + for linea in archivo: + linea_limpia = linea.strip() + if linea_limpia: # Evita líneas vacías + conjunto.add(linea_limpia) + return conjunto + # Falsos positivos comunes # FP Comunes -FP_COMUNES = ['0.0.0.0','4.4.4.4','8.8.8.8','localhost','.local','google.com','amazon.com','microsoft.com','cloudflare.com'] +FP_COMUNES = cargar_fp_en_set("fp.txt") # Organizaciones que se puede omitir la revisión de eventos ORG_OMITIR = [] @@ -92,3 +104,7 @@ IDS_CORRELACIONES = [ "payment-card-number" # Número de tarjeta de pago ] +CONFIG_WL = { + "filtros_buscar": ["osint", "google", "ipv4", "1000","domains","websites","microsoft","amazon","cloudflare","tranco","cisco","azure","office"], + "max_reg": 10000 +} \ No newline at end of file diff --git a/fp.txt b/fp.txt new file mode 100644 index 0000000..2324d2e --- /dev/null +++ b/fp.txt @@ -0,0 +1,63 @@ +google.com +youtube.com +facebook.com +twitter.com +amazonaws.com +cloudflare.com +microsoft.com +office.com +apple.com +dropbox.com +blogspot.com +wordpress.com +cdn.jsdelivr.net +fastly.net +akamai.net +bbc.co.uk +nytimes.com +reddit.com +instagram.com +pinterest.com +linkedin.com +stackoverflow.com +github.com +slack.com +zoom.us +localhost +whatsapp.com +tiktok.com +cnn.com +espn.com +weather.com +booking.com +uber.com +airbnb.com +paypal.com +netflix.com +vimeo.com +tumblr.com +imdb.com +amazon.com +hulu.com +flickr.com +imgur.com +vk.com +baidu.com +sina.com.cn +aliexpress.com +shopify.com +etsy.com +8.8.8.8 +8.8.4.4 +1.1.1.1 +1.0.0.1 +208.67.222.222 +208.67.220.220 +9.9.9.9 +149.112.112.112 +114.114.114.114 +80.80.80.80 +8.26.56.26 +4.4.4.4 +0.0.0.0 + diff --git a/main.py b/main.py index d6496c8..7187395 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import logging import traceback import urllib3 import asyncio +from playwright.async_api import async_playwright, TimeoutError import time from datetime import datetime, timedelta @@ -21,6 +22,8 @@ from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError, OperationalError from contextlib import asynccontextmanager +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry from pymisp import PyMISP, PyMISPError @@ -29,7 +32,9 @@ from models import Base, ModificadosEv, VerificacionFalsoPositivo urllib3.disable_warnings() - +################################################################################ +# CONFIGURACIONES DE LOGGING +################################################################################ directorio_actual = os.getcwd() dir_logs = os.path.join(directorio_actual, "logs") os.makedirs(dir_logs, exist_ok=True) @@ -45,6 +50,9 @@ logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s' ) +################################################################################ +# BASE DE DATOS ASÍNCRONA (sqlite+aiosqlite) +################################################################################ ruta_base_datos = os.path.join(directorio_actual, "data", "procesados.db") os.makedirs(os.path.dirname(ruta_base_datos), exist_ok=True) @@ -55,7 +63,7 @@ def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() # WAL y busy_timeout cursor.execute('PRAGMA journal_mode=WAL;') - cursor.execute('PRAGMA busy_timeout = 5000;') + cursor.execute('PRAGMA busy_timeout=5000;') cursor.close() event.listen(async_engine.sync_engine, 'connect', set_sqlite_pragma) @@ -71,6 +79,96 @@ async def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) +################################################################################ +# PyMISP (bloqueante) => lo usamos con to_thread +################################################################################ +misp = PyMISP( + url=config.MISP_CONFIG['misp_url'], + key=config.MISP_CONFIG['misp_authkey'], + ssl=False # Cambia a True si tienes SSL válido +) + +# Set de FP +lista_fp = set() + +# Para llenar lista de FP +def guarda_fp(): + try: + global lista_fp + # Filtros para seleccionar Warninglist + filtros = config.CONFIG_WL['filtros_buscar'] + + max_reg_pw = config.CONFIG_WL['max_reg'] + + # limpia set + lista_fp.clear() + + # Actualizar Warninglist por si acaso... + misp.update_warninglists() + + # Warninglist completas + wl = misp.warninglists() + + fechas = [] + + for l in wl: + fechas.append(str(l['Warninglist']['version'])) + + # Saca la versión más alta... + version = find_max(fechas) + + types = ['domain','ip-src','ip-dst','hostname'] + + # Verifica que Warning sea del año actual + if str(version).startswith(datetime.now().strftime("%Y")): + for l in wl: + if str(l['Warninglist']['version']) == version: + if any(filtro in str(l['Warninglist']['name']).lower() for filtro in filtros): + if int(str(l['Warninglist']['warninglist_entry_count'])) <= max_reg_pw: + valid_atributes = str(l['Warninglist']['valid_attributes']).split(",") + if any(tipo in valid_atributes for tipo in types): + wl = misp.get_warninglist(int(l['Warninglist']['id'])) + for entry in wl['Warninglist']['WarninglistEntry']: + if '.' in entry['value'] and ':' not in entry['value'] and '/' not in entry['value']: + lista_fp.add(entry['value']) + + # Se une FP_COMUNES + WL + lista_fp.update(config.FP_COMUNES) + else: + lista_fp.update(config.FP_COMUNES) + logging.info("WarningList no son del año actual. Se cargan Falsos positivos genericos") + logging.info("Falsos positivos cargados :"+str(len(lista_fp))) + except (Exception, PyMISPError) as e: + logging.error("Error al cargar FP en main :"+str(e)) + + +async def run_comprobar_fp_periodicamente(): + while True: + # Ejecuta la función en un thread para no bloquear el event loop + await asyncio.to_thread(guarda_fp) + # Espera 24 horas (86400 segundos) + await asyncio.sleep(86400) + + +@staticmethod +def convert_value(value): + try: + # Intentar convertir a fecha y retornar un número representativo + return int(datetime.strptime(value, "%Y%m%d").strftime("%Y%m%d")) + except ValueError: + # Si falla, convertir a entero directamente + return int(value) + +def find_max(data): + max_value = max(data, key=convert_value) + return max_value + + + + +################################################################################ +# STARTUP: INICIALIZAR LA BD +################################################################################ # Definimos el ciclo de vida como un generador @asynccontextmanager @@ -78,14 +176,21 @@ async def lifespan(app: FastAPI): try: await init_db() logging.info("Base de datos inicializada (async).") + + asyncio.create_task(run_comprobar_fp_periodicamente()) + logging.info("Cargando Falsos Positivos") + except (OperationalError, Exception) as e: - logging.error("No se logró establecer conexión con DB. Se utilizará archivo local. Error: " + str(e)) + logging.error("Error: " + str(e)) # Yield indica el punto de inicio de la app yield # Aquí puedes agregar limpieza si es necesario +################################################################################ +# FASTAPI APP +################################################################################ app = FastAPI(lifespan=lifespan, - version="1.0", + version="1.2", title="MISP-WEBHOOK", description="Webhooks for MISP", swagger_ui_parameters={"supportedSubmitMethods": []} @@ -99,6 +204,9 @@ app.add_middleware( allow_headers=["*"], ) +################################################################################ +# PYDANTIC MODELS +################################################################################ class InputModel(BaseModel): model_config = ConfigDict(extra="forbid") @@ -112,6 +220,9 @@ class ResponseData(BaseModel): status: Optional[str] = None detail: Optional[str] = None +################################################################################ +# DEPENDENCIA PARA OBTENER AsyncSession +################################################################################ async def get_db(): async with AsyncSessionLocal() as session: yield session @@ -200,12 +311,11 @@ event_data_responses = { } }, } -misp = PyMISP( - url=config.MISP_CONFIG['misp_url'], - key=config.MISP_CONFIG['misp_authkey'], - ssl=False # Cambia a True si tienes SSL válido -) + +################################################################################ +# DICCIONARIO DE TIPOS +################################################################################ tipos_ktip = { "ip": ['ip-src','ip-dst'], "domain": ['domain','hostname'], @@ -213,10 +323,16 @@ tipos_ktip = { "url": ['url'] } +################################################################################ +# LOCK ASÍNCRONO SI QUIERES SECUNDARIZAR ESCRITURAS +################################################################################ # Concurrencia de 2-3 no suele requerir un single-writer, pero si quieres # evitar colisiones de escritura, lo agregamos. writer_lock = asyncio.Lock() +################################################################################ +# FUNCIONES DE BD (LECTURA Y ESCRITURA) CON REINTENTOS +################################################################################ async def async_check_falso_positivo_en_bd(ioc_value: str, ioc_type: str, days_valid: int = 1, db: AsyncSession = None) -> Optional[bool]: """ Lectura asíncrona en la BD, con reintentos si hay 'database is locked'. @@ -271,6 +387,7 @@ async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_f ) db_session.add(nuevo) await db_session.commit() + break except OperationalError as e: if "database is locked" in str(e).lower(): @@ -282,23 +399,95 @@ async def async_guardar_falso_positivo_en_bd(ioc_value: str, ioc_type: str, es_f else: raise + +################################################################################ +# FUNCION AUXILIAR KTIP +################################################################################ +async def scan_ioc(ioc): + """ + Retorna True si es Falso Positivo (clean/good), + o False si es positivo (malicioso) o hubo un fallo. + """ + url = f"https://opentip.kaspersky.com/{ioc}/?tab=lookup" + + try: + # Iniciamos Playwright + async with async_playwright() as p: + # Lanzamos Chromium en modo headless + browser = await p.chromium.launch(headless=True) + page = await browser.new_page() + + try: + # Intentamos navegar a la URL + await page.goto(url, timeout=30000) + logging.info("Ingresando a KTIP por navegador") + # Esperamos a que aparezca el div + await page.wait_for_selector("div.ReputationBlock_repBlock_HhC4CEbx", timeout=30000) + except TimeoutError: + logging.error(f"[ERROR] Timeout al procesar {ioc}") + await browser.close() + return False # O return None, según lo que prefieras indicar + except Exception as e: + # Cualquier otro error (por ejemplo, fallo de conexión) + logging.error(f"[ERROR] Al procesar {ioc}: {e}") + await browser.close() + return False + + # Si llegamos aquí, se cargó la página y el div se encontró + div_inner = await page.inner_html("div.ReputationBlock_repBlock_HhC4CEbx") + + await browser.close() + + # Verificamos si contiene 'good' o 'clean' + div_lower = div_inner.lower() + if '>good<' in div_lower or '>clean<' in div_lower: + return True # IoC Falso Positivo + else: + return False # IoC Positivo (malicioso o no "clean") + + except Exception as e: + # Captura cualquier error *fuera* del bloque de conexión con Playwright + logging.error(f"[ERROR] Fallo inesperado con {ioc}: {e}") + return False + + +def get_session_with_retries(max_retries=1): + session = requests.Session() + retries = Retry( + total=max_retries, # Número total de reintentos + backoff_factor=0, # Tiempo de espera entre reintentos (0 = sin espera adicional) + status_forcelist=[500, 502, 503, 504], # Códigos HTTP que van a forzar reintento + allowed_methods=["HEAD", "GET", "OPTIONS"] # Métodos que permiten reintentos + ) + adapter = HTTPAdapter(max_retries=retries) + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + +################################################################################ +# FUNCIONES BLOQUEANTES => las llamamos con asyncio.to_thread +################################################################################ def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool: """ Llamada sincrónica (requests) a KTIP. Llamaremos a esta función con to_thread. """ + global lista_fp + url = config.KTIP_CONFIG['url_base'] headers = {'x-api-key': config.KTIP_CONFIG['api_key']} + # Comprobaciones iniciales if str(ioc).lower().startswith("https://www.virustotal.com/gui"): return False - - if str(ioc).lower() in config.FP_COMUNES or any(str(ioc).lower().endswith(item) for item in config.FP_COMUNES): + + if str(ioc).lower() in lista_fp or any(str(ioc).lower().endswith(item) for item in lista_fp): return True + # Construir la URL de KTIP según el tipo de IoC if tipo_ioc in tipos_ktip['ip']: url += f"ip?request={ioc}" elif tipo_ioc in tipos_ktip['domain']: - url += f"domain?request={ioc}" + url += f"domain?request={ioc.replace("www.","")}" elif tipo_ioc in tipos_ktip['hash']: url += f"hash?request={ioc}" elif tipo_ioc in tipos_ktip['url']: @@ -310,26 +499,68 @@ def sync_verificacion_ktip(ioc: str, tipo_ioc: str) -> bool: else: return False + # Crea la sesión con reintentos=1 + session_ktip = get_session_with_retries(max_retries=1) + try: - resp = requests.get(url, headers=headers, verify=False) - logging.info(f"Respondió KTIP con IoC: {ioc}") - if resp.status_code == 200: - data = resp.json() - if data.get('Zone') == 'Green': - if tipo_ioc in tipos_ktip['url']: - resp_temp = requests.get(ioc, verify=False) - if resp_temp.status_code == 200 or resp_temp.status_code == 403: - content = resp_temp.text.lower() - if 'html' in content: - return False - return True - elif resp.status_code == 403: - logging.info(f"No proceso KTIP - IoC: {ioc}") - return False + # PRIMERA PETICIÓN con la sesión configurada + resp = session_ktip.get(url, headers=headers, verify=False) + try: + logging.info(f"Respondió KTIP con IoC: {ioc}") + + if resp.status_code == 200: + data = resp.json() # Descarga el contenido en memoria + if data.get('Zone') == 'Green': + # Si el tipo es URL, se hace una segunda petición para obtener el contenido del IoC + if tipo_ioc in tipos_ktip['url']: + # Se usa requests normal sin reintentos y se habilita stream=True + resp_temp = requests.get(ioc, verify=False, stream=True) + try: + codes = [200, 403, 404] + if resp_temp.status_code in codes: + # Obtenemos el header "Content-Type" para determinar el tipo de contenido + content_type = resp_temp.headers.get("Content-Type", "").lower() + keywords = ['html','abuse', 'report', '{', '}'] + if "text/html" in content_type: + # Si es HTML, usamos .text y verificamos las keywords + content = resp_temp.text.lower() + for w in keywords: + if w in content: + return False + else: + # Si no es HTML (p.ej. es un archivo binario como un .zip) + return False + finally: + # Cerramos la segunda respuesta + resp_temp.close() + return True + # Si 'Zone' no es 'Green' + return False + + elif resp.status_code == 403: + # Se verifica que no sea url para procesar + if tipo_ioc not in tipos_ktip['url']: + # Se limpia por si domain tiene wwww + ioc_fix = ioc.replace("wwww.","") + is_false_positive = asyncio.run(scan_ioc(ioc_fix)) + return is_false_positive + else: + logging.info(f"No proceso KTIP - IoC: {ioc}") + # Otros status code + return False + + finally: + # Cerramos la primera respuesta + resp.close() + except Exception as e: logging.error(f"Error verificacion_ktip: {e}") return False + finally: + # Cerramos la sesión para liberar recursos + session_ktip.close() + def sync_eliminar_atributo_o_objeto(a, object_id, evento_id): """ Llamada sincrónica a PyMISP. La usaremos con to_thread. @@ -371,52 +602,68 @@ def sync_eliminar_atributo_o_objeto(a, object_id, evento_id): logging.error(str(err)) return False +################################################################################ +# COROUTINE PARA PROCESAR UN ATRIBUTO +################################################################################ async def procesar_atributo(a, event_id, days_ago, now, db_factory): """ Verifica si un atributo es un FP usando BD asíncrona y KTIP sincrónico con to_thread. """ try: - attr_timestamp = int(a['timestamp']) - attr_datetime = datetime.fromtimestamp(attr_timestamp) - if not (days_ago.date() <= attr_datetime.date() <= now.date()): - return False + if any(a['type'] in lista for lista in tipos_ktip.values()): + attr_timestamp = int(a['timestamp']) + attr_datetime = datetime.fromtimestamp(attr_timestamp) + if not (days_ago.date() <= attr_datetime.date() <= now.date()): + return False + + + # Para URL valor debe dejarse intacto + if a['type'] in tipos_ktip['url']: + valor_lower = str(a['value']).strip() + else: + valor_lower = str(a['value']).strip().lower() - valor_lower = str(a['value']).strip().lower() + + # 1) Lectura asíncrona en BD + async with db_factory() as db_session: + resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session) - # 1) Lectura asíncrona en BD - async with db_factory() as db_session: - resultado_bd = await async_check_falso_positivo_en_bd(valor_lower, a['type'], config.RANGO_DIAS, db_session) + # 2) Si ya está en BD + if resultado_bd is not None: + if resultado_bd is True: + # Llamada a MISP sincrónica => to_thread + await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id) + return True + else: + return False + - # 2) Si ya está en BD - if resultado_bd is not None: - if resultado_bd is True: - # Llamada a MISP sincrónica => to_thread + + # 3) Si no está en BD, llamar KTIP en un hilo => to_thread + es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type']) + + # 4) Guardar en BD con lock de escritura + await async_guardar_falso_positivo_en_bd( + valor_lower, + a['type'], + es_fp, + db_factory + ) + + if es_fp: await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id) return True else: return False - - # 3) Si no está en BD, llamar KTIP en un hilo => to_thread - es_fp = await asyncio.to_thread(sync_verificacion_ktip, valor_lower, a['type']) - - # 4) Guardar en BD con lock de escritura - await async_guardar_falso_positivo_en_bd( - valor_lower, - a['type'], - es_fp, - db_factory - ) - - if es_fp: - await asyncio.to_thread(sync_eliminar_atributo_o_objeto, a, None if a['object_id'] == "0" else a['object_id'], event_id) - return True - else: - return False + return False except Exception as e: logging.error(f"Error en procesar_atributo({a}): {e}") return False +################################################################################ +# ENDPOINT ASÍNCRONO +################################################################################ @app.post( "/webhook/misp_event_fixer", response_model=ResponseData, diff --git a/requirements.txt b/requirements.txt index 499c959..dab999c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ fastapi pymisp uvicorn SQLAlchemy -aiosqlite \ No newline at end of file +aiosqlite +playwright \ No newline at end of file