Как начать работу с Akka Streams? [закрыто]

222

Библиотека Akka Streams уже поставляется с большим количеством документации . Однако главная проблема для меня заключается в том, что он дает слишком много материала - я чувствую себя совершенно ошеломленным количеством понятий, которые мне нужно выучить. Многие примеры, показанные там, кажутся очень тяжеловесными и их нелегко перевести на реальные случаи использования, и поэтому они довольно эзотеричны. Я думаю, что это дает слишком много деталей, не объясняя, как собрать все строительные блоки вместе и как именно это помогает решать конкретные проблемы.

Есть источники, приемники, потоки, этапы графа, частичные графы, материализация, граф DSL и многое другое, и я просто не знаю, с чего начать. Краткое руководство предназначено , чтобы быть стартовое место , но я не понимаю. Это просто добавляет концепции, упомянутые выше, не объясняя их. Кроме того, примеры кода не могут быть выполнены - есть недостающие части, которые делают более или менее невозможным для меня следовать тексту.

Может ли кто-нибудь объяснить источники понятий, приемники, потоки, этапы графов, частичные графы, материализацию и, возможно, некоторые другие вещи, которые я пропустил простыми словами и простыми примерами, которые не объясняют каждую деталь (и которые, вероятно, вообще не нужны в начало)?

kiritsuku
источник
2
Для получения информации, это обсуждается на мета
DavidG
10
Как первый человек, который проголосует за это (следуя мета-ветке), позвольте мне сначала сказать, что ваш ответ здесь великолепен . Это действительно всесторонний и, безусловно, очень полезный ресурс. Однако, к сожалению, вопрос, который вы задали, слишком широк для переполнения стека. Если каким-то образом ваш ответ может быть опубликован на вопрос, сформулированный по-другому, то это здорово, но я не думаю, что это может быть. Я настоятельно рекомендую повторно представить это как пост в блоге или что-то подобное, что вы и другие можете использовать в качестве справочного ресурса в будущих ответах.
Джеймс Доннелли
2
Я думаю, что написание этого вопроса в блоге не будет эффективным. Да, это широкий вопрос - и это действительно хороший вопрос. Сужение его объема не улучшит его. Ответ предоставлен потрясающий. Я уверен, что Quora будет рада забрать бизнес у SO для больших вопросов.
Майк Слинн
11
@MikeSlinn не пытайтесь обсуждать с ТАКИМИ людьми подходящие вопросы, они слепо следуют правилам. Пока вопрос не снимается, я счастлив и не чувствую необходимости переходить на другую платформу.
Кирицуку
2
@sschaef Как педантично. Да, конечно, правила ничего не стоят, ваше великое я знает намного лучше, и все, кто пытается применить правила, просто слепо следуют обману. / декламация. более серьезно, это было бы отличным дополнением к документации бета, если вы в ней. Вы все еще можете подать заявку и поставить ее там, но вы должны по крайней мере увидеть, что она не очень подходит для основного сайта.
Феликс Ганьон-Гренье

Ответы:

506

Этот ответ основан на akka-streamверсии 2.4.2. API может немного отличаться в других версиях. Зависимость может быть использована sbt :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

Хорошо, давайте начнем. API Akka Streams состоит из трех основных типов. В отличие от Reactive Streams , эти типы намного более мощные и, следовательно, более сложные. Предполагается, что для всех примеров кода уже существуют следующие определения:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

Операторы importнеобходимы для объявлений типов. systemпредставляет систему акторов Akka и materializerпредставляет контекст оценки потока. В нашем случае мы используем a ActorMaterializer, что означает, что потоки оцениваются поверх актеров. Оба значения помечены как implicit, что дает компилятору Scala возможность вставлять эти две зависимости автоматически, когда они необходимы. Мы также импортируем system.dispatcher, что является контекстом выполнения для Futures.

Новый API

Akka Streams имеет следующие ключевые свойства:

  • Они реализуют спецификацию Reactive Streams , чьи три основные цели: обратное давление, асинхронные и неблокирующие границы и функциональная совместимость между различными реализациями, полностью применимы и к Akka Streams.
  • Они предоставляют абстракцию для механизма оценки для потоков, который называется Materializer.
  • Программы сформулированы как многократно используемые строительные блоки, которые представлены как три основных типа Source, Sinkи Flow. Строительные блоки образуют график, оценка которого основана на Materializerи должна быть явно запущена.

Далее будет дано более глубокое введение в то, как использовать три основных типа.

Источник

