API Docs for: 0.8.0
Show:

File: include/system/command/redis_command_broker.js

/*
    Copyright (C) 2016  PencilBlue, LLC

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';

//dependencies
var util = require('../../util.js');

module.exports = function RedisCommandBrokerModule(pb) {

    /**
     * Brokers messages using Redis as the medium.  The implementation follows a
     * publish/subscribe model that allows for listening for changes based on a a
     * specified channel.
     * @class RedisCommandBroker
     * @constructor
     */
    function RedisCommandBroker(){

        /**
         * Client used to publish commands to the cluster
         * @property publishClient
         * @type {Client}
         */
        this.publishClient = null;

        /**
         * Client used to subscribe to commands to the cluster
         * @property publishClient
         * @type {Client}
         */
        this.subscribeClient = null;
    }

    //statics
    /**
     * The hash of handlers for each channel subscribed to
     * @private
     * @static
     * @property SUBSCRIBERS
     * @type {Object}
     */
    var SUBSCRIBERS = {};

    /**
     * Initializes the broker by creating the connections to Redis and registering
     * for the message event.
     * @method init
     * @param {Function} cb A callback that provides two parameters: cb(Error, Boolean)
     */
    RedisCommandBroker.prototype.init = function(cb) {
        var self = this;

        this.publishClient   = pb.CacheFactory.getInstance();
        this.subscribeClient = pb.CacheFactory.createInstance();
        this.subscribeClient.on('message', function(channel, command) {
            self.onCommandReceived(channel, command);
        });

        pb.log.debug('RedisCommandBroker: Initialized.');
        cb(null, true);
    };

    /**
     * Shuts down the broker by closing the open connections to Redis.
     * @method shutdown
     * @param {Function} cb A callback that provides two parameters: cb(Error, Boolean)
     */
    RedisCommandBroker.prototype.shutdown = function(cb) {
        pb.log.debug('RedisCommandBroker: Shutting down...');

        var clients = [this.subscribeClient, this.publishClient, ];
        for (var i = 0; i < clients.length; i++) {
            try {
                clients[i].quit();
            }
            catch(e) {
                pb.log.silly('RedisCommandBroker: Error shutting down client: %s', e.stack);
            }
        }
        cb(null, true);
    };

    /**
     * Called when a member of the cluster has published a command.  The function
     * inspects that it has a handler for the channel then delegates the command
     * back to the handler.
     * @method onCommandReceived
     * @param {String} channel The channel the message was pushed to
     * @param {String} commandStr The message that was published
     */
    RedisCommandBroker.prototype.onCommandReceived = function(channel, commandStr) {
        pb.log.silly('RedisCommandBroker: Command received [%s]%s', channel, commandStr);

        if (SUBSCRIBERS[channel]) {
            try{
                SUBSCRIBERS[channel](channel, JSON.parse(commandStr));
            }
            catch(err){
                pb.log.error('RedisCommandHandler: An error occurred while attempting to handoff the command [%s]%s. %s', channel, commandStr, err.stack);
            }
        }
        else {
            pb.log.warn('RedisCommandBroker: A message was received for channel [%s] but no handler was available to accept it.', channel);
        }
    };

    /**
     * Sends a message to the specified channel
     * @method publish
     * @param {String} channel The channel to send the message to
     * @param {Object} command The command to send to the cluster
     * @param {Function} cb A callback that takes two parameters: cb(Error, 1 on success/FALSE on failure)
     */
    RedisCommandBroker.prototype.publish = function(channel, command, cb) {
        if (!util.isObject(command)) {
            throw new Error('The channel string and command object are required!');
        }

        //send the command
        var commandStr = JSON.stringify(command);

        pb.log.silly('RedisCommandBroker: Sending command [%s]%s', channel, commandStr);
        this.publishClient.publish(channel, commandStr, cb);
    };

    /**
     * Registers a handler for messages on the specified channel.
     * @method subscribe
     * @param {String} channel The channel to listen for messages on
     * @param {Function} onCommandReceived A handler function that takes two
     * arguments: onCommandReceived(channel, message) where channel is a string and
     * message is an object.
     * @param {Function} cb A callback that takes two parameters: cb(Error, [RESULT])
     */
    RedisCommandBroker.prototype.subscribe = function(channel, onCommandReceived, cb) {
        if (!pb.ValidationService.isNonEmptyStr(channel, true) || !util.isFunction(onCommandReceived)) {
            return cb(new Error('A valid channel string and handler function is required'));
        }

        //setup the one time subscribe callback
        pb.log.debug('RedisCommandBroker: Subcribing to channel [%s]', channel);
        this.subscribeClient.once('subscribe', function(subscribedChannel, count) {
            if (channel === subscribedChannel) {

                SUBSCRIBERS[channel] = onCommandReceived;
                cb(null, count);
            }
        });
        this.subscribeClient.subscribe(channel);
    };

    //exports
    return RedisCommandBroker;
};