У меня есть сценарий, в котором у меня есть несколько потоков, добавляющих в очередь, и несколько потоков, читающих из одной и той же очереди. Если очередь достигает определенного размера, все потоки , заполняющие очередь, будут заблокированы при добавлении, пока элемент не будет удален из очереди.
Решение ниже - то, что я использую прямо сейчас, и мой вопрос: как это можно улучшить? Есть ли объект, который уже включает это поведение в BCL, который я должен использовать?
internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE
private AutoResetEvent _FullEvent = new AutoResetEvent(false);
internal T this[int i]
{
get { return (T) List[i]; }
}
private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}
internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}
internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.WaitOne();
List.Add(item);
Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));
checkSize();
}
internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}
Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}
protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}
internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}
private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
c#
.net
multithreading
collections
queue
Эрик Шуновер
источник
источник
Ответы:
Это выглядит очень небезопасно (очень мало синхронизации); как насчет чего-то вроде:
(редактировать)
На самом деле вам нужен способ закрыть очередь, чтобы читатели начинали выходить чисто - возможно, что-то вроде флага bool - если установлен, пустая очередь просто возвращается (а не блокируется):
источник
Wait
, чтобы другие потоки могли ее получить. Он восстанавливает блокировки, когда он просыпается.Используйте .net 4 BlockingCollection, чтобы поставить в очередь, использовать Add (), чтобы убрать из очереди, используйте Take (). Он внутренне использует неблокирующий ConcurrentQueue. Более подробная информация здесь: Быстрая и лучшая техника очереди производителей / потребителей BlockingCollection vs одновременная очередь
источник
"Как это можно улучшить?"
Что ж, вам нужно посмотреть на каждый метод в вашем классе и подумать, что произойдет, если другой поток одновременно вызывает этот метод или любой другой метод. Например, вы устанавливаете блокировку в методе Remove, но не в методе Add. Что происходит, если один поток добавляет одновременно с удалением другого потока? Плохие вещи.
Также учтите, что метод может возвращать второй объект, который обеспечивает доступ к внутренним данным первого объекта, например, GetEnumerator. Представьте, что один поток проходит через этот перечислитель, другой поток одновременно изменяет список. Не хорошо.
Хорошее эмпирическое правило заключается в том, чтобы упростить задачу, сократив количество методов в классе до абсолютного минимума.
В частности, не наследуйте другой контейнерный класс, потому что вы будете выставлять все методы этого класса, предоставляя возможность вызывающей стороне повредить внутренние данные или увидеть частично завершенные изменения данных (столь же плохие, потому что данные кажется поврежденным в этот момент). Скройте все детали и будьте абсолютно безжалостны о том, как вы разрешаете доступ к ним.
Я настоятельно рекомендую вам использовать готовые решения - получить книгу о многопоточности или использовать стороннюю библиотеку. В противном случае, учитывая то, что вы пытаетесь, вы будете отлаживать свой код в течение длительного времени.
Кроме того, не имеет ли смысла для Remove возвращать элемент (скажем, тот, который был добавлен первым, поскольку это очередь), а не вызывающий объект, выбирающий конкретный элемент? И когда очередь пуста, возможно, Удалить также следует заблокировать.
Обновление: ответ Марка фактически реализует все эти предложения! :) Но я оставлю это здесь, так как может быть полезно понять, почему его версия является таким улучшением.
источник
Вы можете использовать BlockingCollection и ConcurrentQueue в пространстве имен System.Collections.Concurrent.
источник
Я просто поднял это с помощью Reactive Extensions и вспомнил этот вопрос:
Не обязательно полностью безопасно, но очень просто.
источник
Это то, что я пришел, чтобы создать потокобезопасную ограниченную очередь блокировки.
источник
Я не полностью изучил TPL, но у них могло бы быть что-то, что соответствует вашим потребностям, или, по крайней мере, какой-то корм для рефлекторов, от которого можно получить вдохновение.
Надеюсь, это поможет.
источник
Ну, вы можете посмотреть на
System.Threading.Semaphore
класс. Кроме этого - нет, вы должны сделать это самостоятельно. У AFAIK нет такой встроенной коллекции.источник
Если вам нужна максимальная пропускная способность, позволяющая читать нескольким читателям и писать только одному писателю, в BCL есть нечто, называемое ReaderWriterLockSlim, которое должно помочь уменьшить ваш код ...
источник