A Source- создатель данных, он служит источником входных данных для потока. У каждого Sourceесть один выходной канал и нет входного канала. Все данные проходят через выходной канал к тому, что подключено к Source.

Источник

Изображение взято с boldradius.com .

A Sourceможет быть создан несколькими способами:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

В вышеупомянутых случаях мы снабжали Sourceконечные данные, что означает, что они в конечном итоге прекратятся. Не следует забывать, что реактивные потоки по умолчанию ленивы и асинхронны. Это означает, что нужно явно запросить оценку потока. В Akka Streams это можно сделать с помощью run*методов. Эта функция runForeachне будет отличаться от хорошо известной foreachфункции - благодаря runдобавлению она делает явным то, что мы запрашиваем оценку потока. Поскольку конечные данные скучны, мы продолжим с бесконечными:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

С помощью этого takeметода мы можем создать искусственную точку остановки, которая мешает нам оценивать неопределенно долго. Поскольку поддержка акторов встроена, мы также можем легко передать поток сообщениями, которые отправляются актеру:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Мы видим, что Futuresони выполняются асинхронно в разных потоках, что объясняет результат. В приведенном выше примере буфер для входящих элементов не является необходимым, и, следовательно, OverflowStrategy.failмы можем настроить, что поток должен потерпеть неудачу при переполнении буфера. Особенно через этот интерфейс актера, мы можем передавать поток через любой источник данных. Не имеет значения, создаются ли данные одним потоком, другим потоком, другим процессом или они поступают из удаленной системы через Интернет.

тонуть

А Sinkв основном противоположность Source. Это конечная точка потока и, следовательно, потребляет данные. А Sinkимеет один входной канал и не имеет выходного канала. Sinksособенно необходимы, когда мы хотим указать поведение сборщика данных многократно и без оценки потока. Уже известные run*методы не позволяют нам эти свойства, поэтому предпочтительнее использовать Sinkвместо них.

тонуть

Изображение взято с boldradius.com .

Краткий пример Sinkдействия:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

Подключение Sourceк Sinkможет быть сделано с помощью toметода. Он возвращает так называемый RunnableFlow, который, как мы увидим позже, представляет собой особую форму Flowпотока, который может быть выполнен простым вызовом его run()метода.

Runnable Flow

Изображение взято с boldradius.com .

Конечно, можно передать все значения, поступающие в приемник, актеру:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

поток

Источники и приемники данных хороши, если вам нужно соединение между потоками Akka и существующей системой, но с ними ничего не поделаешь. Потоки являются последним отсутствующим элементом в базовой абстракции Akka Streams. Они действуют как соединитель между различными потоками и могут использоваться для преобразования его элементов.

поток

Изображение взято с boldradius.com .

Если a Flowсвязан с Sourceновым, Sourceэто результат. Аналогично, Flowсвязанный с собой Sinkсоздает новое Sink. И Flowсвязанный как с, так Sourceи с Sinkрезультатом в RunnableFlow. Поэтому они находятся между входным и выходным каналами, но сами по себе не соответствуют одному из вариантов, если они не подключены ни к a, Sourceни к a Sink.

Полный поток

Изображение взято с boldradius.com .

Чтобы лучше понять Flows, мы рассмотрим несколько примеров:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

С помощью viaметода мы можем соединить Sourceс Flow. Нам нужно указать тип ввода, потому что компилятор не может определить его для нас. Как мы уже видим в этом простом примере, потоки invertи doubleполностью независимы от любых производителей и потребителей данных. Они только преобразуют данные и направляют их в выходной канал. Это означает, что мы можем повторно использовать поток среди нескольких потоков:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1и s2представляют совершенно новые потоки - они не делятся никакими данными через свои строительные блоки.

Неограниченные потоки данных

Прежде чем двигаться дальше, мы должны сначала вернуться к некоторым ключевым аспектам Reactive Streams. Неограниченное количество элементов может прибыть в любую точку и поместить поток в разные состояния. Помимо работающего потока, который является обычным состоянием, поток может быть остановлен либо из-за ошибки, либо из-за сигнала, который означает, что дальнейшие данные не поступят. Поток можно смоделировать графическим способом, пометив события на временной шкале, как здесь:

Показывает, что поток представляет собой последовательность текущих событий, упорядоченных по времени

Изображение взято из введения в реактивное программирование, которое вы пропустили .

Мы уже видели работающие потоки в примерах предыдущего раздела. Мы получаем RunnableGraphвсякий раз, когда поток может быть материализован, а это означает, что a Sinkсвязан с a Source. До сих пор мы всегда материализовались в значение Unit, которое можно увидеть в типах:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

