Допустим, я запрашиваю большой файл JSON, который содержит список многих объектов. Я не хочу, чтобы они все время оставались в памяти, но я бы предпочел прочитать и обработать их один за другим. Поэтому мне нужно превратить асинхронный System.IO.Stream
поток в IAsyncEnumerable<T>
. Как мне использовать новый System.Text.Json
API для этого?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Рик де Уотер
источник
источник
Utf8JsonReader
, пожалуйста , посмотрите на некоторых GitHub образцов и при существующем потоке , а такжеGetAsync
сам по себе возвращает, когда весь ответ получен.SendAsync
Вместо этого вам нужно использовать с HttpCompletionOption.ResponseContentRead. Если у вас есть это, вы можете использовать JSON.NET JsonTextReader . ИспользоватьSystem.Text.Json
для этого не так просто, как показывает эта проблема . Функциональность недоступна, и реализовать ее при низком распределении с использованием структур нетривиальноОтветы:
Да, действительно потоковый JSON (de) сериализатор был бы хорошим улучшением производительности во многих местах.
К сожалению,
System.Text.Json
не делает этого в настоящее время. Я не уверен, будет ли это в будущем - я надеюсь на это! По-настоящему потоковая десериализация JSON оказывается довольно сложной задачей.Вы можете проверить, поддерживает ли чрезвычайно быстрый Utf8Json его, возможно.
Однако может быть индивидуальное решение для вашей конкретной ситуации, поскольку ваши требования, похоже, ограничивают сложность.
Идея состоит в том, чтобы вручную читать один элемент из массива за раз. Мы используем тот факт, что каждый элемент списка сам по себе является допустимым объектом JSON.
Вы можете вручную пропустить
[
(для первого элемента) или,
(для каждого следующего элемента). Тогда я думаю, что вам лучше всего использовать .NET Core,Utf8JsonReader
чтобы определить, где заканчивается текущий объект, и передать отсканированные байтыJsonDeserializer
.Таким образом, вы только слегка буферизуете по одному объекту за раз.
И так как мы говорим о производительности, вы можете получить информацию от a
PipeReader
, пока вы это делаете. :-)источник
TL; DR Это не тривиально
Похоже, кто-то уже опубликовал полный код для
Utf8JsonStreamReader
структуры, которая считывает буферы из потока и передает их в Utf8JsonRreader, что позволяет легко десериализовать с помощьюJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Код тоже не тривиален. Соответствующий вопрос здесь, а ответ здесь .Однако этого недостаточно -
HttpClient.GetAsync
он вернется только после получения полного ответа, по существу буферизуя все в памяти.Чтобы избежать этого, следует использовать HttpClient.GetAsync (string, HttpCompletionOption)
HttpCompletionOption.ResponseHeadersRead
.Цикл десериализации должен также проверять токен отмены, и либо выходить, либо выбрасывать, если он сигнализирован. В противном случае цикл будет продолжаться, пока весь поток не будет получен и обработан.
Этот код основан на примере соответствующего ответа и использует
HttpCompletionOption.ResponseHeadersRead
и проверяет токен отмены. Он может анализировать строки JSON, которые содержат правильный массив элементов, например:Первый вызов
jsonStreamReader.Read()
перемещается в начало массива, а второй - в начало первого объекта. Сам цикл завершается, когда обнаружен конец array (]
).JSON-фрагменты, AKA потокового JSON ака ... *
В сценариях потоковой передачи событий или журналирования достаточно часто добавлять отдельные объекты JSON в файл, по одному элементу в строке, например:
Это не действительный документ JSON, но отдельные фрагменты действительны. Это имеет несколько преимуществ для больших данных / сценариев с высокой степенью одновременности. Добавление нового события требует только добавления новой строки в файл, а не анализа и перекомпоновки всего файла. Обработка , особенно параллельная, проще по двум причинам:
Использование StreamReader
Чтобы сделать это, можно использовать TextReader, читать по одной строке за раз и анализировать его с помощью JsonSerializer.Deserialize :
Это намного проще, чем код, десериализующий правильный массив. Есть две проблемы:
ReadLineAsync
не принимает токен отменыЭтого может быть достаточно, хотя попытка создать
ReadOnlySpan<Byte>
буферы, необходимые для JsonSerializer.eserialize, не тривиальна.Трубопроводы и SequenceReader
Чтобы избежать размещения, нам нужно получить
ReadOnlySpan<byte>
поток. Для этого необходимо использовать каналы System.IO.Pipeline и структуру SequenceReader . В книге Стива Гордона « Введение в SequenceReader» объясняется, как этот класс можно использовать для чтения данных из потока с использованием разделителей.К сожалению,
SequenceReader
это структура ref, которая означает, что ее нельзя использовать в асинхронных или локальных методах. Вот почему Стив Гордон в своей статье создаетМетод чтения элементов формирует ReadOnlySequence и возвращает конечную позицию, поэтому PipeReader может возобновить ее. К сожалению, мы хотим вернуть IEnumerable или IAsyncEnumerable, а методы итератора не любят
in
илиout
параметры либо.Мы могли бы собрать десериализованные элементы в списке или очереди и вернуть их как единый результат, но это все равно выделило бы списки, буферы или узлы и пришлось бы ждать десериализации всех элементов в буфере перед возвратом:
Нам нужно что-то, что действует как перечислимое, не требуя метода итератора, работает с асинхронностью и не буферизирует все как есть.
Добавление каналов для создания IAsyncEnumerable
ChannelReader.ReadAllAsync возвращает IAsyncEnumerable. Мы можем вернуть ChannelReader из методов, которые не могли работать как итераторы и по-прежнему генерировать поток элементов без кэширования.
Адаптируя код Стива Гордона для использования каналов, мы получаем ReadItems (ChannelWriter ...) и
ReadLastItem
методы. Первый, читает по одному элементу за раз, вплоть до новой строкиReadOnlySpan<byte> itemBytes
. Это может быть использованоJsonSerializer.Deserialize
. ЕслиReadItems
не удается найти разделитель, он возвращает свою позицию, чтобы PipelineReader мог извлечь следующий фрагмент из потока.Когда мы достигаем последнего блока и другого разделителя нет, ReadLastItem` читает оставшиеся байты и десериализует их.
Код почти идентичен коду Стива Гордона. Вместо записи в консоль, мы пишем в ChannelWriter.
DeserializeToChannel<T>
Метод создает читатель трубопроводов на верхней части потока, создает канал и начинает задачу работника , который разбирает ломти и толкают их на канал:ChannelReader.ReceiveAllAsync()
может быть использован для потребления всех предметов черезIAsyncEnumerable<T>
:источник
Такое ощущение, что вам нужно реализовать свой собственный потоковый ридер. Вы должны прочитать байты один за другим и остановиться, как только определение объекта будет завершено. Это действительно довольно низкоуровневый. Таким образом, вы НЕ БУДЕТЕ загружать весь файл в ОЗУ, а просто возьмете на себя роль, с которой имеете дело. Кажется ли это ответом?
источник
Может быть, вы могли бы использовать
Newtonsoft.Json
сериализатор? https://www.newtonsoft.com/json/help/html/Performance.htmОсобенно смотри раздел:
редактировать
Вы можете попробовать десериализовать значения из JsonTextReader, например
источник
I don't want them to be in memory all at once, but I would rather read and process them one by one.
Соответствующим классом в JSON.NET является JsonTextReader.