Должен ли я получить блокировку перед вызовом condition_variable.notify_one ()?

90

Я немного запутался в использовании std::condition_variable. Я понимаю , что я должен создать unique_lockна mutexперед вызовом condition_variable.wait(). Я не могу найти, нужно ли мне также получить уникальную блокировку перед вызовом notify_one()или notify_all().

Примеры на cppreference.com противоречивы. Например, страница notify_one дает следующий пример:

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>

std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;

void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cout << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cout << "...finished waiting. i == 1\n";
    done = true;
}

void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Notifying...\n";
    cv.notify_one();

    std::unique_lock<std::mutex> lk(cv_m);
    i = 1;
    while (!done) {
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::seconds(1));
        lk.lock();
        std::cerr << "Notifying again...\n";
        cv.notify_one();
    }
}

int main()
{
    std::thread t1(waits), t2(signals);
    t1.join(); t2.join();
}

Здесь блокировка приобретается не для первого notify_one(), а для второго notify_one(). Просматривая другие страницы с примерами, я вижу разные вещи, в основном не захватывающие.

  • Могу ли я выбрать блокировку мьютекса перед вызовом notify_one()и почему я должен его заблокировать?
  • В приведенном примере, почему нет блокировки для первого notify_one()вызова, но есть для последующих вызовов. Этот пример неверен или есть какое-то обоснование?
Питер Смит
источник

Ответы:

77

Вам не нужно удерживать блокировку при вызове condition_variable::notify_one(), но это не неправильно в том смысле, что это все еще четко определенное поведение, а не ошибка.

Однако это может быть «пессимизацией», поскольку любой ожидающий поток, который становится работоспособным (если таковой имеется), немедленно попытается получить блокировку, удерживаемую уведомляющим потоком. Я думаю, что это хорошее практическое правило - избегать удерживания блокировки, связанной с переменной условия, при вызове notify_one()или notify_all(). См. Pthread Mutex: pthread_mutex_unlock () отнимает много времени для примера, в котором снятие блокировки перед вызовом эквивалента pthread заметно notify_one()улучшает производительность.

Имейте в виду, что lock()вызов в whileцикле необходим в какой-то момент, потому что блокировку необходимо удерживать во время while (!done)проверки условий цикла. Но его не нужно удерживать для звонка notify_one().


2016-02-27 : Большое обновление для ответа на некоторые вопросы в комментариях о том, существует ли состояние гонки, если блокировка не помогает для notify_one()вызова. Я знаю, что это обновление задерживается, потому что вопрос был задан почти два года назад, но я хотел бы ответить на вопрос @Cookie о возможном состоянии гонки, если производитель ( signals()в этом примере) вызывает notify_one()непосредственно перед тем, как потребитель ( waits()в этом примере) возможность позвонить wait().

Ключ в том, что происходит i- это объект, который на самом деле указывает, есть ли у потребителя «работа». Это condition_variableвсего лишь механизм, позволяющий потребителю эффективно дождаться изменения i.

Производитель должен удерживать блокировку при обновлении i, а потребитель должен удерживать блокировку при проверке iи вызове condition_variable::wait()(если ему вообще нужно ждать). В этом случае ключ заключается в том, что это должен быть тот же самый экземпляр удержания блокировки (часто называемый критической секцией), когда потребитель выполняет эту проверку и ожидание. Поскольку критическая секция удерживается, когда производитель обновляет iи когда потребитель проверяет и ожидает i, нет возможности для iпереключения между тем, когда потребитель проверяет iи когда он звонит condition_variable::wait(). Это ключевой момент для правильного использования условных переменных.

Стандарт C ++ говорит, что condition_variable :: wait () при вызове с предикатом (как в этом случае) ведет себя следующим образом:

while (!pred())
    wait(lock);

При проверке потребителя могут возникнуть две ситуации i:

  • если i0, то вызывается потребитель cv.wait(), тогда iвсе равно будет 0 при wait(lock)вызове части реализации - это гарантирует правильное использование блокировок. В этом случае у производителя нет возможности вызвать condition_variable::notify_one()в своем whileцикле до тех пор, пока потребитель не позвонит cv.wait(lk, []{return i == 1;})wait()вызов сделал все, что ему нужно сделать, чтобы правильно `` поймать '' уведомление - wait()не снимет блокировку, пока не сделает это. ). Так что в этом случае потребитель не может пропустить уведомление.

  • если iна момент вызова потребителя значение уже равно 1 cv.wait(), wait(lock)часть реализации никогда не будет вызвана, потому что while (!pred())тест вызовет завершение внутреннего цикла. В этой ситуации не имеет значения, когда происходит вызов notify_one () - потребитель не заблокируется.

