diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/history.js | 79 | ||||
-rw-r--r-- | lib/realtime.js | 161 | ||||
-rw-r--r-- | lib/workers/historyUpdater.js | 66 | ||||
-rw-r--r-- | lib/workers/noteRevisionSaver.js | 19 | ||||
-rw-r--r-- | lib/workers/noteUpdater.js | 101 |
5 files changed, 136 insertions, 290 deletions
diff --git a/lib/history.js b/lib/history.js index bdc922d7..4a3bbe1e 100644 --- a/lib/history.js +++ b/lib/history.js @@ -2,7 +2,6 @@ //external modules var async = require('async'); var moment = require('moment'); -var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -10,9 +9,6 @@ var logger = require("./logger.js"); var response = require("./response.js"); var models = require("./models"); -// workers -var historyUpdater = require("./workers/historyUpdater"); - //public var History = { historyGet: historyGet, @@ -24,50 +20,49 @@ var History = { var caches = {}; //update when the history is dirty -var updaterIsBusy = false; var updater = setInterval(function () { - if (updaterIsBusy) return; var deleted = []; - var _caches = {}; - Object.keys(caches).forEach(function (key) { + async.each(Object.keys(caches), function (key, callback) { var cache = caches[key]; if (cache.isDirty) { - _caches[key] = cache.history; - cache.isDirty = false; + if (config.debug) logger.info("history updater found dirty history: " + key); + var history = parseHistoryToArray(cache.history); + finishUpdateHistory(key, history, function (err, count) { + if (err) return callback(err, null); + if (!count) return callback(null, null); + cache.isDirty = false; + cache.updateAt = Date.now(); + return callback(null, null); + }); } else { if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { deleted.push(key); } + return callback(null, null); } + }, function (err) { + if (err) return logger.error('history updater error', err); }); // delete specified caches for (var i = 0, l = deleted.length; i < l; i++) { caches[deleted[i]].history = {}; delete caches[deleted[i]]; } - if (Object.keys(_caches).length <= 0) return; - updaterIsBusy = true; - var worker = childProcess.fork("./lib/workers/historyUpdater.js"); - if (config.debug) logger.info('history updater worker process started'); - worker.send({ - msg: 'update history', - caches: _caches - }); - worker.on('message', function (data) { - if (!data || !data.msg || !data.userid) return; - var cache = caches[data.userid]; - if (!cache) return; - switch(data.msg) { - case 'check': - cache.updateAt = Date.now(); - break; +}, 1000); + +function finishUpdateHistory(userid, history, callback) { + models.User.update({ + history: JSON.stringify(history) + }, { + where: { + id: userid } + }).then(function (count) { + return callback(null, count); + }).catch(function (err) { + return callback(err, null); }); - worker.on('close', function (code) { - updaterIsBusy = false; - if (config.debug) logger.info('history updater worker process exited with code ' + code); - }); -}, 1000); +} function isReady() { var dirtyCount = 0; @@ -106,7 +101,7 @@ function getHistory(userid, callback) { } function setHistory(userid, history) { - if (Array.isArray(history)) history = historyUpdater.parseHistoryToObject(history); + if (Array.isArray(history)) history = parseHistoryToObject(history); if (!caches[userid]) { caches[userid] = { history: {}, @@ -135,13 +130,31 @@ function updateHistory(userid, noteId, document) { } } +function parseHistoryToArray(history) { + var _history = []; + Object.keys(history).forEach(function (key) { + var item = history[key]; + _history.push(item); + }); + return _history; +} + +function parseHistoryToObject(history) { + var _history = {}; + for (var i = 0, l = history.length; i < l; i++) { + var item = history[i]; + _history[item.id] = item; + } + return _history; +} + function historyGet(req, res) { if (req.isAuthenticated()) { getHistory(req.user.id, function (err, history) { if (err) return response.errorInternalError(res); if (!history) return response.errorNotFound(res); res.send({ - history: historyUpdater.parseHistoryToArray(history) + history: parseHistoryToArray(history) }); }); } else { diff --git a/lib/realtime.js b/lib/realtime.js index 5d769e7d..b50e05b2 100644 --- a/lib/realtime.js +++ b/lib/realtime.js @@ -9,7 +9,6 @@ var randomcolor = require("randomcolor"); var Chance = require('chance'), chance = new Chance(); var moment = require('moment'); -var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -20,9 +19,6 @@ var models = require("./models"); //ot var ot = require("./ot/index.js"); -// workers -var noteUpdater = require("./workers/noteUpdater"); - //public var realtime = { io: null, @@ -83,62 +79,97 @@ function emitCheck(note) { var users = {}; var notes = {}; //update when the note is dirty -var updaterIsBusy = false; var updater = setInterval(function () { - if (updaterIsBusy) return; - var _notes = {}; - Object.keys(notes).forEach(function (key) { + async.each(Object.keys(notes), function (key, callback) { var note = notes[key]; - if (!note.server || !note.server.isDirty) return; - _notes[key] = { - id: note.id, - lastchangeuser: note.lastchangeuser, - authorship: note.authorship, - document: note.server.document - }; - note.server.isDirty = false; - }); - if (Object.keys(_notes).length <= 0) return; - updaterIsBusy = true; - var worker = childProcess.fork("./lib/workers/noteUpdater.js"); - if (config.debug) logger.info('note updater worker process started'); - worker.send({ - msg: 'update note', - notes: _notes - }); - worker.on('message', function (data) { - if (!data || !data.msg || !data.note) return; - var note = notes[data.note.id]; - if (!note) return; - switch(data.msg) { - case 'error': - for (var i = 0, l = note.socks.length; i < l; i++) { - var sock = note.socks[i]; - if (typeof sock !== 'undefined' && sock) { - setTimeout(function () { - sock.disconnect(true); - }, 0); + if (note.server.isDirty) { + if (config.debug) logger.info("updater found dirty note: " + key); + updateNote(note, function(err, _note) { + // handle when note already been clean up + if (!notes[key] || !notes[key].server) return callback(null, null); + if (!_note) { + realtime.io.to(note.id).emit('info', { + code: 404 + }); + logger.error('note not found: ', note.id); + } + if (err || !_note) { + for (var i = 0, l = note.socks.length; i < l; i++) { + var sock = note.socks[i]; + if (typeof sock !== 'undefined' && sock) { + setTimeout(function () { + sock.disconnect(true); + }, 0); + } } + return callback(err, null); } - break; - case 'note not found': - realtime.io.to(note.id).emit('info', { - code: 404 - }); - break; - case 'check': - note.lastchangeuserprofile = data.note.lastchangeuserprofile; - note.updatetime = data.note.updatetime; - saverSleep = false; + note.server.isDirty = false; + note.updatetime = moment(_note.lastchangeAt).valueOf(); emitCheck(note); - break; + return callback(null, null); + }); + } else { + return callback(null, null); } - }); - worker.on('close', function (code) { - updaterIsBusy = false; - if (config.debug) logger.info('note updater worker process exited with code ' + code); + }, function (err) { + if (err) return logger.error('updater error', err); }); }, 1000); +function updateNote(note, callback) { + models.Note.findOne({ + where: { + id: note.id + } + }).then(function (_note) { + if (!_note) return callback(null, null); + if (note.lastchangeuser) { + if (_note.lastchangeuserId != note.lastchangeuser) { + models.User.findOne({ + where: { + id: note.lastchangeuser + } + }).then(function (user) { + if (!user) return callback(null, null); + note.lastchangeuserprofile = models.User.parseProfile(user.profile); + return finishUpdateNote(note, _note, callback); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); + } else { + return finishUpdateNote(note, _note, callback); + } + } else { + note.lastchangeuserprofile = null; + return finishUpdateNote(note, _note, callback); + } + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} +function finishUpdateNote(note, _note, callback) { + if (!note || !note.server) return callback(null, null); + var body = note.server.document; + var title = note.title = models.Note.parseNoteTitle(body); + title = LZString.compressToBase64(title); + body = LZString.compressToBase64(body); + var values = { + title: title, + content: body, + authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), + lastchangeuserId: note.lastchangeuser, + lastchangeAt: Date.now() + }; + _note.update(values).then(function (_note) { + saverSleep = false; + return callback(null, _note); + }).catch(function (err) { + logger.error(err); + return callback(err, null); + }); +} //clean when user not in any rooms or user not in connected list var cleaner = setInterval(function () { async.each(Object.keys(users), function (key, callback) { @@ -161,28 +192,16 @@ var cleaner = setInterval(function () { }); }, 60000); var saverSleep = false; -var saverIsBusy = false; // save note revision in interval var saver = setInterval(function () { - if (saverSleep || saverIsBusy) return; - saverIsBusy = true; - var worker = childProcess.fork("./lib/workers/noteRevisionSaver.js"); - if (config.debug) logger.info('note revision saver worker process started'); - worker.send({ - msg: 'save note revision' - }); - worker.on('message', function (data) { - if (!data || !data.msg) return; - switch(data.msg) { - case 'empty': - saverSleep = true; - break; + if (saverSleep) return; + models.Revision.saveAllNotesRevision(function (err, notes) { + if (err) return logger.error('revision saver failed: ' + err); + if (notes && notes.length <= 0) { + saverSleep = true; + return; } }); - worker.on('close', function (code) { - saverIsBusy = false; - if (config.debug) logger.info('note revision saver worker process exited with code ' + code); - }); }, 60000 * 5); function getStatus(callback) { @@ -524,7 +543,7 @@ function disconnect(socket) { // remove note in notes if no user inside if (Object.keys(note.users).length <= 0) { if (note.server.isDirty) { - noteUpdater.updateNote(note, function (err, _note) { + updateNote(note, function (err, _note) { if (err) return logger.error('disconnect note failed: ' + err); // clear server before delete to avoid memory leaks note.server.document = ""; diff --git a/lib/workers/historyUpdater.js b/lib/workers/historyUpdater.js deleted file mode 100644 index df80e92d..00000000 --- a/lib/workers/historyUpdater.js +++ /dev/null @@ -1,66 +0,0 @@ -// external modules -var async = require('async'); - -// core -var config = require("../config.js"); -var logger = require("../logger.js"); -var models = require("../models"); - -process.on('message', function (data) { - if (!data || !data.msg || data.msg !== 'update history' || !data.caches) return process.exit(); - var caches = data.caches; - async.each(Object.keys(caches), function (key, callback) { - var cache = caches[key]; - if (config.debug) logger.info("history updater found dirty history: " + key); - var history = parseHistoryToArray(cache); - finishUpdateHistory(key, history, function (err, count) { - if (err) return callback(err, null); - if (!count) return callback(null, null); - process.send({ - msg: 'check', - userid: key - }); - return callback(null, null); - }); - }, function (err) { - if (err) logger.error('history updater error', err); - process.exit(); - }); -}); - -function finishUpdateHistory(userid, history, callback) { - models.User.update({ - history: JSON.stringify(history) - }, { - where: { - id: userid - } - }).then(function (count) { - return callback(null, count); - }).catch(function (err) { - return callback(err, null); - }); -} - -function parseHistoryToArray(history) { - var _history = []; - Object.keys(history).forEach(function (key) { - var item = history[key]; - _history.push(item); - }); - return _history; -} - -function parseHistoryToObject(history) { - var _history = {}; - for (var i = 0, l = history.length; i < l; i++) { - var item = history[i]; - _history[item.id] = item; - } - return _history; -} - -module.exports = { - parseHistoryToArray: parseHistoryToArray, - parseHistoryToObject: parseHistoryToObject -};
\ No newline at end of file diff --git a/lib/workers/noteRevisionSaver.js b/lib/workers/noteRevisionSaver.js deleted file mode 100644 index b6b117a3..00000000 --- a/lib/workers/noteRevisionSaver.js +++ /dev/null @@ -1,19 +0,0 @@ -// core -var logger = require("../logger.js"); -var models = require("../models"); - -process.on('message', function (data) { - if (!data || !data.msg || data.msg !== 'save note revision') return process.exit(); - models.Revision.saveAllNotesRevision(function (err, notes) { - if (err) { - logger.error('note revision saver failed: ' + err); - return process.exit(); - } - if (notes && notes.length <= 0) { - process.send({ - msg: 'empty' - }); - } - process.exit(); - }); -});
\ No newline at end of file diff --git a/lib/workers/noteUpdater.js b/lib/workers/noteUpdater.js deleted file mode 100644 index 3fc4b1eb..00000000 --- a/lib/workers/noteUpdater.js +++ /dev/null @@ -1,101 +0,0 @@ -// external modules -var async = require('async'); -var moment = require('moment'); -var LZString = require('lz-string'); - -// core -var config = require("../config.js"); -var logger = require("../logger.js"); -var models = require("../models"); - -process.on('message', function (data) { - if (!data || !data.msg || data.msg !== 'update note' || !data.notes) return process.exit(); - var notes = data.notes; - async.each(Object.keys(notes), function (key, callback) { - var note = notes[key]; - if (config.debug) logger.info("note updater found dirty note: " + key); - updateNote(note, function(err, _note) { - if (!_note) { - process.send({ - msg: 'note not found', - note: note - }); - logger.error('note not found: ', note.id); - } - if (err || !_note) { - process.send({ - msg: 'error', - note: note - }); - return callback(err, null); - } - note.updatetime = moment(_note.lastchangeAt).valueOf(); - process.send({ - msg: 'check', - note: note - }); - return callback(null, null); - }); - }, function (err) { - if (err) logger.error('note updater error', err); - process.exit(); - }); -}); - -function updateNote(note, callback) { - models.Note.findOne({ - where: { - id: note.id - } - }).then(function (_note) { - if (!_note) return callback(null, null); - if (note.lastchangeuser) { - if (_note.lastchangeuserId != note.lastchangeuser) { - models.User.findOne({ - where: { - id: note.lastchangeuser - } - }).then(function (user) { - if (!user) return callback(null, null); - note.lastchangeuserprofile = models.User.parseProfile(user.profile); - return finishUpdateNote(note, _note, callback); - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); - } else { - return finishUpdateNote(note, _note, callback); - } - } else { - note.lastchangeuserprofile = null; - return finishUpdateNote(note, _note, callback); - } - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); -} - -function finishUpdateNote(note, _note, callback) { - var body = note.document; - var title = note.title = models.Note.parseNoteTitle(body); - title = LZString.compressToBase64(title); - body = LZString.compressToBase64(body); - var values = { - title: title, - content: body, - authorship: LZString.compressToBase64(JSON.stringify(note.authorship)), - lastchangeuserId: note.lastchangeuser, - lastchangeAt: Date.now() - }; - _note.update(values).then(function (_note) { - return callback(null, _note); - }).catch(function (err) { - logger.error(err); - return callback(err, null); - }); -} - -module.exports = { - updateNote: updateNote -};
\ No newline at end of file |