[聚合文章] nodejs收集日志,rsyslog同步收集入es的实施
之前写过相关的2篇文章:
2,ElasticSearch和Gome-error-report的安装教程
其实中间还忽略了一个问题,在文章2中,我只是在GER-server的项目中增加了一个ES的create log的接口,这么做其实是一个不太好的方案,因为通过API的方式远程写日志,在大并发的情况下并不是最优的,会遇到一些瓶颈和写入失败的问题,如何能够避免呢?换了公司之后,自己在新公司做了一套新的日志收集方案,这里记录并且分享给大家。
如果没有读过前两篇文章的建议去阅读前两篇之后再来看这一篇,本文主要是说日志的落地和同步收集的实施。
首先来看一下这张图:
本文主要是说错误SDK上报后,我们如何来做负载和日志的同步,这里的技术栈包括了rsyslog,es,nodejs,crontab。
一,日志收集机的配置
首先,一开始设计的是sdk上报后,先推到nginx,拿nginx的access log做落地的,但是后来发现,当SDK上报一条信息,包含多条错误日志时,这种做法很蠢,因为在rsyslog中,都是一条一条去push到es的,中间需要处理的过程比较多,当然可以在nginx那一层拿lua来做一次处理,这里我选择使用nodejs。
处理逻辑比较简单,我贴一下完整代码:
let express = require('express');let app = express();let port = 80;let fs = require('fs');let fsExtra = require('fs-extra');let path = require('path');let moment = require('moment');let accessDir = '/var/log/jslogs/';let accessPath = accessDir + 'jserror.access.log';let logpath = path.resolve(__dirname, accessPath);let async = require('async');fsExtra.ensureDirSync(accessDir);var rotatingLogStream = require('file-stream-rotator').getStream({ filename: accessPath, frequency: "1h", verbose: false, max_logs: "5d", audit_file: "/var/log/jslogs/log-audit.json"});app.get('/read.gif', (req, res, next) => { let img = fs.createReadStream(path.resolve(__dirname, './images/read.gif')); var err_msg = req.query.err_msg; if (err_msg) { let logs = []; let writelogs = []; let errmsg = decodeURIComponent(err_msg); // | 会分割成多条 let errLogs = errmsg.split('|'); errLogs.forEach((msg) => { let params = { log_master: 'js', ext: '-' }; msg = msg.replace(/\^/g, '&'); msg = msg.split('&'); msg.forEach((item) => { item = item.split('='); params[item[0]] = item[1]; }); let timestamp = moment().format(); let request_time = moment().format('YYYY-MM-DD hh:mm:ss'); let log = { project_name: "JS", '@timestamp': timestamp, request_time: request_time, message: { log_master: params.log_master, msg: params.msg, projectType: params.projectType, currentUrl: params.currentUrl, flashVer: params.flashVer, level: params.level, referer: params.referer, screenSize: params.screenSize, timestamp: params.timestamp, userAgent: params.userAgent, title: params.title, host: params.host, colNum: params.colNum, rowNum: params.rowNum, targetUrl: params.targetUrl, ext: params.ext } } log = '@cee: ' + JSON.stringify(log) + '\n'; writelogs.push(function(cb) { rotatingLogStream.write(log + '\n',cb); }); }); //写入日志 async.parallel(writelogs); img.pipe(res); } else { img.pipe(res); }});app.use((req, res, next) => { res.status(404); res.send('404: File Not Found');});app.listen(port);console.log('the server is listen on %s', port);
上面的代码可以简单说一下作用,nodejs监听80端口,然后get请求有一个read.gif的1x1图片接口,别人用err_msg=xxx之后访问,解析出参数最后再落到es格式的日志,日志切分用的是file-stream-rotator,每1小时切一次,保留5天内的日志。
日志参数详见GER SDK的上报格式,这里存的是一致的日志格式。
然后我们看一下rsyslog的设置,rsyslog分为客户端和服务端,客户端为收集日志的配置,既把nodejs的落地日志推到服务端机器,服务端则做接收解析和推入es中。
下面说一下rsyslog的安装和配置,我这边做的系统是centos6.5,默认安装的是5.8,需要安装最新的rsyslog以支持更多的插件和template语法。
wget http://rpms.adiscon.com/v8-stable/rsyslog.repomv rsyslog.repo /etc/yum.repos.d/rsyslog.repoyum info rsyslog --skip-brokenyum install -y rsyslog rsyslog-elasticsearch rsyslog-mmjsonparsersyslogd -version
切记要装rsyslog的elasticsearch和mmjsonparse的扩展,当然这2个模块,在服务端的rsyslog上安装即可。
然后我们看一下客户端是如何配置的,在/etc/rsyslog.conf文件后添加:
module(load="imfile")ruleset(name="remote"){ action(type="omfwd" Protocol="tcp" Target="服务端ip" Port="514") stop}input(type="imfile" File="/var/log/jslogs/*.log.*" Facility="user" Severity="error" Tag="web_access" PersistStateInterval="1" Ruleset="remote")
这段配置指的是把/var/log/jslogs/下面的所有log,通过tcp推送到服务端的514端口。
然后我们再配置一下rsyslog的开机启动即可。
chkconfig rsyslog on
然后我们再配置一下nodejs的开机启动和pm2的日志,首先是安装pm2:
npm install pm2 -gpm2 install pm2-logrotatenpm install pm2-gui
以上分别为pm2,pm2日志切割模块,pm2的可视化server。
然后使用pm2-gui start
启动可视化的pm2管理界面,默认端口是8088,默认密码:AuTh.
然后pm2启动我们上面的nodejs日志记录。
pm2 start index.js --name errorNodeLog -o ./logs/access.out.log -e ./logs/error.out.log
然后我们设置pm2的日志切割参数:
pm2 set pm2-logrotate:compress true pm2 set pm2-logrotate:rotateInterval '* * 1 * * * *'
设置完毕后,我们设置pm2的开机启动:
pm2 savepm2 startup centos6
基本到这里,我们的客户端机器就搞定了。当然,安装pm2之前,如果你是新机器,还得安装nodejs,这里推荐用官方的方法,yum安装:
Installing Node.js via package manager | Node.js
启动好rsyslog和pm2的nodejs服务后,我们测试一下接口,是否会成功写入日志,然后我们开始配置服务端的机器。
二,日志服务端的配置:
日志服务的配置,我们要安装的软件如下:java,es,nodejs,GER-sever,以及rsyslog。
java比较好安装,yum list java*
装1.7或者1.8的都可以,这里不说了,nodejs安装同上。
然后是es和GER-server的安装,文章开头有讲,这里更新一下mapping的设置:
curl -XPUT esip:port/_template/template_1 -d '{"template":"logstash-web_access*","mappings":{"logs":{"properties":{"@timestamp":{"type":"date"},"message":{"properties":{"host":{"type":"string","index":"not_analyzed"}}}}}}}'
自行更换esip和port即可。
然后记得安装一下可视化的es插件:
elasticsearch/bin/plugin install mobz/elasticsearch-head
然后访问 http://{你的ip地址}:9200/_plugin/head/
看是否安装成功,默认是9200端口,如果你改了es的端口,就修改9200地址。
rsyslog的安装上面说过了,我们直接看服务端的配置,在/etc/rsyslog.conf下加入下面配置:
module(load="imtcp")module(load="mmjsonparse")module(load="omelasticsearch")input(type="imtcp" Port="514" Ruleset="apprule")template(name="DynFile" type="string" string="/var/log/applog/%$year%-%$month%-%$day%/%fromhost-ip%.log")template(name="logstash-web_access" type="list") { constant(value="logstash-") property(name="syslogtag" format="json") constant(value="-") property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4") constant(value=".") property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7") constant(value=".") property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")}template(name="es-tpl" type="list"){ constant(value="{\"@timestamp\":\"") property(name="timereported" dateFormat="rfc3339") constant(value="\",") property(name="$!all-json" position.from="2")}template( name="nginx-log" type="string" string="%msg%\n" )ruleset(name="apprule") { action(type="mmjsonparse") action(type="omfile" DynaFile="DynFile") action(type="omelasticsearch" template="es-tpl" searchIndex="logstash-web_access" dynSearchIndex="on" bulkmode="on" searchType="logs" queue.type="LinkedList" queue.size="1000" queue.dequeuebatchsize="300" action.resumeretrycount="-1" server="es-ip" serverport="es-port" errorFile="/var/log/applog/es-error.log") stop}
请自行替换最后面omelasticsearch的配置,把server和serverport换成对应自己的服务和端口地址,简单说一下这个配置做了什么。
首先加载几个需要用到的module,然后下面定义了入es的template,模板方面的语法可以自己参考一下rsyslog官方的文档,比较简单,主要是字符替换和格式替换用。因为我们同步的日志只有我们nodejs收集的程序日志,所以订了一个apprule
的规则,最后配置这个apprule规则,先是把日志json进行解析,然后按照DynFile的template格式写入日志,最后再推送到es中。
三,测试:
配置完成后我们通过刷新客户端的接口来进行测试,可以tail一下2台日志的日志做对比,如果同步成功就算大功告成了,当然最后还要查看一下es里是否真正的推入了日志:
es中如果也推数据成功,根据上一篇文章中的GER-Server来配置可视化的错误平台就成了,之前的专栏文章都有说过。
最后,由于es也要做成开机启动,建议都拿chkconfig来进行维护,使用yum安装,然后同样给可视化的平台加上pm2相关的配置和开机启动。
下面贴一下,crontab下我们如何对es的索引进行管理的脚本:
const request = require('request');const moment = require('moment');const url = require('./config').url;const date15Ago = moment().subtract(16, 'd'); // 获取15天之前的日期const date15AgoStr = date15Ago.year() + '.' + (date15Ago.month() + 1) + '.' + date15Ago.date();module.exports = function () { // 删除15天前的日志日志 request.delete({ url: url + date15AgoStr, json: true }, function (error, response, body) { if(!error){ if(response.statusCode == 200){ console.info('success: delete logs of ' + date15AgoStr); } else if(response.statusCode == 404) { console.error('error: error in delete log:\nlog does not exist'); } else { console.error('error: error in delete log:\n'); console.error(body); } } });}
删除15天之前的索引脚本。
const request = require('request');const moment = require('moment');const config = require('./config');const date = moment().subtract(1, 'd'); // 获取前一天的日期const dateStr = date.year() + '.' + (date.month() + 1) + '.' + date.date();module.exports = function () { getLogInfo(function (data) { saveLogInfo(data); }); function getLogInfo(cb) { // 统计前一天的日志信息 request({ url: config.url + dateStr + '/logs/_search', json: true, body: { "size" : 0, "aggs" : { "projectType" : { "terms" : { "field" : "message.projectType" }, "aggs" : { "hosts" : { "terms" : { "field" : "message.host" } } } } } }}, function (error, response, body) { // console.log(body); // console.log(JSON.stringify(body.aggregations.projectType.buckets)); if(!error){ if(response.statusCode == 200){ console.info('success: get log info'); cb(body.aggregations.projectType.buckets); } else if(response.statusCode == 404){ console.error('error: error in get log info:\nlog does not exist'); } else { console.error('error: error in get log info:\n'); console.error(body); } } }); } // 存储前一天的日志信息 function saveLogInfo(data) { request.post({ url: config.baseUrl + 'log_count/' + dateStr, json: true, body: { info: data } }, function (error, response, body) { if(!error){ if(response.statusCode == 201){ console.info('success: save log info'); } else { console.error('error: error in saving log info:\n'); console.error(body); } } }); }}
统计前一天有多少域名,多少端(pc|mobile)错误count的脚本,然后再反存回es。
最后我们再通过crontab把2个脚本做一个定时执行即可,把入口的js文件开头加入:
#!/usr/bin/env node
这一行就可以啦。
ok,做一个记录,怕过2星期我自己也忘了,多谢大家观看。