Пример здесь имеет дополнительную сложность использования doneпеременного для сигнала обратно к потоку производителя , что потребитель признал , что i == 1, но я не думаю , что это меняет анализ на всех , потому что весь доступ к done(для чтения и модифицирования ) выполняются в тех же критических секциях, которые включают iи condition_variable.

Если посмотреть на вопрос , что @ EH9 указал, синхронизация ненадежна с помощью станд :: атомное и станд :: condition_variable , вы будете видеть состояние гонки. Однако код, опубликованный в этом вопросе, нарушает одно из основных правил использования условной переменной: он не содержит ни одного критического раздела при выполнении проверки и ожидания.

В этом примере код выглядит так:

if (--f->counter == 0)      // (1)
    // we have zeroed this fence's counter, wake up everyone that waits
    f->resume.notify_all(); // (2)
else
{
    unique_lock<mutex> lock(f->resume_mutex);
    f->resume.wait(lock);   // (3)
}

Вы заметите, что wait()at # 3 выполняется при удерживании f->resume_mutex. Но проверка того, wait()является ли это необходимым на шаге № 1, вообще не выполняется при удержании этой блокировки (тем более непрерывно для проверки и ожидания), что является требованием для правильного использования переменных состояния). Я считаю, что человек, у которого возникла проблема с этим фрагментом кода, думал, что, поскольку f->counterэто std::atomicтип, он будет соответствовать требованиям. Однако атомарность, обеспечиваемая std::atomicне распространяется на последующий вызов f->resume.wait(lock). В этом примере существует гонка между тем, когда f->counterпроверяется (шаг №1) и когда wait()вызывается (этап №3).

В примере с этим вопросом такой расы не существует.

Майкл Берр
источник
2
это имеет более глубокие последствия: domaigne.com/blog/computing/… Примечательно, что упомянутая вами проблема pthread должна быть решена либо более новой версией, либо версией, построенной с правильными флагами. (для включения wait morphingоптимизации) Эмпирическое правило, описанное в этой ссылке: notify WITH lock лучше использовать в ситуациях с более чем 2 потоками для более предсказуемых результатов.
v.oddou
6
@Michael: Насколько я понимаю, потребитель должен в конце концов позвонить the_condition_variable.wait(lock);. Если для синхронизации производителя и потребителя блокировка не требуется (скажем, базовая - это очередь spsc без блокировок), то эта блокировка не имеет смысла, если производитель ее не блокирует. Меня устраивает. Но разве нет риска для редкой расы? Если производитель не удерживает блокировку, не мог ли он вызвать notify_one, пока потребитель находится прямо перед ожиданием? Тогда потребитель ждет и не просыпается ...
Cookie
1
например, скажем, в приведенном выше коде находится потребитель, в std::cout << "Waiting... \n";то время как производитель делает cv.notify_one();, тогда сигнал пробуждения пропадает ... Или я что-то здесь упускаю?
Cookie
1
@Cookie. Да, там есть состояние гонки. См stackoverflow.com/questions/20982270/...
EH9
1
@ eh9: Черт, я только что нашел причину ошибки, из-за которой мой код время от времени зависал благодаря вашему комментарию. Это было из-за этого случая состояния гонки. Разблокировка мьютекса после уведомления полностью решила проблему ... Большое спасибо!
Galinette
10

Ситуация

Используя vc10 и Boost 1.56, я реализовал параллельную очередь примерно так, как предлагает это сообщение в блоге . Автор разблокирует мьютекс, чтобы минимизировать конфликт, т. notify_one()Е. Вызывается с разблокированным мьютексом:

void push(const T& item)
{
  std::unique_lock<std::mutex> mlock(mutex_);
  queue_.push(item);
  mlock.unlock();     // unlock before notificiation to minimize mutex contention
  cond_.notify_one(); // notify one waiting thread
}

Разблокировка мьютекса подтверждается примером в документации Boost :