Для получения Sourceи Sinkвторой параметр типа и для Flowтретьего параметра типа обозначает материализованное значение. В этом ответе полное значение материализации не объясняется. Тем не менее, более подробную информацию о материализации можно найти в официальной документации . Пока единственное, что нам нужно знать, это то, что материализованная ценность - это то, что мы получаем, когда запускаем поток. Поскольку до сих пор нас интересовали только побочные эффекты, мы получили Unitматериализованную ценность. Исключением из этого была материализация раковины, которая привела к Future. Это вернуло намFuture, так как это значение может обозначать, когда поток, который связан с приемником, был закончен. До сих пор предыдущие примеры кода были хорошими для объяснения концепции, но они также были скучными, потому что мы имели дело только с конечными потоками или с очень простыми бесконечными. Чтобы сделать его более интересным, далее будет объяснен полный асинхронный и неограниченный поток.

Пример ClickStream

В качестве примера мы хотим иметь поток, который фиксирует события щелчка. Чтобы сделать его более сложным, скажем, мы также хотим сгруппировать события кликов, которые происходят через короткое время друг за другом. Таким образом, мы можем легко обнаружить двойные, тройные или десятикратные клики. Кроме того, мы хотим отфильтровать все отдельные клики. Сделайте глубокий вдох и представьте, как вы решите эту проблему в обязательном порядке. Бьюсь об заклад, никто не сможет реализовать решение, которое работает правильно с первой попытки. Реагировать на эту проблему тривиально. На самом деле, решение настолько простое и понятное для реализации, что мы можем даже выразить его в диаграмме, которая напрямую описывает поведение кода:

Логика примера потока кликов

Изображение взято из введения в реактивное программирование, которое вы пропустили .

Серые прямоугольники - это функции, которые описывают, как один поток превращается в другой. С throttleфункцией мы накапливаем щелчки в течение 250 миллисекунд, то mapи filterфункции должны быть понятны. Цветные сферы представляют событие, а стрелки показывают, как они проходят через наши функции. Позже на этапах обработки мы получаем все меньше и больше элементов, которые проходят через наш поток, поскольку мы группируем их и отфильтровываем. Код для этого изображения будет выглядеть примерно так:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Вся логика может быть представлена ​​только в четырех строках кода! В Scala мы могли бы написать это еще короче:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

Определение clickStreamявляется немного более сложным, но это только тот случай, потому что пример программы работает на JVM, где захват событий щелчка не легко возможен. Другая сложность заключается в том, что Akka по умолчанию не предоставляет эту throttleфункцию. Вместо этого мы должны были написать это сами. Поскольку эту функцию (как и в случае с функциями mapили filter) можно использовать повторно в разных случаях использования, я не считаю эти строки количеством строк, которое нам необходимо для реализации логики. В императивных языках, однако, это нормально, что логику нельзя использовать повторно так просто, и что различные логические шаги выполняются в одном месте, а не применяются последовательно, что означает, что мы, вероятно, могли бы неправильно сформировать наш код с логикой регулирования. Полный пример кода доступен в видесуть и не будет обсуждаться здесь дальше.

Пример SimpleWebServer

Вместо этого следует обсудить еще один пример. Хотя поток кликов является хорошим примером, позволяющим Akka Streams обрабатывать пример из реального мира, ему не хватает возможности продемонстрировать параллельное выполнение в действии. Следующий пример должен представлять небольшой веб-сервер, который может обрабатывать несколько запросов параллельно. Веб-сервер должен иметь возможность принимать входящие соединения и получать от них последовательности байтов, которые представляют печатные знаки ASCII. Эти последовательности байтов или строки должны быть разбиты на все символы новой строки на более мелкие части. После этого сервер должен ответить клиенту каждой из разделенных линий. В качестве альтернативы, он может делать что-то еще со строками и выдавать специальный токен ответа, но мы хотим сделать его простым в этом примере и поэтому не вводить какие-либо необычные функции. Помните, сервер должен иметь возможность обрабатывать несколько запросов одновременно, что в основном означает, что ни один запрос не может заблокировать любой другой запрос от дальнейшего выполнения. Решение всех этих требований может быть непростым делом - однако с Akka Streams нам не нужно больше нескольких строк для решения любого из них. Для начала давайте разберемся с самим сервером:

сервер

