[聚合文章] nodejs收集日志,rsyslog同步收集入es的实施

JavaScript 2017-12-11 28 阅读

之前写过相关的2篇文章:

1,前端异常监控系统的落地

2,ElasticSearch和Gome-error-report的安装教程

其实中间还忽略了一个问题,在文章2中,我只是在GER-server的项目中增加了一个ES的create log的接口,这么做其实是一个不太好的方案,因为通过API的方式远程写日志,在大并发的情况下并不是最优的,会遇到一些瓶颈和写入失败的问题,如何能够避免呢?换了公司之后,自己在新公司做了一套新的日志收集方案,这里记录并且分享给大家。

如果没有读过前两篇文章的建议去阅读前两篇之后再来看这一篇,本文主要是说日志的落地和同步收集的实施。

首先来看一下这张图:

本文主要是说错误SDK上报后,我们如何来做负载和日志的同步,这里的技术栈包括了rsyslog,es,nodejs,crontab。

一,日志收集机的配置

首先,一开始设计的是sdk上报后,先推到nginx,拿nginx的access log做落地的,但是后来发现,当SDK上报一条信息,包含多条错误日志时,这种做法很蠢,因为在rsyslog中,都是一条一条去push到es的,中间需要处理的过程比较多,当然可以在nginx那一层拿lua来做一次处理,这里我选择使用nodejs。

处理逻辑比较简单,我贴一下完整代码:

