/*
Copyright (C) 2016 PencilBlue, LLC
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';
//dependencies
var util = require('../../util.js');
module.exports = function JobRunnerModule(pb) {
/**
* A base interface that system jobs can implement. The premise is that every
* job will have an ID and a name. The job is initialized by calling the
* "init" function and started by calling the "run" function. The specific
* implementation is also provided with functions to report the start, update,
* and end of the job run. The advantage to extending this prototype is that
* the provided functions allow for creating a persisted record of the job. In
* addition, log statements generated by the job are also persisted (as long as
* the provided "log" function is called).
* @class JobRunner
* @constructor
*/
function JobRunner(){
/**
* An instace of DAO to provide direct access to the DB if it is needed.
* @property dao
* @type {DAO}
*/
this.dao = null;
/**
* Holds the unique identifier for the job
* @property id
* @type {String}
*/
this.id = null;
/**
* The percentage of the overall work that this job accounts for. If this
* job is run by itself then the value should be 1. This means that 100%
* of the job is completed by this job. If, for example, the value is .333
* then it is assumed that this job accounts for 33% or one third of the
* over all work necessary to complete the job. This is handy when a large
* job is made up of smaller jobs. This value will assist in allowing the
* jobs to calculate their update increments. The number must be a value
* between 0 (exclusive) & 1 (inclusive).
* @property taskFactor
* @type {Number}
*/
this.chunkOfWorkPercentage = 1;
}
/**
* The name of the persistence entity that contains the log statements for the
* job
* @private
* @static
* @property JOB_LOG_STORE_NAME
* @type {String}
*/
var JOB_LOG_STORE_NAME = 'job_log';
/**
* The name of the persistence entity that contains the job descriptor
* @private
* @static
* @property JOB_STORE_NAME
* @type {String}
*/
var JOB_STORE_NAME = 'job_run';
/**
* The status code for a job that is in progress
* @private
* @static
* @property DEFAULT_START_STATUS
* @type {String}
*/
var DEFAULT_START_STATUS = 'RUNNING';
/**
* The status code for a job that has completed successfully
* @private
* @static
* @property DEFAULT_DONE_STATUS
* @type {String}
*/
var DEFAULT_DONE_STATUS = 'COMPLETED';
/**
* The status code for a job that has generated a fatal error
* @private
* @static
* @property DEFAULT_ERROR_STATUS
* @type {String}
*/
var DEFAULT_ERROR_STATUS = 'ERRORED';
/**
* The initialization function sets the job's name and ID as well as provide an
* instace of DAO.
* @method init
* @param {String} [name] The job's name
* @param {String} [jobId] The job's unique identifier
*/
JobRunner.prototype.init = function(name, jobId) {
this.dao = new pb.DAO();
this.id = jobId || util.uniqueId();
this.name = name || this.id;
return this;
};
/**
* Retrieves the unique identifier for the job
* @method getId
* @return {String} The job ID
*/
JobRunner.prototype.getId = function() {
return this.id;
};
/**
* Sets the portion of the over arching job that this job instance will
* contribute once complete.
* @method setChunkOfWorkPercentage
* @param {Number} chunkOfWorkPercentage
* @return {JobRunner}
*/
JobRunner.prototype.setChunkOfWorkPercentage = function(chunkOfWorkPercentage) {
if (isNaN(chunkOfWorkPercentage) || chunkOfWorkPercentage <= 0 || chunkOfWorkPercentage > 1) {
throw new Error('The chunkOfWorkPercentage must be a value between 0 (exclusive) and 1 (inclusive)');
}
this.chunkOfWorkPercentage = chunkOfWorkPercentage;
return this;
};
/**
* Retrieves the chunk of work percentage
* @method getChunkOfWorkPercentage
* @return {Number}
*/
JobRunner.prototype.getChunkOfWorkPercentage = function() {
return this.chunkOfWorkPercentage;
};
/**
* Call this function once to start the job. The job will execute the callback
* upon completion.
* @method run
* @param {Function} cb A callback that provides two parameters: The first is
* any error that was generated and the second is the implementation specific
* result of the job.
*/
JobRunner.prototype.run = function(/*cb*/) {
throw new Error('This function must be overriden by an extending prototype');
};
/**
* Logs a message to the system logger as well as to the persistence layer. The
* function takes a variable number of arguments. A string message/pattern
* followed by the variables to fill in with that data. See util.format or the
* implementation for Winston loggers.
* @method log
* @param {String} message The message or pattern to log
*/
JobRunner.prototype.log = function() {
var args = Array.prototype.splice.call(arguments, 0);
if (args.length > 0) {
args[0] = this.name+': '+args[0];
var meta = [];
var message = args[0];
if (args.length > 1) {
message = util.format.apply(util, args);
}
var statement = {
object_type: JOB_LOG_STORE_NAME,
job_id: this.id,
worker_id: pb.system.getWorkerId(),
name: this.name,
message: message,
metadata: meta
};
this.dao.save(statement, util.cb);
pb.log.debug.apply(pb.log, args);
}
};
/**
* To be called once by the extending implmentation to mark the start of the
* job. The function persists the job record and makes it available to future
* calls to onUpdate or onComplete.
* @method onStart
* @param {String} [status='RUNNING'] The starting status of the job
*/
JobRunner.prototype.onStart = function(status) {
var job = pb.DAO.getIdWhere(this.getId());
job.object_type = JOB_STORE_NAME;
job.name = this.name;
job.status = status || DEFAULT_START_STATUS;
job.progress = 0;
this.dao.save(job, function(err/*, result*/) {
if (util.isError(err)) {
pb.log.error('JobRunner: Failed to mark job as started %s', err.stack);
}
});
};
/**
* To be called by the extending implmentation when progress has been made.
* The incremental amount of progress should be provided keeping in mind that
* the overall progress should not exceed 100. Optionally, the status
* parameter may also be included.
* @method onUpdate
* @param {Integer} progressIncrement
* @param {String} [status]
*/
JobRunner.prototype.onUpdate = function(progressIncrement, status) {
this.log('Updating job [%s:%s] by %s percent with status: %s', this.getId(), this.name, Math.floor(progressIncrement), status ? status : '');
var query = pb.DAO.getIdWhere(this.getId());
var updates = {};
if (pb.validation.isNum(progressIncrement, true) && progressIncrement >= 0) {
updates.$inc = {progress: progressIncrement};
}
if (pb.validation.isNonEmptyStr(status, true)) {
updates.$set = {status: status};
}
//ensure we need to update
if (updates !== {}) {
this.dao.updateFields(JOB_STORE_NAME, query, updates, function(err/*, result*/) {
if (util.isError(err)) {
pb.log.error('JobRunner: Failed to update job progress - ', err.stack);
}
});
}
};
/**
* Called once by the extending implementation when the job has completed
* execution whether that be successful completion or by error.
* @method onCompleted
* @param {String} [status] The final status of the job. If not provided the
* status will default to 'COMPLETED' or 'ERRORED' when an error is provided as
* the second parameter.
* @param {Error} err The error, if any, that was generated by the job's
* execution
*/
JobRunner.prototype.onCompleted = function(status, err) {
if (util.isError(status)) {
err = status;
status = DEFAULT_ERROR_STATUS;
}
else if (!status) {
status = DEFAULT_DONE_STATUS;
}
//log result
this.log('Setting job [%s:%s] as completed with status: %s', this.getId(), this.name, status);
//persist result
var query = pb.DAO.getIdWhere(this.getId());
var sets = {
$set: {
status: status,
progress: 100,
error: err ? err.stack : undefined
}
};
this.dao.updateFields(JOB_STORE_NAME, query, sets, function(err/*, result*/) {
if (util.isError(err)) {
pb.log.error('JobRunner: Failed to update job as completed - %s', err.stack);
}
});
};
//exports
return JobRunner;
};