95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, select
|
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from datetime import datetime, timedelta
|
|
import config
|
|
import os
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
|
|
DIR_ACTUAL = os.getcwd()
|
|
DIR_LOGS = os.path.join(DIR_ACTUAL,'logs')
|
|
|
|
os.makedirs(DIR_LOGS, exist_ok=True)
|
|
|
|
rotating_handler = RotatingFileHandler(os.path.join(DIR_LOGS,"misp_limpiajobs_"+datetime.now().strftime("%Y%m%d")+".log"), maxBytes=262144000, backupCount=10)
|
|
logging.basicConfig(level=logging.INFO, handlers=[rotating_handler],
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Configuración de la conexión para MariaDB con pymysql
|
|
DATABASE_URL = "mariadb+pymysql://"+config.DB_DATA['user']+":"+config.DB_DATA['pass']+"@"+config.DB_DATA['host']+":"+str(config.DB_DATA['port'])+"/"+config.DB_DATA['dbname']
|
|
|
|
# Inicializa SQLAlchemy
|
|
Base = declarative_base()
|
|
engine = create_engine(DATABASE_URL)
|
|
Session = sessionmaker(bind=engine)
|
|
session = Session()
|
|
|
|
# Modelo de la tabla jobs
|
|
class Job(Base):
|
|
__tablename__ = 'jobs'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
worker = Column(String(32), nullable=False)
|
|
job_type = Column(String(32), nullable=False)
|
|
job_input = Column(Text, nullable=False)
|
|
status = Column(Integer, nullable=False, default=0)
|
|
retries = Column(Integer, nullable=False, default=0)
|
|
message = Column(Text, nullable=False)
|
|
progress = Column(Integer, nullable=False, default=0)
|
|
org_id = Column(Integer, nullable=False, default=0)
|
|
process_id = Column(String(36), nullable=True)
|
|
date_created = Column(DateTime, nullable=False)
|
|
date_modified = Column(DateTime, nullable=False)
|
|
|
|
# Función para detener jobs no completados después de 24 horas
|
|
def kill_stale_jobs():
|
|
"""
|
|
Identifica y detiene jobs que no han sido completados
|
|
y llevan más de 24 horas sin cambios.
|
|
"""
|
|
try:
|
|
time_limit = datetime.now() - timedelta(hours=config.CHECK_JOBS['hours_limit'])
|
|
|
|
# Consulta usando la sintaxis de SQLAlchemy 2.x
|
|
stmt = select(Job).where(
|
|
Job.status == 0, # Status 0: Sigue en progreso
|
|
Job.progress < 100, # Menor a 100, sigue activo
|
|
Job.date_modified < time_limit
|
|
)
|
|
|
|
stale_jobs = session.execute(stmt).scalars().all()
|
|
|
|
if not stale_jobs:
|
|
logging.info("No hay jobs pendientes que cumplan con las condiciones.")
|
|
return
|
|
|
|
processed_count = 0
|
|
failed_count = 0
|
|
|
|
for job in stale_jobs:
|
|
try:
|
|
job.status = 3 # Killed
|
|
job.progress = 100
|
|
job.message = "Killed job manually."
|
|
session.commit()
|
|
processed_count += 1
|
|
logging.info(f"Job {job.id} detenido automáticamente.")
|
|
except SQLAlchemyError as sql_error:
|
|
session.rollback()
|
|
failed_count += 1
|
|
logging.error(f"Error al detener el Job {job.id}: {str(sql_error)}")
|
|
|
|
logging.info(f"Resumen: {processed_count} jobs detenidos exitosamente, {failed_count} errores.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error al procesar los jobs: {str(e)}")
|
|
|
|
finally:
|
|
session.close()
|
|
logging.info("Sesión cerrada correctamente.")
|
|
# Ejecución del script
|
|
if __name__ == "__main__":
|
|
logging.info("CuraJobs v1.0 comenzando")
|
|
kill_stale_jobs()
|