API Docs for: 0.8.0
Show:

File: include/dao/db_manager.js

/*
    Copyright (C) 2016  PencilBlue, LLC

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
'use strict';

//requirements
var async    = require('async');
var domain   = require('domain');
var mongo    = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var util     = require('../util.js');

module.exports = function DBManagerModule(pb) {

    /**
     * @private
     * @static
     * @readonly
     * @property FILES_NAMESPACE
     * @type {String}
     */
    var FILES_NAMESPACE = 'fs.files';

    /**
     * @private
     * @static
     * @readonly
     * @property CHUNKS_NAMESPACE
     * @type {String}
     */
    var CHUNKS_NAMESPACE = 'fs.chunks';

    /**
     * @private
     * @static
     * @readonly
     * @property SYSTEM_NAMESPACE_PREFIX
     * @type {String}
     */
    var SYSTEM_NAMESPACE_PREFIX = 'system.';

    /**
     * Wrapper that protects against direct access to the active connection pools
     * and DB references.
     *
     * @module Database
     * @class DBManager
     * @constructor
     */
    function DBManager() {

        /**
         * Reference to the system instance of System
         * @private
         * @property system
         * @type {System}
         */
        var system = pb.system;

        /**
         * Keeps track of all active DBs with active connection pools.
         * @private
         * @property dbs
         * @type {Object}
         */
        var dbs  = {};

        /**
         * Retrieves a handle to the specified database.
         * @method getDb
         * @param {String} name The database name
         * @return {Object}     A promise object
         */
        this.getDb = function(name, cb) {
            if (util.isFunction(name)) {
                cb = name;
                name = null;
            }
            if(!name){
                name = pb.config.db.name;
            }

            //when we already have a connection just pass back the handle
            if (this.hasConnected(name)) {
                return cb(null, dbs[name]);
            }

            //clone the config and set the name that is being asked for
            var config  = util.clone(pb.config);
            config.db.name = name;

            //build the connection string for the mongo cluster
            var dbURL   = DBManager.buildConnectionStr(pb.config);
            var options = config.db.options;

            pb.log.debug("Attempting connection to: %s with options: %s", dbURL, JSON.stringify(options));
            var self = this;
            mongo.connect(dbURL, options, function(err, db){
                if (err) {
                    var message = err.name + ': ' + err.message + ' - ' + dbURL + '\nIs your instance running?';
                    return cb(new Error(message));
                }

                self.authenticate(pb.config.db.authentication, db, function(err, didAuthenticate) {
                    if (util.isError(err)) {
                        var message = err.name + ': ' + err.message;
                        return cb(new Error(message));
                    }
                    else if (didAuthenticate !== true && didAuthenticate !== null) {
                        return cb(new Error("Failed to authenticate to db "+name+": "+util.inspect(didAuthenticate)));
                    }

                    //save reference to connection in global connection pool
                    dbs[db.databaseName]  = db;

                    //Indicate the promise was kept.
                    cb(null, db);
                });
            });
        };

        /**
         *
         * @method authenticate
         * @param {Object} auth
         * @param {Db} db
         * @param {Function} cb
         */
        this.authenticate = function(auth, db, cb) {
            if (!util.isObject(auth) || !util.isString(auth.un) || !util.isString(auth.pw)) {
                pb.log.debug('DBManager: An empty auth object was passed for DB [%s]. Skipping authentication.', db.databaseName);
                return cb(null, null);
            }

            db.authenticate(auth.un, auth.pw, auth.options ? auth.options : {}, cb);
        };

        /**
         * Indicates if a connection pool to the specified database has already been
         * initialized
         *
         * @method hasConnected
         * @param {String} name
         * @return {Boolean} Whether the pool has been connected
         */
        this.hasConnected = function(name){
            return typeof dbs[name] !== 'undefined';
        };

        /**
         * Takes an Array of indexing procedures and delegates them out to paralleled
         * tasks.
         * @method processIndices
         * @param {Array} procedures An array of objects that describe the index to
         * place upon a collection.  The object contains three properties.
         * "collection" a string that represents the name of the collection to build an
         * index for.  "specs" is an object that describes which fields to index.  The
         * keys are the field names and the value is -1 for descending order and 1 for
         * ascending.  "options" is an object that that provides specific index
         * properties such as unique or sparse.  See
         * http://mongodb.github.io/node-mongodb-native/api-generated/collection.html#ensureindex
         * for specific MongoDB implementation details for specs and options.
         * @param {Function} cb A callback that provides two parameters: The first, an
         * Error, if occurred.  Secondly, an object that contains two properties.
         * "result" an array of the results where each object in the array represents
         * the result of the request to ensure the index.  "errors" an array of errors
         * that occurred while indexing.  The function does not terminate after the
         * first error.  Instead it allows all indices to attempt to be created and
         * defer the reporting of an error until the end.
         */
        this.processIndices = function(procedures, cb) {
            var self = this;
            if (!util.isArray(procedures)) {
                return cb(new Error('The procedures parameter must be an array of Objects'));
            }

            this.dropUnconfiguredIndices(procedures, function(err) {
                if(util.isError(err)) {
                    return cb(new Error(util.format('DBManager: Error occurred during index check/deletion ERROR[%s]', err.stack)));
                }
                self.ensureIndices(procedures, cb);
            });
        };

        /**
         * Ensures all indices declared are defined on Mongo server
         * @method ensureIndices
         * @param {Array} procedures
         * @param {Function} cb
         */
        this.ensureIndices = function (procedures, cb) {
            //to prevent a cirular dependency we do the require for DAO here.
            var DAO = require('./dao.js')(pb);
            //create the task list for executing indices.
            var errors = [];
            var tasks = util.getTasks(procedures, function(procedures, i) {
                return function(callback) {
                    var dao = new DAO();
                    dao.ensureIndex(procedures[i], function(err, result) {
                        if (util.isError(err)) {
                            errors.push(err);
                            pb.log.error('DBManager: Failed to create INDEX=[%s] RESULT=[%s] ERROR[%s]', JSON.stringify(procedures[i]), util.inspect(result), err.stack);
                        }
                        else if (pb.log.isDebug()) {
                            pb.log.silly('DBManager: Attempted to create INDEX=[%s] RESULT=[%s]', JSON.stringify(procedures[i]), util.inspect(result));
                        }
                        callback(null, result);
                    });
                };
            });
            async.parallel(tasks, function(err, results){
                var result = {
                    errors: errors,
                    results: results
                };
                cb(err, result);
            });
        };


        /**
         * Sorts through all created indices and drops any index not declared in pb.config
         * @method dropUnconfiguredIndices
         * @param {Array} procedures
         * @param {Function} cb
         */
        this.dropUnconfiguredIndices = function(procedures, cb) {
            var self = this;
            //to prevent a cirular dependency we do the require for DAO here.
            var DAO = require('./dao.js')(pb);
            this.getStoredIndices(function(err, storedIndices) {
                if(util.isError(err)) {
                    cb(new Error('DBManager: Failed to get stored indices ERROR[%s]', err.stack));
                    return;
                }
                var dao = new DAO();

                var tasks = util.getTasks(storedIndices, function(indices, i) {
                    return function(callback) {
                        var index = indices[i];

                        //special condition: When mongo is used as the media
                        //storage provider two special collections are created:
                        //"fs.chunks" and "fs.files".  These indices should be
                        //left alone and ignored.
                        if (DBManager.isProtectedIndex(index.ns)) {
                            pb.log.silly("DBManager: Skipping protected index for %s", index.ns);
                            return callback();
                        }

                        var filteredIndex = procedures.filter(function(procedure) {
                            var ns = pb.config.db.name + '.' + procedure.collection;
                            var result = ns === index.ns && self.compareIndices(index, procedure);
                            return result;
                        });
                        var indexCollection = index.ns.split('.')[1];

                        //ignore any index relating to the "_id" field.
                        //ignore all indices of the "session" collection as it is managed elsewhere.
                        //use length and null/undefined check for if the index in question is defined in pb.config.indices.
                        if(index.name === '_id_' || indexCollection === 'session' || (filteredIndex.length !== 0 && !util.isNullOrUndefined(filteredIndex))) {
                            return callback();
                        }

                        dao.dropIndex(indexCollection, index.name, function (err, result) {
                            if (util.isError(err)) {
                                pb.log.error('DBManager: Failed to drop undeclared INDEX=[%s] RESULT=[%s] ERROR[%s]', JSON.stringify(index), util.inspect(result), err.stack);
                            }
                            callback(err, result);
                        });
                    };
                });
                async.parallel(tasks, cb);
            });
        };

        /**
         * Compares an index stored in Mongo to an index declared in pb.config
         * @method compareIndices
         * @param {Object} stored
         * @param {Object} defined
         * @return {boolean}
         */
        this.compareIndices = function(stored, defined) {
            var keys = Object.keys(stored.key);
            var specs = Object.keys(defined.spec);
            var result =  JSON.stringify(keys) === JSON.stringify(specs);
            return result;

        };

        /**
         * Yields all indices currently in the entire database
         * @method getStoredInidices
         * @param {Function} cb
         */
        this.getStoredIndices = function(cb) {
            dbs[pb.config.db.name].collections(function(err, collections) {
                var tasks = util.getTasks(collections, function(collections, i) {
                    return function(callback) {
                        collections[i].indexes(function(err, indexes) {
                            if(util.isError(err)) {
                                pb.log.error("Error retrieving indices from db: " + err);
                            }
                            callback(err, indexes);
                        });
                    };
                });
                async.parallel(tasks, function(err, results) {
                    cb(err, Array.prototype.concat.apply([], results));
                });
            });
        };

        this.processMigration = function(cb) {
            var DBMigrate = require('./db_migrate.js')(pb);
            new DBMigrate().run(cb);
        };

        /**
         * Iterates over all database handles and call's their shutdown function.
         *
         * @method shutdown
         * @param {Function} cb Callback function
         * @return {Array}      Array of promise objects, one for each shutdown call
         */
        this.shutdown = function(cb){
            cb = cb || util.cb;

            var tasks = util.getTasks(Object.keys(dbs), function(keys, i) {
                return function(callback) {
                    var d = domain.create();
                    d.run(function() {
                        dbs[keys[i]].close(true, function(err, result) {
                            if (util.isError(err)) {
                                throw err;
                            }
                            callback(null, result);
                        });
                    });
                    d.on('error', function(err) {
                        pb.log.error('DBManager: An error occurred while closing a DB connection. %s', err.stack);
                        callback(null, false);
                    });
                };
            });
            async.parallel(tasks, cb);
        };


        //constructor specific logic
        //register for shutdown
        system.registerShutdownHook('DBManager', this.shutdown);
    }

    /**
     * The protocol prefix for connecting to a mongo cluster
     * @private
     * @static
     * @readonly
     * @property PROTOCOL_PREFIX
     * @type {Object}
     */
    var PROTOCOL_PREFIX = 'mongodb://';

    /**
     *
     * @static
     * @method buildConnectionStr
     * @param {Object} config
     * @param {Array} config.servers
     * @param {String} config.name
     * @return {String}
     */
    DBManager.buildConnectionStr = function(config) {
        var str = PROTOCOL_PREFIX;
        var options = '?';
        for (var i = 0; i < config.db.servers.length; i++) {

            //check for prefix for backward compatibility
            var hostAndPort = config.db.servers[i];
            if (hostAndPort.indexOf(PROTOCOL_PREFIX) === 0) {
                hostAndPort = hostAndPort.substring(PROTOCOL_PREFIX.length);
            }

            //check for options
            var parts = hostAndPort.split('?');
            if (parts.length > 1) {
                options += (options.length > 1 ? '&' : '') + parts[1];
            }
            hostAndPort = parts[0];

            if (i > 0) {
                str += ',';
            }
            str += hostAndPort;
        }
        return pb.UrlService.urlJoin(str, config.db.name) + options;
    };

    /**
     *
     * @static
     * @method isProtectedIndex
     * @param {String} indexNamespace
     * @return {Boolean}
     */
    DBManager.isProtectedIndex = function(indexNamespace) {
        return indexNamespace.indexOf(FILES_NAMESPACE, indexNamespace.length - FILES_NAMESPACE.length) !== -1 ||
        indexNamespace.indexOf(CHUNKS_NAMESPACE, indexNamespace - CHUNKS_NAMESPACE.length) !== -1 ||
        indexNamespace.indexOf(SYSTEM_NAMESPACE_PREFIX) !== -1;
    };

    //exports
    return DBManager;
};