123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832 |
- /**
- * Module dependencies.
- */
- var uid2 = require('uid2');
- var redis = require('redis').createClient;
- var msgpack = require('msgpack-lite');
- var Adapter = require('socket.io-adapter');
- var debug = require('debug')('socket.io-redis');
- var async = require('async');
- /**
- * Module exports.
- */
- module.exports = adapter;
- /**
- * Request types, for messages between nodes
- */
- var requestTypes = {
- clients: 0,
- clientRooms: 1,
- allRooms: 2,
- remoteJoin: 3,
- remoteLeave: 4,
- customRequest: 5,
- remoteDisconnect: 6
- };
- /**
- * Returns a redis Adapter class.
- *
- * @param {String} optional, redis uri
- * @return {RedisAdapter} adapter
- * @api public
- */
- function adapter(uri, opts) {
- opts = opts || {};
- // handle options only
- if ('object' == typeof uri) {
- opts = uri;
- uri = null;
- }
- // opts
- var pub = opts.pubClient;
- var sub = opts.subClient;
- var prefix = opts.key || 'socket.io';
- var subEvent = opts.subEvent || 'messageBuffer';
- var requestsTimeout = opts.requestsTimeout || 1000;
- var withChannelMultiplexing = false !== opts.withChannelMultiplexing;
- // init clients if needed
- function createClient() {
- if (uri) {
- // handle uri string
- return redis(uri, opts);
- } else {
- return redis(opts);
- }
- }
- if (!pub) pub = createClient();
- if (!sub) sub = createClient();
- // this server's key
- var uid = uid2(6);
- /**
- * Adapter constructor.
- *
- * @param {String} namespace name
- * @api public
- */
- function Redis(nsp){
- Adapter.call(this, nsp);
- this.uid = uid;
- this.prefix = prefix;
- this.requestsTimeout = requestsTimeout;
- this.withChannelMultiplexing = withChannelMultiplexing;
- this.channel = prefix + '#' + nsp.name + '#';
- this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
- this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
- this.requests = {};
- this.customHook = function(data, cb){ cb(null); }
- if (String.prototype.startsWith) {
- this.channelMatches = function (messageChannel, subscribedChannel) {
- return messageChannel.startsWith(subscribedChannel);
- }
- } else { // Fallback to other impl for older Node.js
- this.channelMatches = function (messageChannel, subscribedChannel) {
- return messageChannel.substr(0, subscribedChannel.length) === subscribedChannel;
- }
- }
- this.pubClient = pub;
- this.subClient = sub;
- var self = this;
- sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
- if (err) self.emit('error', err);
- });
- sub.on(subEvent, this.onmessage.bind(this));
- function onError(err) {
- self.emit('error', err);
- }
- pub.on('error', onError);
- sub.on('error', onError);
- }
- /**
- * Inherits from `Adapter`.
- */
- Redis.prototype.__proto__ = Adapter.prototype;
- /**
- * Called with a subscription message
- *
- * @api private
- */
- Redis.prototype.onmessage = function(channel, msg){
- channel = channel.toString();
- if (this.channelMatches(channel, this.requestChannel)) {
- return this.onrequest(channel, msg);
- } else if (this.channelMatches(channel, this.responseChannel)) {
- return this.onresponse(channel, msg);
- } else if (!this.channelMatches(channel, this.channel)) {
- return debug('ignore different channel');
- }
- var args = msgpack.decode(msg);
- var packet;
- if (uid == args.shift()) return debug('ignore same uid');
- packet = args[0];
- if (packet && packet.nsp === undefined) {
- packet.nsp = '/';
- }
- if (!packet || packet.nsp != this.nsp.name) {
- return debug('ignore different namespace');
- }
- args.push(true);
- this.broadcast.apply(this, args);
- };
- /**
- * Called on request from another node
- *
- * @api private
- */
- Redis.prototype.onrequest = function(channel, msg){
- var self = this;
- var request;
- try {
- request = JSON.parse(msg);
- } catch(err){
- self.emit('error', err);
- return;
- }
- debug('received request %j', request);
- switch (request.type) {
- case requestTypes.clients:
- Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
- if(err){
- self.emit('error', err);
- return;
- }
- var response = JSON.stringify({
- requestid: request.requestid,
- clients: clients
- });
- pub.publish(self.responseChannel, response);
- });
- break;
- case requestTypes.clientRooms:
- Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
- if(err){
- self.emit('error', err);
- return;
- }
- if (!rooms) { return; }
- var response = JSON.stringify({
- requestid: request.requestid,
- rooms: rooms
- });
- pub.publish(self.responseChannel, response);
- });
- break;
- case requestTypes.allRooms:
- var response = JSON.stringify({
- requestid: request.requestid,
- rooms: Object.keys(this.rooms)
- });
- pub.publish(self.responseChannel, response);
- break;
- case requestTypes.remoteJoin:
- var socket = this.nsp.connected[request.sid];
- if (!socket) { return; }
- socket.join(request.room, function(){
- var response = JSON.stringify({
- requestid: request.requestid
- });
- pub.publish(self.responseChannel, response);
- });
- break;
- case requestTypes.remoteLeave:
- var socket = this.nsp.connected[request.sid];
- if (!socket) { return; }
- socket.leave(request.room, function(){
- var response = JSON.stringify({
- requestid: request.requestid
- });
- pub.publish(self.responseChannel, response);
- });
- break;
- case requestTypes.remoteDisconnect:
- var socket = this.nsp.connected[request.sid];
- if (!socket) { return; }
- socket.disconnect(request.close);
- var response = JSON.stringify({
- requestid: request.requestid
- });
- pub.publish(self.responseChannel, response);
- break;
- case requestTypes.customRequest:
- this.customHook(request.data, function(data) {
- var response = JSON.stringify({
- requestid: request.requestid,
- data: data
- });
- pub.publish(self.responseChannel, response);
- });
- break;
- default:
- debug('ignoring unknown request type: %s', request.type);
- }
- };
- /**
- * Called on response from another node
- *
- * @api private
- */
- Redis.prototype.onresponse = function(channel, msg){
- var self = this;
- var response;
- try {
- response = JSON.parse(msg);
- } catch(err){
- self.emit('error', err);
- return;
- }
- if (!response.requestid || !self.requests[response.requestid]) {
- debug('ignoring unknown request');
- return;
- }
- debug('received response %j', response);
- var request = self.requests[response.requestid];
- switch (request.type) {
- case requestTypes.clients:
- request.msgCount++;
- // ignore if response does not contain 'clients' key
- if(!response.clients || !Array.isArray(response.clients)) return;
- for(var i = 0; i < response.clients.length; i++){
- request.clients[response.clients[i]] = true;
- }
- if (request.msgCount === request.numsub) {
- clearTimeout(request.timeout);
- if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
- delete self.requests[request.requestid];
- }
- break;
- case requestTypes.clientRooms:
- clearTimeout(request.timeout);
- if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
- delete self.requests[request.requestid];
- break;
- case requestTypes.allRooms:
- request.msgCount++;
- // ignore if response does not contain 'rooms' key
- if(!response.rooms || !Array.isArray(response.rooms)) return;
- for(var i = 0; i < response.rooms.length; i++){
- request.rooms[response.rooms[i]] = true;
- }
- if (request.msgCount === request.numsub) {
- clearTimeout(request.timeout);
- if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.rooms)));
- delete self.requests[request.requestid];
- }
- break;
- case requestTypes.remoteJoin:
- case requestTypes.remoteLeave:
- case requestTypes.remoteDisconnect:
- clearTimeout(request.timeout);
- if (request.callback) process.nextTick(request.callback.bind(null, null));
- delete self.requests[request.requestid];
- break;
- case requestTypes.customRequest:
- request.msgCount++;
- request.replies.push(response.data);
- if (request.msgCount === request.numsub) {
- clearTimeout(request.timeout);
- if (request.callback) process.nextTick(request.callback.bind(null, null, request.replies));
- delete self.requests[request.requestid];
- }
- break;
- default:
- debug('ignoring unknown request type: %s', request.type);
- }
- };
- /**
- * Broadcasts a packet.
- *
- * @param {Object} packet to emit
- * @param {Object} options
- * @param {Boolean} whether the packet came from another node
- * @api public
- */
- Redis.prototype.broadcast = function(packet, opts, remote){
- packet.nsp = this.nsp.name;
- if (!(remote || (opts && opts.flags && opts.flags.local))) {
- var msg = msgpack.encode([uid, packet, opts]);
- if (this.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) {
- pub.publish(this.channel + opts.rooms[0] + '#', msg);
- } else {
- pub.publish(this.channel, msg);
- }
- }
- Adapter.prototype.broadcast.call(this, packet, opts);
- };
- /**
- * Subscribe client to room messages.
- *
- * @param {String} client id
- * @param {String} room
- * @param {Function} callback (optional)
- * @api public
- */
- Redis.prototype.add = function(id, room, fn){
- debug('adding %s to %s ', id, room);
- var self = this;
- // subscribe only once per room
- var alreadyHasRoom = this.rooms.hasOwnProperty(room);
- Adapter.prototype.add.call(this, id, room);
- if (!this.withChannelMultiplexing || alreadyHasRoom) {
- if (fn) fn(null);
- return;
- }
- var channel = this.channel + room + '#';
- function onSubscribe(err) {
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- if (fn) fn(null);
- }
- sub.subscribe(channel, onSubscribe);
- };
- /**
- * Unsubscribe client from room messages.
- *
- * @param {String} session id
- * @param {String} room id
- * @param {Function} callback (optional)
- * @api public
- */
- Redis.prototype.del = function(id, room, fn){
- debug('removing %s from %s', id, room);
- var self = this;
- var hasRoom = this.rooms.hasOwnProperty(room);
- Adapter.prototype.del.call(this, id, room);
- if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) {
- var channel = this.channel + room + '#';
- function onUnsubscribe(err) {
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- if (fn) fn(null);
- }
- sub.unsubscribe(channel, onUnsubscribe);
- } else {
- if (fn) process.nextTick(fn.bind(null, null));
- }
- };
- /**
- * Unsubscribe client completely.
- *
- * @param {String} client id
- * @param {Function} callback (optional)
- * @api public
- */
- Redis.prototype.delAll = function(id, fn){
- debug('removing %s from all rooms', id);
- var self = this;
- var rooms = this.sids[id];
- if (!rooms) {
- if (fn) process.nextTick(fn.bind(null, null));
- return;
- }
- async.each(Object.keys(rooms), function(room, next){
- self.del(id, room, next);
- }, function(err){
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- delete self.sids[id];
- if (fn) fn(null);
- });
- };
- /**
- * Gets a list of clients by sid.
- *
- * @param {Array} explicit set of rooms to check.
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.clients = function(rooms, fn){
- if ('function' == typeof rooms){
- fn = rooms;
- rooms = null;
- }
- rooms = rooms || [];
- var self = this;
- var requestid = uid2(6);
- pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- numsub = parseInt(numsub[1], 10);
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.clients,
- rooms : rooms
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- var request = self.requests[requestid];
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.clients,
- numsub: numsub,
- msgCount: 0,
- clients: {},
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- });
- };
- /**
- * Gets the list of rooms a given client has joined.
- *
- * @param {String} client id
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.clientRooms = function(id, fn){
- var self = this;
- var requestid = uid2(6);
- var rooms = this.sids[id];
- if (rooms) {
- if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
- return;
- }
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.clientRooms,
- sid : id
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.clientRooms,
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- };
- /**
- * Gets the list of all rooms (accross every node)
- *
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.allRooms = function(fn){
- var self = this;
- var requestid = uid2(6);
- pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- numsub = parseInt(numsub[1], 10);
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.allRooms
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- var request = self.requests[requestid];
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for allRooms response'), Object.keys(request.rooms)));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.allRooms,
- numsub: numsub,
- msgCount: 0,
- rooms: {},
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- });
- };
- /**
- * Makes the socket with the given id join the room
- *
- * @param {String} socket id
- * @param {String} room name
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.remoteJoin = function(id, room, fn){
- var self = this;
- var requestid = uid2(6);
- var socket = this.nsp.connected[id];
- if (socket) {
- socket.join(room, fn);
- return;
- }
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.remoteJoin,
- sid: id,
- room: room
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteJoin response')));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.remoteJoin,
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- };
- /**
- * Makes the socket with the given id leave the room
- *
- * @param {String} socket id
- * @param {String} room name
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.remoteLeave = function(id, room, fn){
- var self = this;
- var requestid = uid2(6);
- var socket = this.nsp.connected[id];
- if (socket) {
- socket.leave(room, fn);
- return;
- }
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.remoteLeave,
- sid: id,
- room: room
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteLeave response')));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.remoteLeave,
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- };
- /**
- * Makes the socket with the given id to be disconnected forcefully
- * @param {String} socket id
- * @param {Boolean} close if `true`, closes the underlying connection
- * @param {Function} callback
- */
- Redis.prototype.remoteDisconnect = function(id, close, fn) {
- var self = this;
- var requestid = uid2(6);
- var socket = this.nsp.connected[id];
- if(socket) {
- socket.disconnect(close);
- if (fn) process.nextTick(fn.bind(null, null));
- return;
- }
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.remoteDisconnect,
- sid: id,
- close: close
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteDisconnect response')));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.remoteDisconnect,
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- };
- /**
- * Sends a new custom request to other nodes
- *
- * @param {Object} data (no binary)
- * @param {Function} callback
- * @api public
- */
- Redis.prototype.customRequest = function(data, fn){
- if (typeof data === 'function'){
- fn = data;
- data = null;
- }
- var self = this;
- var requestid = uid2(6);
- pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
- if (err) {
- self.emit('error', err);
- if (fn) fn(err);
- return;
- }
- numsub = parseInt(numsub[1], 10);
- var request = JSON.stringify({
- requestid : requestid,
- type: requestTypes.customRequest,
- data: data
- });
- // if there is no response for x second, return result
- var timeout = setTimeout(function() {
- var request = self.requests[requestid];
- if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for customRequest response'), request.replies));
- delete self.requests[requestid];
- }, self.requestsTimeout);
- self.requests[requestid] = {
- type: requestTypes.customRequest,
- numsub: numsub,
- msgCount: 0,
- replies: [],
- callback: fn,
- timeout: timeout
- };
- pub.publish(self.requestChannel, request);
- });
- };
- Redis.uid = uid;
- Redis.pubClient = pub;
- Redis.subClient = sub;
- Redis.prefix = prefix;
- Redis.requestsTimeout = requestsTimeout;
- return Redis;
- }
|