Как я могу убедиться, что работа в Bull не выполняется дважды?

11

У меня есть две функции, scheduleScan()и scan().

scan()звонки, scheduleScan() когда больше ничего не нужно делать, кроме планирования нового сканирования , поэтому scheduleScan()можно запланировать scan(). Но есть проблема, некоторые задания выполняются дважды.

Я хочу убедиться, что в данный момент обрабатывается только одна работа. Как я могу этого достичь? Я полагаю, что это как-то связано done()(это было в scan (), сейчас удалено), но я не смог найти решение.

Бык версия: 3.12.1

Важное позднее редактирование: scan() вызывает другие функции, и они могут или не могут вызывать другие функции, но все они являются функциями синхронизации, поэтому они вызывают функцию только тогда, когда их собственные задания завершены, есть только один путь вперед. В конце «дерева», я его называю, последняя функция вызывает scheduleScan (), но не может быть запущено два одновременных задания. Между scan()прочим, каждая работа начинается сscheduleScan(stock, period, milliseconds, 'called by file.js')

export function update(job) {
  // does some calculations, then it may call scheduleScan() or
  // it may call another function, and that could be the one calling
  // scheduleScan() function.
  // For instance, a function like finalize()
}

export function scan(job) {
  update(job)
}


import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)

queue.process(1, (job) => {
  job.progress(100).then(() => {
    scan(job)
  })
})

export function scheduleScan (stock, period, milliseconds, triggeredBy) {
  let uniqueId = stringHash(stock + ':' + period)

  queue.getJob(uniqueId).then(job => {
    if (!job) {
      if (milliseconds) {
        queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
          // console.log('Added with ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      } else {
        queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
          // console.log('Added without ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      }
    } else {
      job.getState().then(state => {
        if (state === 'completed') {
          job.remove().then(() => {
            if (milliseconds) {
              queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
                // console.log('Added with ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            } else {
              queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
                // console.log('Added without ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            }
          }).catch(err => {
            if (err) {
              // console.log(err)
            }
          })
        }
      }).catch(err => {
        // console.log(err)
      })
    }
  })
}
салепа
источник
Я не могу найти scanфункцию, вы можете помочь?
Мухаммед Зеешан
@MuhammadZeeshan Я добавил это, моя ошибка.
Салеп

Ответы:

6

Проблема, я считаю, в том, что ваша scanфункция асинхронна. Таким образом, ваша job.progressфункция вызывает, scanа затем немедленно вызывает, doneпозволяя очереди обрабатывать другое задание.

Решением может быть передача doneобратного вызова в качестве параметра вашим функциям scanи scheduleScanфункциям и вызов его после завершения работы (или по ошибке).

Другим (лучшим) решением может быть обеспечение того, чтобы вы всегда возвращали Promiseот scanи scheduleScan, а затем ожидали обещания решить и затем вызывали done. Если вы делаете это, убедитесь, что вы включили все свои обещания в свою scheduleScanфункцию.

queue.process(1, (job, done) => {
  job.progress(100).then(() => {
    scan(job)
        .then(done)
        .catch(done)
  })
})

export function scan() {
   // business logic
   return scheduleScan()
}

// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
    return queue.getJob(..).then(() => {
        ....
        return queue.add()...
        ....
        return queue.add(...)
            .catch(e => {
                 console.log(e);
                 // propogate errors!
                 throw e;
             })

}
Дживс
источник
Я отредактировал свой вопрос, пожалуйста, проверьте его еще раз, особенно в части "Важное позднее редактирование"? Ваш ответ все еще применим в этой ситуации? Спасибо.
Салеп
1
Да, это все еще в силе. Из ваших правок я думаю, что вы говорите, scheduledScanчто всегда вызывается после всех других функций синхронизации в scan. Если это так, то да, мой ответ остается в силе. Просто всегда возвращает обещание , которое будет возвращено из scheduleScanв scanфункции
Дживс
Опять моя ошибка. Первая функция, update (), сканируется, но update () может вызывать другую функцию, такую ​​как finalize (), а finalize () может вызывать scheduleScan (). Пожалуйста, имейте в виду, что это происходит по порядку, поэтому многократных звонков нет, я делаю это для поддержания модульности моего приложения. - Спасибо
Salep
1
Да, тот же ответ. Если updateзвонки scheduledScanили любое количество функций между ними. Ключевым моментом является то, что вам нужно вернуть цепочку обещаний на scheduleScanвсем пути назад к scanфункции. Так что если scanвызовы, updateвызывающие finalise..... Какие вызовы scheduleScanцепочки обещаний, должны быть возвращены через все вызовы функций, т.е. просто убедитесь, что вы возвращаете обещание от каждой из этих функций.
Дживс
Так что, чтобы уточнить мой последний комментарий. Например, если внутри сканирования вы вызываете обновление. Вам необходимо вернуть результат обновления (обещание) из функции сканирования.
Дживс
4

Функция сканирования является асинхронной функцией. В вашей queue.process()функции вы должны дождаться функции сканирования и затем вызвать done()обратный вызов.

export async function scan(job) {
  // it does some calculations, then it creates a new schedule.
  return scheduleScan(stock, period, milliseconds, "scan.js");
}

queue.process(1, (job, done) => {
  job.progress(100).then(async() => {
    await scan(job);
    done();
  });
});

export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
    let uniqueId = stringHash(stock + ":" + period);
    try {
      const existingJob = await queue.getJob(uniqueId);
      if (!existingJob) {
        const job = await addJob({
          queue,
          stock,
          period,
          uniqueId,
          milliseconds,
          triggeredBy
        });
        return job;
      } else {
        const jobState = await existingJob.getState();
        if (jobState === "completed") {
          await existingJob.remove();
          const newJob = await addJob({
            queue,
            stock,
            period,
            uniqueId,
            milliseconds,
            triggeredBy
          });
          return newJob;
        }
      }
    } catch (err) {
      throw new Error(err);
    }
}

export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
  if (milliseconds) {
    return queue.add(
      { stock, period, triggeredBy },
      { delay: milliseconds, jobId: uniqueId }
    );
  } else {
    return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
  }
}

Попробуй это! Я попытался немного изменить код, используя async-await.

Адитья Срейдж
источник
Я отредактировал свой вопрос, пожалуйста, проверьте его еще раз, особенно в части "Важное позднее редактирование"? Ваш ответ все еще применим в этой ситуации? Спасибо.
Салеп