avatar
@bangbang93

carrotmq中文文档

3/29/2016, 9:54:41 PM

CarrotMQ是一个给NodeJS使用的RabbitMQ客户端,基于官方的amqplib,能够简化amqplib的复杂的操作

Version npm
NPM Downloads
Dependencies
NPM

使用姿势

var carrotmq = require('carrotmq');
//var rabbitmqSchema = require('rabbitmq-schema'); 
var rabbitmqSchema = carrotmq.schema;
 
//see https://www.npmjs.com/package/rabbitmq-schema 
var schema = new rabbitmqSchema({
    exchange: 'exchange0',
    type: 'topic',
    bindings: [{
      routingPattern: 'foo.bar.#',
      destination: {
        queue: 'fooQueue',
        messageSchema: {}
      }
    }]
})
var mq = new carrotmq('amqp://localhost', schema);
 
mq.queue('fooQueue', function (data){
    console.log(data);
    this.ack();
    //this.nack(); 
    //this.reject(); 
    //this.cancel(); cancel this consumer; 
    this.reply({date: new Date}); //reply to message.properties.relyTo 
    this.carrotmq //carrotmq instrance 
});
 
mq.sendToQueue('queue', {msg: 'message'});
mq.publish('exchange', 'foo.bar.key', {msg: 'hello world!'});

RPC

mq.rpc('queue', {data: new Date}, function(data){  //same as queue consumer 
  this.ack();
  return data;
}).then((data)=>{
  //above return value 
});

RPC Over Exchange

//{ 
//    routingPattern: 'rpc.#', 
//   destination: { 
//      queue: 'rpcQueue', 
//      messageSchema: {} 
//    } 
//  } 
 
app.queue('rpcQueue', function (data) {
  this.reply(data);
  this.ack();
}, true);   /* 第三个参数true标记当前队列为RPCExchange队列。
由于rabbitMQ会忽略掉发布到Exchange中携带了replyTo设置的消息,carrotmq 将会在真正的消息外面包裹一层{replyTo: 'queue', content: {buffer}}用于记录回复队列。
如果服务端使用的不是carrotmq,则需要手工处理{replyTo: 'queue', content: {buffer}}格式的JSON*/
 
let time = new Date();
app.rpcExchange('exchange0', 'rpc.rpc', {time}, function (data){
//data: {time: time} 
this.ack();
return data;
}).then(function (data){
//data: {time: time} 
})

events

ready

当carrotmq连接到rabbitmq并且声明好所需的exchange和队列后触发
mq.on('ready', function(){});

error

我们遇到了一些问题
mq.on('error', function (err){});

message

当每条消息到达时都会触发该事件,建议不要在该事件内做操作,用于处理一些日志还是比较不错的
mq.on('message', function (data){
data.channel; //channel object
data.queue //queue name
data.message //message object
})

API

carritmq(uri, schema)

uri是rabbitmq连接uri,会直接传递给amqplib
schema是rabbitmqSchema的实例,详情参考https://www.npmjs.com/package/rabbitmq-schema


carrotmq.schema

rabbitmqSchema的构造函数,可以直接var schema = new carritmq.schema({/**/});
可以减少一个依赖


carrotmq.prototype.queue(queue, consumer, rpcQueue, opts)

carrotmq.prototype.queue(queue, consumer, opts)

carrotmq.prototype.queue(queue, consumer)

queue string 队列名

consumer function 订阅消息处理函数

rpcQueue boolean 是否为RPCExchange队列

opts object 若需要声明一个不属于exchange的队列,用这个opts传入队列参数,参数参照amqplib的assertQueue方法


carrotmq.prototype.sendToQueue(queue, content, options = {})

amqplib的sendToQueue方法的简单包装,content可传入Object,string,buffer,都会被自动转换为buffer


carrotmq.prototype.publish(exchange, routingKey, content, options = {})

amqplib的publish方法的简单包装,content同上


carrotmq.prototype.rpcExchange(exchange, routingKey, content, options, consumer)

carrotmq.prototype.rpcExchange(exchange, routingKey, content, consumer)

通过exchange发送rpc请求,若exchange将请求发给多个队列,可能会收到多个响应

options参数用于publish方法


carrotmq.prototype.rpc(queue, content, options, consumer)

carrotmq.prototype.rpc(queue, content, consumer)

options方法好像暂时还没用途,

以上两种rpc都是使用临时队列


carrotmq.prototype.createChannel()

返回一个amqplib的channel对象,用于扩展carrotmq所没有的功能

consumer函数说明

consumer函数拥有一个参数,data,内容是消息的本体,将会被JSON.parse()

consumer函数的this对象拥有以下属性

carrotmq carrotmq实例
channel 当前consumer使用的channel
reply() rpc请求的回复方法,this.reply(content),content的类型同上
ack() ack当前消息
nack() nack当前消息
reject() reject当前消息
cancel() cancel当前消息

rpc和RPCExchange

这两种方法返回的都是一个Bluebird的Promise对象,但是由于无法传递this对象,所以需要在consumer函数里完成ack()