from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, select from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy.exc import SQLAlchemyError from datetime import datetime, timedelta import config import os import logging from logging.handlers import RotatingFileHandler DIR_ACTUAL = os.getcwd() DIR_LOGS = os.path.join(DIR_ACTUAL,'logs') os.makedirs(DIR_LOGS, exist_ok=True) rotating_handler = RotatingFileHandler(os.path.join(DIR_LOGS,"misp_limpiajobs_"+datetime.now().strftime("%Y%m%d")+".log"), maxBytes=262144000, backupCount=10) logging.basicConfig(level=logging.INFO, handlers=[rotating_handler], format='%(asctime)s - %(levelname)s - %(message)s') # Configuración de la conexión para MariaDB con pymysql DATABASE_URL = "mariadb+pymysql://"+config.DB_DATA['user']+":"+config.DB_DATA['pass']+"@"+config.DB_DATA['host']+":"+str(config.DB_DATA['port'])+"/"+config.DB_DATA['dbname'] # Inicializa SQLAlchemy Base = declarative_base() engine = create_engine(DATABASE_URL) Session = sessionmaker(bind=engine) session = Session() # Modelo de la tabla jobs class Job(Base): __tablename__ = 'jobs' id = Column(Integer, primary_key=True, autoincrement=True) worker = Column(String(32), nullable=False) job_type = Column(String(32), nullable=False) job_input = Column(Text, nullable=False) status = Column(Integer, nullable=False, default=0) retries = Column(Integer, nullable=False, default=0) message = Column(Text, nullable=False) progress = Column(Integer, nullable=False, default=0) org_id = Column(Integer, nullable=False, default=0) process_id = Column(String(36), nullable=True) date_created = Column(DateTime, nullable=False) date_modified = Column(DateTime, nullable=False) # Función para detener jobs no completados después de 24 horas def kill_stale_jobs(): """ Identifica y detiene jobs que no han sido completados y llevan más de 24 horas sin cambios. """ try: time_limit = datetime.now() - timedelta(hours=config.CHECK_JOBS['hours_limit']) # Consulta usando la sintaxis de SQLAlchemy 2.x stmt = select(Job).where( Job.status == 0, # Status 0: Sigue en progreso Job.progress < 100, # Menor a 100, sigue activo Job.date_modified < time_limit ) stale_jobs = session.execute(stmt).scalars().all() if not stale_jobs: logging.info("No hay jobs pendientes que cumplan con las condiciones.") return processed_count = 0 failed_count = 0 for job in stale_jobs: try: job.status = 3 # Killed job.progress = 100 job.message = "Killed job manually." session.commit() processed_count += 1 logging.info(f"Job {job.id} detenido automáticamente.") except SQLAlchemyError as sql_error: session.rollback() failed_count += 1 logging.error(f"Error al detener el Job {job.id}: {str(sql_error)}") logging.info(f"Resumen: {processed_count} jobs detenidos exitosamente, {failed_count} errores.") except Exception as e: logging.error(f"Error al procesar los jobs: {str(e)}") finally: session.close() logging.info("Sesión cerrada correctamente.") # Ejecución del script if __name__ == "__main__": logging.info("CuraJobs v1.0 comenzando") kill_stale_jobs()