Запутался, когда метод запуска boost :: asio :: io_service блокирует / разблокирует

88

Поскольку я новичок в Boost.Asio, я смущен io_service::run(). Я был бы признателен, если бы кто-нибудь мог объяснить мне, когда этот метод блокирует / разблокирует. В документации говорится:

Эти run()функциональные блоки , пока вся работа не будет закончена , и больше нет обработчиков быть посланным, или до тех пор , io_serviceпока не прекратилось.

Несколько потоков могут вызывать run()функцию для создания пула потоков, из которых они io_serviceмогут выполнять обработчики. Все потоки, ожидающие в пуле, эквивалентны, и они io_serviceмогут выбрать любой из них для вызова обработчика.

Нормальный выход из run()функции означает, что io_serviceобъект остановлен ( stopped()функция возвращает истину). Последующие вызовы run(), run_one(), poll()или poll_one()сразу же возвращаются , если нет предшествующего вызов reset().

Что означает следующее утверждение?

[...] больше не нужно отправлять обработчики [...]


Пытаясь понять поведение io_service::run(), я наткнулся на этот пример (пример 3а). В нем я наблюдаю, что io_service->run()блоки и ждут заказов на работу.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

Однако в следующем коде, над которым я работал, клиент подключается с помощью TCP / IP, а метод выполнения блокируется до тех пор, пока данные не будут получены асинхронно.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Любое объяснение того, run()что описывает его поведение в двух приведенных ниже примерах, будет оценено.

MistyD
источник

Ответы:

234

Фонд

Давайте начнем с упрощенного примера и рассмотрим соответствующие части Boost.Asio:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

Что такое хендлер ?

Обработчик не более чем на обратный вызов. В примере кода есть 3 обработчика:

  • printОбработчик (1).
  • handle_async_receiveОбработчик (3).
  • printОбработчик (4).

Несмотря на то print(), что одна и та же функция используется дважды, считается, что каждое использование создает свой собственный уникально идентифицируемый обработчик. Обработчики могут быть разных форм и размеров, от базовых функций, подобных приведенным выше, до более сложных конструкций, таких как функторы, генерируемые из boost::bind()лямбда-выражений и. Независимо от сложности, обработчик по-прежнему остается не более чем обратным вызовом.

Что такое работа ?

Работа - это некоторая обработка, которую Boost.Asio попросили выполнить от имени кода приложения. Иногда Boost.Asio может начать некоторую работу, как только ей об этом сообщили, а в других случаях может подождать, чтобы выполнить работу в более поздний момент времени. По завершении работы Boost.Asio проинформирует приложение, вызвав предоставленный обработчик .

Boost.Asio гарантирует , что обработчики будут работать только в потоке , который в настоящее время вызывающий run(), run_one(), poll()или poll_one(). Это потоки, которые будут работать и вызывать обработчики . Следовательно, в приведенном выше примере print()не вызывается, когда он размещается в io_service(1). Вместо этого он добавляется в io_serviceи будет вызываться позже. В данном случае это в пределах io_service.run()(5).

Что такое асинхронные операции?

Асинхронная операция создает работу и Boost.Asio будет вызывать обработчик информировать приложение , когда работа завершена. Асинхронные операции создаются путем вызова функции, имя которой имеет префикс async_. Эти функции также известны как функции запуска .

Асинхронные операции можно разделить на три уникальных этапа:

  • Инициирование или информирование связанных с io_serviceэтим работ. async_receiveОперации (3) сообщает , io_serviceчто она будет необходимо асинхронно считывать данные из гнезда, а затем async_receiveвозвращается немедленно.
  • Выполнение реальной работы. В этом случае при получении socketданных байты будут прочитаны и скопированы в buffer. Фактическая работа будет выполняться либо в:
    • Инициирующая функция (3), если Boost.Asio может определить, что она не будет блокироваться.
    • Когда приложение явно запускает io_service(5).
  • Вызов handle_async_receive ReadHandler . Еще раз, обработчики вызываются только в потоках, в которых запущен io_service. Таким образом, независимо от того, когда работа выполнена (3 или 5), гарантируется, что handle_async_receive()она будет вызвана только в пределах io_service.run()(5).

Разделение во времени и пространстве между этими тремя этапами известно как инверсия потока управления. Это одна из сложностей, затрудняющих асинхронное программирование. Однако есть методы, которые могут помочь смягчить это, например, с помощью сопрограмм .

Что делает io_service.run()?

Когда поток вызывает io_service.run(), работа и обработчики будут вызываться из этого потока. В приведенном выше примере io_service.run()(5) будет блокироваться до тех пор, пока:

  • Он был вызван и возвращен обоими printобработчиками, операция приема завершилась успешно или неудачно, а его handle_async_receiveобработчик был вызван и возвращен.
  • io_serviceЯвно остановлено через io_service::stop().
  • Исключение генерируется из обработчика.

Один потенциальный псевдо-поток можно описать следующим образом:

создать io_service
создать сокет
добавить обработчик печати в io_service (1)
дождитесь подключения сокета (2)
добавить запрос асинхронной работы чтения в io_service (3)
добавить обработчик печати в io_service (4)
запустите io_service (5)
  есть работа или обработчики?
    да, есть 1 работа и 2 обработчика
      у сокета есть данные? нет ничего не делай
      запустить обработчик печати (1)
  есть работа или обработчики?
    да, есть 1 работа и 1 обработчик
      у сокета есть данные? нет ничего не делай
      запустить обработчик печати (4)
  есть работа или обработчики?
    да, есть 1 работа
      у сокета есть данные? нет, продолжай ждать
  - сокет получает данные -
      сокет имеет данные, считываем их в буфер
      добавить обработчик handle_async_receive в io_service
  есть работа или обработчики?
    да, есть 1 обработчик
      запустить обработчик handle_async_receive (3)
  есть работа или обработчики?
    нет, установите io_service как остановленный и верните

