index.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. /**
  2. * Module dependencies.
  3. */
  4. var uid2 = require('uid2');
  5. var redis = require('redis').createClient;
  6. var msgpack = require('msgpack-lite');
  7. var Adapter = require('socket.io-adapter');
  8. var debug = require('debug')('socket.io-redis');
  9. var async = require('async');
  10. /**
  11. * Module exports.
  12. */
  13. module.exports = adapter;
  14. /**
  15. * Request types, for messages between nodes
  16. */
  17. var requestTypes = {
  18. clients: 0,
  19. clientRooms: 1,
  20. allRooms: 2,
  21. remoteJoin: 3,
  22. remoteLeave: 4,
  23. customRequest: 5,
  24. remoteDisconnect: 6
  25. };
  26. /**
  27. * Returns a redis Adapter class.
  28. *
  29. * @param {String} optional, redis uri
  30. * @return {RedisAdapter} adapter
  31. * @api public
  32. */
  33. function adapter(uri, opts) {
  34. opts = opts || {};
  35. // handle options only
  36. if ('object' == typeof uri) {
  37. opts = uri;
  38. uri = null;
  39. }
  40. // opts
  41. var pub = opts.pubClient;
  42. var sub = opts.subClient;
  43. var prefix = opts.key || 'socket.io';
  44. var subEvent = opts.subEvent || 'messageBuffer';
  45. var requestsTimeout = opts.requestsTimeout || 1000;
  46. var withChannelMultiplexing = false !== opts.withChannelMultiplexing;
  47. // init clients if needed
  48. function createClient() {
  49. if (uri) {
  50. // handle uri string
  51. return redis(uri, opts);
  52. } else {
  53. return redis(opts);
  54. }
  55. }
  56. if (!pub) pub = createClient();
  57. if (!sub) sub = createClient();
  58. // this server's key
  59. var uid = uid2(6);
  60. /**
  61. * Adapter constructor.
  62. *
  63. * @param {String} namespace name
  64. * @api public
  65. */
  66. function Redis(nsp){
  67. Adapter.call(this, nsp);
  68. this.uid = uid;
  69. this.prefix = prefix;
  70. this.requestsTimeout = requestsTimeout;
  71. this.withChannelMultiplexing = withChannelMultiplexing;
  72. this.channel = prefix + '#' + nsp.name + '#';
  73. this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
  74. this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
  75. this.requests = {};
  76. this.customHook = function(data, cb){ cb(null); }
  77. if (String.prototype.startsWith) {
  78. this.channelMatches = function (messageChannel, subscribedChannel) {
  79. return messageChannel.startsWith(subscribedChannel);
  80. }
  81. } else { // Fallback to other impl for older Node.js
  82. this.channelMatches = function (messageChannel, subscribedChannel) {
  83. return messageChannel.substr(0, subscribedChannel.length) === subscribedChannel;
  84. }
  85. }
  86. this.pubClient = pub;
  87. this.subClient = sub;
  88. var self = this;
  89. sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
  90. if (err) self.emit('error', err);
  91. });
  92. sub.on(subEvent, this.onmessage.bind(this));
  93. function onError(err) {
  94. self.emit('error', err);
  95. }
  96. pub.on('error', onError);
  97. sub.on('error', onError);
  98. }
  99. /**
  100. * Inherits from `Adapter`.
  101. */
  102. Redis.prototype.__proto__ = Adapter.prototype;
  103. /**
  104. * Called with a subscription message
  105. *
  106. * @api private
  107. */
  108. Redis.prototype.onmessage = function(channel, msg){
  109. channel = channel.toString();
  110. if (this.channelMatches(channel, this.requestChannel)) {
  111. return this.onrequest(channel, msg);
  112. } else if (this.channelMatches(channel, this.responseChannel)) {
  113. return this.onresponse(channel, msg);
  114. } else if (!this.channelMatches(channel, this.channel)) {
  115. return debug('ignore different channel');
  116. }
  117. var args = msgpack.decode(msg);
  118. var packet;
  119. if (uid == args.shift()) return debug('ignore same uid');
  120. packet = args[0];
  121. if (packet && packet.nsp === undefined) {
  122. packet.nsp = '/';
  123. }
  124. if (!packet || packet.nsp != this.nsp.name) {
  125. return debug('ignore different namespace');
  126. }
  127. args.push(true);
  128. this.broadcast.apply(this, args);
  129. };
  130. /**
  131. * Called on request from another node
  132. *
  133. * @api private
  134. */
  135. Redis.prototype.onrequest = function(channel, msg){
  136. var self = this;
  137. var request;
  138. try {
  139. request = JSON.parse(msg);
  140. } catch(err){
  141. self.emit('error', err);
  142. return;
  143. }
  144. debug('received request %j', request);
  145. switch (request.type) {
  146. case requestTypes.clients:
  147. Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
  148. if(err){
  149. self.emit('error', err);
  150. return;
  151. }
  152. var response = JSON.stringify({
  153. requestid: request.requestid,
  154. clients: clients
  155. });
  156. pub.publish(self.responseChannel, response);
  157. });
  158. break;
  159. case requestTypes.clientRooms:
  160. Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
  161. if(err){
  162. self.emit('error', err);
  163. return;
  164. }
  165. if (!rooms) { return; }
  166. var response = JSON.stringify({
  167. requestid: request.requestid,
  168. rooms: rooms
  169. });
  170. pub.publish(self.responseChannel, response);
  171. });
  172. break;
  173. case requestTypes.allRooms:
  174. var response = JSON.stringify({
  175. requestid: request.requestid,
  176. rooms: Object.keys(this.rooms)
  177. });
  178. pub.publish(self.responseChannel, response);
  179. break;
  180. case requestTypes.remoteJoin:
  181. var socket = this.nsp.connected[request.sid];
  182. if (!socket) { return; }
  183. socket.join(request.room, function(){
  184. var response = JSON.stringify({
  185. requestid: request.requestid
  186. });
  187. pub.publish(self.responseChannel, response);
  188. });
  189. break;
  190. case requestTypes.remoteLeave:
  191. var socket = this.nsp.connected[request.sid];
  192. if (!socket) { return; }
  193. socket.leave(request.room, function(){
  194. var response = JSON.stringify({
  195. requestid: request.requestid
  196. });
  197. pub.publish(self.responseChannel, response);
  198. });
  199. break;
  200. case requestTypes.remoteDisconnect:
  201. var socket = this.nsp.connected[request.sid];
  202. if (!socket) { return; }
  203. socket.disconnect(request.close);
  204. var response = JSON.stringify({
  205. requestid: request.requestid
  206. });
  207. pub.publish(self.responseChannel, response);
  208. break;
  209. case requestTypes.customRequest:
  210. this.customHook(request.data, function(data) {
  211. var response = JSON.stringify({
  212. requestid: request.requestid,
  213. data: data
  214. });
  215. pub.publish(self.responseChannel, response);
  216. });
  217. break;
  218. default:
  219. debug('ignoring unknown request type: %s', request.type);
  220. }
  221. };
  222. /**
  223. * Called on response from another node
  224. *
  225. * @api private
  226. */
  227. Redis.prototype.onresponse = function(channel, msg){
  228. var self = this;
  229. var response;
  230. try {
  231. response = JSON.parse(msg);
  232. } catch(err){
  233. self.emit('error', err);
  234. return;
  235. }
  236. if (!response.requestid || !self.requests[response.requestid]) {
  237. debug('ignoring unknown request');
  238. return;
  239. }
  240. debug('received response %j', response);
  241. var request = self.requests[response.requestid];
  242. switch (request.type) {
  243. case requestTypes.clients:
  244. request.msgCount++;
  245. // ignore if response does not contain 'clients' key
  246. if(!response.clients || !Array.isArray(response.clients)) return;
  247. for(var i = 0; i < response.clients.length; i++){
  248. request.clients[response.clients[i]] = true;
  249. }
  250. if (request.msgCount === request.numsub) {
  251. clearTimeout(request.timeout);
  252. if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.clients)));
  253. delete self.requests[request.requestid];
  254. }
  255. break;
  256. case requestTypes.clientRooms:
  257. clearTimeout(request.timeout);
  258. if (request.callback) process.nextTick(request.callback.bind(null, null, response.rooms));
  259. delete self.requests[request.requestid];
  260. break;
  261. case requestTypes.allRooms:
  262. request.msgCount++;
  263. // ignore if response does not contain 'rooms' key
  264. if(!response.rooms || !Array.isArray(response.rooms)) return;
  265. for(var i = 0; i < response.rooms.length; i++){
  266. request.rooms[response.rooms[i]] = true;
  267. }
  268. if (request.msgCount === request.numsub) {
  269. clearTimeout(request.timeout);
  270. if (request.callback) process.nextTick(request.callback.bind(null, null, Object.keys(request.rooms)));
  271. delete self.requests[request.requestid];
  272. }
  273. break;
  274. case requestTypes.remoteJoin:
  275. case requestTypes.remoteLeave:
  276. case requestTypes.remoteDisconnect:
  277. clearTimeout(request.timeout);
  278. if (request.callback) process.nextTick(request.callback.bind(null, null));
  279. delete self.requests[request.requestid];
  280. break;
  281. case requestTypes.customRequest:
  282. request.msgCount++;
  283. request.replies.push(response.data);
  284. if (request.msgCount === request.numsub) {
  285. clearTimeout(request.timeout);
  286. if (request.callback) process.nextTick(request.callback.bind(null, null, request.replies));
  287. delete self.requests[request.requestid];
  288. }
  289. break;
  290. default:
  291. debug('ignoring unknown request type: %s', request.type);
  292. }
  293. };
  294. /**
  295. * Broadcasts a packet.
  296. *
  297. * @param {Object} packet to emit
  298. * @param {Object} options
  299. * @param {Boolean} whether the packet came from another node
  300. * @api public
  301. */
  302. Redis.prototype.broadcast = function(packet, opts, remote){
  303. packet.nsp = this.nsp.name;
  304. if (!(remote || (opts && opts.flags && opts.flags.local))) {
  305. var msg = msgpack.encode([uid, packet, opts]);
  306. if (this.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) {
  307. pub.publish(this.channel + opts.rooms[0] + '#', msg);
  308. } else {
  309. pub.publish(this.channel, msg);
  310. }
  311. }
  312. Adapter.prototype.broadcast.call(this, packet, opts);
  313. };
  314. /**
  315. * Subscribe client to room messages.
  316. *
  317. * @param {String} client id
  318. * @param {String} room
  319. * @param {Function} callback (optional)
  320. * @api public
  321. */
  322. Redis.prototype.add = function(id, room, fn){
  323. debug('adding %s to %s ', id, room);
  324. var self = this;
  325. // subscribe only once per room
  326. var alreadyHasRoom = this.rooms.hasOwnProperty(room);
  327. Adapter.prototype.add.call(this, id, room);
  328. if (!this.withChannelMultiplexing || alreadyHasRoom) {
  329. if (fn) fn(null);
  330. return;
  331. }
  332. var channel = this.channel + room + '#';
  333. function onSubscribe(err) {
  334. if (err) {
  335. self.emit('error', err);
  336. if (fn) fn(err);
  337. return;
  338. }
  339. if (fn) fn(null);
  340. }
  341. sub.subscribe(channel, onSubscribe);
  342. };
  343. /**
  344. * Unsubscribe client from room messages.
  345. *
  346. * @param {String} session id
  347. * @param {String} room id
  348. * @param {Function} callback (optional)
  349. * @api public
  350. */
  351. Redis.prototype.del = function(id, room, fn){
  352. debug('removing %s from %s', id, room);
  353. var self = this;
  354. var hasRoom = this.rooms.hasOwnProperty(room);
  355. Adapter.prototype.del.call(this, id, room);
  356. if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) {
  357. var channel = this.channel + room + '#';
  358. function onUnsubscribe(err) {
  359. if (err) {
  360. self.emit('error', err);
  361. if (fn) fn(err);
  362. return;
  363. }
  364. if (fn) fn(null);
  365. }
  366. sub.unsubscribe(channel, onUnsubscribe);
  367. } else {
  368. if (fn) process.nextTick(fn.bind(null, null));
  369. }
  370. };
  371. /**
  372. * Unsubscribe client completely.
  373. *
  374. * @param {String} client id
  375. * @param {Function} callback (optional)
  376. * @api public
  377. */
  378. Redis.prototype.delAll = function(id, fn){
  379. debug('removing %s from all rooms', id);
  380. var self = this;
  381. var rooms = this.sids[id];
  382. if (!rooms) {
  383. if (fn) process.nextTick(fn.bind(null, null));
  384. return;
  385. }
  386. async.each(Object.keys(rooms), function(room, next){
  387. self.del(id, room, next);
  388. }, function(err){
  389. if (err) {
  390. self.emit('error', err);
  391. if (fn) fn(err);
  392. return;
  393. }
  394. delete self.sids[id];
  395. if (fn) fn(null);
  396. });
  397. };
  398. /**
  399. * Gets a list of clients by sid.
  400. *
  401. * @param {Array} explicit set of rooms to check.
  402. * @param {Function} callback
  403. * @api public
  404. */
  405. Redis.prototype.clients = function(rooms, fn){
  406. if ('function' == typeof rooms){
  407. fn = rooms;
  408. rooms = null;
  409. }
  410. rooms = rooms || [];
  411. var self = this;
  412. var requestid = uid2(6);
  413. pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
  414. if (err) {
  415. self.emit('error', err);
  416. if (fn) fn(err);
  417. return;
  418. }
  419. numsub = parseInt(numsub[1], 10);
  420. var request = JSON.stringify({
  421. requestid : requestid,
  422. type: requestTypes.clients,
  423. rooms : rooms
  424. });
  425. // if there is no response for x second, return result
  426. var timeout = setTimeout(function() {
  427. var request = self.requests[requestid];
  428. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for clients response'), Object.keys(request.clients)));
  429. delete self.requests[requestid];
  430. }, self.requestsTimeout);
  431. self.requests[requestid] = {
  432. type: requestTypes.clients,
  433. numsub: numsub,
  434. msgCount: 0,
  435. clients: {},
  436. callback: fn,
  437. timeout: timeout
  438. };
  439. pub.publish(self.requestChannel, request);
  440. });
  441. };
  442. /**
  443. * Gets the list of rooms a given client has joined.
  444. *
  445. * @param {String} client id
  446. * @param {Function} callback
  447. * @api public
  448. */
  449. Redis.prototype.clientRooms = function(id, fn){
  450. var self = this;
  451. var requestid = uid2(6);
  452. var rooms = this.sids[id];
  453. if (rooms) {
  454. if (fn) process.nextTick(fn.bind(null, null, Object.keys(rooms)));
  455. return;
  456. }
  457. var request = JSON.stringify({
  458. requestid : requestid,
  459. type: requestTypes.clientRooms,
  460. sid : id
  461. });
  462. // if there is no response for x second, return result
  463. var timeout = setTimeout(function() {
  464. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for rooms response')));
  465. delete self.requests[requestid];
  466. }, self.requestsTimeout);
  467. self.requests[requestid] = {
  468. type: requestTypes.clientRooms,
  469. callback: fn,
  470. timeout: timeout
  471. };
  472. pub.publish(self.requestChannel, request);
  473. };
  474. /**
  475. * Gets the list of all rooms (accross every node)
  476. *
  477. * @param {Function} callback
  478. * @api public
  479. */
  480. Redis.prototype.allRooms = function(fn){
  481. var self = this;
  482. var requestid = uid2(6);
  483. pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
  484. if (err) {
  485. self.emit('error', err);
  486. if (fn) fn(err);
  487. return;
  488. }
  489. numsub = parseInt(numsub[1], 10);
  490. var request = JSON.stringify({
  491. requestid : requestid,
  492. type: requestTypes.allRooms
  493. });
  494. // if there is no response for x second, return result
  495. var timeout = setTimeout(function() {
  496. var request = self.requests[requestid];
  497. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for allRooms response'), Object.keys(request.rooms)));
  498. delete self.requests[requestid];
  499. }, self.requestsTimeout);
  500. self.requests[requestid] = {
  501. type: requestTypes.allRooms,
  502. numsub: numsub,
  503. msgCount: 0,
  504. rooms: {},
  505. callback: fn,
  506. timeout: timeout
  507. };
  508. pub.publish(self.requestChannel, request);
  509. });
  510. };
  511. /**
  512. * Makes the socket with the given id join the room
  513. *
  514. * @param {String} socket id
  515. * @param {String} room name
  516. * @param {Function} callback
  517. * @api public
  518. */
  519. Redis.prototype.remoteJoin = function(id, room, fn){
  520. var self = this;
  521. var requestid = uid2(6);
  522. var socket = this.nsp.connected[id];
  523. if (socket) {
  524. socket.join(room, fn);
  525. return;
  526. }
  527. var request = JSON.stringify({
  528. requestid : requestid,
  529. type: requestTypes.remoteJoin,
  530. sid: id,
  531. room: room
  532. });
  533. // if there is no response for x second, return result
  534. var timeout = setTimeout(function() {
  535. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteJoin response')));
  536. delete self.requests[requestid];
  537. }, self.requestsTimeout);
  538. self.requests[requestid] = {
  539. type: requestTypes.remoteJoin,
  540. callback: fn,
  541. timeout: timeout
  542. };
  543. pub.publish(self.requestChannel, request);
  544. };
  545. /**
  546. * Makes the socket with the given id leave the room
  547. *
  548. * @param {String} socket id
  549. * @param {String} room name
  550. * @param {Function} callback
  551. * @api public
  552. */
  553. Redis.prototype.remoteLeave = function(id, room, fn){
  554. var self = this;
  555. var requestid = uid2(6);
  556. var socket = this.nsp.connected[id];
  557. if (socket) {
  558. socket.leave(room, fn);
  559. return;
  560. }
  561. var request = JSON.stringify({
  562. requestid : requestid,
  563. type: requestTypes.remoteLeave,
  564. sid: id,
  565. room: room
  566. });
  567. // if there is no response for x second, return result
  568. var timeout = setTimeout(function() {
  569. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteLeave response')));
  570. delete self.requests[requestid];
  571. }, self.requestsTimeout);
  572. self.requests[requestid] = {
  573. type: requestTypes.remoteLeave,
  574. callback: fn,
  575. timeout: timeout
  576. };
  577. pub.publish(self.requestChannel, request);
  578. };
  579. /**
  580. * Makes the socket with the given id to be disconnected forcefully
  581. * @param {String} socket id
  582. * @param {Boolean} close if `true`, closes the underlying connection
  583. * @param {Function} callback
  584. */
  585. Redis.prototype.remoteDisconnect = function(id, close, fn) {
  586. var self = this;
  587. var requestid = uid2(6);
  588. var socket = this.nsp.connected[id];
  589. if(socket) {
  590. socket.disconnect(close);
  591. if (fn) process.nextTick(fn.bind(null, null));
  592. return;
  593. }
  594. var request = JSON.stringify({
  595. requestid : requestid,
  596. type: requestTypes.remoteDisconnect,
  597. sid: id,
  598. close: close
  599. });
  600. // if there is no response for x second, return result
  601. var timeout = setTimeout(function() {
  602. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for remoteDisconnect response')));
  603. delete self.requests[requestid];
  604. }, self.requestsTimeout);
  605. self.requests[requestid] = {
  606. type: requestTypes.remoteDisconnect,
  607. callback: fn,
  608. timeout: timeout
  609. };
  610. pub.publish(self.requestChannel, request);
  611. };
  612. /**
  613. * Sends a new custom request to other nodes
  614. *
  615. * @param {Object} data (no binary)
  616. * @param {Function} callback
  617. * @api public
  618. */
  619. Redis.prototype.customRequest = function(data, fn){
  620. if (typeof data === 'function'){
  621. fn = data;
  622. data = null;
  623. }
  624. var self = this;
  625. var requestid = uid2(6);
  626. pub.send_command('pubsub', ['numsub', self.requestChannel], function(err, numsub){
  627. if (err) {
  628. self.emit('error', err);
  629. if (fn) fn(err);
  630. return;
  631. }
  632. numsub = parseInt(numsub[1], 10);
  633. var request = JSON.stringify({
  634. requestid : requestid,
  635. type: requestTypes.customRequest,
  636. data: data
  637. });
  638. // if there is no response for x second, return result
  639. var timeout = setTimeout(function() {
  640. var request = self.requests[requestid];
  641. if (fn) process.nextTick(fn.bind(null, new Error('timeout reached while waiting for customRequest response'), request.replies));
  642. delete self.requests[requestid];
  643. }, self.requestsTimeout);
  644. self.requests[requestid] = {
  645. type: requestTypes.customRequest,
  646. numsub: numsub,
  647. msgCount: 0,
  648. replies: [],
  649. callback: fn,
  650. timeout: timeout
  651. };
  652. pub.publish(self.requestChannel, request);
  653. });
  654. };
  655. Redis.uid = uid;
  656. Redis.pubClient = pub;
  657. Redis.subClient = sub;
  658. Redis.prefix = prefix;
  659. Redis.requestsTimeout = requestsTimeout;
  660. return Redis;
  661. }