В основном, есть только три основных строительных блока. Первый должен принимать входящие соединения. Второй должен обрабатывать входящие запросы, а третий должен отправить ответ. Реализация всех этих трех стандартных блоков немного сложнее, чем реализация потока кликов:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

Функция mkServerпринимает (помимо адреса и порта сервера) также систему акторов и материализатор в качестве неявных параметров. Поток управления сервером представлен binding, который принимает источник входящих соединений и перенаправляет их в приемник входящих соединений. Внутри connectionHandlerнашего приемника мы обрабатываем каждое соединение потоком serverLogic, который будет описан позже. bindingвозвращаетFuture, которая завершается, когда сервер был запущен или запуск не удался, что может быть в том случае, если порт уже занят другим процессом. Код, однако, не полностью отражает графику, поскольку мы не можем видеть строительный блок, который обрабатывает ответы. Причина этого в том, что соединение уже обеспечивает эту логику само по себе. Это двунаправленный поток, а не просто однонаправленный, как потоки, которые мы видели в предыдущих примерах. Как и в случае материализации, такие сложные потоки здесь не объясняются. Официальная документации есть много материала , чтобы покрыть более сложные графики потока. На данный момент достаточно знать, что Tcp.IncomingConnectionпредставляет собой соединение, которое знает, как получать запросы и как отправлять ответы. Часть, которая все еще отсутствует, этоserverLogicструктурный элемент. Это может выглядеть так:

логика сервера

Еще раз, мы можем разделить логику на несколько простых строительных блоков, которые все вместе формируют поток нашей программы. Сначала мы хотим разделить нашу последовательность байтов на строки, что мы должны делать всякий раз, когда находим символ новой строки. После этого байты каждой строки необходимо преобразовать в строку, поскольку работа с необработанными байтами является громоздкой. В целом мы можем получить двоичный поток сложного протокола, что сделает работу с исходными необработанными данными чрезвычайно сложной. Получив читаемую строку, мы можем создать ответ. По причинам простоты ответом может быть что угодно в нашем случае. В конце концов, мы должны преобразовать наш ответ в последовательность байтов, которую можно отправить по проводам. Код для всей логики может выглядеть так:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Мы уже знаем, что serverLogicэто поток, который берет ByteStringи должен произвести ByteString. С помощью delimiterмы можем разделить на ByteStringболее мелкие части - в нашем случае это должно происходить всякий раз, когда встречается символ новой строки. receiverэто поток, который берет все последовательности разделенных байтов и преобразует их в строку. Это, конечно, опасное преобразование, поскольку в строку должны быть преобразованы только печатные символы ASCII, но для наших нужд это достаточно хорошо. responderявляется последним компонентом и отвечает за создание ответа и преобразование ответа обратно в последовательность байтов. В отличие от графики мы не разделили этот последний компонент на две части, поскольку логика тривиальна. В конце мы соединяем все потоки черезviaфункция. В этот момент можно спросить, позаботились ли мы о многопользовательском свойстве, о котором говорилось в начале. И действительно, мы сделали, хотя это может быть не очевидно сразу. Глядя на этот рисунок, он должен стать более ясным:

сервер и логика сервера объединены

serverLogicКомпонент не что иное, как поток , который содержит меньшие потоки. Этот компонент принимает ввод, который является запросом, и производит вывод, который является ответом. Поскольку потоки могут создаваться несколько раз, и все они работают независимо друг от друга, мы достигаем за счет этого вложения нашего многопользовательского свойства. Каждый запрос обрабатывается в своем собственном запросе, и поэтому короткий запущенный запрос может перекрыть ранее запущенный длительный запрос. В случае, если вам интересно, определение того, serverLogicчто было показано ранее, конечно, можно записать намного короче, указав большинство его внутренних определений:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Тест веб-сервера может выглядеть так:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Для того, чтобы приведенный выше пример кода функционировал правильно, нам сначала нужно запустить сервер, который изображен startServerскриптом:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

Полный пример кода этого простого TCP-сервера можно найти здесь . Мы можем написать не только сервер с Akka Streams, но и клиента. Это может выглядеть так:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

Полный код TCP-клиента можно найти здесь . Код выглядит очень похоже, но в отличие от сервера нам больше не нужно управлять входящими соединениями.

Сложные графики