void prepare_data_for_processing()
{
    retrieve_data();
    prepare_data();
    {
        boost::lock_guard<boost::mutex> lock(mut);
        data_ready=true;
    }
    cond.notify_one();
}

Проблема

Тем не менее это привело к следующему неустойчивому поведению:

  • пока notify_one()еще не был вызван, cond_.wait()может быть прерван черезboost::thread::interrupt()
  • когда-то впервые notify_one()был назван cond_.wait()тупик; ожидание не может быть закончено ни чем, boost::thread::interrupt()ни boost::condition_variable::notify_*()больше.

Решение

Удаление строки mlock.unlock()заставило код работать должным образом (уведомления и прерывания завершают ожидание). Обратите внимание, что notify_one()вызывается с заблокированным мьютексом, он разблокируется сразу после выхода из области видимости:

void push(const T& item)
{
  std::lock_guard<std::mutex> mlock(mutex_);
  queue_.push(item);
  cond_.notify_one(); // notify one waiting thread
}

Это означает, что, по крайней мере, с моей конкретной реализацией потока мьютекс не должен быть разблокирован перед вызовом boost::condition_variable::notify_one(), хотя оба способа кажутся правильными.

Маттеус Брандл
источник
Сообщали ли вы об этой проблеме Boost.Thread? Я не могу найти там похожую задачу svn.boost.org/trac/boost/…
magras
@magras К сожалению, я не знал, почему я не подумал об этом. И, к сожалению, мне не удается воспроизвести эту ошибку с помощью указанной очереди.
Маттеус Брандл,
Я не уверен, что понимаю, как раннее пробуждение может вызвать тупик. В частности, если вы вышли из cond_.wait () в pop () после того, как push () освободит мьютекс очереди, но до вызова notify_one () - Pop () должен увидеть, что очередь не пуста, и потреблять новую запись, а не ждать () ing. если вы вышли из cond_.wait (), пока push () обновляет очередь, блокировку следует удерживать с помощью push (), таким образом, pop () должен блокироваться в ожидании снятия блокировки. Любые другие ранние пробуждения будут удерживать блокировку, удерживая push () от изменения очереди до того, как pop () вызовет следующий wait (). Что я пропустил?
Кевин
5

Как указывали другие, вам не нужно удерживать блокировку при вызове notify_one()с точки зрения условий гонки и проблем, связанных с потоками. Однако в некоторых случаях может потребоваться удержание блокировки для предотвращения condition_variableразрушения перед notify_one()вызовом. Рассмотрим следующий пример:

thread t;

void foo() {
    std::mutex m;
    std::condition_variable cv;
    bool done = false;

    t = std::thread([&]() {
        {
            std::lock_guard<std::mutex> l(m);  // (1)
            done = true;  // (2)
        }  // (3)
        cv.notify_one();  // (4)
    });  // (5)

    std::unique_lock<std::mutex> lock(m);  // (6)
    cv.wait(lock, [&done]() { return done; });  // (7)
}

void main() {
    foo();  // (8)
    t.join();  // (9)
}

Предположим, что есть переключение контекста на вновь созданный поток tпосле того, как мы его создали, но до того, как мы начнем ждать переменной условия (где-то между (5) и (6)). Поток tполучает блокировку (1), устанавливает переменную предиката (2) и затем снимает блокировку (3). Предположим, что есть еще один переключатель контекста прямо в этот момент перед выполнением notify_one()(4). Основной поток получает блокировку (6) и выполняет строку (7), после чего предикат возвращается, trueи нет причин ждать, поэтому он снимает блокировку и продолжает работу. fooвозвращает (8), а переменные в его области видимости (включая cv) уничтожаются. Прежде чем поток tсможет присоединиться к основному потоку (9), он должен завершить свое выполнение, поэтому он продолжит выполнение с того места, где остановился.cv.notify_one()(4), после чего cvуже уничтожен!

Возможное исправление в этом случае - удерживать блокировку при вызове notify_one(т.е. удалить область действия, заканчивающуюся строкой (3)). Поступая таким образом, мы гарантируем, что tвызовы потока notify_oneдо этого cv.waitмогут проверить вновь установленную переменную предиката и продолжить, так как t для выполнения проверки потребуется блокировка, которая удерживается в данный момент. Итак, мы гарантируем, что cvпоток не получит доступа к нему tпосле fooвозврата.

