From 4ccfdfa538d2fd7e9ee63e937435a7f6fc622c12 Mon Sep 17 00:00:00 2001 From: Wu Cheng-Han Date: Mon, 7 Nov 2016 21:30:40 +0800 Subject: Add workers for notes to leverage CPU intensive work loading --- lib/realtime.js | 161 +++++++++++++++++---------------------- lib/workers/noteRevisionSaver.js | 19 +++++ lib/workers/noteUpdater.js | 101 ++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 90 deletions(-) create mode 100644 lib/workers/noteRevisionSaver.js create mode 100644 lib/workers/noteUpdater.js (limited to 'lib') diff --git a/lib/realtime.js b/lib/realtime.js index b50e05b2..5d769e7d 100644 --- a/lib/realtime.js +++ b/lib/realtime.js @@ -9,6 +9,7 @@ var randomcolor = require("randomcolor"); var Chance = require('chance'), chance = new Chance(); var moment = require('moment'); +var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -19,6 +20,9 @@ var models = require("./models"); //ot var ot = require("./ot/index.js"); +// workers +var noteUpdater = require("./workers/noteUpdater"); + //public var realtime = { io: null, @@ -79,97 +83,62 @@ function emitCheck(note) { var users = {}; var notes = {}; //update when the note is dirty +var updaterIsBusy = false; var updater = setInterval(function () { - async.each(Object.keys(notes), function (key, callback) { + if (updaterIsBusy) return; + var _notes = {}; + Object.keys(notes).forEach(function (key) { var note = notes[key]; - if (note.server.isDirty) { - if (config.debug) logger.info("updater found dirty note: " + key); - updateNote(note, function(err, _note) { - // handle when note already been clean up - if (!notes[key] || !notes[key].server) return callback(null, null); - if (!_note) { - realtime.io.to(note.id).emit('info', { - code: 404 - }); - logger.error('note not found: ', note.id); - } - if (err || !_note) { - for (var i = 0, l = note.socks.length; i < l; i++) { - var sock = note.socks[i]; - if (typeof sock !== 'undefined' && sock) { - setTimeout(function () { - sock.disconnect(true); - }, 0); - } - } - return callback(err, null); - } - note.server.isDirty = false; - note.updatetime = moment(_note.lastchangeAt).valueOf(); - emitCheck(note); - return callback(null, null); - }); - } else { - return callback(null, null); - } - }, function (err) { - if (err) return logger.error('updater error', err); + if (!note.server || !note.server.isDirty) return; + _notes[key] = { + id: note.id, + lastchangeuser: note.lastchangeuser, + authorship: note.authorship, + document: note.server.document + }; + note.server.isDirty = false; }); -}, 1000); -function updateNote(note, callback) { - models.Note.findOne({ - where: { - id: note.id - } - }).then(function (_note) { - if (!_note) return callback(null, null); - if (note.lastchangeuser) { - if (_note.lastchangeuserId != note.lastchangeuser) { - models.User.findOne({ - where: { - id: note.lastchangeuser + if (Object.keys(_notes).length <= 0) return; + updaterIsBusy = true; + var worker = childProcess.fork("./lib/workers/noteUpdater.js"); + if (config.debug) logger.info('note updater worker process started'); + worker.send({ + msg: 'update note', + notes: _notes + }); + worker.on('message', function (data) { + if (!data || !data.msg || !data.note) return; + var note = notes[data.note.id]; + if (!note) return; + switch(data.msg) { + case 'error': + for (var i = 0, l = note.socks.length; i < l; i++) { + var sock = note.socks[i]; + if (typeof sock !== 'undefined' && sock) { + setTimeout(function () { + sock.disconnect(true); + }, 0); } - }).then(function (user) { - if (!user) return callback(null, null); - note.lastchangeuserprofile = models.User.parseProfile(user.profile); - return finishUpdateNote(note, _note, callback); - }).catch(function (err) { - logger.error(err); - return callback(err, null); + } + break; + case 'note not found': + realtime.io.to(note.id).emit('info', { + code: 404 }); - } else { - return finishUpdateNote(note, _note, callback); - } - } else { - note.lastchangeuserprofile = null; - return finishUpdateNote(note, _note, callback); + break; + case 'check': + note.lastchangeuserprofile = data.note.lastchangeuserprofile; + note.updatetime = data.note.updatetime; + saverSleep = false; + emitCheck(note); + break; } - }).catch(function (err) { - logger.error(err); - return callback(err, null); }); -} -function finishUpdateNote(note, _note, callback) { - if (!note || !note.server) return callback(null, null); - var body = note.server.document; - var title = note.title = models.Note.parseNoteTitle(body); - title = LZString.compressToBase64(title); - body = LZString.compressToBase64(body); - var values = { - title: title, - content: body, - authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), - lastchangeuserId: note.lastchangeuser, - lastchangeAt: Date.now() - }; - _note.update(values).then(function (_note) { - saverSleep = false; - return callback(null, _note); - }).catch(function (err) { - logger.error(err); - return callback(err, null); + worker.on('close', function (code) { + updaterIsBusy = false; + if (config.debug) logger.info('note updater worker process exited with code ' + code); }); -} +}, 1000); //clean when user not in any rooms or user not in connected list var cleaner = setInterval(function () { async.each(Object.keys(users), function (key, callback) { @@ -192,16 +161,28 @@ var cleaner = setInterval(function () { }); }, 60000); var saverSleep = false; +var saverIsBusy = false; // save note revision in interval var saver = setInterval(function () { - if (saverSleep) return; - models.Revision.saveAllNotesRevision(function (err, notes) { - if (err) return logger.error('revision saver failed: ' + err); - if (notes && notes.length <= 0) { - saverSleep = true; - return; + if (saverSleep || saverIsBusy) return; + saverIsBusy = true; + var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js"); + if (config.debug) logger.info('note revision saver worker process started'); + worker.send({ + msg: 'save note revision' + }); + worker.on('message', function (data) { + if (!data || !data.msg) return; + switch(data.msg) { + case 'empty': + saverSleep = true; + break; } }); + worker.on('close', function (code) { + saverIsBusy = false; + if (config.debug) logger.info('note revision saver worker process exited with code ' + code); + }); }, 60000 * 5); function getStatus(callback) { @@ -543,7 +524,7 @@ function disconnect(socket) { // remove note in notes if no user inside if (Object.keys(note.users).length <= 0) { if (note.server.isDirty) { - updateNote(note, function (err, _note) { + noteUpdater.updateNote(note, function (err, _note) { if (err) return logger.error('disconnect note failed: ' + err); // clear server before delete to avoid memory leaks note.server.document = ""; diff --git a/lib/workers/noteRevisionSaver.js b/lib/workers/noteRevisionSaver.js new file mode 100644 index 00000000..b6b117a3 --- /dev/null +++ b/lib/workers/noteRevisionSaver.js @@ -0,0 +1,19 @@ +// core +var logger = require("../logger.js"); +var models = require("../models"); + +process.on('message', function (data) { + if (!data || !data.msg || data.msg !== 'save note revision') return process.exit(); + models.Revision.saveAllNotesRevision(function (err, notes) { + if (err) { + logger.error('note revision saver failed: ' + err); + return process.exit(); + } + if (notes && notes.length <= 0) { + process.send({ + msg: 'empty' + }); + } + process.exit(); + }); +}); \ No newline at end of file diff --git a/lib/workers/noteUpdater.js b/lib/workers/noteUpdater.js new file mode 100644 index 00000000..3fc4b1eb --- /dev/null +++ b/lib/workers/noteUpdater.js @@ -0,0 +1,101 @@ +// external modules +var async = require('async'); +var moment = require('moment'); +var LZString = require('lz-string'); + +// core +var config = require("../config.js"); +var logger = require("../logger.js"); +var models = require("../models"); + +process.on('message', function (data) { + if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit(); + var notes = data.notes; + async.each(Object.keys(notes), function (key, callback) { + var note = notes[key]; + if (config.debug) logger.info("note updater found dirty note: " + key); + updateNote(note, function(err, _note) { + if (!_note) { + process.send({ + msg: 'note not found', + note: note + }); + logger.error('note not found: ', note.id); + } + if (err || !_note) { + process.send({ + msg: 'error', + note: note + }); + return callback(err, null); + } + note.updatetime = moment(_note.lastchangeAt).valueOf(); + process.send({ + msg: 'check', + note: note + }); + return callback(null, null); + }); + }, function (err) { + if (err) logger.error('note updater error', err); + process.exit(); + }); +}); + +function updateNote(note, callback) { + models.Note.findOne({ + where: { + id: note.id + } + }).then(function (_note) { + if (!_note) return callback(null, null); + if (note.lastchangeuser) { + if (_note.lastchangeuserId != note.lastchangeuser) { + models.User.findOne({ + where: { + id: note.lastchangeuser + } + }).then(function (user) { + if (!user) return callback(null, null); + note.lastchangeuserprofile = models.User.parseProfile(user.profile); + return finishUpdateNote(note, _note, callback); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); + } else { + return finishUpdateNote(note, _note, callback); + } + } else { + note.lastchangeuserprofile = null; + return finishUpdateNote(note, _note, callback); + } + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} + +function finishUpdateNote(note, _note, callback) { + var body = note.document; + var title = note.title = models.Note.parseNoteTitle(body); + title = LZString.compressToBase64(title); + body = LZString.compressToBase64(body); + var values = { + title: title, + content: body, + authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), + lastchangeuserId: note.lastchangeuser, + lastchangeAt: Date.now() + }; + _note.update(values).then(function (_note) { + return callback(null, _note); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} + +module.exports = { + updateNote: updateNote +}; \ No newline at end of file -- cgit v1.2.3