Я хотел бы получить await
результат BlockingCollection<T>.Take()
асинхронно, чтобы я не блокировал поток. Ищу что-нибудь подобное:
var item = await blockingCollection.TakeAsync();
Я знаю, что могу это сделать:
var item = await Task.Run(() => blockingCollection.Take());
но это как бы убивает всю идею, потому что ThreadPool
вместо этого блокируется другой поток (из ).
Есть ли альтернатива?
await Task.Run(() => blockingCollection.Take())
задачу, она будет выполняться в другом потоке, и ваш поток пользовательского интерфейса не будет заблокирован.Task
основанная на экспорте API. Его можно использовать, например, из ASP.NET. Здесь рассматриваемый код не будет хорошо масштабироваться.ConfigureAwait
использовать послеRun()
? [изд. неважно, теперь я понимаю, о чем вы говорите]Ответы:
Я знаю четыре альтернативы.
Первый - Channels , который предоставляет потокобезопасную очередь, поддерживающую асинхронные операции
Read
иWrite
операции. Каналы сильно оптимизированы и, возможно, поддерживают удаление некоторых элементов при достижении порогового значения.Следующее -
BufferBlock<T>
от TPL Dataflow . Если у вас всего один потребитель, вы можете использоватьOutputAvailableAsync
илиReceiveAsync
или просто связать его с файломActionBlock<T>
. Для получения дополнительной информации см. Мой блог .Последние два - это созданные мной типы, доступные в моей библиотеке AsyncEx .
AsyncCollection<T>
являетсяasync
почти эквивалентомBlockingCollection<T>
, способным заключать в оболочку параллельную коллекцию производителей / потребителей, такую какConcurrentQueue<T>
илиConcurrentBag<T>
. Вы можете использоватьTakeAsync
для асинхронного использования элементов из коллекции. Для получения дополнительной информации см. Мой блог .AsyncProducerConsumerQueue<T>
- это более портативнаяasync
очередь производителей / потребителей. Вы можете использоватьDequeueAsync
для асинхронного использования элементов из очереди. Для получения дополнительной информации см. Мой блог .Последние три из этих альтернатив допускают синхронные и асинхронные операции ввода-вывода.
источник
AsyncCollection.TryTakeAsync
, но я не могу найти его в загруженнойNito.AsyncEx.Coordination.dll 5.0.0.0
(последней версии). Указанный Nito.AsyncEx.Concurrent.dll не существует в пакете . Что мне не хватает?while ((result = await collection.TryTakeAsync()).Success) { }
. Почему его удалили?... или вы можете сделать это:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }
Простая, полнофункциональная асинхронная очередь FIFO.
источник
for
? если семафор освобожден, в очереди есть по крайней мере один элемент для удаления, нет?TryDequeue
которой, возврат со значением или не возврат вообще. Технически, если у вас более 1 ридера, один и тот же ридер может использовать два (или более) предмета, прежде чем любой другой ридер полностью проснется. Успешный результатWaitAsync
- это просто сигнал о том, что в очереди могут быть элементы, которые нужно потребить, это не гарантия.If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.
с сайта docs.microsoft.com/en-us/dotnet/api/… Как при успешном выполненииWaitAsync
не иметь элементов в очереди? Если выброс N пробуждает более N потребителей, чемsemaphore
происходит сбой. Не так ли?Вот очень простая реализация
BlockingCollection
, поддерживающая ожидание, с множеством недостающих функций. Он используетAsyncEnumerable
библиотеку, которая делает возможным асинхронное перечисление для версий C # старше 8.0.public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } }
Пример использования:
var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer);
Вывод:
Обновление: с выпуском C # 8 асинхронное перечисление стало встроенной функцией языка. Необходимые классы (
IAsyncEnumerable
,IAsyncEnumerator
) встроены в .NET Core 3.0 и предлагаются в виде пакета для .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).Вот альтернативная
GetConsumingEnumerable
реализация с новым синтаксисом C # 8:public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } }
Обратите внимание на сосуществование
await
иyield
в одном методе.Пример использования (C # 8):
var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } });
Обратите внимание на
await
передforeach
.источник
AsyncBlockingCollection
бессмысленно. Что-то не может быть асинхронным и блокирующим одновременно, поскольку эти две концепции являются точными противоположностями!Если вы не против небольшого взлома, вы можете попробовать эти расширения.
public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } }
источник