diff options
author | Wu Cheng-Han | 2016-11-07 21:30:53 +0800 |
---|---|---|
committer | Wu Cheng-Han | 2016-11-07 21:30:53 +0800 |
commit | b5920fbbd1ceb595456da18f7d458b63d1a960bf (patch) | |
tree | 6d2b97cce6503728db28a7acea23028c9c47edb4 /lib | |
parent | 4ccfdfa538d2fd7e9ee63e937435a7f6fc622c12 (diff) |
Add workers for history to leverage CPU intensive work loading
Diffstat (limited to '')
-rw-r--r-- | lib/history.js | 79 | ||||
-rw-r--r-- | lib/workers/historyUpdater.js | 66 |
2 files changed, 99 insertions, 46 deletions
diff --git a/lib/history.js b/lib/history.js index 4a3bbe1e..bdc922d7 100644 --- a/lib/history.js +++ b/lib/history.js @@ -2,6 +2,7 @@ //external modules var async = require('async'); var moment = require('moment'); +var childProcess = require('child_process'); //core var config = require("./config.js"); @@ -9,6 +10,9 @@ var logger = require("./logger.js"); var response = require("./response.js"); var models = require("./models"); +// workers +var historyUpdater = require("./workers/historyUpdater"); + //public var History = { historyGet: historyGet, @@ -20,49 +24,50 @@ var History = { var caches = {}; //update when the history is dirty +var updaterIsBusy = false; var updater = setInterval(function () { + if (updaterIsBusy) return; var deleted = []; - async.each(Object.keys(caches), function (key, callback) { + var _caches = {}; + Object.keys(caches).forEach(function (key) { var cache = caches[key]; if (cache.isDirty) { - if (config.debug) logger.info("history updater found dirty history: " + key); - var history = parseHistoryToArray(cache.history); - finishUpdateHistory(key, history, function (err, count) { - if (err) return callback(err, null); - if (!count) return callback(null, null); - cache.isDirty = false; - cache.updateAt = Date.now(); - return callback(null, null); - }); + _caches[key] = cache.history; + cache.isDirty = false; } else { if (moment().isAfter(moment(cache.updateAt).add(5, 'minutes'))) { deleted.push(key); } - return callback(null, null); } - }, function (err) { - if (err) return logger.error('history updater error', err); }); // delete specified caches for (var i = 0, l = deleted.length; i < l; i++) { caches[deleted[i]].history = {}; delete caches[deleted[i]]; } -}, 1000); - -function finishUpdateHistory(userid, history, callback) { - models.User.update({ - history: JSON.stringify(history) - }, { - where: { - id: userid + if (Object.keys(_caches).length <= 0) return; + updaterIsBusy = true; + var worker = childProcess.fork("./lib/workers/historyUpdater.js"); + if (config.debug) logger.info('history updater worker process started'); + worker.send({ + msg: 'update history', + caches: _caches + }); + worker.on('message', function (data) { + if (!data || !data.msg || !data.userid) return; + var cache = caches[data.userid]; + if (!cache) return; + switch(data.msg) { + case 'check': + cache.updateAt = Date.now(); + break; } - }).then(function (count) { - return callback(null, count); - }).catch(function (err) { - return callback(err, null); }); -} + worker.on('close', function (code) { + updaterIsBusy = false; + if (config.debug) logger.info('history updater worker process exited with code ' + code); + }); +}, 1000); function isReady() { var dirtyCount = 0; @@ -101,7 +106,7 @@ function getHistory(userid, callback) { } function setHistory(userid, history) { - if (Array.isArray(history)) history = parseHistoryToObject(history); + if (Array.isArray(history)) history = historyUpdater.parseHistoryToObject(history); if (!caches[userid]) { caches[userid] = { history: {}, @@ -130,31 +135,13 @@ function updateHistory(userid, noteId, document) { } } -function parseHistoryToArray(history) { - var _history = []; - Object.keys(history).forEach(function (key) { - var item = history[key]; - _history.push(item); - }); - return _history; -} - -function parseHistoryToObject(history) { - var _history = {}; - for (var i = 0, l = history.length; i < l; i++) { - var item = history[i]; - _history[item.id] = item; - } - return _history; -} - function historyGet(req, res) { if (req.isAuthenticated()) { getHistory(req.user.id, function (err, history) { if (err) return response.errorInternalError(res); if (!history) return response.errorNotFound(res); res.send({ - history: parseHistoryToArray(history) + history: historyUpdater.parseHistoryToArray(history) }); }); } else { diff --git a/lib/workers/historyUpdater.js b/lib/workers/historyUpdater.js new file mode 100644 index 00000000..df80e92d --- /dev/null +++ b/lib/workers/historyUpdater.js @@ -0,0 +1,66 @@ +// external modules +var async = require('async'); + +// core +var config = require("../config.js"); +var logger = require("../logger.js"); +var models = require("../models"); + +process.on('message', function (data) { + if (!data || !data.msg || data.msg !== 'update history' || !data.caches) return process.exit(); + var caches = data.caches; + async.each(Object.keys(caches), function (key, callback) { + var cache = caches[key]; + if (config.debug) logger.info("history updater found dirty history: " + key); + var history = parseHistoryToArray(cache); + finishUpdateHistory(key, history, function (err, count) { + if (err) return callback(err, null); + if (!count) return callback(null, null); + process.send({ + msg: 'check', + userid: key + }); + return callback(null, null); + }); + }, function (err) { + if (err) logger.error('history updater error', err); + process.exit(); + }); +}); + +function finishUpdateHistory(userid, history, callback) { + models.User.update({ + history: JSON.stringify(history) + }, { + where: { + id: userid + } + }).then(function (count) { + return callback(null, count); + }).catch(function (err) { + return callback(err, null); + }); +} + +function parseHistoryToArray(history) { + var _history = []; + Object.keys(history).forEach(function (key) { + var item = history[key]; + _history.push(item); + }); + return _history; +} + +function parseHistoryToObject(history) { + var _history = {}; + for (var i = 0, l = history.length; i < l; i++) { + var item = history[i]; + _history[item.id] = item; + } + return _history; +} + +module.exports = { + parseHistoryToArray: parseHistoryToArray, + parseHistoryToObject: parseHistoryToObject +};
\ No newline at end of file |