C ++ 0x не имеет семафоров? Как синхронизировать потоки?

135

Правда ли, что C ++ 0x придет без семафоров? Уже есть несколько вопросов о переполнении стека относительно использования семафоров. Я использую их (семафоры posix) все время, чтобы позволить потоку ожидать какого-то события в другом потоке:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Если бы я сделал это с мьютексом:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Проблема: Это уродливо и не гарантируется, что thread1 сначала блокирует мьютекс (учитывая, что тот же поток должен блокировать и разблокировать мьютекс, вы также не можете заблокировать event1 до запуска thread0 и thread1).

Итак, поскольку у boost тоже нет семафоров, каков самый простой способ достичь вышеуказанного?

Торан
источник
Может быть, использовать условия mutex и std :: обещание и std :: future?
Ив

Ответы:

179

Вы можете легко создать его из мьютекса и условной переменной:

#include <mutex>
#include <condition_variable>

class semaphore
{
private:
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void notify() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_wait() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Максим Егорушкин
источник
96
кто-то должен представить предложение
7
комментарий, который озадачил меня изначально - это блокировка в ожидании, можно спросить, как поток может пройти мимо уведомления, если блокировка удерживается ожиданием? несколько плохо задокументированный ответ заключается в том, что condition_variable.wait запускает блокировку, позволяя другому потоку проходить уведомления атомарным способом, по крайней мере, я так понимаю
31
Он был преднамеренно исключен из Boost на том основании, что семафор - это слишком большая веревка для программистов. Условные переменные, предположительно, более управляемы. Я вижу их точку зрения, но чувствую себя немного покровительственно. Я предполагаю, что та же логика применима к C ++ 11 - программисты должны писать свои программы таким образом, чтобы «естественно» использовать condvars или другие одобренные методы синхронизации. Поставка семафора будет работать против этого независимо от того, реализован он поверх condvar или изначально.
Стив Джессоп
5
Примечание. См. En.wikipedia.org/wiki/Spurious_wakeup для обоснования while(!count_)цикла.
Дан Ниссенбаум
3
@ Максим, прости, я не думаю, что ты прав. sem_wait и sem_post используют только системные вызовы на предмет конкуренции (см. sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ), поэтому код здесь дублирует реализацию libc с потенциальными ошибками. Если вы планируете переносимость в любой системе, это может быть решением, но если вам нужна только совместимость с Posix, используйте семафор Posix.
xryl669
107

Исходя из ответа Максима Егорушкина , я попытался сделать пример в стиле C ++ 11.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Цунео Йошиока
источник
34
Вы можете сделать wait () также трехслойным:cv.wait(lck, [this]() { return count > 0; });
Domi
2
Добавление другого класса в духе lock_guard также полезно. В режиме RAII конструктор, который принимает семафор в качестве ссылки, вызывает вызов wait () семафора, а деструктор вызывает его вызов notify (). Это предотвращает сбой освобождения семафора исключениями.
Джим Хунзикер
нет мертвой блокировки, если, скажем, N потоков вызвало wait () и count == 0, то cv.notify_one (); никогда не вызывается, так как MTX не выпустил?
Марчелло
1
@Marcello Ожидающие потоки не удерживают блокировку. Весь смысл условных переменных состоит в том, чтобы обеспечить элементарную операцию «разблокировки и ожидания».
Дэвид Шварц
3
Вы должны снять блокировку перед вызовом notify_one (), чтобы избежать немедленной блокировки пробуждения ... см. Здесь: en.cppreference.com/w/cpp/thread/condition_variable/notify_all
kylefinn
38

Я решил написать самый надежный / универсальный семафор C ++ 11, какой только мог, в стиле стандарта, насколько я мог (заметьте using semaphore = ..., вы обычно просто используете имя, semaphoreпохожее на обычное использование stringnot basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
Дэвид
источник
Это работает, с незначительным редактированием. В wait_forи wait_untilвызовы метода предиката возвращают логическое значение ( а не `станд :: cv_status).
JDKNIGHT
Извините, что так поздно в игре. std::size_tбез знака, поэтому уменьшение его ниже нуля равно UB, и всегда будет >= 0. ИМХО countдолжно быть int.
Ричард Ходжес
3
@RichardHodges нет способа уменьшить на ноль ниже нуля, так что нет проблем, и что будет означать отрицательный счет для семафора? Это даже не имеет смысла ИМО.
Дэвид
1
@David Что делать, если поток должен был ждать других, чтобы инициализировать вещи? например, 1 поток чтения, ожидающий 4 потока, я бы вызвал конструктор семафора с -3, чтобы заставить поток чтения ждать, пока все другие темы не создали сообщение. Я думаю, есть другие способы сделать это, но разве это не разумно? Я думаю, что на самом деле это вопрос, который задает ОП, но с большим количеством "thread1".
Jmmut
2
@RichardHodges быть очень педантичным, уменьшение целого типа без знака ниже 0 не является UB.
jcai
15

в соответствии с семафорами posix, я бы добавил

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

И я предпочитаю использовать механизм синхронизации на удобном уровне абстракции, а не всегда копировать, вставляя сшитую версию, используя более простые операторы.

Михаил Зиллич
источник
9

Вы также можете проверить cpp11-on-multicore - он обладает переносимой и оптимальной реализацией семафора.

Хранилище также содержит другие полезные свойства потоков, которые дополняют потоки c ++ 11.

onqtam
источник
8

Вы можете работать с мьютексом и условными переменными. Вы получаете эксклюзивный доступ с мьютексом, проверьте, хотите ли вы продолжить или нужно ждать другого конца. Если вам нужно ждать, вы ждете в состоянии. Когда другой поток определяет, что вы можете продолжить, он сигнализирует об этом условии.

В библиотеке boost :: thread есть короткий пример, который вы, скорее всего, можете просто скопировать (библиотеки C ++ 0x и Boost очень похожи).

Дэвид Родригес - дрибеи
источник
Состояние сигнализирует только ожидающим потокам или нет? Так что, если thread0 там не ждет, когда thread1 сигнализирует, это будет заблокировано позже? Плюс: мне не нужна дополнительная блокировка, которая идет с условием - это накладные расходы.
Тауран
Да, условие только сигнализирует ожидающие потоки. Распространенным шаблоном является наличие переменной с состоянием и условием на случай ожидания. Подумайте о производителе / ​​потребителе, будет подсчет элементов в буфере, производитель блокирует, добавляет элемент, увеличивает счет и сигналы. Потребитель блокирует, проверяет счетчик и, если не ноль потребляет, тогда как если ноль ждет в условии.
Дэвид Родригес - dribeas
2
Вы можете смоделировать семафор следующим образом: Инициализируйте переменную значением, которое вы дадите семафору, затем wait()преобразуйте в «блокировку, проверьте счетчик, если ненулевое уменьшение, и продолжите; если нулевое ожидание при условии», в то время как postбудет «блокировка, счетчик приращений, сигнализируйте, если это было 0 "
Дэвид Родригес - dribeas
Да, звучит хорошо. Интересно, реализованы ли семафоры posix таким же образом?
Тауран
@tauran: я точно не знаю (и это может зависеть от того, какая ОС Posix), но я думаю, что это маловероятно. Семафоры традиционно являются низкоуровневым примитивом синхронизации по сравнению с мьютексами и условными переменными, и в принципе их можно сделать более эффективными, чем если бы они были реализованы поверх condvar. Таким образом, более вероятно, что в данной ОС все примитивы синхронизации на уровне пользователя построены поверх некоторых общих инструментов, которые взаимодействуют с планировщиком.
Стив Джессоп
3

Также может быть полезна обертка семафора RAII в потоках:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Пример использования в многопоточном приложении:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
slasla
источник
3

C ++ 20 наконец-то будет иметь семафоры - std::counting_semaphore<max_count> .

Они будут иметь (как минимум) следующие методы:

  • acquire() (Блокировка)
  • try_acquire() (неблокирует, немедленно возвращает)
  • try_acquire_for() (неблокирование, длительность)
  • try_acquire_until() (неблокирование, требуется время, чтобы прекратить попытки)
  • release()

Это еще не указано в cppreference, но вы можете прочитать эти слайды презентации CppCon 2019 или посмотреть видео . Также есть официальное предложение P0514R4 , но я не уверен, что это самая актуальная версия.

einpoklum
источник
2

Я нашел, что shared_ptr и weak_ptr, длинный со списком, выполнил работу, в которой я нуждался. Моя проблема заключалась в том, что у меня было несколько клиентов, желающих взаимодействовать с внутренними данными хоста. Обычно хост обновляет данные самостоятельно, однако, если клиент запрашивает его, хост должен прекратить обновление, пока клиенты не получат доступ к данным хоста. В то же время клиент может запросить монопольный доступ, чтобы ни другие клиенты, ни хост не могли изменить эти данные хоста.

Как я это сделал, я создал структуру:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

У каждого клиента будет такой член:

UpdateLock::ptr m_myLock;

Тогда у хоста будет элемент weak_ptr для исключительности и список слабых_приятий для неисключительных блокировок:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Существует функция для включения блокировки и другая функция для проверки, заблокирован ли хост:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Я проверяю блокировки в LockUpdate, IsUpdateLocked и периодически в процедуре обновления хоста. Тестирование на блокировку так же просто, как проверка, истек ли срок действия weak_ptr, и удаление всех просроченных из списка m_locks (я делаю это только во время обновления хоста), я могу проверить, пуст ли список; в то же время, я получаю автоматическую разблокировку, когда клиент сбрасывает shared_ptr, за который он висит, что также происходит, когда клиент автоматически уничтожается.

Общий эффект заключается в том, что клиенты редко нуждаются в эксклюзивности (обычно зарезервированы только для добавления и удаления), большую часть времени запрос к LockUpdate (false), то есть неисключительный, выполняется до тех пор, пока (! M_exclusiveLock). И LockUpdate (true), запрос на эксклюзивность, успешно выполняется только тогда, когда оба (! M_exclusiveLock) и (m_locks.empty ()).

Можно было бы добавить очередь для смягчения исключительных и неисключительных блокировок, однако у меня до сих пор не было коллизий, поэтому я собираюсь подождать, пока это произойдет, чтобы добавить решение (в основном, поэтому у меня есть условия тестирования в реальном мире).

Пока это хорошо работает для моих нужд; Я могу представить необходимость его расширения и некоторые проблемы, которые могут возникнуть при расширенном использовании, однако, это было быстро реализовано и требовало очень мало пользовательского кода.

Kit10
источник
-4

В случае, если кто-то заинтересован в атомарной версии, вот реализация. Производительность ожидается лучше, чем версия переменной mutex & condition.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Джефри
источник
4
Я ожидаю, что производительность будет намного хуже. Этот код делает практически буквально каждую возможную ошибку. В качестве наиболее очевидного примера, предположим, что waitкод должен повторяться несколько раз. Когда он, наконец, разблокируется, он возьмет на себя мать всех непредсказуемых ветвей, так как предсказание цикла ЦП, безусловно, предскажет, что оно будет повторяться снова. Я мог бы перечислить еще много проблем с этим кодом.
Дэвид Шварц
1
Вот еще один очевидный waitфактор снижения производительности: цикл будет потреблять ресурсы микропроцессора ЦП по мере его вращения. Предположим, что он находится в том же физическом ядре, что и предполагаемый notifyему поток - он ужасно замедлит этот поток.
Дэвид Шварц
1
И вот еще один: на x86-процессорах (самых популярных на сегодняшний день процессорах) операция compare_exchange_weak всегда является операцией записи, даже если она завершается неудачно (она записывает то же значение, которое читала при сбое сравнения). Итак, предположим, что два ядра находятся в waitцикле для одного и того же семафора. Они оба пишут на полной скорости в одну и ту же строку кэша, что может замедлить сканирование других ядер путем насыщения межъядерных шин.
Дэвид Шварц
@DavidSchwartz Рад видеть ваши комментарии. Не уверен, что понимаю часть "... предсказание цикла процессора ...". Договорились 2-го. Очевидно, что ваш третий случай может произойти, но если сравнить мьютекс, который вызывает переключение режима пользователя с переключателем режима ядра и системным вызовом, синхронизация между ядрами не хуже.
Джеффри
1
Не существует такого понятия, как семафор без блокировки. Вся идея быть свободной от блокировок состоит не в том, чтобы писать код без использования мьютексов, а в том, чтобы писать код, когда поток вообще никогда не блокируется. В этом случае суть семафора заключается в блокировке потоков, которые вызывают функцию wait ()!
Карло Вуд