Обратите внимание, как когда чтение закончилось, он добавил еще один обработчик в файлio_service . Эта тонкая деталь - важная особенность асинхронного программирования. Это позволяет связывать обработчики вместе. Например, если handle_async_receiveне были получены все ожидаемые данные, то его реализация может отправить еще одну операцию асинхронного чтения, в результате чего io_serviceпотребуется больше работы и, следовательно, не будет возврата из нее io_service.run().

Обратите внимание, что, когда работа io_serviceзакончилась, приложение должно быть reset()выполнено io_serviceперед повторным запуском.


Пример вопроса и код примера 3a

Теперь давайте рассмотрим два фрагмента кода, упомянутые в вопросе.

Код вопроса

socket->async_receiveдобавляет работу в io_service. Таким образом, io_service->run()будет блокироваться до тех пор, пока операция чтения не завершится успешно или с ошибкой и ClientReceiveEventлибо не завершится, либо не выдаст исключение.

Пример 3a Код

В надежде облегчить понимание, вот небольшой аннотированный Пример 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

На высоком уровне программа создаст 2 потока, которые будут обрабатывать io_serviceцикл событий (2). В результате получается простой пул потоков, который будет вычислять числа Фибоначчи (3).

Одно из основных различий между кодом вопроса и этим кодом заключается в том, что этот код вызывает io_service::run()(2) перед фактической работой, а обработчики добавляются в io_service(3). Чтобы предотвратить io_service::run()немедленный возврат, создается io_service::workобъект (1). Этот объект предотвращает io_serviceнехватку работы; следовательно, io_service::run()не вернется в результате отсутствия работы.

Общий поток выглядит следующим образом:

  1. Создайте и добавьте io_service::workобъект, добавленный в io_service.
  2. Создан пул потоков, который вызывает io_service::run(). Эти рабочие потоки не вернутся из- io_serviceза io_service::workобъекта.
  3. Добавьте 3 обработчика, которые вычисляют числа Фибоначчи io_service, и немедленно верните результат. Рабочие потоки, а не основной поток, могут немедленно запустить эти обработчики.
  4. Удалите io_service::workобъект.
  5. Подождите, пока рабочие потоки закончат работу. Это произойдет только после того, как все 3 обработчика завершат выполнение, поскольку io_serviceни у них нет ни обработчиков, ни работы.

Код может быть написан иначе, таким же образом, как и исходный код, где обработчики добавляются к io_service, а затем io_serviceобрабатывается цикл событий. Это устраняет необходимость использования io_service::workи приводит к следующему коду:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Синхронный против асинхронного

Хотя код в вопросе использует асинхронную операцию, он эффективно работает синхронно, поскольку ожидает завершения асинхронной операции:

socket.async_receive(buffer, handler)
io_service.run();

эквивалентно:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Как правило, старайтесь избегать смешивания синхронных и асинхронных операций. Часто это может превратить сложную систему в сложную систему. Этот ответ подчеркивает преимущества асинхронного программирования, некоторые из которых также описаны в документации Boost.Asio .

Таннер Сэнсбери
источник
13
Классный пост. Я хотел бы добавить только одну вещь, потому что я чувствую, что этому не уделяется достаточно внимания: после возврата run () вам нужно вызвать reset () в вашем io_service, прежде чем вы сможете снова запустить () его. В противном случае он может вернуться мгновенно независимо от того, ожидают ли операции async_ или нет.
DeVadder
Откуда берется буфер? Что это такое?
ruipacheco
Я все еще в замешательстве. Если смешивание - это синхронизация, а асинхронный режим не рекомендуется, то что такое чистый асинхронный режим? вы можете привести пример кода без io_service.run () ;?
Splash
@Splash Можно использовать io_service.poll()для обработки цикла событий без блокировки невыполненных операций. Основная рекомендация избегать смешивания синхронных и асинхронных операций - избегать добавления ненужной сложности и предотвращения плохой реакции, когда обработчикам требуется много времени для завершения. В некоторых случаях это безопасно, например, когда известно, что синхронная операция не заблокируется.
Таннер Сэнсбери,
Что вы подразумеваете под «в настоящее время» в «Boost.Asio гарантирует, что обработчики будут выполняться только в потоке, который в данный момент вызываетrun() ....» ? Если есть N потоков (которые вызвали run()), то какой из них является «текущим»? Может быть много? Или вы имеете в виду, что поток, который завершил выполнение async_*()(скажем async_read), также гарантированно вызовет свои обработчики?
Nawaz
18

Чтобы упростить runпроцесс, представьте, что это сотрудник, который должен обрабатывать стопку бумаги; он берет один лист, делает то, что он говорит, выбрасывает лист и берет следующий; когда у него заканчиваются простыни, он уходит из офиса. На каждом листе может быть любая инструкция, даже добавление нового листа в стопку. Вернемся к asio: вы можете дать io_serviceработе двумя способами, по существу: используя postее, как в образце, который вы связали, или используя другие объекты, которые внутренне вызывают postметод io_service, например методы socketи его async_*.

Логхорн
источник