Есть ли что-нибудь вроде асинхронного BlockingCollection <T>?

87

Я хотел бы получить awaitрезультат BlockingCollection<T>.Take()асинхронно, чтобы я не блокировал поток. Ищу что-нибудь подобное:

var item = await blockingCollection.TakeAsync();

Я знаю, что могу это сделать:

var item = await Task.Run(() => blockingCollection.Take());

но это как бы убивает всю идею, потому что ThreadPoolвместо этого блокируется другой поток (из ).

Есть ли альтернатива?

избегать
источник
3
Я не понимаю, если вы используете await Task.Run(() => blockingCollection.Take())задачу, она будет выполняться в другом потоке, и ваш поток пользовательского интерфейса не будет заблокирован.
Selman Genç
8
@ Selman22, это не UI-приложение. Это библиотека, Taskоснованная на экспорте API. Его можно использовать, например, из ASP.NET. Здесь рассматриваемый код не будет хорошо масштабироваться.
избежать
Будет ли проблема, если ConfigureAwaitиспользовать после Run()? [изд. неважно, теперь я понимаю, о чем вы говорите]
MojoFilter

Ответы:

99

Я знаю четыре альтернативы.

Первый - Channels , который предоставляет потокобезопасную очередь, поддерживающую асинхронные операции Readи Writeоперации. Каналы сильно оптимизированы и, возможно, поддерживают удаление некоторых элементов при достижении порогового значения.

Следующее - BufferBlock<T>от TPL Dataflow . Если у вас всего один потребитель, вы можете использовать OutputAvailableAsyncили ReceiveAsyncили просто связать его с файлом ActionBlock<T>. Для получения дополнительной информации см. Мой блог .

Последние два - это созданные мной типы, доступные в моей библиотеке AsyncEx .

AsyncCollection<T>является asyncпочти эквивалентом BlockingCollection<T>, способным заключать в оболочку параллельную коллекцию производителей / потребителей, такую ​​как ConcurrentQueue<T>или ConcurrentBag<T>. Вы можете использовать TakeAsyncдля асинхронного использования элементов из коллекции. Для получения дополнительной информации см. Мой блог .

AsyncProducerConsumerQueue<T>- это более портативная asyncочередь производителей / потребителей. Вы можете использовать DequeueAsyncдля асинхронного использования элементов из очереди. Для получения дополнительной информации см. Мой блог .

Последние три из этих альтернатив допускают синхронные и асинхронные операции ввода-вывода.

Стивен Клири
источник
12
Ссылка на Git Hub, когда CodePlex окончательно закрывается: github.com/StephenCleary/AsyncEx
Пол
Документация по API содержит метод AsyncCollection.TryTakeAsync, но я не могу найти его в загруженной Nito.AsyncEx.Coordination.dll 5.0.0.0(последней версии). Указанный Nito.AsyncEx.Concurrent.dll не существует в пакете . Что мне не хватает?
Theodor Zoulias
@TheodorZoulias: Этот метод был удален в v5. Документация по API v5 находится здесь .
Стивен Клири
ООО Спасибо. Похоже, это был самый простой и безопасный способ перечислить коллекцию. while ((result = await collection.TryTakeAsync()).Success) { }. Почему его удалили?
Теодор Зулиас
1
@TheodorZoulias: Потому что «попробовать» для разных людей означает разное. Я думаю о том, чтобы снова добавить метод «Попробовать», но на самом деле он будет иметь другую семантику, чем исходный метод. Также рассматривается поддержка асинхронных потоков в будущей версии, которая определенно будет лучшим методом потребления при поддержке.
Стивен Клири
21

... или вы можете сделать это:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

Простая, полнофункциональная асинхронная очередь FIFO.

Примечание: до этого SemaphoreSlim.WaitAsyncбыл добавлен в .NET 4.5, это было не так просто.