Подводя итог, можно сказать, что проблема в этом конкретном случае на самом деле не в потоковой передаче, а во времени жизни переменных, захваченных по ссылке. cvзахватывается по ссылке через поток t, поэтому вы должны убедиться, что он cvостается активным на время выполнения потока. Другие примеры , представленные здесь , не страдают от этой проблемы, так condition_variableи mutexобъекты определяются в глобальном масштабе, поэтому они гарантированно не будет храниться в живых до выхода из программы.

кантунка
источник
1

@Michael Burr прав. condition_variable::notify_oneне требует блокировки переменной. Однако ничто не мешает вам использовать блокировку в этой ситуации, как показывает пример.

В данном примере блокировка вызвана одновременным использованием переменной i. Поскольку signalsпоток изменяет переменную, он должен гарантировать, что в это время к ней не будет обращаться никакой другой поток.

Блокировки используются в любой ситуации, требующей синхронизации , я не думаю, что мы можем сформулировать это в более общем виде.

Didierc
источник
конечно, но помимо этого их также необходимо использовать в сочетании с условными переменными, чтобы на самом деле работал весь шаблон. в частности, waitфункция условной переменной освобождает блокировку внутри вызова и возвращается только после повторного получения блокировки. после этого вы можете безопасно проверить свое состояние, потому что, скажем, вы приобрели «права чтения». если это все еще не то, чего вы ждете, вы вернетесь к wait. это шаблон. кстати, этот пример НЕ уважает его.
v.oddou
1

В некоторых случаях, когда cv может быть занято (заблокировано) другими потоками. Вам необходимо получить блокировку и освободить ее перед notify _ * ().
В противном случае notify _ * () может вообще не выполняться.

Фань Цзин
источник
1

Просто добавляю этот ответ, потому что я думаю, что принятый ответ может вводить в заблуждение. Во всех случаях вам нужно будет заблокировать мьютекс перед вызовом notify_one () где-нибудь, чтобы ваш код был потокобезопасным, хотя вы можете разблокировать его снова, прежде чем фактически вызывать notify _ * ().

Чтобы уточнить, вы ДОЛЖНЫ взять блокировку перед вводом wait (lk), потому что wait () разблокирует lk, и это будет Undefined Behavior, если блокировка не заблокирована. Это не относится к notify_one (), но вам нужно убедиться, что вы не вызываете notify _ * () перед вводом wait () и разблокировкой мьютекса этим вызовом; что, очевидно, можно сделать только заблокировав тот же мьютекс перед вызовом notify _ * ().

Например, рассмотрим следующий случай:

std::atomic_int count;
std::mutex cancel_mutex;
std::condition_variable cancel_cv;

void stop()
{
  if (count.fetch_sub(1) == -999) // Reached -1000 ?
    cv.notify_one();
}

bool start()
{
  if (count.fetch_add(1) >= 0)
    return true;
  // Failure.
  stop();
  return false;
}

void cancel()
{
  if (count.fetch_sub(1000) == 0)  // Reached -1000?
    return;
  // Wait till count reached -1000.
  std::unique_lock<std::mutex> lk(cancel_mutex);
  cancel_cv.wait(lk);
}

Предупреждение : этот код содержит ошибку.

Идея заключается в следующем: потоки вызывают start () и stop () попарно, но только до тех пор, пока start () возвращает true. Например:

if (start())
{
  // Do stuff
  stop();
}

Один (другой) поток в какой-то момент вызовет cancel () и после возврата из cancel () уничтожит объекты, которые необходимы в «Делать что-нибудь». Однако предполагается, что cancel () не будет возвращаться, пока есть потоки между start () и stop (), и после того, как cancel () выполнил свою первую строку, start () всегда будет возвращать false, поэтому новые потоки не будут вводить 'Do зона вещей.

Работает правильно?

Рассуждения таковы:

1) Если какой-либо поток успешно выполняет первую строку start () (и, следовательно, вернет истину), тогда ни один поток еще не выполнил первую строку cancel () (мы предполагаем, что общее количество потоков намного меньше 1000 на путь).

2) Кроме того, хотя поток успешно выполнил первую строку start (), но еще не первую строку stop (), невозможно, чтобы какой-либо поток успешно выполнил первую строку cancel () (обратите внимание, что только один поток когда-либо вызывает cancel ()): значение, возвращаемое fetch_sub (1000), будет больше 0.

