Фонд
Давайте начнем с упрощенного примера и рассмотрим соответствующие части Boost.Asio:
void handle_async_receive(...) { ... }
void print() { ... }
...
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);
...
io_service.post(&print);
socket.connect(endpoint);
socket.async_receive(buffer, &handle_async_receive);
io_service.post(&print);
io_service.run();
Что такое хендлер ?
Обработчик не более чем на обратный вызов. В примере кода есть 3 обработчика:
print
Обработчик (1).
handle_async_receive
Обработчик (3).
print
Обработчик (4).
Несмотря на то print()
, что одна и та же функция используется дважды, считается, что каждое использование создает свой собственный уникально идентифицируемый обработчик. Обработчики могут быть разных форм и размеров, от базовых функций, подобных приведенным выше, до более сложных конструкций, таких как функторы, генерируемые из boost::bind()
лямбда-выражений и. Независимо от сложности, обработчик по-прежнему остается не более чем обратным вызовом.
Что такое работа ?
Работа - это некоторая обработка, которую Boost.Asio попросили выполнить от имени кода приложения. Иногда Boost.Asio может начать некоторую работу, как только ей об этом сообщили, а в других случаях может подождать, чтобы выполнить работу в более поздний момент времени. По завершении работы Boost.Asio проинформирует приложение, вызвав предоставленный обработчик .
Boost.Asio гарантирует , что обработчики будут работать только в потоке , который в настоящее время вызывающий run()
, run_one()
, poll()
или poll_one()
. Это потоки, которые будут работать и вызывать обработчики . Следовательно, в приведенном выше примере print()
не вызывается, когда он размещается в io_service
(1). Вместо этого он добавляется в io_service
и будет вызываться позже. В данном случае это в пределах io_service.run()
(5).
Что такое асинхронные операции?
Асинхронная операция создает работу и Boost.Asio будет вызывать обработчик информировать приложение , когда работа завершена. Асинхронные операции создаются путем вызова функции, имя которой имеет префикс async_
. Эти функции также известны как функции запуска .
Асинхронные операции можно разделить на три уникальных этапа:
- Инициирование или информирование связанных с
io_service
этим работ. async_receive
Операции (3) сообщает , io_service
что она будет необходимо асинхронно считывать данные из гнезда, а затем async_receive
возвращается немедленно.
- Выполнение реальной работы. В этом случае при получении
socket
данных байты будут прочитаны и скопированы в buffer
. Фактическая работа будет выполняться либо в:
- Инициирующая функция (3), если Boost.Asio может определить, что она не будет блокироваться.
- Когда приложение явно запускает
io_service
(5).
- Вызов
handle_async_receive
ReadHandler . Еще раз, обработчики вызываются только в потоках, в которых запущен io_service
. Таким образом, независимо от того, когда работа выполнена (3 или 5), гарантируется, что handle_async_receive()
она будет вызвана только в пределах io_service.run()
(5).
Разделение во времени и пространстве между этими тремя этапами известно как инверсия потока управления. Это одна из сложностей, затрудняющих асинхронное программирование. Однако есть методы, которые могут помочь смягчить это, например, с помощью сопрограмм .
Что делает io_service.run()
?
Когда поток вызывает io_service.run()
, работа и обработчики будут вызываться из этого потока. В приведенном выше примере io_service.run()
(5) будет блокироваться до тех пор, пока:
- Он был вызван и возвращен обоими
print
обработчиками, операция приема завершилась успешно или неудачно, а его handle_async_receive
обработчик был вызван и возвращен.
io_service
Явно остановлено через io_service::stop()
.
- Исключение генерируется из обработчика.
Один потенциальный псевдо-поток можно описать следующим образом:
создать io_service
создать сокет
добавить обработчик печати в io_service (1)
дождитесь подключения сокета (2)
добавить запрос асинхронной работы чтения в io_service (3)
добавить обработчик печати в io_service (4)
запустите io_service (5)
есть работа или обработчики?
да, есть 1 работа и 2 обработчика
у сокета есть данные? нет ничего не делай
запустить обработчик печати (1)
есть работа или обработчики?
да, есть 1 работа и 1 обработчик
у сокета есть данные? нет ничего не делай
запустить обработчик печати (4)
есть работа или обработчики?
да, есть 1 работа
у сокета есть данные? нет, продолжай ждать
- сокет получает данные -
сокет имеет данные, считываем их в буфер
добавить обработчик handle_async_receive в io_service
есть работа или обработчики?
да, есть 1 обработчик
запустить обработчик handle_async_receive (3)
есть работа или обработчики?
нет, установите io_service как остановленный и верните
Обратите внимание, как когда чтение закончилось, он добавил еще один обработчик в файлio_service
. Эта тонкая деталь - важная особенность асинхронного программирования. Это позволяет связывать обработчики вместе. Например, если handle_async_receive
не были получены все ожидаемые данные, то его реализация может отправить еще одну операцию асинхронного чтения, в результате чего io_service
потребуется больше работы и, следовательно, не будет возврата из нее io_service.run()
.
Обратите внимание, что, когда работа io_service
закончилась, приложение должно быть reset()
выполнено io_service
перед повторным запуском.
Пример вопроса и код примера 3a
Теперь давайте рассмотрим два фрагмента кода, упомянутые в вопросе.
Код вопроса
socket->async_receive
добавляет работу в io_service
. Таким образом, io_service->run()
будет блокироваться до тех пор, пока операция чтения не завершится успешно или с ошибкой и ClientReceiveEvent
либо не завершится, либо не выдаст исключение.
В надежде облегчить понимание, вот небольшой аннотированный Пример 3a:
void CalculateFib(std::size_t n);
int main()
{
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work =
boost::in_place(boost::ref(io_service));
boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
worker_threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service)
);
}
io_service.post(boost::bind(CalculateFib, 3));
io_service.post(boost::bind(CalculateFib, 4));
io_service.post(boost::bind(CalculateFib, 5));
work = boost::none;
worker_threads.join_all();
}
На высоком уровне программа создаст 2 потока, которые будут обрабатывать io_service
цикл событий (2). В результате получается простой пул потоков, который будет вычислять числа Фибоначчи (3).
Одно из основных различий между кодом вопроса и этим кодом заключается в том, что этот код вызывает io_service::run()
(2) перед фактической работой, а обработчики добавляются в io_service
(3). Чтобы предотвратить io_service::run()
немедленный возврат, создается io_service::work
объект (1). Этот объект предотвращает io_service
нехватку работы; следовательно, io_service::run()
не вернется в результате отсутствия работы.
Общий поток выглядит следующим образом:
- Создайте и добавьте
io_service::work
объект, добавленный в io_service
.
- Создан пул потоков, который вызывает
io_service::run()
. Эти рабочие потоки не вернутся из- io_service
за io_service::work
объекта.
- Добавьте 3 обработчика, которые вычисляют числа Фибоначчи
io_service
, и немедленно верните результат. Рабочие потоки, а не основной поток, могут немедленно запустить эти обработчики.
- Удалите
io_service::work
объект.
- Подождите, пока рабочие потоки закончат работу. Это произойдет только после того, как все 3 обработчика завершат выполнение, поскольку
io_service
ни у них нет ни обработчиков, ни работы.
Код может быть написан иначе, таким же образом, как и исходный код, где обработчики добавляются к io_service
, а затем io_service
обрабатывается цикл событий. Это устраняет необходимость использования io_service::work
и приводит к следующему коду:
int main()
{
boost::asio::io_service io_service;
io_service.post(boost::bind(CalculateFib, 3));
io_service.post(boost::bind(CalculateFib, 4));
io_service.post(boost::bind(CalculateFib, 5));
boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
worker_threads.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service)
);
}
worker_threads.join_all();
}
Синхронный против асинхронного
Хотя код в вопросе использует асинхронную операцию, он эффективно работает синхронно, поскольку ожидает завершения асинхронной операции:
socket.async_receive(buffer, handler)
io_service.run();
эквивалентно:
boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);
Как правило, старайтесь избегать смешивания синхронных и асинхронных операций. Часто это может превратить сложную систему в сложную систему. Этот ответ подчеркивает преимущества асинхронного программирования, некоторые из которых также описаны в документации Boost.Asio .
io_service.poll()
для обработки цикла событий без блокировки невыполненных операций. Основная рекомендация избегать смешивания синхронных и асинхронных операций - избегать добавления ненужной сложности и предотвращения плохой реакции, когда обработчикам требуется много времени для завершения. В некоторых случаях это безопасно, например, когда известно, что синхронная операция не заблокируется.run()
....» ? Если есть N потоков (которые вызвалиrun()
), то какой из них является «текущим»? Может быть много? Или вы имеете в виду, что поток, который завершил выполнениеasync_*()
(скажемasync_read
), также гарантированно вызовет свои обработчики?Чтобы упростить
run
процесс, представьте, что это сотрудник, который должен обрабатывать стопку бумаги; он берет один лист, делает то, что он говорит, выбрасывает лист и берет следующий; когда у него заканчиваются простыни, он уходит из офиса. На каждом листе может быть любая инструкция, даже добавление нового листа в стопку. Вернемся к asio: вы можете датьio_service
работе двумя способами, по существу: используяpost
ее, как в образце, который вы связали, или используя другие объекты, которые внутренне вызываютpost
методio_service
, например методыsocket
и егоasync_*
.источник