Я использую rxjava в своем приложении для Android для асинхронной обработки сетевых запросов. Теперь я хотел бы повторить неудачный сетевой запрос только по прошествии определенного времени.
Есть ли способ использовать retry () для Observable, но повторить попытку только после определенной задержки?
Есть ли способ сообщить Observable, что в настоящее время выполняется повторная попытка (а не в первый раз)?
Я посмотрел на debounce () / throttleWithTimeout (), но они, похоже, делают что-то другое.
Редактировать:
Я думаю, что нашел один способ сделать это, но мне было бы интересно либо подтверждение того, что это правильный способ сделать это, либо другие, лучшие способы.
Я делаю следующее: в методе call () моего Observable.OnSubscribe, прежде чем я вызываю метод Subscribers onError (), я просто позволяю Thread спать на желаемое время. Итак, чтобы повторить попытку каждые 1000 миллисекунд, я делаю что-то вроде этого:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Поскольку этот метод в любом случае выполняется в потоке ввода-вывода, он не блокирует пользовательский интерфейс. Единственная проблема, которую я вижу, это то, что даже первая ошибка сообщается с задержкой, поэтому задержка есть, даже если нет retry (). Я бы хотел, чтобы задержка применялась не после ошибки, а перед повторной попыткой (но, очевидно, не до первой попытки).
Error:(73, 20) error: incompatible types: RetryWithDelay cannot be converted to Func1<? super Observable<? extends Throwable>,? extends Observable<?>>
RetryWithDelay
на это: pastebin.com/6SiZeKnCВдохновленный ответом Пола , и если вас не беспокоят
retryWhen
проблемы, указанные Абхиджитом Саркаром , самый простой способ отложить повторную подписку с помощью rxJava2 без условий:source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Возможно, вы захотите увидеть больше примеров и объяснений по retryWhen и repeatWhen .
источник
Этот пример работает с jxjava 2.2.2:
Повторите попытку без промедления:
Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retry(5) .doOnSuccess(status -> log.info("Yay! {}", status);
Повторить с задержкой:
Single.just(somePaylodData) .map(data -> someConnection.send(data)) .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS)) .doOnSuccess(status -> log.info("Yay! {}", status) .doOnError((Throwable error) -> log.error("I tried five times with a 300ms break" + " delay in between. But it was in vain."));
Наш исходный сингл не работает, если не удается someConnection.send (). Когда это происходит, наблюдаемые отказы внутри retryWhen выдают ошибку. Мы задерживаем это излучение на 300 мс и отправляем его обратно, чтобы сигнализировать о повторной попытке. take (5) гарантирует, что наша наблюдаемая сигнализация завершится после получения пяти ошибок. retryWhen видит завершение и не повторяет попытку после пятой ошибки.
источник
Это решение, основанное на фрагментах Бена Кристенсена, которые я видел, RetryWhen Example и RetryWhenTestsConditional (мне пришлось изменить
n.getThrowable()
на,n
чтобы он работал). Я использовал evant / gradle-retrolambda, чтобы лямбда-нотация работала на Android, но вам не обязательно использовать лямбды (хотя это настоятельно рекомендуется). Для задержки я реализовал экспоненциальный откат, но вы можете подключить туда любую логику отсрочки, которая вам нужна. Для полноты я добавилsubscribeOn
иobserveOn
оператор. Я использую ReactiveX / RxAndroid дляAndroidSchedulers.mainThread()
.int ATTEMPT_COUNT = 10; public class Tuple<X, Y> { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
источник
Observable
объектов?kjones
решение, и оно работает идеально для меня, спасибовместо MyRequestObservable.retry я использую функцию-оболочку retryObservable (MyRequestObservable, retrycount, seconds), которая возвращает новый Observable, обрабатывающий косвенное обращение к задержке, поэтому я могу сделать
retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1<BonusIndividualList>(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1<Throwable>(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static <T> Observable<T> retryObservable( final Observable<T> requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { requestObservable.subscribe(new Action1<T>() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1<Observable<T>>(){ @Override public void call(Observable<T> observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); }
источник
retryWhen
- сложный, возможно, даже глючный оператор. Официальный документ и хотя бы один ответ здесь используютrange
оператор, который завершится ошибкой, если не будет предпринято никаких повторных попыток. Смотрите мое обсуждение с участником ReactiveX Дэвидом Карноком.Я улучшил ответ kjones' путем изменения
flatMap
кconcatMap
и путем добавленияRetryDelayStrategy
класса.flatMap
при этом не сохраняет порядок эмиссииconcatMap
, что важно для задержек с откатом. КакRetryDelayStrategy
видно из названия, позволяет пользователю выбирать из различных режимов генерации задержек повтора, включая откат. Код доступен на моем GitHub вместе со следующими тестовыми примерами:См.
setRandomJokes
Метод.источник
Основываясь на ответе kjones, вот версия RxJava 2.x для Kotlin с задержкой в качестве расширения. Заменить,
Observable
чтобы создать такое же расширение дляFlowable
.fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> { var retryCount = 0 return retryWhen { thObservable -> thObservable.flatMap { throwable -> if (++retryCount < maxRetries) { Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS) } else { Observable.error(throwable) } } } }
Тогда просто используйте его на наблюдаемых
observable.retryWithDelay(3, 1000)
источник
Single
на?flatMap
тут придется использоватьFlowable.timer
иFlowable.error
хотя функция естьSingle<T>.retryWithDelay
.Теперь с RxJava версии 1.0+ вы можете использовать zipWith для повторной попытки с задержкой.
Добавление модификаций в ответ kjones .
Изменено
public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } }
источник
Тот же ответ, что и от kjones, но обновлен до последней версии. Для версии RxJava 2.x : ('io.reactivex.rxjava2: rxjava: 2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (i.e. re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } }
Применение:
// Добавляем логику повтора к существующему наблюдаемому. // Повторить максимум 3 раза с задержкой в 2 секунды.
observable .retryWhen(new RetryWithDelay(3, 2000));
источник
Вы можете добавить задержку в Observable, возвращаемом в retryWhen Operator
/** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); }
Вы можете увидеть больше примеров здесь. https://github.com/politrons/reactive
источник
Просто сделайте это так:
Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1<String, Observable<File>>() { @Override public Observable<File> call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1<File>() { @Override public void call(File file) { postAvatar(file); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } });
источник
Для версии Kotlin и RxJava1
class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long) : Function1<Observable<out Throwable>, Observable<*>> { private val START_RETRY: Int = 1 override fun invoke(observable: Observable<out Throwable>): Observable<*> { return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), object : Function2<Throwable, Int, Int> { override fun invoke(throwable: Throwable, attempt: Int): Int { return attempt } }) } }
источник
(Kotlin) Я немного улучшил код с экспоненциальным откатом и применил защиту от Observable.range ():
fun testOnRetryWithDelayExponentialBackoff() { val interval = 1 val maxCount = 3 val ai = AtomicInteger(1); val source = Observable.create<Unit> { emitter -> val attempt = ai.getAndIncrement() println("Subscribe ${attempt}") if (attempt >= maxCount) { emitter.onNext(Unit) emitter.onComplete() } emitter.onError(RuntimeException("Test $attempt")) } // Below implementation of "retryWhen" function, remove all "println()" for real code. val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx -> throwableRx.doOnNext({ println("Error: $it") }) .zipWith(Observable.range(1, maxCount) .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) }, BiFunction { t1: Throwable, t2: Int -> t1 to t2 } ) .flatMap { pair -> if (pair.second >= maxCount) { Observable.error(pair.first) } else { val delay = interval * 2F.pow(pair.second) println("retry delay: $delay") Observable.timer(delay.toLong(), TimeUnit.SECONDS) } } } //Code to print the result in terminal. sourceWithRetry .doOnComplete { println("Complete") } .doOnError({ println("Final Error: $it") }) .blockingForEach { println("$it") } }
источник
в случае, когда вам нужно распечатать счетчик повторов, вы можете использовать пример, представленный на вики-странице Rxjava https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
observable.retryWhen(errors -> // Count and increment the number of errors. errors.map(error -> 1).scan((i, j) -> i + j) .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount)) // Limit the maximum number of retries. .takeWhile(errorCount -> errorCount < retryCounts) // Signal resubscribe event after some delay. .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
источник