Я пишу приложение, в котором есть задание cron, которое выполняется каждые 60 секунд. Приложение настроено для масштабирования при необходимости на несколько экземпляров. Я хочу выполнять задачу только на 1 экземпляре каждые 60 секунд (на любом узле). Я не могу найти решение этой проблемы прямо из коробки, и я удивлен, что об этом не спрашивали несколько раз раньше. Я использую Spring 4.1.6.
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
spring
spring-scheduled
user3131879
источник
источник
CronJob
вkubernetes
?Ответы:
Существует проект ShedLock, который служит именно этой цели. Вы просто комментируете задачи, которые должны быть заблокированы при выполнении
@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }
Настроить Spring и LockProvider
@Configuration @EnableScheduling @EnableSchedulerLock(defaultLockAtMostFor = "10m") class MySpringConfiguration { ... @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } ... }
источник
Я думаю, вам нужно использовать Quartz Clustering с JDBC-JobStore для этой цели
источник
Это еще один простой и надежный способ безопасного выполнения задания в кластере. Вы можете основываться на базе данных и выполнять задачу только в том случае, если узел является «лидером» в кластере.
Также, когда узел выходит из строя или выключается в кластере, другой узел становится лидером.
Все, что вам нужно, это создать механизм «выборов лидера» и каждый раз проверять, являетесь ли вы лидером:
@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
Следуйте этим шагам:
1. Определите объект и таблицу, которые содержат по одной записи для каждого узла в кластере:
@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}'; }
}
2.Создайте сервис, который а) вставляет узел в базу данных, б) проверяет лидера
@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }
}
3. отправка базы данных о том, что вы живы
@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }
4. вы готовы! Просто проверьте, являетесь ли вы лидером, прежде чем выполнять задание:
@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
источник
Пакетные и запланированные задания обычно выполняются на их собственных автономных серверах, вдали от приложений, предназначенных для клиентов, поэтому не является обычным требованием включать задание в приложение, которое, как ожидается, будет выполняться в кластере. Кроме того, заданиям в кластерных средах обычно не нужно беспокоиться о том, что другие экземпляры одного и того же задания выполняются параллельно, так что это еще одна причина, по которой изоляция экземпляров задания не является большим требованием.
Простым решением было бы настроить ваши задания внутри профиля Spring. Например, если ваша текущая конфигурация:
<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>
измените его на:
<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>
Затем запустите приложение только на одном компьютере с
scheduled
активированным профилем (-Dspring.profiles.active=scheduled
).Если по какой-то причине основной сервер становится недоступным, просто запустите другой сервер с включенным профилем, и все будет продолжать работать нормально.
Ситуация изменится, если вы захотите также автоматическое переключение заданий при отказе. Затем вам нужно будет сохранить выполнение задания на всех серверах и проверить синхронизацию через общий ресурс, такой как таблица базы данных, кластерный кеш, переменная JMX и т. Д.
источник
get
иset
операцией для этого.dlock предназначен для однократного выполнения задач с использованием индексов и ограничений базы данных. Вы можете просто сделать что-то вроде ниже.
@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }
См. Статью о его использовании.
источник
Я использую таблицу базы данных для блокировки. Только одна задача за раз может выполнять вставку в таблицу. Другой получит исключение DuplicateKeyException. Логика вставки и удаления обрабатывается аспектом аннотации @Scheduled. Я использую Spring Boot 2.0
@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }
@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }
CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );
источник
Вы можете использовать встраиваемый планировщик, например db-scheduler . Он имеет постоянное выполнение и использует простой оптимистичный механизм блокировки, чтобы гарантировать выполнение одним узлом.
Пример кода того, как можно реализовать вариант использования:
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();
источник
Контекст Spring не кластеризован, поэтому управлять задачей в распределенном приложении немного сложно, и вам нужно использовать системы, поддерживающие jgroup, чтобы синхронизировать состояние и позволить вашей задаче иметь приоритет для выполнения действия. Или вы можете использовать контекст ejb для управления кластеризованной одноэлементной службой ha, такой как среда jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Или вы можете использовать кластерный кеш и ресурс блокировки доступа между службой и первой службой блокировка будет формировать действие или реализовать вашу собственную jgroup для связи с вашей службой и выполнить действие на одном узле
источник