1. 目标
本次使用NodeJs创建两个应用,一个应用负责发送消息,一个应用负责接收消息。 案例来源于官方文档 本例使用 NodeJS v12.17.0(x64)
2. 创建NodeJS项目
cd g:\
mkdir node_rabbitmq
cd node_rabbitmq
npm init -y
npm i amqplib -S
NodeJS 使用 amqplib库 完成与RabbitMQ服务之间的通信
3. 发送方(生产者)
在根目录下创建 send.js
- 引入 amqplib/callback_api 库
- 准备连接信息,包含 RabbitMQ服务器IP,端口,用户名,密码,virtual host
- 建立连接
- 连接成功后创建 channel
- channel 创建成功后,使用channel声明 队列,然后发送数据。
- 发送的数据是存放在NodeJS的 Buffer对象中的。
const amqp = require('amqplib/callback_api')
const serverConfig = {
protocol: 'amqp',
hostname: '192.168.1.201',
port: 5672,
username: 'rabbitmq',
password: 'rabbitmq',
vhost: '/electron'
}
// 与服务器连接成功后,执行回调
amqp.connect(serverConfig, (e, connection) => {
if (e) {
console.log('create connection error:', e)
throw e
}
// 创建channel,创建成功后,执行回调
connection.createChannel((e, channel) => {
if (e) {
console.log('create channel error:', e)
throw error1
}
// 队列名称
var queue = 'mq_hello'
// 要发送的消息
var msg = 'Hello World!'
// 声明队列
channel.assertQueue(queue, {
durable: false // 队列不进行持久化
})
// 发送消息
channel.sendToQueue(queue, Buffer.from(msg))
console.log(' [x] Sent %s,按 ctrl+c 退出程序', msg)
})
})
process.openStdin()
4. 接收方(消费者)
在根目录下创建 receive.js
-
引入 amqplib/callback_api 库
-
准备连接信息,包含 RabbitMQ服务器IP,端口,用户名,密码,virtual host
-
建立连接
-
连接成功后创建 channel
-
channel 创建成功后,使用channel声明 队列
-
调用channel 的consume 方法开始消费队列,这个方法会阻塞整个应用,因为它要持续监听服务器上的队列。
const amqp = require('amqplib/callback_api')
const serverConfig = {
protocol: 'amqp',
hostname: '192.168.1.201',
port: 5672,
username: 'rabbitmq',
password: 'rabbitmq',
vhost: '/electron'
}
// 与服务器连接成功后,执行回调
amqp.connect(serverConfig, (e, connection) => {
if (e) {
console.log('create connection error:', e)
throw e
}
// 创建channel,创建成功后,执行回调
connection.createChannel((e, channel) => {
if (e) {
console.log('create channel error:', e)
throw error1
}
// 队列名称
var queue = 'mq_hello'
// 要发送的消息
var msg = 'Hello World!'
// 声明队列
channel.assertQueue(queue, {
durable: false // 队列不进行持久化
})
console.log(' [*] Waiting for messages in %s. To exit press CTRL+C', queue)
// 开始消费队列,持续监听队列
channel.consume(
queue,
function (msg) {
console.log(' [x] Received %s', msg.content.toString())
},
{
noAck: true
}
)
})
})
5. 运行测试
# 打开一个控制台, 运行 receive.js 接收信息
cd g:\node_rabbitmq
node receive.js
# 打开另外一个控制台,运行 send.js 发送数据
cd g:\node_rabbitmq
node send.js
# 以下是输出
[x] Sent Hello World!,按 ctrl+c 退出程序
同时接收方控制台将会收到消息:
[x] Received Hello World!
6. 使用基于promise 的API
上面接收方和发送方所使用的API都是基于 callback方式的,amqplib 库也提供了基于 promise 的调用方式
- const amqp = require(‘amqplib/callback_api’) 这是基于 callback的api调用方式
- const amqp = require(‘amqplib’) 这是基于promise的api 调用方式
创建连接,创建channel,声明 queue 在消息发送和消息接收上都有相同的代码,这里将它们提取出来放在一个公共的文件中:
rabbitmq.js
// 采用 promise 形式的API
const amqp = require('amqplib')
const serverConfig = {
protocol: 'amqp',
hostname: '192.168.1.201',
port: 5672,
username: 'qinye',
password: 'qinye',
vhost: '/electron'
}
let queueName = 'mq_hello'
let connPromise = amqp.connect(serverConfig)
let channel = new Promise((resolve, reject) => {
connPromise.then(
conn => {
conn.createChannel().then(
ch => {
ch.assertQueue(queueName, {
durable: false // 队列不进行持久化
})
resolve(ch)
},
err => {
reject(err)
}
)
},
err => {
reject(err)
}
)
})
// 导出channel 和队列名称
module.exports = {
queueName,
channel
}
send.js
const { channel, queueName } = require('./rabbitmq')
channel.then(ch => {
var msg = 'Hello World!'
ch.sendToQueue(queueName, Buffer.from(msg))
console.log(' [x] Sent %s,按 ctrl+c 退出程序', msg)
})
process.openStdin()
receive.js
const { channel, queueName } = require('./rabbitmq')
channel.then(ch => {
ch.consume(
queueName,
function (msg) {
console.log(' [x] Received %s', msg.content.toString())
},
{
noAck: true
}
)
})