为MongoDB编写Js维护脚本

最近项目上有一个需求,要对存储在MongoDB中的用户行为数据定期进行统计分析。

先使用PHP实现原型,发现因为数据量很大,大量时间都花在MongoDB服务器和Web服务器之间的数据交换上。考虑到这一点,必须在MongoDB服务器上进行本地计算,将结果保存起来,再使用PHP访问并展示给用户。

查阅文档得知,MongoDB可以执行JS脚本,这样思路就清楚了,用JS脚本实现统计的功能,再用crontab定期执行。

现在和大家分享一下在完成这个任务的过程中,遇到的一些问题和解决思路。(必须要说的是,MongoDB的官方文档对服务器端JS编程的文档极度缺乏,很多命令都是通过Google才找到的。)

在命令行输出信息

输出字符串:

print( 'Hello World' );

输出对象:

var obj = { 'key' : 'value' };
printjson( obj );

切换数据库

在Mongo JS Shell中,切换当前数据库的命令是:use xxx,这是一个magic helper。遗憾的是,在JS脚本中没有use可用。

如果运行mongo时没有加上–nodb参数,那么会自动connect到指定的服务器和数据库,并把句柄保存为db。如果运行时没有指定服务器会默认连接本机127.0.0.1,如果没有指定数据库会连接到test数据库。

你可以用conncet命令重新发起连接:

var userDB = connect( '123.123.123.123/user' );
userDB.user.find();

需要注意的是,connect命令是从mongo客户端发起连接,效果相当于将参数放在mongo命令后执行,会在终端显示“connecting to : database”。 如果你运行mongod和mongo的是同一台机器,可以省略ip地址。

你也可以保持连接的前提下切换数据库:

var userDB = db.getSiblingDB( 'user' );
var videoDB = db.getSisterDB( 'video' );
userDB.user.find();

db.getSiblingDB()或db.getSisterDB()这两个方法的实现完全一样,实现了男女平等……

加载外部JS文件

因为需求比较复杂,很快代码就臃肿不堪,需要把各种Helper函数等提取到单独的文件。想要实现这一点,就需要可以在JS脚本中加载其他文件。可以使用load命令:

load( 'helpers/time.js' );

需要注意的是,在load命令中使用相对路径时,当前路径是执行mongo命令时shell所在路径。这就需要在shell中执行mongo之前,先cd到脚本所在的路径,否则会找不到需要加载的脚本。

时间戳转换

MongoDB中每条记录的时间戳是标准UNIX时间戳,而JS中没有操作时间戳的原生函数,需要自己实现:

function date2timestamp ( date ) {
  return Math.floor( date.getTime() / 1000 );
}
function timestamp2date ( timestamp ) {
  return new Date( timestamp * 1000 );
}
function now2timestamp ( ) {
  return date2timestamp( new Date() );
}

在本次需求中,需要按日生成统计记录。这时需要一个对阅读友好的格式来表示日期,同时还要能方便的在JS中还原成Date对象。由于不想自己实现一个date parse(担心性能),而JS原生的Date.parse()只支持mm/dd/yyyy或mm-dd-yyyy的格式,所以选择了前者:

function date2day ( date ) {
  return ( '0' + ( date.getMonth() + 1 ) ).substr( -2 ) + '/' + ( '0' + date.getDate() ).substr( -2 ) + '/' + date.getFullYear();
}
function day2date ( day ) {
  return new Date( Date.parse( day ) );
}

显示进度

在执行每个步骤前先输出信息,这是常识:

print( 'scan [ user ] for information...' );
userDB.user.find().forEach(function( doc ){
  ....
})

在数据量大或者计算量大的时候,单个步骤可能执行很长时间。这时我们想监控计算的进度,以评价脚本的运行效率。于是写了一个progress:

var progress_update_counts = 100;
var progress_update_seconds = 3;
function createProgress ( total ) {
  return {
    start : now2timestamp(),
    timer : now2timestamp(),
    total : total,
    count : 0,
    pass : function () {
      if ( ( ++this.count % progress_update_counts == 0 ) && ( now2timestamp() - this.timer > progress_update_seconds ) ) {
        print( "\t" + this.count + ' / ' + this.total + "\t" + ( this.count / this.total * 100 ).toFixed( 2 ) + '%' );
        this.timer = now2timestamp();
      }
    },
    done : function () {
      var seconds = now2timestamp() - this.start;
      print( "\t" + this.total + ' / ' + this.total + "\t100.00%\tCost " + seconds + " seconds.\tSpeed rate: " + ( this.total / seconds ).toFixed( 2 ) + ' record/s' );
    }
  };
}

简单解释一下逻辑:

1、初始化progress的时候记录下当前的时间和需要计算的总量;
2、每计算完一项时调用progress.pass(),此时progress内部的计数器自增,然后每当计数增加一个指定的量级(太大失去统计意义,太小影响性能,目前设为100)时,如果上一次输出的时间已经超过指定的周期(太大失去统计意义,太小影响性能,目前设为3秒)则输出当前进度。
3、 计算完成后调用progress.done(),输出完成的进度、消耗时间及速率。

使用示例如下:

print( 'scan [ user ] for information...' );
var progress = createProgress( userDB.user.count() );
userDb.user.find().forEach(function( doc ){
  ....
  progress.pass();
});
progress.done();

输出示例如下:

scan [ user ] for information...
    3200 / 10000    32.00%
    6400 / 10000    64.00%
    9600 / 10000    96.00%
    10000 / 10000   100.00%    Cost 10 seconds.   Speed rate: 1000 record/s.

改写缓存

在扫描用户行为数据时需要不断更新统计结果记录,仔细分析会发现同一条记录被多次改写,而实际保留的只有最终值,之前的记录写入操作完全是浪费。应该先将改写的动作缓存在内存里,等扫描操作结束后在批量写入到数据库中。于是写了一个buffer:

function createIncreaseBuffer ( collection ) {
  return {
    buffer : {},
    collection : collection,
    push : function ( id, key, value ) {
      var buffer = this.buffer;
      if ( !buffer[ id ] ) buffer[ id ] = {};
      if ( !buffer[ id ][ key ] ) buffer[ id ][ key ] = 0;
      buffer[ id ][ key ] += value;
    },
    flush : function () {
      var collection = this.collection;
      for ( var id in this.buffer ) {
        collection.update( { _id : ObjectId( id ) }, { $inc : this.buffer[ id ] } );
      }
      this.buffer = {};
    }
  }
}

使用示例:

print( 'scan [ user ] for information...' );
var progress = createProgress( userDB.user.count() );
var buffer = createIncreaseBuffer( reportDB.user_report );
userDb.user.find().forEach(function( doc ){
  ....
  buffer.push( doc._id, 'count', 1 );
  progress.pass();
});
buffer.flush();
progress.done();

经过缓存优化后,脚本执行效率提升了10倍!

此条目发表在 IT技术, Javascript, MongoDB 分类目录。将固定链接加入收藏夹。