Мне нужно вызвать вышестоящую службу (службу BLOB-объектов Azure), чтобы отправить данные в OutputStream, который затем мне нужно развернуть и отправить обратно клиенту через akka. Без akka (и просто кода сервлета) я просто получил бы ServletOutputStream и передал его методу службы Azure.
Самое близкое, на что я могу попытаться наткнуться, и, очевидно, это неправильно, это что-то вроде этого
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
Идея в том, что я вызываю вышестоящий сервис для получения выходного потока, вызывая blobClient.download (os);
Кажется, что лямбда-функция вызывается и возвращается, но затем она перестает работать, потому что нет данных или чего-то еще. Как будто я не должен иметь эту лямбда-функцию, но возможно вернуть какой-то объект, который работает? Точно сказать не могу.
Как это сделать?
источник
download
? Потоковые данные передаютсяos
и возвращаются только после записи данных?Ответы:
Настоящая проблема здесь заключается в том, что API Azure не предназначен для противодействия давлению. Выходной поток не может сообщить в Azure о том, что он не готов для получения дополнительных данных. Другими словами: если Azure отправляет данные быстрее, чем вы можете их использовать, где-то должен произойти какой-то ужасный сбой переполнения буфера.
Принимая этот факт, мы можем сделать следующее:
Source.lazySource
чтобы начать загрузку данных только тогда, когда есть потребность в нисходящем потоке (он же источник запускается и данные запрашиваются).download
вызов в другой поток, чтобы он продолжал выполняться, не блокируя возвращение источника. Один из способов сделать это с помощьюFuture
(я не уверен, каковы лучшие практики Java, но в любом случае должен работать нормально). Хотя изначально это не имеет значения, вам может потребоваться выбрать контекст выполнения, отличный отsystem.dispatcher
- все зависит от тогоdownload
, блокирует он или нет.Я заранее прошу прощения, если этот код Java искажен - я использую Akka со Scala, так что это все из рассмотрения справочника по синтаксису Java API и Akka.
источник
В
OutputStream
этом случае это «материализованное значение»Source
и оно будет создано только после запуска потока (или «материализации» в текущий поток). Запуск его вне вашего контроля, так как вы передаете вSource
Akka HTTP, и это позже фактически запустит ваш источник..mapMaterializedValue(matval -> ...)
обычно используется для преобразования материализованного значения, но поскольку оно вызывается как часть материализации, вы можете использовать его для выполнения побочных эффектов, таких как отправка матвала в сообщении, как вы уже поняли, в этом нет ничего плохого что даже если это выглядит прикольно. Важно понимать, что поток не завершит свою материализацию и начнет работать, пока эта лямбда не завершится. Это означает проблемы, еслиdownload()
блокировать, а не завершать какую-то работу в другом потоке и немедленно возвращать.Однако существует другое решение:
Source.preMaterialize()
оно материализует источник и дает вамPair
материализованное значение и новое,Source
которое можно использовать для потребления уже запущенного источника:Обратите внимание, что в вашем коде есть еще несколько вещей, о которых следует подумать, особенно если
blobClient.download(os)
вызов блокируется до тех пор, пока он не будет завершен, и вы вызываете его от актера, в этом случае вы должны убедиться, что ваш актер не остановит диспетчер и не остановит его. другие действующие лица в вашем приложении от выполнения (см. документы Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).источник