最近项目上有一个需求,要对存储在MongoDB中的用户行为数据定期进行统计分析。
先使用PHP实现原型,发现因为数据量很大,大量时间都花在MongoDB服务器和Web服务器之间的数据交换上。考虑到这一点,必须在MongoDB服务器上进行本地计算,将结果保存起来,再使用PHP访问并展示给用户。
查阅文档得知,MongoDB可以执行JS脚本,这样思路就清楚了,用JS脚本实现统计的功能,再用crontab定期执行。
现在和大家分享一下在完成这个任务的过程中,遇到的一些问题和解决思路。(必须要说的是,MongoDB的官方文档对服务器端JS编程的文档极度缺乏,很多命令都是通过Google才找到的。)
在命令行输出信息
输出字符串:
print( 'Hello World' );
输出对象:
var obj = { 'key' : 'value' }; printjson( obj );
切换数据库
在Mongo JS Shell中,切换当前数据库的命令是:use xxx,这是一个magic helper。遗憾的是,在JS脚本中没有use可用。
如果运行mongo时没有加上–nodb参数,那么会自动connect到指定的服务器和数据库,并把句柄保存为db。如果运行时没有指定服务器会默认连接本机127.0.0.1,如果没有指定数据库会连接到test数据库。
你可以用conncet命令重新发起连接:
var userDB = connect( '123.123.123.123/user' ); userDB.user.find();
需要注意的是,connect命令是从mongo客户端发起连接,效果相当于将参数放在mongo命令后执行,会在终端显示“connecting to : database”。 如果你运行mongod和mongo的是同一台机器,可以省略ip地址。
你也可以保持连接的前提下切换数据库:
var userDB = db.getSiblingDB( 'user' ); var videoDB = db.getSisterDB( 'video' ); userDB.user.find();
db.getSiblingDB()或db.getSisterDB()这两个方法的实现完全一样,实现了男女平等……
加载外部JS文件
因为需求比较复杂,很快代码就臃肿不堪,需要把各种Helper函数等提取到单独的文件。想要实现这一点,就需要可以在JS脚本中加载其他文件。可以使用load命令:
load( 'helpers/time.js' );
需要注意的是,在load命令中使用相对路径时,当前路径是执行mongo命令时shell所在路径。这就需要在shell中执行mongo之前,先cd到脚本所在的路径,否则会找不到需要加载的脚本。
时间戳转换
MongoDB中每条记录的时间戳是标准UNIX时间戳,而JS中没有操作时间戳的原生函数,需要自己实现:
function date2timestamp ( date ) { return Math.floor( date.getTime() / 1000 ); } function timestamp2date ( timestamp ) { return new Date( timestamp * 1000 ); } function now2timestamp ( ) { return date2timestamp( new Date() ); }
在本次需求中,需要按日生成统计记录。这时需要一个对阅读友好的格式来表示日期,同时还要能方便的在JS中还原成Date对象。由于不想自己实现一个date parse(担心性能),而JS原生的Date.parse()只支持mm/dd/yyyy或mm-dd-yyyy的格式,所以选择了前者:
function date2day ( date ) { return ( '0' + ( date.getMonth() + 1 ) ).substr( -2 ) + '/' + ( '0' + date.getDate() ).substr( -2 ) + '/' + date.getFullYear(); } function day2date ( day ) { return new Date( Date.parse( day ) ); }
显示进度
在执行每个步骤前先输出信息,这是常识:
print( 'scan [ user ] for information...' ); userDB.user.find().forEach(function( doc ){ .... })
在数据量大或者计算量大的时候,单个步骤可能执行很长时间。这时我们想监控计算的进度,以评价脚本的运行效率。于是写了一个progress:
var progress_update_counts = 100; var progress_update_seconds = 3; function createProgress ( total ) { return { start : now2timestamp(), timer : now2timestamp(), total : total, count : 0, pass : function () { if ( ( ++this.count % progress_update_counts == 0 ) && ( now2timestamp() - this.timer > progress_update_seconds ) ) { print( "\t" + this.count + ' / ' + this.total + "\t" + ( this.count / this.total * 100 ).toFixed( 2 ) + '%' ); this.timer = now2timestamp(); } }, done : function () { var seconds = now2timestamp() - this.start; print( "\t" + this.total + ' / ' + this.total + "\t100.00%\tCost " + seconds + " seconds.\tSpeed rate: " + ( this.total / seconds ).toFixed( 2 ) + ' record/s' ); } }; }
简单解释一下逻辑:
1、初始化progress的时候记录下当前的时间和需要计算的总量;
2、每计算完一项时调用progress.pass(),此时progress内部的计数器自增,然后每当计数增加一个指定的量级(太大失去统计意义,太小影响性能,目前设为100)时,如果上一次输出的时间已经超过指定的周期(太大失去统计意义,太小影响性能,目前设为3秒)则输出当前进度。
3、 计算完成后调用progress.done(),输出完成的进度、消耗时间及速率。
使用示例如下:
print( 'scan [ user ] for information...' ); var progress = createProgress( userDB.user.count() ); userDb.user.find().forEach(function( doc ){ .... progress.pass(); }); progress.done();
输出示例如下:
scan [ user ] for information... 3200 / 10000 32.00% 6400 / 10000 64.00% 9600 / 10000 96.00% 10000 / 10000 100.00% Cost 10 seconds. Speed rate: 1000 record/s.
改写缓存
在扫描用户行为数据时需要不断更新统计结果记录,仔细分析会发现同一条记录被多次改写,而实际保留的只有最终值,之前的记录写入操作完全是浪费。应该先将改写的动作缓存在内存里,等扫描操作结束后在批量写入到数据库中。于是写了一个buffer:
function createIncreaseBuffer ( collection ) { return { buffer : {}, collection : collection, push : function ( id, key, value ) { var buffer = this.buffer; if ( !buffer[ id ] ) buffer[ id ] = {}; if ( !buffer[ id ][ key ] ) buffer[ id ][ key ] = 0; buffer[ id ][ key ] += value; }, flush : function () { var collection = this.collection; for ( var id in this.buffer ) { collection.update( { _id : ObjectId( id ) }, { $inc : this.buffer[ id ] } ); } this.buffer = {}; } } }
使用示例:
print( 'scan [ user ] for information...' ); var progress = createProgress( userDB.user.count() ); var buffer = createIncreaseBuffer( reportDB.user_report ); userDb.user.find().forEach(function( doc ){ .... buffer.push( doc._id, 'count', 1 ); progress.pass(); }); buffer.flush(); progress.done();
经过缓存优化后,脚本执行效率提升了10倍!
好文,收藏。 另,您有没有遇到过mongodb replset 延迟严重的问题?
缺乏运维经验,没遇到这问题。。
给力。好文