let express = require('express');let app = express();let port = 80;let fs = require('fs');let fsExtra = require('fs-extra');let path = require('path');let moment = require('moment');let accessDir = '/var/log/jslogs/';let accessPath = accessDir + 'jserror.access.log';let logpath = path.resolve(__dirname, accessPath);let async = require('async');fsExtra.ensureDirSync(accessDir);var rotatingLogStream = require('file-stream-rotator').getStream({  filename: accessPath,  frequency: "1h",  verbose: false,  max_logs: "5d",  audit_file: "/var/log/jslogs/log-audit.json"});app.get('/read.gif', (req, res, next) => {  let img = fs.createReadStream(path.resolve(__dirname, './images/read.gif'));  var err_msg = req.query.err_msg;  if (err_msg) {    let logs = [];    let writelogs = [];    let errmsg = decodeURIComponent(err_msg);    // | 会分割成多条    let errLogs = errmsg.split('|');    errLogs.forEach((msg) => {      let params = {        log_master: 'js',        ext: '-'      };      msg = msg.replace(/\^/g, '&');      msg = msg.split('&');      msg.forEach((item) => {        item = item.split('=');        params[item[0]] = item[1];      });      let timestamp = moment().format();      let request_time = moment().format('YYYY-MM-DD hh:mm:ss');      let log = {        project_name: "JS",        '@timestamp': timestamp,        request_time: request_time,        message: {          log_master: params.log_master,          msg: params.msg,          projectType: params.projectType,          currentUrl: params.currentUrl,          flashVer: params.flashVer,          level: params.level,          referer: params.referer,          screenSize: params.screenSize,          timestamp: params.timestamp,          userAgent: params.userAgent,          title: params.title,          host: params.host,          colNum: params.colNum,          rowNum: params.rowNum,          targetUrl: params.targetUrl,          ext: params.ext        }      }      log = '@cee: ' + JSON.stringify(log) + '\n';      writelogs.push(function(cb) {	rotatingLogStream.write(log + '\n',cb);      });    });    //写入日志    async.parallel(writelogs);    img.pipe(res);  } else {    img.pipe(res);  }});app.use((req, res, next) => {  res.status(404);  res.send('404: File Not Found');});app.listen(port);console.log('the server is listen on %s', port);

上面的代码可以简单说一下作用,nodejs监听80端口,然后get请求有一个read.gif的1x1图片接口,别人用err_msg=xxx之后访问,解析出参数最后再落到es格式的日志,日志切分用的是file-stream-rotator,每1小时切一次,保留5天内的日志。

日志参数详见GER SDK的上报格式,这里存的是一致的日志格式。

然后我们看一下rsyslog的设置,rsyslog分为客户端和服务端,客户端为收集日志的配置,既把nodejs的落地日志推到服务端机器,服务端则做接收解析和推入es中。

下面说一下rsyslog的安装和配置,我这边做的系统是centos6.5,默认安装的是5.8,需要安装最新的rsyslog以支持更多的插件和template语法。

wget http://rpms.adiscon.com/v8-stable/rsyslog.repomv rsyslog.repo /etc/yum.repos.d/rsyslog.repoyum info rsyslog --skip-brokenyum install -y rsyslog rsyslog-elasticsearch rsyslog-mmjsonparsersyslogd -version

切记要装rsyslog的elasticsearch和mmjsonparse的扩展,当然这2个模块,在服务端的rsyslog上安装即可。

然后我们看一下客户端是如何配置的,在/etc/rsyslog.conf文件后添加:

module(load="imfile")ruleset(name="remote"){  action(type="omfwd" Protocol="tcp" Target="服务端ip" Port="514") stop}input(type="imfile"    File="/var/log/jslogs/*.log.*"    Facility="user"    Severity="error"    Tag="web_access"    PersistStateInterval="1"    Ruleset="remote")

这段配置指的是把/var/log/jslogs/下面的所有log,通过tcp推送到服务端的514端口。

然后我们再配置一下rsyslog的开机启动即可。

chkconfig rsyslog on

然后我们再配置一下nodejs的开机启动和pm2的日志,首先是安装pm2:

npm install pm2 -gpm2 install pm2-logrotatenpm install pm2-gui

以上分别为pm2,pm2日志切割模块,pm2的可视化server。

然后使用pm2-gui start 启动可视化的pm2管理界面,默认端口是8088,默认密码:AuTh.

然后pm2启动我们上面的nodejs日志记录。

pm2 start index.js --name errorNodeLog -o ./logs/access.out.log -e ./logs/error.out.log

然后我们设置pm2的日志切割参数:

pm2 set pm2-logrotate:compress true pm2 set pm2-logrotate:rotateInterval '* * 1 * * * *'

设置完毕后,我们设置pm2的开机启动:

pm2 savepm2 startup centos6

基本到这里,我们的客户端机器就搞定了。当然,安装pm2之前,如果你是新机器,还得安装nodejs,这里推荐用官方的方法,yum安装:

Installing Node.js via package manager | Node.js

启动好rsyslog和pm2的nodejs服务后,我们测试一下接口,是否会成功写入日志,然后我们开始配置服务端的机器。

二,日志服务端的配置:

日志服务的配置,我们要安装的软件如下:java,es,nodejs,GER-sever,以及rsyslog。

java比较好安装,yum list java* 装1.7或者1.8的都可以,这里不说了,nodejs安装同上。

然后是es和GER-server的安装,文章开头有讲,这里更新一下mapping的设置:

curl -XPUT esip:port/_template/template_1 -d '{"template":"logstash-web_access*","mappings":{"logs":{"properties":{"@timestamp":{"type":"date"},"message":{"properties":{"host":{"type":"string","index":"not_analyzed"}}}}}}}' 

自行更换esip和port即可。

然后记得安装一下可视化的es插件:

elasticsearch/bin/plugin install mobz/elasticsearch-head 

然后访问 http://{你的ip地址}:9200/_plugin/head/看是否安装成功,默认是9200端口,如果你改了es的端口,就修改9200地址。

rsyslog的安装上面说过了,我们直接看服务端的配置,在/etc/rsyslog.conf下加入下面配置:

module(load="imtcp")module(load="mmjsonparse")module(load="omelasticsearch")input(type="imtcp" Port="514" Ruleset="apprule")template(name="DynFile" type="string" string="/var/log/applog/%$year%-%$month%-%$day%/%fromhost-ip%.log")template(name="logstash-web_access" type="list") {    constant(value="logstash-") property(name="syslogtag" format="json")    constant(value="-")    property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4")    constant(value=".")    property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7")    constant(value=".")    property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")}template(name="es-tpl" type="list"){    constant(value="{\"@timestamp\":\"") property(name="timereported" dateFormat="rfc3339")    constant(value="\",") property(name="$!all-json" position.from="2")}template( name="nginx-log" type="string" string="%msg%\n" )ruleset(name="apprule") {    action(type="mmjsonparse")    action(type="omfile" DynaFile="DynFile")    action(type="omelasticsearch"        template="es-tpl"        searchIndex="logstash-web_access"        dynSearchIndex="on"        bulkmode="on"        searchType="logs"        queue.type="LinkedList"        queue.size="1000"        queue.dequeuebatchsize="300"        action.resumeretrycount="-1"        server="es-ip"        serverport="es-port"        errorFile="/var/log/applog/es-error.log") stop}

请自行替换最后面omelasticsearch的配置,把server和serverport换成对应自己的服务和端口地址,简单说一下这个配置做了什么。

首先加载几个需要用到的module,然后下面定义了入es的template,模板方面的语法可以自己参考一下rsyslog官方的文档,比较简单,主要是字符替换和格式替换用。因为我们同步的日志只有我们nodejs收集的程序日志,所以订了一个apprule的规则,最后配置这个apprule规则,先是把日志json进行解析,然后按照DynFile的template格式写入日志,最后再推送到es中。

三,测试:

配置完成后我们通过刷新客户端的接口来进行测试,可以tail一下2台日志的日志做对比,如果同步成功就算大功告成了,当然最后还要查看一下es里是否真正的推入了日志:

es中如果也推数据成功,根据上一篇文章中的GER-Server来配置可视化的错误平台就成了,之前的专栏文章都有说过。

最后,由于es也要做成开机启动,建议都拿chkconfig来进行维护,使用yum安装,然后同样给可视化的平台加上pm2相关的配置和开机启动。

下面贴一下,crontab下我们如何对es的索引进行管理的脚本:

const request = require('request');const moment = require('moment');const url = require('./config').url;const date15Ago = moment().subtract(16, 'd'); // 获取15天之前的日期const date15AgoStr = date15Ago.year() + '.' + (date15Ago.month() + 1) + '.' + date15Ago.date();module.exports = function () {    // 删除15天前的日志日志    request.delete({        url: url + date15AgoStr,        json: true    }, function (error, response, body) {        if(!error){            if(response.statusCode == 200){                console.info('success: delete logs of ' + date15AgoStr);            } else if(response.statusCode == 404) {                console.error('error: error in delete log:\nlog does not exist');            } else {                console.error('error: error in delete log:\n');                console.error(body);            }        }    });}

删除15天之前的索引脚本。

const request = require('request');const moment = require('moment');const config = require('./config');const date = moment().subtract(1, 'd'); // 获取前一天的日期const dateStr = date.year() + '.' + (date.month() + 1) + '.' + date.date();module.exports = function () {    getLogInfo(function (data) {        saveLogInfo(data);    });    function getLogInfo(cb) {        // 统计前一天的日志信息        request({            url: config.url + dateStr + '/logs/_search',            json: true,            body: {                "size" : 0,                "aggs" : {                    "projectType" : {                        "terms" : {                            "field" : "message.projectType"                        },                        "aggs" : {                            "hosts" : {                                "terms" : {                                    "field" : "message.host"                                }                            }                        }                    }                }            }}, function (error, response, body) {            // console.log(body);            // console.log(JSON.stringify(body.aggregations.projectType.buckets));            if(!error){                if(response.statusCode == 200){                    console.info('success: get log info');                    cb(body.aggregations.projectType.buckets);                } else if(response.statusCode == 404){                    console.error('error: error in get log info:\nlog does not exist');                } else {                    console.error('error: error in get log info:\n');                    console.error(body);                }            }        });    }    // 存储前一天的日志信息    function saveLogInfo(data) {        request.post({            url: config.baseUrl + 'log_count/' + dateStr,            json: true,            body: {                info: data            }        }, function (error, response, body) {            if(!error){                if(response.statusCode == 201){                    console.info('success: save log info');                } else {                    console.error('error: error in saving log info:\n');                    console.error(body);                }            }        });    }}

统计前一天有多少域名,多少端(pc|mobile)错误count的脚本,然后再反存回es。

最后我们再通过crontab把2个脚本做一个定时执行即可,把入口的js文件开头加入:

#!/usr/bin/env node

这一行就可以啦。

ok,做一个记录,怕过2星期我自己也忘了,多谢大家观看。

「真诚赞赏,手留余香」
还没有人赞赏,快来当第一个赞赏的人吧!