- /*
- Copyright (C) 2016 PencilBlue, LLC
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- 'use strict';
-
- //dependencies
- var util = require('../../util.js');
-
- module.exports = function JobRunnerModule(pb) {
-
- /**
- * A base interface that system jobs can implement. The premise is that every
- * job will have an ID and a name. The job is initialized by calling the
- * "init" function and started by calling the "run" function. The specific
- * implementation is also provided with functions to report the start, update,
- * and end of the job run. The advantage to extending this prototype is that
- * the provided functions allow for creating a persisted record of the job. In
- * addition, log statements generated by the job are also persisted (as long as
- * the provided "log" function is called).
- * @class JobRunner
- * @constructor
- */
- function JobRunner(){
-
- /**
- * An instace of DAO to provide direct access to the DB if it is needed.
- * @property dao
- * @type {DAO}
- */
- this.dao = null;
-
- /**
- * Holds the unique identifier for the job
- * @property id
- * @type {String}
- */
- this.id = null;
-
- /**
- * The percentage of the overall work that this job accounts for. If this
- * job is run by itself then the value should be 1. This means that 100%
- * of the job is completed by this job. If, for example, the value is .333
- * then it is assumed that this job accounts for 33% or one third of the
- * over all work necessary to complete the job. This is handy when a large
- * job is made up of smaller jobs. This value will assist in allowing the
- * jobs to calculate their update increments. The number must be a value
- * between 0 (exclusive) & 1 (inclusive).
- * @property taskFactor
- * @type {Number}
- */
- this.chunkOfWorkPercentage = 1;
- }
-
- /**
- * The name of the persistence entity that contains the log statements for the
- * job
- * @private
- * @static
- * @property JOB_LOG_STORE_NAME
- * @type {String}
- */
- var JOB_LOG_STORE_NAME = 'job_log';
-
- /**
- * The name of the persistence entity that contains the job descriptor
- * @private
- * @static
- * @property JOB_STORE_NAME
- * @type {String}
- */
- var JOB_STORE_NAME = 'job_run';
-
- /**
- * The status code for a job that is in progress
- * @private
- * @static
- * @property DEFAULT_START_STATUS
- * @type {String}
- */
- var DEFAULT_START_STATUS = 'RUNNING';
-
- /**
- * The status code for a job that has completed successfully
- * @private
- * @static
- * @property DEFAULT_DONE_STATUS
- * @type {String}
- */
- var DEFAULT_DONE_STATUS = 'COMPLETED';
-
- /**
- * The status code for a job that has generated a fatal error
- * @private
- * @static
- * @property DEFAULT_ERROR_STATUS
- * @type {String}
- */
- var DEFAULT_ERROR_STATUS = 'ERRORED';
-
- /**
- * The initialization function sets the job's name and ID as well as provide an
- * instace of DAO.
- * @method init
- * @param {String} [name] The job's name
- * @param {String} [jobId] The job's unique identifier
- */
- JobRunner.prototype.init = function(name, jobId) {
-
- this.dao = new pb.DAO();
- this.id = jobId || util.uniqueId();
- this.name = name || this.id;
- return this;
- };
-
- /**
- * Retrieves the unique identifier for the job
- * @method getId
- * @return {String} The job ID
- */
- JobRunner.prototype.getId = function() {
- return this.id;
- };
-
- /**
- * Sets the portion of the over arching job that this job instance will
- * contribute once complete.
- * @method setChunkOfWorkPercentage
- * @param {Number} chunkOfWorkPercentage
- * @return {JobRunner}
- */
- JobRunner.prototype.setChunkOfWorkPercentage = function(chunkOfWorkPercentage) {
- if (isNaN(chunkOfWorkPercentage) || chunkOfWorkPercentage <= 0 || chunkOfWorkPercentage > 1) {
- throw new Error('The chunkOfWorkPercentage must be a value between 0 (exclusive) and 1 (inclusive)');
- }
-
- this.chunkOfWorkPercentage = chunkOfWorkPercentage;
- return this;
- };
-
- /**
- * Retrieves the chunk of work percentage
- * @method getChunkOfWorkPercentage
- * @return {Number}
- */
- JobRunner.prototype.getChunkOfWorkPercentage = function() {
- return this.chunkOfWorkPercentage;
- };
-
- /**
- * Call this function once to start the job. The job will execute the callback
- * upon completion.
- * @method run
- * @param {Function} cb A callback that provides two parameters: The first is
- * any error that was generated and the second is the implementation specific
- * result of the job.
- */
- JobRunner.prototype.run = function(/*cb*/) {
- throw new Error('This function must be overriden by an extending prototype');
- };
-
- /**
- * Logs a message to the system logger as well as to the persistence layer. The
- * function takes a variable number of arguments. A string message/pattern
- * followed by the variables to fill in with that data. See util.format or the
- * implementation for Winston loggers.
- * @method log
- * @param {String} message The message or pattern to log
- */
- JobRunner.prototype.log = function() {
-
- var args = Array.prototype.splice.call(arguments, 0);
- if (args.length > 0) {
- args[0] = this.name+': '+args[0];
-
- var meta = [];
- var message = args[0];
- if (args.length > 1) {
- message = util.format.apply(util, args);
- }
- var statement = {
- object_type: JOB_LOG_STORE_NAME,
- job_id: this.id,
- worker_id: pb.system.getWorkerId(),
- name: this.name,
- message: message,
- metadata: meta
- };
- this.dao.save(statement, util.cb);
- pb.log.debug.apply(pb.log, args);
- }
- };
-
- /**
- * To be called once by the extending implmentation to mark the start of the
- * job. The function persists the job record and makes it available to future
- * calls to onUpdate or onComplete.
- * @method onStart
- * @param {String} [status='RUNNING'] The starting status of the job
- */
- JobRunner.prototype.onStart = function(status) {
- var job = pb.DAO.getIdWhere(this.getId());
- job.object_type = JOB_STORE_NAME;
- job.name = this.name;
- job.status = status || DEFAULT_START_STATUS;
- job.progress = 0;
- this.dao.save(job, function(err/*, result*/) {
- if (util.isError(err)) {
- pb.log.error('JobRunner: Failed to mark job as started %s', err.stack);
- }
- });
- };
-
- /**
- * To be called by the extending implmentation when progress has been made.
- * The incremental amount of progress should be provided keeping in mind that
- * the overall progress should not exceed 100. Optionally, the status
- * parameter may also be included.
- * @method onUpdate
- * @param {Integer} progressIncrement
- * @param {String} [status]
- */
- JobRunner.prototype.onUpdate = function(progressIncrement, status) {
- this.log('Updating job [%s:%s] by %s percent with status: %s', this.getId(), this.name, Math.floor(progressIncrement), status ? status : '');
-
- var query = pb.DAO.getIdWhere(this.getId());
- var updates = {};
- if (pb.validation.isNum(progressIncrement, true) && progressIncrement >= 0) {
- updates.$inc = {progress: progressIncrement};
- }
- if (pb.validation.isNonEmptyStr(status, true)) {
- updates.$set = {status: status};
- }
-
- //ensure we need to update
- if (updates !== {}) {
-
- this.dao.updateFields(JOB_STORE_NAME, query, updates, function(err/*, result*/) {
- if (util.isError(err)) {
- pb.log.error('JobRunner: Failed to update job progress - ', err.stack);
- }
- });
- }
- };
-
- /**
- * Called once by the extending implementation when the job has completed
- * execution whether that be successful completion or by error.
- * @method onCompleted
- * @param {String} [status] The final status of the job. If not provided the
- * status will default to 'COMPLETED' or 'ERRORED' when an error is provided as
- * the second parameter.
- * @param {Error} err The error, if any, that was generated by the job's
- * execution
- */
- JobRunner.prototype.onCompleted = function(status, err) {
- if (util.isError(status)) {
- err = status;
- status = DEFAULT_ERROR_STATUS;
- }
- else if (!status) {
- status = DEFAULT_DONE_STATUS;
- }
-
- //log result
- this.log('Setting job [%s:%s] as completed with status: %s', this.getId(), this.name, status);
-
- //persist result
- var query = pb.DAO.getIdWhere(this.getId());
- var sets = {
- $set: {
- status: status,
- progress: 100,
- error: err ? err.stack : undefined
- }
- };
- this.dao.updateFields(JOB_STORE_NAME, query, sets, function(err/*, result*/) {
- if (util.isError(err)) {
- pb.log.error('JobRunner: Failed to update job as completed - %s', err.stack);
- }
- });
- };
-
- //exports
- return JobRunner;
- };
-
-