Джон Лейдегрен
источник
2
Что толку от бесконечности for? если семафор освобожден, в очереди есть по крайней мере один элемент для удаления, нет?
Blendester
2
@Blendester может возникнуть состояние гонки, если несколько потребителей заблокированы. Мы не можем знать наверняка, что нет по крайней мере двух конкурирующих потребителей, и мы не знаем, удается ли им обоим проснуться до того, как они успеют забрать товар. В случае гонки, если одному из участников не удалось удалить очередь, он вернется в режим сна и будет ждать другого сигнала.
Джон Лейдегрен
Если два или более потребителя проходят через WaitAsync (), то в очереди имеется эквивалентное количество элементов, и поэтому они всегда будут удаляться из очереди успешно. Я что-то упускаю?
mindcruzer
2
Это блокирующая коллекция, семантика TryDequeueкоторой, возврат со значением или не возврат вообще. Технически, если у вас более 1 ридера, один и тот же ридер может использовать два (или более) предмета, прежде чем любой другой ридер полностью проснется. Успешный результат WaitAsync- это просто сигнал о том, что в очереди могут быть элементы, которые нужно потребить, это не гарантия.
Джон Лейдегрен
@JohnLeidegren If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.с сайта docs.microsoft.com/en-us/dotnet/api/… Как при успешном выполнении WaitAsyncне иметь элементов в очереди? Если выброс N пробуждает более N потребителей, чем semaphoreпроисходит сбой. Не так ли?
Ашиш Неги
4

Вот очень простая реализация BlockingCollection, поддерживающая ожидание, с множеством недостающих функций. Он использует AsyncEnumerableбиблиотеку, которая делает возможным асинхронное перечисление для версий C # старше 8.0.

public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
    private Queue<T> _queue = new Queue<T>();
    private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
    private int _consumersCount = 0;
    private bool _isAddingCompleted;

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_isAddingCompleted) throw new InvalidOperationException();
            _queue.Enqueue(item);
        }
        _semaphore.Release();
    }

    public void CompleteAdding()
    {
        lock (_queue)
        {
            if (_isAddingCompleted) return;
            _isAddingCompleted = true;
            if (_consumersCount > 0) _semaphore.Release(_consumersCount);
        }
    }

    public IAsyncEnumerable<T> GetConsumingEnumerable()
    {
        lock (_queue) _consumersCount++;
        return new AsyncEnumerable<T>(async yield =>
        {
            while (true)
            {
                lock (_queue)
                {
                    if (_queue.Count == 0 && _isAddingCompleted) break;
                }
                await _semaphore.WaitAsync();
                bool hasItem;
                T item = default;
                lock (_queue)
                {
                    hasItem = _queue.Count > 0;
                    if (hasItem) item = _queue.Dequeue();
                }
                if (hasItem) await yield.ReturnAsync(item);
            }
        });
    }
}

Пример использования:

var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(100);
        abc.Add(i);
    }
    abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
    await abc.GetConsumingEnumerable().ForEachAsync(async item =>
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    });
});
await Task.WhenAll(producer, consumer);

Вывод:

1 2 3 4 5 6 7 8 9 10


Обновление: с выпуском C # 8 асинхронное перечисление стало встроенной функцией языка. Необходимые классы ( IAsyncEnumerable, IAsyncEnumerator) встроены в .NET Core 3.0 и предлагаются в виде пакета для .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).

Вот альтернативная GetConsumingEnumerableреализация с новым синтаксисом C # 8:

public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
    lock (_queue) _consumersCount++;
    while (true)
    {
        lock (_queue)
        {
            if (_queue.Count == 0 && _isAddingCompleted) break;
        }
        await _semaphore.WaitAsync();
        bool hasItem;
        T item = default;
        lock (_queue)
        {
            hasItem = _queue.Count > 0;
            if (hasItem) item = _queue.Dequeue();
        }
        if (hasItem) yield return item;
    }
}

Обратите внимание на сосуществование awaitи yieldв одном методе.

Пример использования (C # 8):

var consumer = Task.Run(async () =>
{
    await foreach (var item in abc.GetConsumingEnumerable())
    {
        await Task.Delay(200);
        await Console.Out.WriteAsync(item + " ");
    }
});

Обратите внимание на awaitперед foreach.

Теодор Зулиас
источник
1
Вспомнив, я теперь думаю, что имя класса AsyncBlockingCollectionбессмысленно. Что-то не может быть асинхронным и блокирующим одновременно, поскольку эти две концепции являются точными противоположностями!
Theodor Zoulias
-2

Если вы не против небольшого взлома, вы можете попробовать эти расширения.

public static async Task AddAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            if (Bc.TryAdd(item, 0, abortCt))
                return;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

public static async Task<TEntity> TakeAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            TEntity item;

            if (Bc.TryTake(out item, 0, abortCt))
                return item;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}
Dejisys
источник
1
Значит, вы вводите искусственную задержку, чтобы сделать его асинхронным? Его все еще блокирует, верно?
nawfal