API Docs for: 0.8.0
Show:

File: include/service/jobs/job_runner.js

  1. /*
  2. Copyright (C) 2016 PencilBlue, LLC
  3.  
  4. This program is free software: you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation, either version 3 of the License, or
  7. (at your option) any later version.
  8.  
  9. This program is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13.  
  14. You should have received a copy of the GNU General Public License
  15. along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. 'use strict';
  18.  
  19. //dependencies
  20. var util = require('../../util.js');
  21.  
  22. module.exports = function JobRunnerModule(pb) {
  23.  
  24. /**
  25. * A base interface that system jobs can implement. The premise is that every
  26. * job will have an ID and a name. The job is initialized by calling the
  27. * "init" function and started by calling the "run" function. The specific
  28. * implementation is also provided with functions to report the start, update,
  29. * and end of the job run. The advantage to extending this prototype is that
  30. * the provided functions allow for creating a persisted record of the job. In
  31. * addition, log statements generated by the job are also persisted (as long as
  32. * the provided "log" function is called).
  33. * @class JobRunner
  34. * @constructor
  35. */
  36. function JobRunner(){
  37.  
  38. /**
  39. * An instace of DAO to provide direct access to the DB if it is needed.
  40. * @property dao
  41. * @type {DAO}
  42. */
  43. this.dao = null;
  44.  
  45. /**
  46. * Holds the unique identifier for the job
  47. * @property id
  48. * @type {String}
  49. */
  50. this.id = null;
  51.  
  52. /**
  53. * The percentage of the overall work that this job accounts for. If this
  54. * job is run by itself then the value should be 1. This means that 100%
  55. * of the job is completed by this job. If, for example, the value is .333
  56. * then it is assumed that this job accounts for 33% or one third of the
  57. * over all work necessary to complete the job. This is handy when a large
  58. * job is made up of smaller jobs. This value will assist in allowing the
  59. * jobs to calculate their update increments. The number must be a value
  60. * between 0 (exclusive) & 1 (inclusive).
  61. * @property taskFactor
  62. * @type {Number}
  63. */
  64. this.chunkOfWorkPercentage = 1;
  65. }
  66.  
  67. /**
  68. * The name of the persistence entity that contains the log statements for the
  69. * job
  70. * @private
  71. * @static
  72. * @property JOB_LOG_STORE_NAME
  73. * @type {String}
  74. */
  75. var JOB_LOG_STORE_NAME = 'job_log';
  76.  
  77. /**
  78. * The name of the persistence entity that contains the job descriptor
  79. * @private
  80. * @static
  81. * @property JOB_STORE_NAME
  82. * @type {String}
  83. */
  84. var JOB_STORE_NAME = 'job_run';
  85.  
  86. /**
  87. * The status code for a job that is in progress
  88. * @private
  89. * @static
  90. * @property DEFAULT_START_STATUS
  91. * @type {String}
  92. */
  93. var DEFAULT_START_STATUS = 'RUNNING';
  94.  
  95. /**
  96. * The status code for a job that has completed successfully
  97. * @private
  98. * @static
  99. * @property DEFAULT_DONE_STATUS
  100. * @type {String}
  101. */
  102. var DEFAULT_DONE_STATUS = 'COMPLETED';
  103.  
  104. /**
  105. * The status code for a job that has generated a fatal error
  106. * @private
  107. * @static
  108. * @property DEFAULT_ERROR_STATUS
  109. * @type {String}
  110. */
  111. var DEFAULT_ERROR_STATUS = 'ERRORED';
  112.  
  113. /**
  114. * The initialization function sets the job's name and ID as well as provide an
  115. * instace of DAO.
  116. * @method init
  117. * @param {String} [name] The job's name
  118. * @param {String} [jobId] The job's unique identifier
  119. */
  120. JobRunner.prototype.init = function(name, jobId) {
  121.  
  122. this.dao = new pb.DAO();
  123. this.id = jobId || util.uniqueId();
  124. this.name = name || this.id;
  125. return this;
  126. };
  127.  
  128. /**
  129. * Retrieves the unique identifier for the job
  130. * @method getId
  131. * @return {String} The job ID
  132. */
  133. JobRunner.prototype.getId = function() {
  134. return this.id;
  135. };
  136.  
  137. /**
  138. * Sets the portion of the over arching job that this job instance will
  139. * contribute once complete.
  140. * @method setChunkOfWorkPercentage
  141. * @param {Number} chunkOfWorkPercentage
  142. * @return {JobRunner}
  143. */
  144. JobRunner.prototype.setChunkOfWorkPercentage = function(chunkOfWorkPercentage) {
  145. if (isNaN(chunkOfWorkPercentage) || chunkOfWorkPercentage <= 0 || chunkOfWorkPercentage > 1) {
  146. throw new Error('The chunkOfWorkPercentage must be a value between 0 (exclusive) and 1 (inclusive)');
  147. }
  148.  
  149. this.chunkOfWorkPercentage = chunkOfWorkPercentage;
  150. return this;
  151. };
  152.  
  153. /**
  154. * Retrieves the chunk of work percentage
  155. * @method getChunkOfWorkPercentage
  156. * @return {Number}
  157. */
  158. JobRunner.prototype.getChunkOfWorkPercentage = function() {
  159. return this.chunkOfWorkPercentage;
  160. };
  161.  
  162. /**
  163. * Call this function once to start the job. The job will execute the callback
  164. * upon completion.
  165. * @method run
  166. * @param {Function} cb A callback that provides two parameters: The first is
  167. * any error that was generated and the second is the implementation specific
  168. * result of the job.
  169. */
  170. JobRunner.prototype.run = function(/*cb*/) {
  171. throw new Error('This function must be overriden by an extending prototype');
  172. };
  173.  
  174. /**
  175. * Logs a message to the system logger as well as to the persistence layer. The
  176. * function takes a variable number of arguments. A string message/pattern
  177. * followed by the variables to fill in with that data. See util.format or the
  178. * implementation for Winston loggers.
  179. * @method log
  180. * @param {String} message The message or pattern to log
  181. */
  182. JobRunner.prototype.log = function() {
  183.  
  184. var args = Array.prototype.splice.call(arguments, 0);
  185. if (args.length > 0) {
  186. args[0] = this.name+': '+args[0];
  187.  
  188. var meta = [];
  189. var message = args[0];
  190. if (args.length > 1) {
  191. message = util.format.apply(util, args);
  192. }
  193. var statement = {
  194. object_type: JOB_LOG_STORE_NAME,
  195. job_id: this.id,
  196. worker_id: pb.system.getWorkerId(),
  197. name: this.name,
  198. message: message,
  199. metadata: meta
  200. };
  201. this.dao.save(statement, util.cb);
  202. pb.log.debug.apply(pb.log, args);
  203. }
  204. };
  205.  
  206. /**
  207. * To be called once by the extending implmentation to mark the start of the
  208. * job. The function persists the job record and makes it available to future
  209. * calls to onUpdate or onComplete.
  210. * @method onStart
  211. * @param {String} [status='RUNNING'] The starting status of the job
  212. */
  213. JobRunner.prototype.onStart = function(status) {
  214. var job = pb.DAO.getIdWhere(this.getId());
  215. job.object_type = JOB_STORE_NAME;
  216. job.name = this.name;
  217. job.status = status || DEFAULT_START_STATUS;
  218. job.progress = 0;
  219. this.dao.save(job, function(err/*, result*/) {
  220. if (util.isError(err)) {
  221. pb.log.error('JobRunner: Failed to mark job as started %s', err.stack);
  222. }
  223. });
  224. };
  225.  
  226. /**
  227. * To be called by the extending implmentation when progress has been made.
  228. * The incremental amount of progress should be provided keeping in mind that
  229. * the overall progress should not exceed 100. Optionally, the status
  230. * parameter may also be included.
  231. * @method onUpdate
  232. * @param {Integer} progressIncrement
  233. * @param {String} [status]
  234. */
  235. JobRunner.prototype.onUpdate = function(progressIncrement, status) {
  236. this.log('Updating job [%s:%s] by %s percent with status: %s', this.getId(), this.name, Math.floor(progressIncrement), status ? status : '');
  237.  
  238. var query = pb.DAO.getIdWhere(this.getId());
  239. var updates = {};
  240. if (pb.validation.isNum(progressIncrement, true) && progressIncrement >= 0) {
  241. updates.$inc = {progress: progressIncrement};
  242. }
  243. if (pb.validation.isNonEmptyStr(status, true)) {
  244. updates.$set = {status: status};
  245. }
  246.  
  247. //ensure we need to update
  248. if (updates !== {}) {
  249.  
  250. this.dao.updateFields(JOB_STORE_NAME, query, updates, function(err/*, result*/) {
  251. if (util.isError(err)) {
  252. pb.log.error('JobRunner: Failed to update job progress - ', err.stack);
  253. }
  254. });
  255. }
  256. };
  257.  
  258. /**
  259. * Called once by the extending implementation when the job has completed
  260. * execution whether that be successful completion or by error.
  261. * @method onCompleted
  262. * @param {String} [status] The final status of the job. If not provided the
  263. * status will default to 'COMPLETED' or 'ERRORED' when an error is provided as
  264. * the second parameter.
  265. * @param {Error} err The error, if any, that was generated by the job's
  266. * execution
  267. */
  268. JobRunner.prototype.onCompleted = function(status, err) {
  269. if (util.isError(status)) {
  270. err = status;
  271. status = DEFAULT_ERROR_STATUS;
  272. }
  273. else if (!status) {
  274. status = DEFAULT_DONE_STATUS;
  275. }
  276.  
  277. //log result
  278. this.log('Setting job [%s:%s] as completed with status: %s', this.getId(), this.name, status);
  279.  
  280. //persist result
  281. var query = pb.DAO.getIdWhere(this.getId());
  282. var sets = {
  283. $set: {
  284. status: status,
  285. progress: 100,
  286. error: err ? err.stack : undefined
  287. }
  288. };
  289. this.dao.updateFields(JOB_STORE_NAME, query, sets, function(err/*, result*/) {
  290. if (util.isError(err)) {
  291. pb.log.error('JobRunner: Failed to update job as completed - %s', err.stack);
  292. }
  293. });
  294. };
  295.  
  296. //exports
  297. return JobRunner;
  298. };
  299.