API Docs for: 0.8.0
Show:

File: include/system/command/command_service.js

/*
 Copyright (C) 2016  PencilBlue, LLC

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
'use strict';

//dependencies
var async = require('async');
var domain = require('domain');
var util = require('../../util.js');

module.exports = function CommandServiceModule(pb) {

    /**
     * Provides a mechanism to send commands to all members of the cluster or a
     * specific member.
     * @class CommandService
     * @constructor
     * @param {CommandBroker} broker
     */
    function CommandService(broker) {
        if (!broker) {
            throw new Error('The broker parameter is required');
        }
        this.broker = broker;
        this.registrants = {};
        this.awaitingResponse = {};
    }

    /**
     * The singleton instance
     * @private
     * @static
     * @property INSTANCE
     * @type {CommandService}
     */
    var INSTANCE = null;

    /**
     * The singleton instance
     * @private
     * @static
     * @property INITIALIZED
     * @type {boolean}
     */
    var INITIALIZED = false;

    /**
     * The default timeout in milliseconds (2 seconds)
     * @private
     * @static
     * @readonly
     * @property DEFAULT_TIMEOUT
     * @type {Integer}
     */
    var DEFAULT_TIMEOUT = 2000;

    /**
     * A hash of the brokers that are available out of the box
     * @private
     * @static
     * @property AWAITING_RESPONSE
     * @type {Object}
     */
    var DEFAULT_BROKERS = {
        redis: pb.RedisCommandBroker,
        mongo: pb.MongoCommandBroker
    };

    /**
     * Indicates that an error occurrred while attempting to check if a handler was registered
     * @static
     * @readonly
     * @property ERROR_INDEX
     * @type {number}
     */
    CommandService.ERROR_INDEX = -2;

    /**
     * Indicates that the handler was not found
     * @static
     * @readonly
     * @property NOT_FOUND
     * @type {number}
     */
    CommandService.NOT_FOUND = -1;

    /**
     * Initializes the service and the broker implementation.  The broker is
     * determined by the configuration value of "command.broker".  This value can
     * be "redis" for the out of the box implementation for Redis or an absolute
     * path to another implementation.
     * @method init
     * @param {Function} cb A callback that takes two parameters: cb(Error, TRUE/FALSE)
     */
    CommandService.prototype.init = function (cb) {
        if (INITIALIZED) {
            return cb(null, true);
        }
        pb.log.debug('CommandService: Initializing...');

        //instantiate the command broker
        var self = this;
        this.broker.init(function (err) {
            if (util.isError(err)) {
                return cb(err);
            }

            self.broker.subscribe(pb.ServerRegistration.generateKey(), CommandService.onCommandReceived(self), function(error, result) {
                INITIALIZED = !util.isError(error);
                cb(err, result);
            });
        });

        //register for events
        pb.system.registerShutdownHook('CommandService', util.wrapTask(this, this.shutdown));
    };

    /**
     * Shuts down the command service and the broker if initialized
     * @method shutdown
     * @param {Function} cb A callback that takes two parameters: cb(Error, TRUE/FALSE)
     */
    CommandService.prototype.shutdown = function (cb) {
        pb.log.debug('CommandService: Shutting down...');

        this.registrants = {};
        if (!this.broker) {
            return cb(null, true);
        }
        this.broker.shutdown(cb);
    };

    /**
     * Registers a handler for incoming commands of the specified type.
     * @method registerForType
     * @param {String} type The name/type of the command to handle
     * @param {Function} handler A function that takes two parameters:
     * handler(channel, command). where channel is a string and command is an
     * object.
     * @return {Boolean} TRUE if the the handler was registered, FALSE if not.
     */
    CommandService.prototype.registerForType = function (type, handler) {
        var result = this.isRegistered(type, handler);
        if (result !== CommandService.NOT_FOUND && !!this.registrants[type]) {
            return false;
        }

        //ensure there is a holder for the type
        if (!this.registrants[type]) {
            this.registrants[type] = [];
        }

        this.registrants[type].push(handler);
        return true;
    };

    /**
     * Unregisters a handler for the specified type.
     * @method unregisterForType
     * @param {String} type The name/type of the command that the handler is
     * registered for
     * @param {Function} handler The handler function to unregister
     * @return {Boolean} TRUE if the handler was unregistered, FALSE if not.
     */
    CommandService.prototype.unregisterForType = function (type, handler) {
        var index = this.isRegistered(type, handler);
        var result = index > CommandService.NOT_FOUND;
        if (result) {
            this.registrants[type].splice(index, 1);
        }
        return result;
    };

    /**
     * Determines if the provided handler is registered for the given type.
     * @method isRegistered
     * @param type
     * @param handler
     * @return {number} The index in the storage array where the handle to
     * the function is kept.  CommandService.NOT_FOUND if the handle is not
     * registered for the type.  CommandService.ERROR_INDEX when the
     * parameters are invalid.
     */
    CommandService.prototype.isRegistered = function (type, handler) {
        if (!pb.ValidationService.isNonEmptyStr(type, true) || !util.isFunction(handler) || !util.isArray(this.registrants[type])) {
            return CommandService.ERROR_INDEX;
        }

        for (var i = 0; i < this.registrants[type].length; i++) {
            if (handler === this.registrants[type][i]) {
                return i;
            }
        }
        return CommandService.NOT_FOUND;
    };

    /**
     * Responsible for delegating out the received command to the registered
     * handlers.  The command parameter must be an object, must have a type
     * property that is a string, and must have a registered handler for the
     * specified type.
     * @method notifyOfCommand
     * @param {Object} command The command to delegate
     */
    CommandService.prototype.notifyOfCommand = function (command) {
        if (!util.isObject(command)) {
            return;
        }

        var type = command.type;
        if (!pb.ValidationService.isNonEmptyStr(type, true)) {
            return;
        }

        if (!util.isArray(this.registrants[type])) {
            pb.log.warn('CommandService: Command of type [%s] was received but there are no registered handlers.', type);
            return;
        }

        //check if this is a response to a message that was sent
        if (command.replyTo) {
            if (this.awaitingResponse[command.replyTo]) {
                this.awaitingResponse[command.replyTo](command);
            }
            else {
                pb.log.warn('CommandService: The command was in reply to [%s] but no callback was registered. Skipping.', command.replyTo);
            }
            return;
        }

        //emit command to each handler
        var self = this;
        var emitFunction = function (type, i, command) {
            return function () {
                self.registrants[type][i](command);
            };
        };
        for (var i = 0; i < this.registrants[type].length; i++) {
            process.nextTick(emitFunction(type, i, command));
        }
    };

    /**
     * Sends a command to all processes iin the cluster and waits for a response
     * from all before calling back.
     * @method sendCommandToAllGetResponses
     * @param {String} type The name/type of the command being sent
     * @param {Object} [options] The options for the command.  The options object
     * becomes the command object.  Custom properties to be part of the command can
     * be added.  However, certain properties do have special meaning such as "id",
     * "to", "from", "timeout", "includeme".  These special properties may be
     * overriden by this function or one it calls in order for the commands to
     * function properly.
     * @param {String} [options.id]
     * @param {Boolean} [options.ignoreme]
     * @param {Integer} [options.timeout=2000] Timeout in milliseconds for each
     * process to respond.
     * @param {Function} [options.progress] A function called right before each
     * command is sent.  The function should take two parameters.  The first is the
     * index of the task being executed and the second is the total number of tasks.
     * @param {Function} cb A callback that provides two parameters: cb(Error, Array)
     */
    CommandService.prototype.sendCommandToAllGetResponses = function (type, options, cb) {
        if (util.isFunction(options)) {
            cb = options;
            options = {};
        }
        else if (!options) {
            options = {};
        }
        else if (!util.isObject(options)) {
            cb(new Error('The options parameter must be an object'));
            return;
        }

        //set the progress callback
        var progressCb = null;
        if (util.isFunction(options.progress)) {
            progressCb = options.progress;
        }
        delete options.progress;

        //get all proceses in the cluster
        var self = this;
        var serverResigration = pb.ServerRegistration.getInstance();
        serverResigration.getClusterStatus(function (err, statuses) {
            if (util.isError(err)) {
                cb(err);
                return;
            }

            var me = null;
            var myKey = pb.ServerRegistration.generateKey();
            var tasks = util.getTasks(statuses, function (statuses, i) {
                if (myKey === statuses[i]._id) {
                    me = i;
                }

                //create the task function
                return function (callback) {

                    //make progress callback
                    if (progressCb) {
                        progressCb(i, statuses.length);
                    }

                    //create command
                    var opts = util.clone(options);
                    opts.to = statuses[i].id;

                    //execute command against the cluster
                    self.sendCommandGetResponse(type, opts, function (err, command) {
                        var result = {
                            err: err ? err.stack : undefined,
                            command: command
                        };
                        callback(null, result);
                    });
                };
            });

            //remove me if specified
            if (options.ignoreme && me !== null) {
                tasks.splice(me, 1);
            }

            //send commands for each process
            async.series(tasks, cb);
        });
    };

    /**
     * Sends a command to a single process in the cluster expecting a response.
     * @method sendCommandGetResponse
     * @param {String} type
     * @param {Object} options
     * @param {String} options.to
     * @param {Integer} [options.timeout]
     * @param{Function} onResponse
     */
    CommandService.prototype.sendCommandGetResponse = function (type, options, onResponse) {
        if (!util.isObject(options) || !pb.validation.isNonEmptyStr(options.to, true)) {
            return onResponse(new Error('A to field must be provided when expecting a response to a message.'));
        }

        var self = this;
        var doSend = function (type, options, onResponse) {

            var timeout = options.timeout || pb.config.command.timeout || DEFAULT_TIMEOUT;
            var d = domain.create();
            d.on('error', onResponse);
            d.run(function () {
                self.sendCommand(type, options, function (err, commandId) {
                    if (util.isError(err)) {
                        onResponse(err);
                        return;
                    }
                    else if (!pb.validation.isNonEmptyStr(commandId, true)) {
                        onResponse(new Error('Failed to publish the command to the cluster'));
                        return;
                    }

                    var handle = setTimeout(function () {
                        delete self.awaitingResponse[commandId];
                        onResponse(new Error('Timed out waiting for response to command [' + commandId + ']'));
                        handle = null;
                    }, timeout);
                    self.awaitingResponse[commandId] = function (command) {
                        if (handle) {
                            clearTimeout(handle);
                            handle = null;
                            onResponse(null, command);
                        }
                        else {
                            pb.log.silly('CommandService: A response to command [%s] came back but it was after the timeout. %s', commandId, JSON.stringify(command));
                        }
                        delete self.awaitingResponse[commandId];
                    };
                });
            });
        };
        doSend(type, options, onResponse);
    };

    /**
     * Provides a mechanism to respond to a the entity that sent the command.
     * @method sendInResponseTo
     * @param {Object} command The command that was sent to ths process
     * @param {Object} responseCommand The command to send back to the entity that sent the first command.
     * @param {Function} [cb] A callback that provides two parameters: cb(Error, Command ID)
     */
    CommandService.prototype.sendInResponseTo = function (command, responseCommand, cb) {
        if (!util.isObject(command)) {
            return cb(new Error('The command parameter must be an object'));
        }

        if (!util.isObject(responseCommand)) {
            responseCommand = {};
        }

        //ensure a callback
        cb = cb || util.cb;

        //complete the response
        responseCommand.id = util.uniqueId();
        responseCommand.to = command.from;
        responseCommand.replyTo = command.id;

        //send the response
        var type = responseCommand.type || command.type;
        this.sendCommand(type, responseCommand, cb);
    };

    /**
     * Sends a command to the cluster
     * @method sendCommand
     * @param {String} type The command name/type
     * @param {Object} [options] The options that will be serialized and sent to the other processes in the cluster
     * @param {String} [options.to] The cluster process that should handle the message
     * @param {Function} [cb] A callback that provides two parameters: cb(Error, Command ID)
     */
    CommandService.prototype.sendCommand = function (type, options, cb) {
        if (!pb.ValidationService.isNonEmptyStr(type, true)) {
            return cb(new Error("The command type is required"));
        }
        else if (util.isFunction(options)) {
            cb = options;
            options = {};
        }
        else if (!pb.ValidationService.isObj(options, false)) {
            return cb(new Error('When provided the options parameter must be an object'));
        }

        //ensure a callback is provided
        cb = cb || util.cb;

        //ensure an options object
        if (!options) {
            options = {};
        }

        //set who its from
        options.from = pb.ServerRegistration.generateKey();
        options.type = type;
        options.date = new Date();

        //ensure each command is sent with an ID.  Allow the ID to already be set
        //in the event that we are sending a response.
        if (!options.id) {
            options.id = util.uniqueId();
        }

        //instruct the broker to broadcast the command
        var tasks = [
            util.wrapTask(this, this.init),
            util.wrapTask(this.broker, this.broker.publish, [options.to, options])
        ];
        async.series(tasks, function (err, results) {
            cb(err, results[tasks.length - 1] ? options.id : null);
        });
    };

    /**
     * The global handler for incoming commands.  It registers itself with the
     * broker and then when messages are received it verifies that the message is
     * meant for this member of the cluster (or all members) then proceeds to
     * handoff to the function that will delegate out to the handlers.
     * @static
     * @method onCommandReceived
     * @param {CommandService} commandService
     * @return {Function} (string, object)
     */
    CommandService.onCommandReceived = function (commandService) {
        return function (channel, command) {

            var uid = pb.ServerRegistration.generateKey();
            if (command.to === uid || command.to === null || command.to === undefined) {
                commandService.notifyOfCommand(command);
            }
            else {
                //skip because it isn't addressed to us
                pb.log.silly('CommandService: The command was not addressed to me [%s] but to [%s]. Skipping.', uid, command.to);
            }
        };
    };

    /**
     * Retrieves the singleton instance of CommandService.
     * @static
     * @method getInstance
     * @return {CommandService}
     */
    CommandService.getInstance = function () {
        if (INSTANCE) {
            return INSTANCE;
        }

        //figure out which broker to use
        var CommandBroker = null;
        if (DEFAULT_BROKERS[pb.config.command.broker]) {
            CommandBroker = DEFAULT_BROKERS[pb.config.command.broker];
        }
        else {
            CommandBroker = require(pb.config.command.broker)(pb);
        }

        var broker = new CommandBroker();
        INSTANCE = new CommandService(broker);
        return INSTANCE;
    };

    return CommandService;
};