«Универсальная конструкция» - это класс-оболочка для последовательного объекта, который позволяет его линеаризовать (условие строгой согласованности для параллельных объектов). Например, вот адаптированная конструкция без ожидания в Java из [1], которая предполагает существование очереди без ожидания, которая удовлетворяет интерфейсу WFQ
(который требует единовременного согласования между потоками) и предполагает Sequential
интерфейс:
public interface WFQ<T> // "FIFO" iteration
{
int enqueue(T t); // returns the sequence number of t
Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
// Apply an invocation (method + arguments)
// and get a response (return value + state)
Response apply(Invocation i);
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}
public class SlowUniversal implements Universal
{
Factory<? extends Sequential> generator;
WFQ<Invocation> wfq = new WFQ<Invocation>();
Universal(Factory<? extends Sequential> g) { generator = g; }
public Response apply(Invocation i)
{
int max = wfq.enqueue(i);
Sequential s = generator.generate();
for(Invocation invoc : wfq.iterateUntil(max))
s.apply(invoc);
return s.apply(i);
}
}
Эта реализация не очень удовлетворительна, так как она действительно медленная (вы помните каждый вызов и должны воспроизводить его при каждом применении - у нас есть линейное время выполнения в размере истории). Есть ли способ , что мы могли бы расширить WFQ
и Sequential
интерфейсы (в разумных путях) , чтобы мы могли сохранить некоторые шаги при применении нового вызова?
Можем ли мы сделать это более эффективным (не линейное время выполнения в размере истории, предпочтительно использование памяти также уменьшается) без потери свойства без ожидания?
осветление
«Универсальная конструкция» - это термин, который, я уверен, был составлен [1], который принимает небезопасный, но совместимый с потоками объект, который обобщается Sequential
интерфейсом. Используя очередь без ожидания, первая конструкция предлагает поточно-ориентированную, линеаризуемую версию объекта, которая также не требует ожидания (это предполагает детерминизм и apply
операции остановки ).
Это неэффективно, поскольку этот метод эффективно запускает каждый локальный поток с чистого листа и применяет все операции, когда-либо записанные в него. В любом случае, это работает, потому что это обеспечивает эффективную синхронизацию с помощью WFQ
определения порядка, в котором должны применяться все операции: каждый вызывающий поток apply
увидит один и тот же локальный Sequential
объект с одинаковой последовательностью Invocation
s, примененной к нему.
Мой вопрос заключается в том, можем ли мы (например) внедрить процесс фоновой очистки, который обновляет «начальное состояние», чтобы нам не пришлось перезапускать с нуля. Это не так просто, как иметь атомарный указатель с начальным указателем - такие подходы легко теряют гарантию без ожидания. Я подозреваю, что здесь может работать другой подход, основанный на очереди.
Жаргон:
- без ожидания - независимо от количества потоков или принятия решения планировщиком,
apply
будет заканчиваться ограниченным количеством команд, выполняемых для этого потока. - без блокировки - то же, что и выше, но допускает возможность неограниченного времени выполнения, только в том случае, если
apply
в других потоках выполняется неограниченное количество операций. Как правило, оптимистические схемы синхронизации попадают в эту категорию. - блокировка - эффективность во власти планировщика.
Рабочий пример по запросу (теперь на странице, срок действия которой не истекает)
[1] Херлихи и Шавит, Искусство многопроцессорного программирования .
CopyableSequential
были действительными - линеаризация должна следовать из факта, что это такSequential
.Ответы:
Вот объяснение и пример того, как это достигается. Дайте мне знать, если есть части, которые не ясны.
Гист с источником
универсальный
Инициализация:
Индексы потоков применяются атомарно увеличенным способом. Это управляется с использованием
AtomicInteger
именованныхnextIndex
. Эти индексы присваиваются потокам черезThreadLocal
экземпляр, который инициализирует себя, получая следующий индексnextIndex
и увеличивая его. Это происходит при первом получении индекса каждого потока. AThreadLocal
создан для отслеживания последней последовательности, созданной этим потоком. Он инициализирован 0. Последовательная фабричная ссылка на объект передается и сохраняется. ДваAtomicReferenceArray
экземпляра созданы размеромn
. Конечный объект присваивается каждой ссылке, будучи инициализированной с исходным состоянием, предоставленнымSequential
фабрикой.n
максимально допустимое количество потоков. Каждый элемент в этих массивах «принадлежит» соответствующему индексу потока.Применить метод:
Это метод, который делает интересную работу. Это делает следующее:
Затем начинается цикл секвенирования. Это будет продолжаться, пока текущий вызов не будет упорядочен:
decideNext()
Ключом к описанному выше вложенному циклу является
decideNext()
метод. Чтобы понять это, нам нужно взглянуть на класс Node.Класс узла
Этот класс определяет узлы в двусвязном списке. Там не много действий в этом классе. Большинство методов - это простые методы поиска, которые должны быть достаточно понятны.
метод хвоста
это возвращает специальный экземпляр узла с последовательностью 0. Он просто действует как заполнитель, пока вызов не заменит его.
Свойства и инициализация
seq
: порядковый номер, инициализированный -1 (что означает неупорядоченный)invocation
: значение вызоваapply()
. Установить на строительство.next
:AtomicReference
для прямой ссылки. после назначения это никогда не изменитсяprevious
:AtomicReference
для обратной ссылки, назначенной при секвенировании и очищеннойtruncate()
Решить дальше
Этот метод является единственным в Node с нетривиальной логикой. В двух словах, узел предлагается в качестве кандидата на следующий узел в связанном списке.
compareAndSet()
Метод будет проверять , если это ссылка является недействительным , и если да, то установите ссылку на кандидата. Если ссылка уже установлена, она ничего не делает. Эта операция является атомарной, поэтому, если два кандидата предлагаются одновременно, будет выбран только один. Это гарантирует, что в качестве следующего будет выбран только один узел. Если выбран узел-кандидат, его последовательность устанавливается на следующее значение, а его предыдущая ссылка устанавливается на этот узел.Возвращаясь к методу применения класса Universal ...
Вызвав
decideNext()
последний секвенированный узел (если проверено) с нашим узлом или узлом изannounce
массива, возможны два случая: 1. Узел был успешно секвенирован. 2. Некоторые другие потоки опередили этот поток.Следующим шагом является проверка, создан ли узел для этого вызова. Это могло произойти, потому что этот поток успешно упорядочил его или какой-то другой поток извлек его из
announce
массива и упорядочил его для нас. Если он не был упорядочен, процесс повторяется. В противном случае вызов завершается очисткой массива анонса для индекса этого потока и возвращением результирующего значения вызова. Массив аннулирования очищается, чтобы гарантировать, что нет никаких ссылок на оставленный узел, который предотвратил бы сборку мусора для узла и, следовательно, сохранил все узлы в связанном списке с этой точки живым в куче.Оценить метод
Теперь, когда узел вызова был успешно упорядочен, вызов должен быть оценен. Чтобы сделать это, первый шаг должен гарантировать, что вызовы, предшествующие этому, были оценены. Если у них нет этого потока, он не будет ждать, но сделает эту работу немедленно.
Метод EnsurePrior
ensurePrior()
Метод делает эту работу, проверяя предыдущий узел в связанном списке. Если это состояние не установлено, предыдущий узел будет оценен. Узел, что это рекурсивно. Если узел, предшествующий предыдущему узлу, не был оценен, он вызовет метод оценки для этого узла и так далее, и так далее.Теперь, когда известно, что предыдущий узел имеет состояние, мы можем оценить этот узел. Последний узел извлекается и присваивается локальной переменной. Если эта ссылка нулевая, это означает, что какой-то другой поток опередил этот и уже оценил этот узел; установив это состояние. В противном случае состояние предыдущего узла передается
Sequential
методу apply объекта вместе с вызовом этого узла. Возвращаемое состояние устанавливается на узле, иtruncate()
вызывается метод, очищающий обратную ссылку от узла, поскольку он больше не нужен.Метод MoveForward
Метод move forward попытается переместить все ссылки на головы на этот узел, если они еще не указывают на что-то еще. Это сделано для того, чтобы, если поток прекратил вызывать, его голова не будет сохранять ссылку на узел, который больше не нужен.
compareAndSet()
Метод будет убедиться , что мы обновляем только узел , если какой - то другой поток не изменил его , так как он был получен.Объявить массив и помощь
Ключом к тому, чтобы сделать этот подход без ожидания, а не просто без блокировки, является то, что мы не можем предполагать, что планировщик потока будет давать приоритет каждому потоку, когда он ему нужен. Если каждый поток просто пытается упорядочить свои собственные узлы, возможно, поток будет постоянно загружаться под нагрузкой. Чтобы учесть эту возможность, каждый поток сначала попытается «помочь» другим потокам, которые могут быть не в состоянии упорядочиться.
Основная идея состоит в том, что, поскольку каждый поток успешно создает узлы, назначенные последовательности монотонно увеличиваются. Если поток или потоки непрерывно вытесняют другой поток, индекс, используемый для поиска неупорядоченных узлов в
announce
массиве, будет двигаться вперед. Даже если каждый поток, который в данный момент пытается упорядочить заданный узел, постоянно вытесняется другим потоком, в конечном итоге все потоки будут пытаться упорядочить этот узел. Чтобы проиллюстрировать это, мы построим пример с тремя потоками.В начальной точке все три элемента заголовка и объявления потоков направлены на
tail
узел. ДляlastSequence
каждого потока - 0.На этом этапе поток 1 выполняется с вызовом. Он проверяет массив анонсов на предмет его последней последовательности (ноль), которая является узлом, который он в настоящее время планирует индексировать. Это последовательность узла, и он
lastSequence
установлен в 1.Поток 2 теперь выполняется с вызовом, он проверяет массив анонсов в его последней последовательности (ноль) и видит, что ему не нужна помощь, и поэтому пытается упорядочить свой вызов. Это успешно, и теперь он
lastSequence
установлен на 2.Поток 3 теперь выполняется, и он также видит, что узел в
announce[0]
уже секвенирован и выполняет собственный вызов. ЭтоlastSequence
теперь установлено значение 3.Теперь поток 1 вызывается снова. Он проверяет массив анонсов по индексу 1 и находит, что он уже упорядочен. Параллельно вызывается поток 2 . Он проверяет массив анонсов в индексе 2 и находит, что он уже упорядочен. И Поток 1, и Поток 2 теперь пытаются упорядочить свои собственные узлы. Поток 2 побеждает, и это вызывает его вызов. Это
lastSequence
установлено в 4. Между тем, поток три был вызван. Он проверяет индекс itlastSequence
(mod 3) и находит, что узелannounce[0]
не был упорядочен. Поток 2 снова вызывается в то время, когда Поток 1 находится на второй попытке. Тема 1находит неупорядоченный вызов, вannounce[1]
котором находится узел, только что созданный потоком 2 . Он пытается упорядочить вызов потока 2 и завершается успешно. Поток 2 находит свой собственный узел в,announce[1]
и он был упорядочен. Он устанавливаетlastSequence
значение 5. Поток 3 затем вызывается и обнаруживает, что узел, в котором размещен поток 1announce[0]
, все еще не секвенирован, и пытается это сделать. Между тем поток 2 также был вызван и опережает поток 3. Он упорядочивает свой узел и устанавливает его равнымlastSequence
6.Плохая тема 1 . Несмотря на то, что Поток 3 пытается упорядочить его, планировщик постоянно мешал обоим потокам. Но на данный момент. Тема 2 также теперь указывает на
announce[0]
(6 мод 3). Все три потока настроены на последовательность одного и того же вызова. Независимо от того, какой поток завершится успешно, следующий подлежащий последовательности узел будет ожидающим вызовом потока 1, то есть узла, на который ссылаетсяannounce[0]
.Это неизбежно. Для того чтобы потоки имели приоритет, другие потоки должны быть последовательными узлами, и при этом они будут постоянно двигаться
lastSequence
вперед. Если узел данного потока постоянно не упорядочен, в конечном итоге все потоки будут указывать на его индекс в массиве анонсов. Никакой поток не будет делать что-либо еще, пока узел, которому он пытается помочь, не будет упорядочен, в худшем случае все потоки будут указывать на один и тот же непоследовательный узел. Следовательно, время, необходимое для последовательности любого вызова, зависит от количества потоков, а не от размера ввода.источник
previous
иnext
указатель. Поддержание и создание действительной истории без ожидания кажется трудным.Мой предыдущий ответ на самом деле не отвечает на вопрос должным образом, но, поскольку ФП считает его полезным, я оставлю его как есть. Основываясь на коде в ссылке в вопросе, вот моя попытка. Я провел только базовое тестирование по этому вопросу, но, похоже, он правильно рассчитывает средние значения. Приветствуется обратная связь относительно того, действительно ли это без ожидания.
ПРИМЕЧАНИЕ : я удалил универсальный интерфейс и сделал его классом. Наличие Universal, состоящего из последовательностей, а также единое целое кажется ненужным осложнением, но я мог бы что-то упустить. В среднем классе я пометил переменную состояния как
volatile
. Это не обязательно, чтобы заставить код работать. Быть консервативным (хорошая идея с потоками) и не допускать, чтобы каждый поток делал все вычисления (один раз).Последовательный и завод
универсальный
Средний
Демо-код
Я внес некоторые изменения в код, так как размещал его здесь. Это должно быть хорошо, но дайте мне знать, если у вас есть проблемы с этим.
источник
wfq
, так что вам все равно придется просматривать всю историю - время выполнения не улучшилось, за исключением постоянного фактора.