RabbitMQ / AMQP: singola coda, più utenti per lo stesso messaggio?

Sto solo iniziando a usare RabbitMQ e AMQP in generale.

  • Ho una coda di messaggi
  • Ho più consumatori, che mi piacerebbe fare cose diverse con lo stesso messaggio .

La maggior parte della documentazione di RabbitMQ sembra essere focalizzata su round-robin, ovvero dove un singolo messaggio viene consumato da un singolo consumatore, con il carico distribuito tra ogni consumatore. Questo è davvero il comportamento che io assisto.

Un esempio: il produttore ha una singola coda e invia messaggi ogni 2 secondi:

var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { var sendMessage = function(connection, queue_name, payload) { var encoded_payload = JSON.stringify(payload); connection.publish(queue_name, encoded_payload); } setInterval( function() { var test_message = 'TEST '+count sendMessage(connection, "my_queue_name", test_message) count += 1; }, 2000) }) 

Ed ecco un consumatore:

 var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); connection.on('ready', function () { connection.queue("my_queue_name", function(queue){ queue.bind('#'); queue.subscribe(function (message) { var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) }) 

Se avvio il cliente due volte, posso vedere che ogni consumatore sta consumando messaggi alternativi nel comportamento round-robin. Ad esempio, vedrò i messaggi 1, 3, 5 in un terminale, 2, 4, 6 nell’altro .

La mia domanda è:

  • Posso avere ogni consumatore ricevere gli stessi messaggi? Vale a dire, entrambi i consumatori ricevono il messaggio 1, 2, 3, 4, 5, 6? Come si chiama AMQP / RabbitMQ? Come è configurato normalmente?

  • Questo è fatto comunemente? Dovrei semplicemente fare in modo che lo scambio instradi il messaggio in due code separate, con un singolo consumatore, invece?

Posso avere ogni consumatore ricevere gli stessi messaggi? Vale a dire, entrambi i consumatori ricevono il messaggio 1, 2, 3, 4, 5, 6? Come si chiama AMQP / RabbitMQ? Come è configurato normalmente?

No, non se i consumatori sono sulla stessa coda. Dalla guida Concetti AMQP di RabbitMQ:

è importante capire che, in AMQP 0-9-1, i messaggi sono bilanciati tra i consumatori.

Ciò sembra implicare che il comportamento round-robin all’interno di una coda sia un dato e non configurabile. Ad esempio, sono necessarie code separate per poter gestire lo stesso ID messaggio da più utenti.

Questo è fatto comunemente? Dovrei semplicemente fare in modo che lo scambio instradi il messaggio in due code separate, con un singolo consumatore, invece?

No, non è, una singola coda / più utenti con ogni singolo utente che gestisce lo stesso ID messaggio non è ansible. Avere lo scambio instradare il messaggio in due code separate è davvero meglio.

Dato che non ho bisogno di un routing troppo complesso, lo scambio di fanout lo gestirà bene. Non mi sono concentrato troppo su Exchanges in precedenza poiché node-amqp ha il concetto di “scambio predefinito” che consente di pubblicare messaggi direttamente su una connessione, tuttavia la maggior parte dei messaggi AMQP viene pubblicata su uno specifico scambio.

Ecco il mio scambio fanout, sia di invio che di ricezione:

 var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) { var sendMessage = function(exchange, payload) { console.log('about to publish') var encoded_payload = JSON.stringify(payload); exchange.publish('', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log('Created queue') queue.bind(exchange, ''); queue.subscribe(function (message) { console.log('subscribed to queue') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) setInterval( function() { var test_message = 'TEST '+count sendMessage(exchange, test_message) count += 1; }, 2000) }) }) 

Basta leggere il tutorial di rabbitmq . Pubblichi un messaggio per lo scambio, non per fare la coda; viene quindi instradato alle code appropriate. Nel tuo caso, dovresti associare una coda separata per ciascun consumatore. In questo modo, possono consumare messaggi in modo completamente indipendente.

L’ultimo paio di risposte sono quasi corrette: ho un sacco di app che generano messaggi che devono finire con diversi consumatori, quindi il processo è molto semplice.

Se desideri che più utenti condividano lo stesso messaggio, procedi come segue.

Crea più code, una per ogni app che riceverà il messaggio, in ogni proprietà della coda, “associa” un tag di routing con lo scambio amq.direct. Cambia l’app di pubblicazione da inviare a amq.direct e usa il tag di instradamento (non una coda). AMQP copierà il messaggio in ogni coda con la stessa associazione. Funziona con un fascino 🙂

Esempio: Diciamo che ho una stringa JSON che genero, la pubblico allo scambio “amq.direct” usando il tag di routing “new-sales-order”, ho una coda per la mia app order_printer che stampa l’ordine, ho un coda per il mio sistema di fatturazione che invierà una copia dell’ordine e fatturerò il cliente e ho un sistema di archiviazione web dove archivo gli ordini per ragioni storiche / di conformità e ho un’interfaccia web client in cui gli ordini vengono tracciati mentre altre informazioni arrivano a circa un ordine.

Quindi le mie code sono: order_printer, order_billing, order_archive e order_tracking Tutti hanno il tag di binding “new-sales-order” vincolato a loro, tutti i 4 otterranno i dati JSON.

Questo è un modo ideale per inviare dati senza che l’app per la pubblicazione sappia o si interessi delle app di ricezione.

Il modello di invio è una relazione uno a uno. Se si desidera “inviare” a più di un ricevitore, si dovrebbe usare il modello pub / sub. Vedi http://www.rabbitmq.com/tutorials/tutorial-three-python.html per maggiori dettagli.

Sì, ogni utente può ricevere gli stessi messaggi. dare un’occhiata a http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http: //www.rabbitmq. com / tutorial / esercitazione-cinque python.html

per diversi modi di instradare i messaggi. So che sono per Python e Java ma è bello capire i principi, decidere cosa stai facendo e poi trovare come farlo in JS. Sembra che tu voglia fare un semplice fanout ( tutorial 3 ), che invia i messaggi a tutte le code connesse allo scambio.

La differenza con quello che stai facendo e ciò che vuoi fare è fondamentalmente che devi configurare e scambiare o digitare fanout. Gli esci di fan-out inviano tutti i messaggi a tutte le code connesse. Ogni coda avrà un consumatore che avrà accesso a tutti i messaggi separatamente.

Sì, questo è comunemente fatto, è una delle caratteristiche di AMPQ.

RabbitMQ / AMQP: coda singola, più utenti per lo stesso messaggio e l’aggiornamento della pagina.

 rabbit.on('ready', function () { }); sockjs_chat.on('connection', function (conn) { conn.on('data', function (message) { try { var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, '')); if (obj.header == "register") { // Connect to RabbitMQ try { conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); conn.q = rabbit.queue('my-queue-'+obj.agentID, { durable: false, autoDelete: false, exclusive: false }, function () { conn.channel = 'my-queue-'+obj.agentID; conn.q.bind(conn.exchange, conn.channel); conn.q.subscribe(function (message) { console.log("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } catch (err) { console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (obj.header == "typing") { var reply = { type: 'chatMsg', msg: utils.escp(obj.msga), visitorNick: obj.channel, customField1: '', time: utils.getDateTime(), channel: obj.channel }; conn.exchange.publish('my-queue-'+obj.agentID, reply); } } catch (err) { console.log("ERROR ----> " + err.stack); } }); // When the visitor closes or reloads a page we need to unbind from RabbitMQ? conn.on('close', function () { try { // Close the socket conn.close(); // Close RabbitMQ conn.q.unsubscribe(ctag[conn.channel]); } catch (er) { console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack); } }); }); 

Per ottenere il comportamento desiderato, è sufficiente che ogni consumatore consumi dalla propria coda. Dovrai utilizzare un tipo di scambio non diretto (topic, header, fanout) per ottenere il messaggio a tutte le code contemporaneamente.

Come valuto il tuo caso è:

  • Ho una coda di messaggi (la tua fonte per ricevere messaggi, lascia che la chiami q111)

  • Ho più consumatori, che mi piacerebbe fare cose diverse con lo stesso messaggio.

Il tuo problema qui è che mentre 3 messaggi vengono ricevuti da questa coda, il messaggio 1 viene consumato da un consumatore A, altri consumatori B e C consumano i messaggi 2 e 3. Dove hai bisogno di una configurazione in cui rabbitmq passa le stesse copie di tutti e tre i messaggi (1,2,3) a tutti e tre i consumatori connessi (A, B, C) contemporaneamente.

Mentre molte configurazioni possono essere fatte per raggiungere questo objective, un modo semplice è utilizzare il seguente concetto in due fasi:

  • Utilizzare un rabbitmq dinamico-shovel per raccogliere i messaggi dalla coda desiderata (q111) e pubblicare su uno scambio fanout (scambio esclusivamente creato e dedicato per questo scopo).
  • Ora riconfigura i tuoi consumatori A, B & C (che stavano ascoltando la coda (q111)) per ascoltare da questo scambio Fanout usando direttamente una coda esclusiva e anonima per ciascun consumatore.

Nota: durante l’uso di questo concetto non consumare direttamente dalla coda di origine (q111), poiché i messaggi già consumati non verranno spalati allo scambio Fanout.

Se pensi che questo non soddisfi il tuo esatto requisito … sentiti libero di postare i tuoi suggerimenti 🙂

Se ti capita di utilizzare la libreria amqplib come sono, hanno un pratico esempio di implementazione del tutorial RabbitMQ di pubblicazione / sottoscrizione che potresti trovare utile.

Penso che dovresti controllare l’invio dei tuoi messaggi usando lo scambiatore fan-out . In questo modo riceverai lo stesso messaggio per i diversi consumatori, sotto la tabella RabbitMQ sta creando code diverse per ognuno di questi nuovi consumatori / abbonati.

Questo è il link per vedere l’esempio del tutorial in javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html