Цель этого мини-урока

Ниже приведен краткий обзор настройки приложения, написанного на Kotlin + Spring Boot, которое развертывается на кластере в нескольких экземплярах и использует библиотеку. Кварц запускать задачи, запланированные cron, только на одном из экземпляров этой службы.

Пример: В OpenShift запущено несколько подов микросервисов, один из которых должен генерировать отчет раз в сутки. Если модуль по какой-либо причине выходит из строя во время работы, эта задача должна быть перехвачена и выполнена другим модулем. Если генерация отчета не удалась, следует попробовать запустить задачу генерации отчета несколько раз в течение следующих двух часов. После N неудачных попыток вы должны восстановить исходный cron для этой задачи. Конфигурация всех модулей микросервисов должна быть одинаковой.

Краткое введение

Прежде чем читать эту статью, я настоятельно рекомендую вам прочитать этот прекрасный исчерпывающий обзор библиотеки Quartz.

Альтернативы

Вам могут не понадобиться все функции, предоставляемые Quartz.
В таком случае, если вы используете Spring в проекте, советую посмотреть библиотеку ShedLocking. Ссылка на репозиторий в Гитхаб.
Краткий ShedLocking это простая библиотека, которая гарантирует, что любая задача будет выполнена только один раз.
Реализация основана на блокировках, хранящихся в базе данных — всего несколько таблиц. Очень удобно, что всю настройку можно сделать только с помощью аннотаций в стиле аннотаций Spring @Программа.
Однако основным недостатком этой библиотеки является
ShedLock не отслеживает жизненный цикл работы (нет возможности проверить выполнение задания, при необходимости перепланируйте задание).

Если вы рассматриваете альтернативные механизмы для синхронизации нескольких экземпляров вашего приложения, советую прочитать эта тема на StackOverflow
(вот моя публикация с содержанием этой статьи)

Выполнение

Описание задачи

Ниже приведен пример настройки приложения на Spring Boot, которое одновременно работает на нескольких серверах и исследует базу данных. У каждого экземпляра приложения есть bean — задача, которую выполняет cron. Это здание должно быть завершено только один раз (на одном из инстансов).
Если модуль, в котором выполнялось задание, дает сбой, задание необходимо перезапустить на любом другом работающем модуле. Если платформа не упала во время выполнения задания, а
задача не была выполнена (получили исключение при выполнении), задачу необходимо перезапустить еще 2 раза с задержкой в 5 часов * количество попыток.
Если вторая попытка перезапуска не удалась, вам необходимо установить cron по умолчанию
для нашей миссии:
0 0 4 L-1 * ? * — исполнение в 4 часа утра предпоследнего числа каждого месяца.

ЧИТАТЬ   Функция помощи водителю Ford уровня 3 в ближайшее время не сможет справиться с оживленными улицами города.

Теперь мы решили, что обязательно будем использовать Quartz и будем использовать его. в кластерном режиме

Подключить зависимость:

Gradle
implementation("org.springframework.boot:spring-boot-starter-quartz")

Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Заполнение базы:

Прежде чем мы начнем писать конфигурации, нам нужно заполнить нашу базу данных таблицами, с которыми будет работать Quartz. Я использовал liquibase и официальные скрипты — здесь отсюда.
Для заданий Quartz лучше иметь отдельную схему в базе данных.

Задача, которую должен выполнить cron:

Мы имитируем поведение, когда задание или одна из служб, которые использует задание, может возвращать ошибку в 50% случаев.

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Параметр настройки

(Больше информации Здесь):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail дла задания выше
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            // Устанавливаем данное значение в true, если хотим, чтобы джоба была перезапущена
            // в случае падения пода
            .requestRecovery(true)
            // не удаляем задание из базы даже в случае, если ни один из триггеров на задание не укаывает
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    // Необходимо также при старте пересоздавать уже имеющиеся задания 
    // (нужно на случай, если вы заходите изменить cron выражение для какого-либо из ваших заданий,
    // которые уже были созданы ранее, в противном случае в базе сохранится старое cron выражение)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Создание слушателя, который будет следить за выполнением нашей задачи:

Для работы litener необходимо зарегистрировать его в планировщике.

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

Основная логика обработки жизненного цикла задания:

Мы следим за статусом выполнения задачи с помощью слушателя, который мы зарегистрировали ранее в планировщике. Слушатель имеет 2 метода:
jobToBeExecuted(context: JobExecutionContext)
И
jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?),
которые выполняются до старта и после выполнения задачи (независимо от того, успешно завершилась задача или нет)

Вся логика показана ниже. Я просто добавлю несколько комментариев:

  • Shuduler понимает, сколько раз триггер был перезапущен, используя информацию в jobDataMap. Кроме того, эти данные хранятся в базе данных, поэтому в случае перезапуска инстанса предыдущее значение количества неудачных срабатываний триггера будет вычтено.

  • Если приложение аварийно завершает работу во время выполнения задачи, в базе данных может остаться неисполненный триггер, который, в свою очередь, может преобразоваться в триггер восстановления при перезапуске приложения (его имя будет начинаться с recovery_ и у триггера будет группа RECOVERING_JOBS)

@Profile("quartz")
class JobListener(
    //можно вытащить из контекста выполнения, либо заинжектить напрямую из application контекста
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //В лист триггеров будут помещены только самописные задания, recover триггеры (если 
        //они существуют на момент старта приложения в этот лист внедрены не будут)
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //либо можно использовать context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //В этот блок if можно добавить следующее условие: && !context.trigger.key.name.startsWith("recover_")
        // в этом случае шедулер не будет будет перезапускать recover триггеры, которые могут образоваться
        // в случае падения приложения во время выполнения задания.
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //на случай, если триггер с таким именем уже существует (остался в бд после падения инстанса)
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //заменяем наш cron триггер simple триггером, который будет запущен 
                    // через 5 часов * количество попыток перезапуска задания
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.HOURS)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery триггеры не должны быть перешедулены
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

Стоит обратить внимание на несколько методов:
jobException.setRefireImmediately(true)который можно использовать вместе с context.refireCountесли вам не нужно перепланировать задание после получения ошибки времени выполнения. Работа будет перезапущена немедленно.
В одном из ответы на StackOverflow рекомендовалось использовать в работе
Thread.sleep(N-seconds) вместо того, чтобы переназначать задачу, если она вылетает — явно не лучшая идея ☺

ЧИТАТЬ   Самый вкусный суп с фрикадельками, который вы будете готовить не один раз. Простой суп с овощами и мясом.

Сбой приложения-quartz.yaml

И последнее, что нужно сделать, это написать конфигурацию yaml файл профиля quartzчто мы собираемся использовать. Оставлю комментарии в файле без перевода:

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below:  & 
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: quartz_schema.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Локальная отладка производилась следующим образом: я написал несколько docker compose файлы, в которых он поднял базу, и несколько экземпляров приложения, которые он «поместил» на поднятую базу. Если интересно, могу описать отдельно.

Дальнейшая информация:

Вот еще несколько интересных статей на эту тему, которые я рекомендую прочитать:
О кварце
Весенний старт с использованием кварца в кластерном режиме
Интересная статья от коллег OTUS
Эффективно кварцевый кластер

ЧИТАТЬ   «Носят уже не первый день»: жители пригорода Челябинска обнаружили в лесу незаконную свалку снега

PS Приветствую конструктивную критику предложенного выше решения и с удовольствием рассмотрю альтернативы.

Спасибо за ваше время!

Source

От admin