В предыдущих разделах мы видели, как мы можем создавать простые программы из потоков. Однако в действительности часто недостаточно просто полагаться на уже встроенные функции для создания более сложных потоков. Если мы хотим использовать Akka Streams для произвольных программ, нам нужно знать, как создавать собственные структуры управления и комбинируемые потоки, которые позволяют нам решать сложные задачи наших приложений. Хорошая новость заключается в том, что Akka Streams был разработан с учетом потребностей пользователей, и для того, чтобы дать вам краткое введение в более сложные части Akka Streams, мы добавили некоторые дополнительные функции в наш пример клиент / сервер.

Одна вещь, которую мы пока не можем сделать, это закрыть соединение. В этот момент все становится немного сложнее, потому что API потоков, который мы видели до сих пор, не позволяет нам останавливать поток в произвольной точке. Однако существует GraphStageабстракция, которую можно использовать для создания произвольных этапов обработки графа с любым количеством входных или выходных портов. Давайте сначала посмотрим на серверную часть, где мы представляем новый компонент под названием closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q"
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Этот API выглядит намного более громоздким, чем API потока. Неудивительно, что мы должны сделать здесь много важных шагов. В обмен мы имеем больше контроля над поведением наших потоков. В приведенном выше примере мы указываем только один входной и один выходной порт и делаем их доступными для системы путем переопределения shapeзначения. Кроме того, мы определили так называемые InHandlerи а OutHandler, которые в этом порядке отвечают за получение и излучение элементов. Если вы внимательно посмотрели пример полного потока кликов, вы должны уже распознать эти компоненты. В InHandlerмы берем элемент и если это строка с одним символом 'q', мы хотим закрыть поток. Чтобы дать возможность клиенту узнать, что поток скоро закроется, мы отправляем строку"BYE"и затем мы сразу же закрываем сцену. closeConnectionКомпонент может быть объединен с потоком через viaметод, который был представлен в разделе о потоках.

Помимо возможности закрывать соединения, было бы также хорошо, если бы мы могли показать приветственное сообщение для вновь созданного соединения. Чтобы сделать это, нам снова нужно пойти немного дальше:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

Функция serverLogic теперь принимает входящее соединение в качестве параметра. Внутри его тела мы используем DSL, который позволяет нам описывать сложное поведение потока. С помощью welcomeмы создаем поток, который может излучать только один элемент - приветственное сообщение. logicэто то, что было описано, как serverLogicв предыдущем разделе. Единственное заметное отличие состоит в том, что мы добавили closeConnectionк нему. Теперь на самом деле приходит интересная часть DSL. GraphDSL.createФункция делает строитель bдоступный, который используется , чтобы выразить поток в виде графика. С помощью ~>функции можно соединять входные и выходные порты друг с другом. ConcatКомпонент , который используется в примере , может сцеплять элементы и здесь используется предварять сообщение приветствия перед другими элементами , которые выходят изinternalLogic, В последней строке мы делаем доступными только входной порт логики сервера и выходной порт объединенного потока, поскольку все остальные порты должны оставаться деталями реализации serverLogicкомпонента. Для более подробного ознакомления с графиком DSL Akka Streams посетите соответствующий раздел официальной документации . Полный пример кода сложного TCP-сервера и клиента, который может связаться с ним, можно найти здесь . Всякий раз, когда вы открываете новое соединение от клиента, вы должны увидеть приветственное сообщение, и, набрав "q"на клиенте, вы должны увидеть сообщение о том, что соединение было отменено.

Есть еще некоторые темы, которые не были охвачены этим ответом. Особенно материализация может напугать одного читателя или другого, но я уверен, что с материалом, который освещен здесь, каждый должен быть в состоянии сделать следующие шаги самостоятельно. Как уже говорилось, официальная документация является хорошим местом для продолжения изучения Akka Streams.

kiritsuku
источник
4
@monksy Я не планировал публиковать это где-либо еще. Не стесняйтесь переиздавать это в своем блоге, если хотите. API в настоящее время стабилен в большинстве случаев, а это значит, что вам, вероятно, даже не нужно заботиться об обслуживании (большинство статей в блогах об Akka Streams там устарели, поскольку показывают API, которого больше не существует).
kiritsuku
3
Это не исчезнет. Зачем это?
kiritsuku
2
@sschaef Это может исчезнуть, поскольку вопрос не по теме и закрыт как таковой.
DavidG
7
@Magisch Всегда помните: «Мы не удаляем хороший контент». Я не совсем уверен, но я предполагаю, что этот ответ мог бы действительно соответствовать, несмотря ни на что.
Дедупликатор
9
Этот пост может быть полезен для новой функции документирования в Stack Overflow, которая откроется для Scala.
SL Barth - Восстановить Монику