Я только начинаю использовать RabbitMQ и AMQP в целом.
- У меня очередь сообщений
- У меня несколько потребителей, которым я хотел бы делать разные вещи с одним и тем же сообщением .
Большая часть документации RabbitMQ, по-видимому, ориентирована на циклический перебор, то есть когда отдельное сообщение используется одним потребителем, а нагрузка распределяется между каждым потребителем. Я действительно наблюдаю такое поведение.
Пример: у производителя одна очередь, и он отправляет сообщения каждые 2 секунды:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;
connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}
setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)
})
А вот и потребитель:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})
Если я запускаю потребителя дважды, я вижу, что каждый потребитель использует альтернативные сообщения в циклическом режиме. Например, я увижу сообщения 1, 3, 5 в одном терминале, 2, 4, 6 в другом .
У меня вопрос:
Могу ли я сделать так, чтобы все потребители получали одни и те же сообщения? Т.е. оба потребителя получают сообщения 1, 2, 3, 4, 5, 6? Как это называется на языке AMQP / RabbitMQ? Как обычно настраивается?
Это обычно делается? Должен ли я просто передать сообщение в две отдельные очереди с одним потребителем?
Ответы:
Могу ли я сделать так, чтобы все потребители получали одни и те же сообщения? Т.е. оба потребителя получают сообщения 1, 2, 3, 4, 5, 6? Как это называется на языке AMQP / RabbitMQ? Как обычно настраивается?
Нет, если потребители находятся в одной очереди. Из руководства по концепциям AMQP RabbitMQ :
Похоже, это подразумевает, что циклическое поведение в очереди является заданным и не настраивается. Т.е. для того, чтобы один и тот же идентификатор сообщения обрабатывался несколькими потребителями, требуются отдельные очереди.
Это обычно делается? Должен ли я просто перенаправить сообщение в две отдельные очереди с одним потребителем?
Нет, это не так, одна очередь / несколько потребителей, каждый из которых обрабатывает один и тот же идентификатор сообщения, невозможен. Действительно, лучше, если обмен направит сообщение в две отдельные очереди.
Поскольку мне не требуется слишком сложная маршрутизация, с этим справится обмен по разветвителю. Раньше я не уделял слишком много внимания обменам, так как в node-amqp есть концепция «обмена по умолчанию», позволяющая публиковать сообщения в соединение напрямую, однако большинство сообщений AMQP публикуются для конкретного обмена.
Вот мой обмен фэнаутом, отправляющий и получающий:
var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) { var sendMessage = function(exchange, payload) { console.log('about to publish') var encoded_payload = JSON.stringify(payload); exchange.publish('', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log('Created queue') queue.bind(exchange, ''); queue.subscribe(function (message) { console.log('subscribed to queue') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) setInterval( function() { var test_message = 'TEST '+count sendMessage(exchange, test_message) count += 1; }, 2000) }) })
источник
int prefetchCount = 1; channel.basicQos(prefetchCount);
Это позволит каждому потребителю получить сообщение, как только оно закончит с предыдущим. Вместо получения чередующихся сообщений. Опять же, не решает вашу проблему, но может быть полезно знать людям. пример здесь http://www.rabbitmq.com/tutorials/tutorial-two-java.html в разделе Fair DispatchПросто прочтите руководство rabbitmq . Вы публикуете сообщение для обмена, а не в очередь; затем он направляется в соответствующие очереди. В вашем случае вам следует привязать отдельную очередь для каждого потребителя. Таким образом, они могут получать сообщения совершенно независимо.
источник
Последние несколько ответов почти верны - у меня есть множество приложений, которые генерируют сообщения, которые должны поступать к разным потребителям, поэтому процесс очень прост.
Если вы хотите, чтобы несколько потребителей получали одно и то же сообщение, выполните следующую процедуру.
Создайте несколько очередей, по одной для каждого приложения, которое должно получить сообщение, в свойствах каждой очереди «привязать» тег маршрутизации к обмену amq.direct. Измените приложение публикации, чтобы отправить его в amq.direct и использовать тег маршрутизации (а не очередь). AMQP затем скопирует сообщение в каждую очередь с той же привязкой. Работает как шарм :)
Пример. Допустим, у меня есть генерируемая мной строка JSON, я публикую ее в обмене amq.direct с помощью тега маршрутизации «new-sales-order», у меня есть очередь для моего приложения order_printer, которое печатает заказ, у меня есть очередь для моей биллинговой системы, которая отправит копию заказа и выставит счет клиенту, и у меня есть система веб-архива, где я архивирую заказы по историческим причинам / причинам соответствия, и у меня есть клиентский веб-интерфейс, где заказы отслеживаются по мере поступления другой информации Заказ.
Итак, мои очереди: order_printer, order_billing, order_archive и order_tracking. Все они имеют привязанный тег «new-sales-order», все 4 получат данные JSON.
Это идеальный способ отправлять данные, не зная и не заботясь о принимающих приложениях.
источник
Да, каждый потребитель может получать одни и те же сообщения. загляните на http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http: //www.rabbitmq. com / tutorials / tutorial-five-python.html
для различных способов маршрутизации сообщений. Я знаю, что они предназначены для python и java, но полезно понять принципы, решить, что вы делаете, а затем найти, как это сделать в JS. Похоже, вы хотите сделать простое разветвление ( учебник 3 ), которое отправляет сообщения во все очереди, подключенные к обмену.
Разница в том, что вы делаете и что хотите делать, в основном в том, что вы собираетесь настроить и обменять или набрать разветвление. Обмены Fanout отправляют все сообщения во все подключенные очереди. У каждой очереди будет потребитель, который будет иметь доступ ко всем сообщениям отдельно.
Да, это обычно делается, это одна из особенностей AMPQ.
источник
Шаблон отправки - это отношение «один к одному». Если вы хотите «послать» более чем одному получателю, вы должны использовать шаблон pub / sub. См. Http://www.rabbitmq.com/tutorials/tutorial-three-python.html для получения дополнительных сведений.
источник
RabbitMQ / AMQP: одна очередь, несколько потребителей для одного и того же сообщения и обновления страницы.
rabbit.on('ready', function () { }); sockjs_chat.on('connection', function (conn) { conn.on('data', function (message) { try { var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, '')); if (obj.header == "register") { // Connect to RabbitMQ try { conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); conn.q = rabbit.queue('my-queue-'+obj.agentID, { durable: false, autoDelete: false, exclusive: false }, function () { conn.channel = 'my-queue-'+obj.agentID; conn.q.bind(conn.exchange, conn.channel); conn.q.subscribe(function (message) { console.log("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } catch (err) { console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (obj.header == "typing") { var reply = { type: 'chatMsg', msg: utils.escp(obj.msga), visitorNick: obj.channel, customField1: '', time: utils.getDateTime(), channel: obj.channel }; conn.exchange.publish('my-queue-'+obj.agentID, reply); } } catch (err) { console.log("ERROR ----> " + err.stack); } }); // When the visitor closes or reloads a page we need to unbind from RabbitMQ? conn.on('close', function () { try { // Close the socket conn.close(); // Close RabbitMQ conn.q.unsubscribe(ctag[conn.channel]); } catch (er) { console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack); } }); });
источник
Чтобы получить желаемое поведение, просто попросите каждого потребителя использовать его из собственной очереди. Вам нужно будет использовать тип непрямого обмена (тема, заголовок, разветвление), чтобы отправить сообщение сразу во все очереди.
источник
Насколько я понимаю, ваш случай:
У меня есть очередь сообщений (ваш источник получения сообщений, назовите его q111)
У меня несколько потребителей, которым я хотел бы делать разные вещи с одним и тем же сообщением.
Ваша проблема здесь в том, что пока эта очередь получает 3 сообщения, сообщение 1 используется потребителем A, другие потребители B и C потребляют сообщения 2 и 3. Если вам нужна настройка, в которой rabbitmq передает одни и те же копии все эти три сообщения (1,2,3) всем трем подключенным потребителям (A, B, C) одновременно.
Хотя для этого можно выполнить множество конфигураций, самым простым способом является использование следующей концепции из двух шагов:
Примечание: при использовании этой концепции не потребляйте напрямую из исходной очереди (q111), поскольку уже использованные сообщения не будут отправлены на ваш обмен Fanout.
Если вы считаете, что это не соответствует вашим требованиям ... не стесняйтесь размещать свои предложения :-)
источник
Я думаю, вам стоит проверить отправку сообщений через разветвительный обменник. Таким образом, вы получите одно и то же сообщение для разных потребителей, под таблицей RabbitMQ создает разные очереди для каждого из этих новых потребителей / подписчиков.
Это ссылка для просмотра примера учебника в javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
источник
Если вы, как и я, используете библиотеку amqplib , у них есть удобный пример реализации учебника Publish / Subscribe RabbitMQ, который может вам пригодиться.
источник
Развернуть все, что вы хотели.
fanout
прочтите руководство rabbitMQ: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
вот мой пример:
Publisher.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => { if (error0) { throw error0; } console.log('RabbitMQ connected') try { // Create exchange for queues channel = await connection.createChannel() await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false }); await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg')) } catch(error) { console.error(error) } })
Subscriber.js:
amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => { if (error0) { throw error0; } console.log('RabbitMQ connected') try { // Create/Bind a consumer queue for an exchange broker channel = await connection.createChannel() await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false }); const queue = await channel.assertQueue('', {exclusive: true}) channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '') console.log(" [*] Waiting for messages in %s. To exit press CTRL+C"); channel.consume('', consumeMessage, {noAck: true}); } catch(error) { console.error(error) } });
вот пример, который я нашел в Интернете. может также может помочь. https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange
источник
В этом сценарии есть один интересный вариант, который я не нашел здесь в ответах.
Вы можете накапливать сообщения с функцией "повторной постановки" в одном потребителе, чтобы обрабатывать их другим. Вообще говоря, это неправильный путь, но, может быть, для кого-то он подойдет.
https://www.rabbitmq.com/nack.html
И остерегайтесь петель (когда все потребители не отправляют сообщение + повторное сообщение)!
источник