From c671d54d6755b8e164428214216e13351f92d09c Mon Sep 17 00:00:00 2001 From: Wu Cheng-Han Date: Fri, 18 Nov 2016 12:09:58 +0800 Subject: Add dmp worker to leverage CPU intensive calculation to child process --- lib/models/revision.js | 181 +++++++++++++++++++++---------------------------- 1 file changed, 79 insertions(+), 102 deletions(-) (limited to 'lib/models') diff --git a/lib/models/revision.js b/lib/models/revision.js index 33fdd73c..8b8eba94 100644 --- a/lib/models/revision.js +++ b/lib/models/revision.js @@ -5,13 +5,53 @@ var Sequelize = require("sequelize"); var LZString = require('lz-string'); var async = require('async'); var moment = require('moment'); -var DiffMatchPatch = require('diff-match-patch'); -var dmp = new DiffMatchPatch(); +var childProcess = require('child_process'); +var shortId = require('shortid'); // core var config = require("../config.js"); var logger = require("../logger.js"); +var dmpWorker = createDmpWorker(); +var dmpCallbackCache = {}; + +function createDmpWorker() { + var worker = childProcess.fork("./lib/workers/dmpWorker.js", { + stdio: 'ignore' + }); + if (config.debug) logger.info('dmp worker process started'); + worker.on('message', function (data) { + if (!data || !data.msg || !data.cacheKey) { + return logger.error('dmp worker error: not enough data on message'); + } + var cacheKey = data.cacheKey; + switch(data.msg) { + case 'error': + dmpCallbackCache[cacheKey](data.error, null); + break; + case 'check': + dmpCallbackCache[cacheKey](null, data.result); + break; + } + delete dmpCallbackCache[cacheKey]; + }); + worker.on('close', function (code) { + dmpWorker = null; + if (config.debug) logger.info('dmp worker process exited with code ' + code); + }); + return worker; +} + +function sendDmpWorker(data, callback) { + if (!dmpWorker) dmpWorker = createDmpWorker(); + var cacheKey = Date.now() + '_' + shortId.generate(); + dmpCallbackCache[cacheKey] = callback; + data = Object.assign(data, { + cacheKey: cacheKey + }); + dmpWorker.send(data); +} + module.exports = function (sequelize, DataTypes) { var Revision = sequelize.define("Revision", { id: { @@ -43,19 +83,6 @@ module.exports = function (sequelize, DataTypes) { constraints: false }); }, - createPatch: function (lastDoc, CurrDoc) { - var ms_start = (new Date()).getTime(); - var diff = dmp.diff_main(lastDoc, CurrDoc); - dmp.diff_cleanupSemantic(diff); - var patch = dmp.patch_make(lastDoc, diff); - patch = dmp.patch_toText(patch); - var ms_end = (new Date()).getTime(); - if (config.debug) { - logger.info(patch); - logger.info((ms_end - ms_start) + 'ms'); - } - return patch; - }, getNoteRevisions: function (note, callback) { Revision.findAll({ where: { @@ -96,67 +123,11 @@ module.exports = function (sequelize, DataTypes) { order: '"createdAt" DESC' }).then(function (count) { if (count <= 0) return callback(null, null); - var ms_start = (new Date()).getTime(); - var startContent = null; - var lastPatch = []; - var applyPatches = []; - var authorship = []; - if (count <= Math.round(revisions.length / 2)) { - // start from top to target - for (var i = 0; i < count; i++) { - var revision = revisions[i]; - if (i == 0) { - startContent = LZString.decompressFromBase64(revision.content || revision.lastContent); - } - if (i != count - 1) { - var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch)); - applyPatches = applyPatches.concat(patch); - } - lastPatch = revision.patch; - authorship = revision.authorship; - } - // swap DIFF_INSERT and DIFF_DELETE to achieve unpatching - for (var i = 0, l = applyPatches.length; i < l; i++) { - for (var j = 0, m = applyPatches[i].diffs.length; j < m; j++) { - var diff = applyPatches[i].diffs[j]; - if (diff[0] == DiffMatchPatch.DIFF_INSERT) - diff[0] = DiffMatchPatch.DIFF_DELETE; - else if (diff[0] == DiffMatchPatch.DIFF_DELETE) - diff[0] = DiffMatchPatch.DIFF_INSERT; - } - } - } else { - // start from bottom to target - var l = revisions.length - 1; - for (var i = l; i >= count - 1; i--) { - var revision = revisions[i]; - if (i == l) { - startContent = LZString.decompressFromBase64(revision.lastContent); - authorship = revision.authorship; - } - if (revision.patch) { - var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch)); - applyPatches = applyPatches.concat(patch); - } - lastPatch = revision.patch; - authorship = revision.authorship; - } - } - try { - var finalContent = dmp.patch_apply(applyPatches, startContent)[0]; - } catch (err) { - return callback(err, null); - } - var data = { - content: finalContent, - patch: dmp.patch_fromText(LZString.decompressFromBase64(lastPatch)), - authorship: authorship ? JSON.parse(LZString.decompressFromBase64(authorship)) : null - }; - var ms_end = (new Date()).getTime(); - if (config.debug) { - logger.info((ms_end - ms_start) + 'ms'); - } - return callback(null, data); + sendDmpWorker({ + msg: 'get revision', + revisions: revisions, + count: count + }, callback); }).catch(function (err) { return callback(err, null); }); @@ -254,37 +225,43 @@ module.exports = function (sequelize, DataTypes) { var latestRevision = revisions[0]; var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent); var content = LZString.decompressFromBase64(note.content); - var patch = Revision.createPatch(lastContent, content); - if (!patch) { - // if patch is empty (means no difference) then just update the latest revision updated time - latestRevision.changed('updatedAt', true); - latestRevision.update({ - updatedAt: Date.now() - }).then(function (revision) { - Revision.finishSaveNoteRevision(note, revision, callback); - }).catch(function (err) { - return callback(err, null); - }); - } else { - Revision.create({ - noteId: note.id, - patch: LZString.compressToBase64(patch), - content: note.content, - length: LZString.decompressFromBase64(note.content).length, - authorship: note.authorship - }).then(function (revision) { - // clear last revision content to reduce db size + sendDmpWorker({ + msg: 'create patch', + lastDoc: lastContent, + currDoc: content, + }, function (err, patch) { + if (err) logger.error('save note revision error', err); + if (!patch) { + // if patch is empty (means no difference) then just update the latest revision updated time + latestRevision.changed('updatedAt', true); latestRevision.update({ - content: null - }).then(function () { + updatedAt: Date.now() + }).then(function (revision) { Revision.finishSaveNoteRevision(note, revision, callback); }).catch(function (err) { return callback(err, null); }); - }).catch(function (err) { - return callback(err, null); - }); - } + } else { + Revision.create({ + noteId: note.id, + patch: LZString.compressToBase64(patch), + content: note.content, + length: LZString.decompressFromBase64(note.content).length, + authorship: note.authorship + }).then(function (revision) { + // clear last revision content to reduce db size + latestRevision.update({ + content: null + }).then(function () { + Revision.finishSaveNoteRevision(note, revision, callback); + }).catch(function (err) { + return callback(err, null); + }); + }).catch(function (err) { + return callback(err, null); + }); + } + }); } }).catch(function (err) { return callback(err, null); -- cgit v1.2.3