Запланированная задача Spring выполняется в кластерной среде

98

Я пишу приложение, в котором есть задание cron, которое выполняется каждые 60 секунд. Приложение настроено для масштабирования при необходимости на несколько экземпляров. Я хочу выполнять задачу только на 1 экземпляре каждые 60 секунд (на любом узле). Я не могу найти решение этой проблемы прямо из коробки, и я удивлен, что об этом не спрашивали несколько раз раньше. Я использую Spring 4.1.6.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
user3131879
источник
7
Я думаю, что Quartz - лучшее решение для вас: stackoverflow.com/questions/6663182/…
selalerer 08
Есть предложения по использованию CronJobв kubernetes?
ch271828n

Ответы:

97

Существует проект ShedLock, который служит именно этой цели. Вы просто комментируете задачи, которые должны быть заблокированы при выполнении

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

Настроить Spring и LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}
Лукас
источник
1
Я просто хочу сказать «Хорошая работа!». Но ... Приятная особенность будет, если библиотека сможет обнаружить имя базы данных, не указывая его явным образом в коде ... За исключением того, что она отлично работает!
Krzysiek
У меня работает с Oracle и Spring boot data jpa starter.
Mahendran Ayyarsamy Kandiar
Работает ли это решение для Spring 3.1.1.RELEASE и java 6? Пожалуйста скажи.
Викас Шарма
Я пробовал с MsSQL и Spring boot JPA, и я использовал сценарий Liquibase для части SQL .. работает хорошо .. Спасибо
Sheetal
Это действительно хорошо работает. Однако я встретил здесь немного сложный случай, не могли бы вы взглянуть. Благодарность!!! stackoverflow.com/questions/57691205/…
Дейтон Ван
15

Это еще один простой и надежный способ безопасного выполнения задания в кластере. Вы можете основываться на базе данных и выполнять задачу только в том случае, если узел является «лидером» в кластере.

Также, когда узел выходит из строя или выключается в кластере, другой узел становится лидером.

Все, что вам нужно, это создать механизм «выборов лидера» и каждый раз проверять, являетесь ли вы лидером:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

Следуйте этим шагам:

1. Определите объект и таблицу, которые содержат по одной записи для каждого узла в кластере:

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2.Создайте сервис, который а) вставляет узел в базу данных, б) проверяет лидера

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3. отправка базы данных о том, что вы живы

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4. вы готовы! Просто проверьте, являетесь ли вы лидером, прежде чем выполнять задание:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}
mspapant
источник
В этом случае что такое SystemService и SettingEnum? Похоже, это очень просто и просто возвращает значение тайм-аута. В таком случае, почему бы просто не закодировать тайм-аут?
tlavarea
@mspapant, что такое SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT? Какое оптимальное значение я должен использовать здесь?
user525146
@tlavarea вы реализовали этот код, у меня вопрос по методу DateUtils.hasExpired? это собственный метод или это общие утилиты apache?
user525146
10

Пакетные и запланированные задания обычно выполняются на их собственных автономных серверах, вдали от приложений, предназначенных для клиентов, поэтому не является обычным требованием включать задание в приложение, которое, как ожидается, будет выполняться в кластере. Кроме того, заданиям в кластерных средах обычно не нужно беспокоиться о том, что другие экземпляры одного и того же задания выполняются параллельно, так что это еще одна причина, по которой изоляция экземпляров задания не является большим требованием.

Простым решением было бы настроить ваши задания внутри профиля Spring. Например, если ваша текущая конфигурация:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

измените его на:

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

Затем запустите приложение только на одном компьютере с scheduledактивированным профилем ( -Dspring.profiles.active=scheduled).

Если по какой-то причине основной сервер становится недоступным, просто запустите другой сервер с включенным профилем, и все будет продолжать работать нормально.


Ситуация изменится, если вы захотите также автоматическое переключение заданий при отказе. Затем вам нужно будет сохранить выполнение задания на всех серверах и проверить синхронизацию через общий ресурс, такой как таблица базы данных, кластерный кеш, переменная JMX и т. Д.

маниакальный
источник
58
Это допустимый обходной путь, но это нарушит идею кластерной среды, где, если узел не работает, другой узел может обслуживать другие запросы. В этом обходном пути, если узел с «запланированным» профилем выйдет из строя, это фоновое задание не будет запущено
Ахмед Хашем
3
Я думаю, что мы могли бы использовать Redis с атомарным кодом getи setоперацией для этого.
Thanh Nguyen Van
Есть несколько проблем с вашим предложением: 1. Обычно вы хотите, чтобы каждый узел кластера имел точно такую ​​же конфигурацию, чтобы они были на 100% взаимозаменяемыми и требовали одинаковых ресурсов при одинаковой нагрузке. 2. Ваше решение потребует ручного вмешательства при выходе из строя узла «задача». 3. Это все еще не гарантирует, что задание действительно было выполнено успешно, потому что узел «задача» вышел из строя до того, как завершил обработку текущего выполнения, а новый «исполнитель задач» был создан после того, как первый из них отключился, не зная, кончено оно или нет.
Моше Биксеншпанер
1
это просто нарушает идею кластерных сред, не может быть никакого решения с подходом, который вы предложили. Вы не можете реплицировать даже серверы профилей, чтобы гарантировать доступность, потому что это приведет к дополнительным затратам и ненужной трате ресурсов. Решение, предложенное @Thanh, намного чище, чем это. Думайте об этом как о MUTEX. Любой сервер, на котором запущен сценарий, получит временную блокировку в некотором распределенном кеше, таком как redis, а затем продолжит работу с концепциями традиционной блокировки.
анудж прадхан
2

dlock предназначен для однократного выполнения задач с использованием индексов и ограничений базы данных. Вы можете просто сделать что-то вроде ниже.

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

См. Статью о его использовании.

Уилл Хьюз
источник
3
Если используется dlock. Предположим, мы используем БД для поддержки блокировки. И один из узлов в кластере неожиданно вышел из строя после взятия блокировки, что же произойдет в этом сценарии? Будет ли он в тупиковом состоянии?
Badman
1

Я использую таблицу базы данных для блокировки. Только одна задача за раз может выполнять вставку в таблицу. Другой получит исключение DuplicateKeyException. Логика вставки и удаления обрабатывается аспектом аннотации @Scheduled. Я использую Spring Boot 2.0

@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);
RenRen
источник
3
Как вы думаете, он будет работать идеально? Потому что, если один из узлов выйдет из строя после взятия блокировки, другие не узнают, почему существует блокировка (в вашем случае запись строки, соответствующая заданию в таблице).
Badman
0

Вы можете использовать встраиваемый планировщик, например db-scheduler . Он имеет постоянное выполнение и использует простой оптимистичный механизм блокировки, чтобы гарантировать выполнение одним узлом.

Пример кода того, как можно реализовать вариант использования:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();
Густав Карлссон
источник
-1

Контекст Spring не кластеризован, поэтому управлять задачей в распределенном приложении немного сложно, и вам нужно использовать системы, поддерживающие jgroup, чтобы синхронизировать состояние и позволить вашей задаче иметь приоритет для выполнения действия. Или вы можете использовать контекст ejb для управления кластеризованной одноэлементной службой ha, такой как среда jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Или вы можете использовать кластерный кеш и ресурс блокировки доступа между службой и первой службой блокировка будет формировать действие или реализовать вашу собственную jgroup для связи с вашей службой и выполнить действие на одном узле

Абдулгаффар аль-Лабади
источник