misp-jobfixer/main.py

96 lines
3.5 KiB
Python
Raw Normal View History

2025-01-10 10:17:35 -03:00
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, select
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta
import config
import os
import logging
from logging.handlers import RotatingFileHandler
DIR_ACTUAL = os.getcwd()
DIR_LOGS = os.path.join(DIR_ACTUAL,'logs')
os.makedirs(DIR_LOGS, exist_ok=True)
rotating_handler = RotatingFileHandler(os.path.join(DIR_LOGS,"misp_limpiajobs_"+datetime.now().strftime("%Y%m%d")+".log"), maxBytes=262144000, backupCount=10)
logging.basicConfig(level=logging.INFO, handlers=[rotating_handler],
format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de la conexión para MariaDB con pymysql
DATABASE_URL = "mariadb+pymysql://"+config.DB_DATA['user']+":"+config.DB_DATA['pass']+"@"+config.DB_DATA['host']+":"+str(config.DB_DATA['port'])+"/"+config.DB_DATA['dbname']
# Inicializa SQLAlchemy
Base = declarative_base()
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()
# Modelo de la tabla jobs
class Job(Base):
__tablename__ = 'jobs'
id = Column(Integer, primary_key=True, autoincrement=True)
worker = Column(String(32), nullable=False)
job_type = Column(String(32), nullable=False)
job_input = Column(Text, nullable=False)
status = Column(Integer, nullable=False, default=0)
retries = Column(Integer, nullable=False, default=0)
message = Column(Text, nullable=False)
progress = Column(Integer, nullable=False, default=0)
org_id = Column(Integer, nullable=False, default=0)
process_id = Column(String(36), nullable=True)
date_created = Column(DateTime, nullable=False)
date_modified = Column(DateTime, nullable=False)
# Función para detener jobs no completados después de 24 horas
def kill_stale_jobs():
"""
Identifica y detiene jobs que no han sido completados
y llevan más de 24 horas sin cambios.
"""
try:
time_limit = datetime.now() - timedelta(hours=config.CHECK_JOBS['hours_limit'])
# Consulta usando la sintaxis de SQLAlchemy 2.x
stmt = select(Job).where(
Job.status == 0, # Status 0: Sigue en progreso
Job.progress < 100, # Menor a 100, sigue activo
Job.date_modified < time_limit
)
stale_jobs = session.execute(stmt).scalars().all()
if not stale_jobs:
logging.info("No hay jobs pendientes que cumplan con las condiciones.")
return
processed_count = 0
failed_count = 0
for job in stale_jobs:
try:
job.status = 3 # Killed
job.progress = 100
job.message = "Killed job manually."
session.commit()
processed_count += 1
logging.info(f"Job {job.id} detenido automáticamente.")
except SQLAlchemyError as sql_error:
session.rollback()
failed_count += 1
logging.error(f"Error al detener el Job {job.id}: {str(sql_error)}")
logging.info(f"Resumen: {processed_count} jobs detenidos exitosamente, {failed_count} errores.")
except Exception as e:
logging.error(f"Error al procesar los jobs: {str(e)}")
finally:
session.close()
logging.info("Sesión cerrada correctamente.")
# Ejecución del script
if __name__ == "__main__":
2025-01-17 09:41:11 -03:00
logging.info("JobFixer v1.0 comenzando")
2025-01-10 10:17:35 -03:00
kill_stale_jobs()