3) После того, как поток выполнил первую строку cancel (), первая строка start () всегда будет возвращать false, а поток, вызывающий start (), больше не будет входить в область «Что делать».

4) Количество вызовов start () и stop () всегда сбалансировано, поэтому после неудачного выполнения первой строки cancel () всегда будет момент, когда (последний) вызов stop () вызывает счет чтобы достичь -1000 и, следовательно, вызвать notify_one (). Обратите внимание, что это может произойти только тогда, когда первая строка отмены привела к провалу этого потока.

Помимо проблемы голодания, когда так много потоков вызывают start () / stop (), что count никогда не достигает -1000 и cancel () никогда не возвращает, что можно принять как «маловероятно и никогда не длится долго», есть еще одна ошибка:

Вполне возможно, что внутри области «Что делать» есть один поток, допустим, он просто вызывает stop (); в этот момент поток выполняет первую строку cancel (), считывая значение 1 с помощью fetch_sub (1000) и пропуская. Но прежде чем он возьмет мьютекс и / или вызовет ожидание (lk), первый поток выполняет первую строку stop (), считывает -999 и вызывает cv.notify_one ()!

Затем этот вызов notify_one () выполняется ДО того, как мы ждем () в переменной условия! И программа зашла бы в тупик на неопределенное время.

По этой причине мы не сможем вызвать notify_one (), пока не вызовем wait (). Обратите внимание, что сила условной переменной заключается в том, что она может атомарно разблокировать мьютекс, проверить, произошел ли вызов notify_one (), и перейти в режим сна. Вы не можете обмануть его, но делать нужно держать мьютекс заблокирован всякий раз , когда вы вносите изменения в переменных , которые могут изменить состояние от ложного к истине и держать его взаперти при вызове notify_one () из - за условий гонки , как описано здесь.

Однако в этом примере нет условия. Почему я не использовал в качестве условия «count == -1000»? Потому что здесь это совсем не интересно: как только будет достигнуто значение -1000, мы уверены, что ни один новый поток не войдет в область «Что делать». Более того, потоки по-прежнему могут вызывать start () и увеличивать счетчик (до -999 и -998 и т. Д.), Но нас это не волнует. Единственное, что имеет значение, это то, что было достигнуто значение -1000, чтобы мы точно знали, что в области «Что делать» больше нет потоков. Мы уверены, что это так, когда вызывается notify_one (), но как убедиться, что мы не вызываем notify_one () до того, как cancel () заблокирует его мьютекс? Простая блокировка cancel_mutex незадолго до notify_one (), конечно, не поможет.

Проблема заключается в том, что, несмотря на то что мы не ждем условия, там еще есть условие, и нам нужно заблокировать мьютекс

1) до достижения этого условия 2) до вызова notify_one.

Таким образом, правильный код становится:

void stop()
{
  if (count.fetch_sub(1) == -999) // Reached -1000 ?
  {
    cancel_mutex.lock();
    cancel_mutex.unlock();
    cv.notify_one();
  }
}

[... то же начало () ...]

void cancel()
{
  std::unique_lock<std::mutex> lk(cancel_mutex);
  if (count.fetch_sub(1000) == 0)
    return;
  cancel_cv.wait(lk);
}

Конечно, это только один пример, но другие случаи очень похожи; почти во всех случаях, когда вы используете условную переменную, вам нужно будет заблокировать этот мьютекс (вскоре) перед вызовом notify_one (), иначе вы можете вызвать его перед вызовом wait ().

Обратите внимание, что в этом случае я разблокировал мьютекс до вызова notify_one (), потому что в противном случае существует (небольшая) вероятность, что вызов notify_one () разбудит поток, ожидающий переменной условия, который затем попытается принять мьютекс и block, прежде чем снова освободить мьютекс. Это немного медленнее, чем нужно.

Этот пример был особенным в том смысле, что строка, изменяющая условие, выполняется тем же потоком, который вызывает wait ().

Более обычным является случай, когда один поток просто ожидает, пока условие станет истинным, а другой поток берет блокировку перед изменением переменных, участвующих в этом условии (в результате чего, возможно, оно станет истинным). В этом случае мьютекс будет немедленно заблокирован до того (и после) условия сбылось - так это совершенно нормально , чтобы просто разблокировать мьютекс перед вызовом уведомит _ * () в этом случае.

Карло Вуд
источник