Предположим , у меня есть несколько фьючерсных и нужно ждать , пока либо ни один из них не откажет или все из них успеха.
Например: Пусть есть 3 фьючерсов: f1
, f2
, f3
.
В случае
f1
успеха иf2
неудачи я не ждуf3
(и возвращаю ошибку клиенту).Если
f2
не удается покаf1
иf3
все еще работает , я не ждать их (и возвращения неудачи )Если
f1
получится, а потомf2
получится, я продолжаю ждатьf3
.
Как бы вы это реализовали?
scala
concurrency
future
Майкл
источник
источник
Ответы:
Вместо этого вы можете использовать для понимания следующим образом:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
В этом примере фьючерсы 1, 2 и 3 запускаются параллельно. Затем, для понимания, мы ждем, пока не станут доступны результаты 1, затем 2 и 3. Если 1 или 2 выйдет из строя, мы больше не будем ждать 3. Если все 3 успешны, то
aggFut
val будет содержать кортеж с 3 слотами, соответствующий результатам 3 фьючерсов.Теперь, если вам нужно поведение, при котором вы хотите перестать ждать, если, скажем, сначала выйдет из строя fut2, все становится немного сложнее. В приведенном выше примере вам придется подождать завершения работы fut1, прежде чем вы поймете, что fut2 не удалось. Чтобы решить эту проблему, вы можете попробовать что-то вроде этого:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Теперь это работает правильно, но проблема возникает из-за того, что
Future
нужно знать, что удалить из того,Map
когда один был успешно завершен. Если у вас есть способ правильно соотнести результат с Будущим, породившим этот результат, что-то вроде этого работает. Он просто рекурсивно удаляет завершенные фьючерсы с карты, а затем вызываетFuture.firstCompletedOf
оставшиесяFutures
до тех пор, пока не останется ни одного, собирая результаты по пути. Это некрасиво, но если вам действительно нужно поведение, о котором вы говорите, то это или что-то подобное может сработать.источник
fut2
раньше не удалосьfut1
? Будем ли мы еще ждатьfut1
в таком случае? Если мы захотим, это не совсем то, что я хочу.onFailure
обработчик дляfut2
быстрого сбоя иonSuccess
onaggFut
для успешного выполнения. Успешное выполнениеaggFut
подразумеваетfut2
завершилось успешно, поэтому у вас вызван только один из обработчиков.Вы можете использовать обещание и отправить ему либо первый сбой, либо последний завершенный совокупный успех:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Затем вы можете получить
Await
результат,Future
если хотите заблокировать, или простоmap
сделать что-то еще.Разница с для понимания состоит в том, что здесь вы получаете ошибку первого сбоя, тогда как с для понимания вы получаете первую ошибку в порядке обхода входной коллекции (даже если сначала не удалось выполнить другую). Например:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
А также:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
источник
Вот решение без использования актеров.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
источник
Вы можете сделать это только с фьючерсами. Вот одна реализация. Обратите внимание, что он не прерывает выполнение раньше срока! В этом случае вам нужно сделать что-то более сложное (и, возможно, реализовать прерывание самостоятельно). Но если вы просто не хотите продолжать ждать чего-то, что не сработает, ключ в том, чтобы продолжать ждать, пока закончится первое, и останавливаться, когда либо ничего не осталось, либо вы столкнетесь с исключением:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Вот пример того, как это работает, когда все работает нормально:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Но когда что-то идет не так:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
источник
Для этой цели я бы использовал актера Akka. В отличие от for-computing, он терпит неудачу, как только одно из фьючерсов терпит неудачу, поэтому в этом смысле он более эффективен.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Затем создайте актера, отправьте ему сообщение (чтобы он знал, куда отправить свой ответ) и дождитесь ответа.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
источник
На этот вопрос был дан ответ, но я публикую свое решение класса значений (классы значений были добавлены в 2.10), поскольку его здесь нет. Не стесняйтесь критиковать.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture - это оболочка Future без накладных расходов, которая меняет стандартную карту Future / flatMap с do-this-then-that на комбинацию-все-и-сбой-если-какой-нибудь-сбой. Применение:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
В приведенном выше примере f1, f2 и f3 будут выполняться одновременно, и в случае сбоя в любом порядке будущее кортежа немедленно завершится ошибкой.
источник
Возможно, вы захотите проверить Twitter Future API. В частности, метод Future.collect. Он делает именно то, что вы хотите: https://twitter.github.io/scala_school/finagle.html
Исходный код Future.scala доступен здесь: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
источник
Вы можете использовать это:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
источник