Библиотека Akka Streams уже поставляется с большим количеством документации . Однако главная проблема для меня заключается в том, что он дает слишком много материала - я чувствую себя совершенно ошеломленным количеством понятий, которые мне нужно выучить. Многие примеры, показанные там, кажутся очень тяжеловесными и их нелегко перевести на реальные случаи использования, и поэтому они довольно эзотеричны. Я думаю, что это дает слишком много деталей, не объясняя, как собрать все строительные блоки вместе и как именно это помогает решать конкретные проблемы.
Есть источники, приемники, потоки, этапы графа, частичные графы, материализация, граф DSL и многое другое, и я просто не знаю, с чего начать. Краткое руководство предназначено , чтобы быть стартовое место , но я не понимаю. Это просто добавляет концепции, упомянутые выше, не объясняя их. Кроме того, примеры кода не могут быть выполнены - есть недостающие части, которые делают более или менее невозможным для меня следовать тексту.
Может ли кто-нибудь объяснить источники понятий, приемники, потоки, этапы графов, частичные графы, материализацию и, возможно, некоторые другие вещи, которые я пропустил простыми словами и простыми примерами, которые не объясняют каждую деталь (и которые, вероятно, вообще не нужны в начало)?
источник
Ответы:
Этот ответ основан на
akka-stream
версии2.4.2
. API может немного отличаться в других версиях. Зависимость может быть использована sbt :Хорошо, давайте начнем. API Akka Streams состоит из трех основных типов. В отличие от Reactive Streams , эти типы намного более мощные и, следовательно, более сложные. Предполагается, что для всех примеров кода уже существуют следующие определения:
Операторы
import
необходимы для объявлений типов.system
представляет систему акторов Akka иmaterializer
представляет контекст оценки потока. В нашем случае мы используем aActorMaterializer
, что означает, что потоки оцениваются поверх актеров. Оба значения помечены какimplicit
, что дает компилятору Scala возможность вставлять эти две зависимости автоматически, когда они необходимы. Мы также импортируемsystem.dispatcher
, что является контекстом выполнения дляFutures
.Новый API
Akka Streams имеет следующие ключевые свойства:
Materializer
.Source
,Sink
иFlow
. Строительные блоки образуют график, оценка которого основана наMaterializer
и должна быть явно запущена.Далее будет дано более глубокое введение в то, как использовать три основных типа.
Источник
A
Source
- создатель данных, он служит источником входных данных для потока. У каждогоSource
есть один выходной канал и нет входного канала. Все данные проходят через выходной канал к тому, что подключено кSource
.Изображение взято с boldradius.com .
A
Source
может быть создан несколькими способами:В вышеупомянутых случаях мы снабжали
Source
конечные данные, что означает, что они в конечном итоге прекратятся. Не следует забывать, что реактивные потоки по умолчанию ленивы и асинхронны. Это означает, что нужно явно запросить оценку потока. В Akka Streams это можно сделать с помощьюrun*
методов. Эта функцияrunForeach
не будет отличаться от хорошо известнойforeach
функции - благодаряrun
добавлению она делает явным то, что мы запрашиваем оценку потока. Поскольку конечные данные скучны, мы продолжим с бесконечными:С помощью этого
take
метода мы можем создать искусственную точку остановки, которая мешает нам оценивать неопределенно долго. Поскольку поддержка акторов встроена, мы также можем легко передать поток сообщениями, которые отправляются актеру:Мы видим, что
Futures
они выполняются асинхронно в разных потоках, что объясняет результат. В приведенном выше примере буфер для входящих элементов не является необходимым, и, следовательно,OverflowStrategy.fail
мы можем настроить, что поток должен потерпеть неудачу при переполнении буфера. Особенно через этот интерфейс актера, мы можем передавать поток через любой источник данных. Не имеет значения, создаются ли данные одним потоком, другим потоком, другим процессом или они поступают из удаленной системы через Интернет.тонуть
А
Sink
в основном противоположностьSource
. Это конечная точка потока и, следовательно, потребляет данные. АSink
имеет один входной канал и не имеет выходного канала.Sinks
особенно необходимы, когда мы хотим указать поведение сборщика данных многократно и без оценки потока. Уже известныеrun*
методы не позволяют нам эти свойства, поэтому предпочтительнее использоватьSink
вместо них.Изображение взято с boldradius.com .
Краткий пример
Sink
действия:Подключение
Source
кSink
может быть сделано с помощьюto
метода. Он возвращает так называемыйRunnableFlow
, который, как мы увидим позже, представляет собой особую формуFlow
потока, который может быть выполнен простым вызовом егоrun()
метода.Изображение взято с boldradius.com .
Конечно, можно передать все значения, поступающие в приемник, актеру:
поток
Источники и приемники данных хороши, если вам нужно соединение между потоками Akka и существующей системой, но с ними ничего не поделаешь. Потоки являются последним отсутствующим элементом в базовой абстракции Akka Streams. Они действуют как соединитель между различными потоками и могут использоваться для преобразования его элементов.
Изображение взято с boldradius.com .
Если a
Flow
связан сSource
новым,Source
это результат. Аналогично,Flow
связанный с собойSink
создает новоеSink
. ИFlow
связанный как с, такSource
и сSink
результатом вRunnableFlow
. Поэтому они находятся между входным и выходным каналами, но сами по себе не соответствуют одному из вариантов, если они не подключены ни к a,Source
ни к aSink
.Изображение взято с boldradius.com .
Чтобы лучше понять
Flows
, мы рассмотрим несколько примеров:С помощью
via
метода мы можем соединитьSource
сFlow
. Нам нужно указать тип ввода, потому что компилятор не может определить его для нас. Как мы уже видим в этом простом примере, потокиinvert
иdouble
полностью независимы от любых производителей и потребителей данных. Они только преобразуют данные и направляют их в выходной канал. Это означает, что мы можем повторно использовать поток среди нескольких потоков:s1
иs2
представляют совершенно новые потоки - они не делятся никакими данными через свои строительные блоки.Неограниченные потоки данных
Прежде чем двигаться дальше, мы должны сначала вернуться к некоторым ключевым аспектам Reactive Streams. Неограниченное количество элементов может прибыть в любую точку и поместить поток в разные состояния. Помимо работающего потока, который является обычным состоянием, поток может быть остановлен либо из-за ошибки, либо из-за сигнала, который означает, что дальнейшие данные не поступят. Поток можно смоделировать графическим способом, пометив события на временной шкале, как здесь:
Изображение взято из введения в реактивное программирование, которое вы пропустили .
Мы уже видели работающие потоки в примерах предыдущего раздела. Мы получаем
RunnableGraph
всякий раз, когда поток может быть материализован, а это означает, что aSink
связан с aSource
. До сих пор мы всегда материализовались в значениеUnit
, которое можно увидеть в типах:Для получения
Source
иSink
второй параметр типа и дляFlow
третьего параметра типа обозначает материализованное значение. В этом ответе полное значение материализации не объясняется. Тем не менее, более подробную информацию о материализации можно найти в официальной документации . Пока единственное, что нам нужно знать, это то, что материализованная ценность - это то, что мы получаем, когда запускаем поток. Поскольку до сих пор нас интересовали только побочные эффекты, мы получилиUnit
материализованную ценность. Исключением из этого была материализация раковины, которая привела кFuture
. Это вернуло намFuture
, так как это значение может обозначать, когда поток, который связан с приемником, был закончен. До сих пор предыдущие примеры кода были хорошими для объяснения концепции, но они также были скучными, потому что мы имели дело только с конечными потоками или с очень простыми бесконечными. Чтобы сделать его более интересным, далее будет объяснен полный асинхронный и неограниченный поток.Пример ClickStream
В качестве примера мы хотим иметь поток, который фиксирует события щелчка. Чтобы сделать его более сложным, скажем, мы также хотим сгруппировать события кликов, которые происходят через короткое время друг за другом. Таким образом, мы можем легко обнаружить двойные, тройные или десятикратные клики. Кроме того, мы хотим отфильтровать все отдельные клики. Сделайте глубокий вдох и представьте, как вы решите эту проблему в обязательном порядке. Бьюсь об заклад, никто не сможет реализовать решение, которое работает правильно с первой попытки. Реагировать на эту проблему тривиально. На самом деле, решение настолько простое и понятное для реализации, что мы можем даже выразить его в диаграмме, которая напрямую описывает поведение кода:
Изображение взято из введения в реактивное программирование, которое вы пропустили .
Серые прямоугольники - это функции, которые описывают, как один поток превращается в другой. С
throttle
функцией мы накапливаем щелчки в течение 250 миллисекунд, тоmap
иfilter
функции должны быть понятны. Цветные сферы представляют событие, а стрелки показывают, как они проходят через наши функции. Позже на этапах обработки мы получаем все меньше и больше элементов, которые проходят через наш поток, поскольку мы группируем их и отфильтровываем. Код для этого изображения будет выглядеть примерно так:Вся логика может быть представлена только в четырех строках кода! В Scala мы могли бы написать это еще короче:
Определение
clickStream
является немного более сложным, но это только тот случай, потому что пример программы работает на JVM, где захват событий щелчка не легко возможен. Другая сложность заключается в том, что Akka по умолчанию не предоставляет этуthrottle
функцию. Вместо этого мы должны были написать это сами. Поскольку эту функцию (как и в случае с функциямиmap
илиfilter
) можно использовать повторно в разных случаях использования, я не считаю эти строки количеством строк, которое нам необходимо для реализации логики. В императивных языках, однако, это нормально, что логику нельзя использовать повторно так просто, и что различные логические шаги выполняются в одном месте, а не применяются последовательно, что означает, что мы, вероятно, могли бы неправильно сформировать наш код с логикой регулирования. Полный пример кода доступен в видесуть и не будет обсуждаться здесь дальше.Пример SimpleWebServer
Вместо этого следует обсудить еще один пример. Хотя поток кликов является хорошим примером, позволяющим Akka Streams обрабатывать пример из реального мира, ему не хватает возможности продемонстрировать параллельное выполнение в действии. Следующий пример должен представлять небольшой веб-сервер, который может обрабатывать несколько запросов параллельно. Веб-сервер должен иметь возможность принимать входящие соединения и получать от них последовательности байтов, которые представляют печатные знаки ASCII. Эти последовательности байтов или строки должны быть разбиты на все символы новой строки на более мелкие части. После этого сервер должен ответить клиенту каждой из разделенных линий. В качестве альтернативы, он может делать что-то еще со строками и выдавать специальный токен ответа, но мы хотим сделать его простым в этом примере и поэтому не вводить какие-либо необычные функции. Помните, сервер должен иметь возможность обрабатывать несколько запросов одновременно, что в основном означает, что ни один запрос не может заблокировать любой другой запрос от дальнейшего выполнения. Решение всех этих требований может быть непростым делом - однако с Akka Streams нам не нужно больше нескольких строк для решения любого из них. Для начала давайте разберемся с самим сервером:
В основном, есть только три основных строительных блока. Первый должен принимать входящие соединения. Второй должен обрабатывать входящие запросы, а третий должен отправить ответ. Реализация всех этих трех стандартных блоков немного сложнее, чем реализация потока кликов:
Функция
mkServer
принимает (помимо адреса и порта сервера) также систему акторов и материализатор в качестве неявных параметров. Поток управления сервером представленbinding
, который принимает источник входящих соединений и перенаправляет их в приемник входящих соединений. ВнутриconnectionHandler
нашего приемника мы обрабатываем каждое соединение потокомserverLogic
, который будет описан позже.binding
возвращаетFuture
, которая завершается, когда сервер был запущен или запуск не удался, что может быть в том случае, если порт уже занят другим процессом. Код, однако, не полностью отражает графику, поскольку мы не можем видеть строительный блок, который обрабатывает ответы. Причина этого в том, что соединение уже обеспечивает эту логику само по себе. Это двунаправленный поток, а не просто однонаправленный, как потоки, которые мы видели в предыдущих примерах. Как и в случае материализации, такие сложные потоки здесь не объясняются. Официальная документации есть много материала , чтобы покрыть более сложные графики потока. На данный момент достаточно знать, чтоTcp.IncomingConnection
представляет собой соединение, которое знает, как получать запросы и как отправлять ответы. Часть, которая все еще отсутствует, этоserverLogic
структурный элемент. Это может выглядеть так:Еще раз, мы можем разделить логику на несколько простых строительных блоков, которые все вместе формируют поток нашей программы. Сначала мы хотим разделить нашу последовательность байтов на строки, что мы должны делать всякий раз, когда находим символ новой строки. После этого байты каждой строки необходимо преобразовать в строку, поскольку работа с необработанными байтами является громоздкой. В целом мы можем получить двоичный поток сложного протокола, что сделает работу с исходными необработанными данными чрезвычайно сложной. Получив читаемую строку, мы можем создать ответ. По причинам простоты ответом может быть что угодно в нашем случае. В конце концов, мы должны преобразовать наш ответ в последовательность байтов, которую можно отправить по проводам. Код для всей логики может выглядеть так:
Мы уже знаем, что
serverLogic
это поток, который беретByteString
и должен произвестиByteString
. С помощьюdelimiter
мы можем разделить наByteString
более мелкие части - в нашем случае это должно происходить всякий раз, когда встречается символ новой строки.receiver
это поток, который берет все последовательности разделенных байтов и преобразует их в строку. Это, конечно, опасное преобразование, поскольку в строку должны быть преобразованы только печатные символы ASCII, но для наших нужд это достаточно хорошо.responder
является последним компонентом и отвечает за создание ответа и преобразование ответа обратно в последовательность байтов. В отличие от графики мы не разделили этот последний компонент на две части, поскольку логика тривиальна. В конце мы соединяем все потоки черезvia
функция. В этот момент можно спросить, позаботились ли мы о многопользовательском свойстве, о котором говорилось в начале. И действительно, мы сделали, хотя это может быть не очевидно сразу. Глядя на этот рисунок, он должен стать более ясным:serverLogic
Компонент не что иное, как поток , который содержит меньшие потоки. Этот компонент принимает ввод, который является запросом, и производит вывод, который является ответом. Поскольку потоки могут создаваться несколько раз, и все они работают независимо друг от друга, мы достигаем за счет этого вложения нашего многопользовательского свойства. Каждый запрос обрабатывается в своем собственном запросе, и поэтому короткий запущенный запрос может перекрыть ранее запущенный длительный запрос. В случае, если вам интересно, определение того,serverLogic
что было показано ранее, конечно, можно записать намного короче, указав большинство его внутренних определений:Тест веб-сервера может выглядеть так:
Для того, чтобы приведенный выше пример кода функционировал правильно, нам сначала нужно запустить сервер, который изображен
startServer
скриптом:Полный пример кода этого простого TCP-сервера можно найти здесь . Мы можем написать не только сервер с Akka Streams, но и клиента. Это может выглядеть так:
Полный код TCP-клиента можно найти здесь . Код выглядит очень похоже, но в отличие от сервера нам больше не нужно управлять входящими соединениями.
Сложные графики
В предыдущих разделах мы видели, как мы можем создавать простые программы из потоков. Однако в действительности часто недостаточно просто полагаться на уже встроенные функции для создания более сложных потоков. Если мы хотим использовать Akka Streams для произвольных программ, нам нужно знать, как создавать собственные структуры управления и комбинируемые потоки, которые позволяют нам решать сложные задачи наших приложений. Хорошая новость заключается в том, что Akka Streams был разработан с учетом потребностей пользователей, и для того, чтобы дать вам краткое введение в более сложные части Akka Streams, мы добавили некоторые дополнительные функции в наш пример клиент / сервер.
Одна вещь, которую мы пока не можем сделать, это закрыть соединение. В этот момент все становится немного сложнее, потому что API потоков, который мы видели до сих пор, не позволяет нам останавливать поток в произвольной точке. Однако существует
GraphStage
абстракция, которую можно использовать для создания произвольных этапов обработки графа с любым количеством входных или выходных портов. Давайте сначала посмотрим на серверную часть, где мы представляем новый компонент под названиемcloseConnection
:Этот API выглядит намного более громоздким, чем API потока. Неудивительно, что мы должны сделать здесь много важных шагов. В обмен мы имеем больше контроля над поведением наших потоков. В приведенном выше примере мы указываем только один входной и один выходной порт и делаем их доступными для системы путем переопределения
shape
значения. Кроме того, мы определили так называемыеInHandler
и аOutHandler
, которые в этом порядке отвечают за получение и излучение элементов. Если вы внимательно посмотрели пример полного потока кликов, вы должны уже распознать эти компоненты. ВInHandler
мы берем элемент и если это строка с одним символом'q'
, мы хотим закрыть поток. Чтобы дать возможность клиенту узнать, что поток скоро закроется, мы отправляем строку"BYE"
и затем мы сразу же закрываем сцену.closeConnection
Компонент может быть объединен с потоком черезvia
метод, который был представлен в разделе о потоках.Помимо возможности закрывать соединения, было бы также хорошо, если бы мы могли показать приветственное сообщение для вновь созданного соединения. Чтобы сделать это, нам снова нужно пойти немного дальше:
Функция
serverLogic
теперь принимает входящее соединение в качестве параметра. Внутри его тела мы используем DSL, который позволяет нам описывать сложное поведение потока. С помощьюwelcome
мы создаем поток, который может излучать только один элемент - приветственное сообщение.logic
это то, что было описано, какserverLogic
в предыдущем разделе. Единственное заметное отличие состоит в том, что мы добавилиcloseConnection
к нему. Теперь на самом деле приходит интересная часть DSL.GraphDSL.create
Функция делает строительb
доступный, который используется , чтобы выразить поток в виде графика. С помощью~>
функции можно соединять входные и выходные порты друг с другом.Concat
Компонент , который используется в примере , может сцеплять элементы и здесь используется предварять сообщение приветствия перед другими элементами , которые выходят изinternalLogic
, В последней строке мы делаем доступными только входной порт логики сервера и выходной порт объединенного потока, поскольку все остальные порты должны оставаться деталями реализацииserverLogic
компонента. Для более подробного ознакомления с графиком DSL Akka Streams посетите соответствующий раздел официальной документации . Полный пример кода сложного TCP-сервера и клиента, который может связаться с ним, можно найти здесь . Всякий раз, когда вы открываете новое соединение от клиента, вы должны увидеть приветственное сообщение, и, набрав"q"
на клиенте, вы должны увидеть сообщение о том, что соединение было отменено.Есть еще некоторые темы, которые не были охвачены этим ответом. Особенно материализация может напугать одного читателя или другого, но я уверен, что с материалом, который освещен здесь, каждый должен быть в состоянии сделать следующие шаги самостоятельно. Как уже говорилось, официальная документация является хорошим местом для продолжения изучения Akka Streams.
источник