nodejs实践录:按行处理文件数据的示例

背景:
有时候,我们需要对日志做事后分析,抽离出特定的数据进行处理。比如:
1、中间件模块发送json数据到后台,每次发送的json都会转换成字符串保存到日志文件,但因网络原因发送失败,需要事后补充。
2、日志文件混合了各个设备的信息,需要抽离出指定设备编号的信息,再进行分析。
等等。

注意,有时可能处理数据太快导致异常,比如频繁发送json到后台,但后台处理不及,会丢失数据,因此,本文添加延时处理。

本文将应用场景简化,假定如下:
日志文件中的json数据字符串保存成一行数据,也掺杂其它数据,每行日志前面均有时间戳。利用程序读取出json数据进行处理。

1、实现

  • 为验证延时,添加时间戳打印函数getTimestamp。
  • 利用readline库读取文件每一行数据,读取后,进行判定,json数据以“{”开头。如果要提取其它数据,根据特征字段即可。
  • 利用JSON.parse将字符串转换成json结构。
  • 利用setTimeout设置定时时间,并在匿名函数中再次调用之。
  • 通过mybuffer的push保存数据,shift取出数据。以便在定时函数中处理。

3、源码

完整源码如下,假设源码文件名称为test.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**

读取文件一行数据,转换,再处理。
如:
文件每一行的数据为json字符串,读取之,再转换成真实json结构,延时,发送到服务器
读取第一行数据,转换成其它格式,再发送(或保存其它文件)

核心要点:
文件读取、延时处理
*/

var fs = require('fs');
var readline = require('readline');

// 获取当前时间戳
function getTimestamp(){var now=new Date();var y=now.getFullYear();var m=now.getMonth()+1;var d=now.getDate();var hour=now.getHours();var min=now.getMinutes();var sec=now.getSeconds();var msec=now.getMilliseconds();return"["+y+"-"+(m<10?"0"+m:m)+"-"+(d<10?"0"+d:d)+" "+(hour<10?"0"+hour:hour)+":"+(min<10?"0"+min:min)+":"+(sec<10?"0"+sec:sec)+"."+(msec<100?(msec<10?"00"+msec:'0'+msec):msec)+"] "}

function print(fmt, ...extras)
{
console.log(getTimestamp(), fmt, ...extras);
}

// data是泛指
function handle_it(data)
{
// 这里处理data处理,只简单打印
print(data);
}

var g_cnt = 0;
// buffer为一组数据,可理解为队列,元素可为任何值
// timeout超时时间,或函数执行间隔时间
function start_timer(buffer, timeout)
{
function timer_task()
{
var item = buffer.shift();
if(item !== undefined)
{
g_cnt++;
console.log(`# ${g_cnt} will deal data`);

handle_it(item);
// 继续执行
setTimeout(timer_task, timeout);
}
}
setTimeout(timer_task, timeout);
}

function main(filename)
{
var fRead = fs.createReadStream(filename);
var objReadline = readline.createInterface({
input:fRead
});

var mybuffer = [];
start_timer(mybuffer, 3000);

// 读到一行
objReadline.on('line',function(line) {
//console.log(line);
if (line.indexOf('{') >= 0)
{
var org_msg = line.substr(line.indexOf("{"));
var json = JSON.parse(org_msg);
//console.log(json);

mybuffer.push(json);

}
});

objReadline.on('close',function() {
});
}

main("foo.txt");

foo.txt文件内容:

1
2
3
4
5
6
7
[2019-08-03 13:06:14.497] got data {"op":"got","devid":"12345678","time":"20190803110614","ID":" ", "status": "100", "result": "0"}
fdaf3adg
[2019-08-03 13:06:15.497] got data {"op":"set","devid":"222222222","time":"20190803110614","ID":" ", "status": "101", "result": "0"}
dffdaf
[2019-08-03 13:06:16.497] got data {"op":"got","devid":"3333333","time":"20190803110614","ID":" ", "status": "102", "result": "1"}
3654esxgf
[2019-08-03 13:06:17.497] got data {"op":"set","devid":"44444444","time":"20190803110614","ID":" ", "status": "103", "result": "0"}

4、测试

直接执行node test.js即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1 will deal data
[2019-08-03 13:44:17.630] { op: 'got',
devid: '12345678',
time: '20190803110614',
ID: ' ',
status: '100',
result: '0' }
# 2 will deal data
[2019-08-03 13:44:20.650] { op: 'set',
devid: '222222222',
time: '20190803110614',
ID: ' ',
status: '101',
result: '0' }

5、经验

nodejs是异步操作的,前面源码的objReadline.on就是处理lineclose事件的响应函数。异步机制有很多发挥作用的地方,特别是网页的响应。不过,还是有一些场合是需要进行同步的,或者说,需要按照时间进行处理。
比如使用fs.writeFile函数将数据保存到文件时,先保存的数据反而在文件的后面。这样对于过滤日志来说不方便,在没有找到好方法又不想使用C语言实现的情况下,可以通过一些小技巧来解决。
1、使用nodejs的console.log函数打印输出数据。
2、Windows下使用cmd终端执行node test.js | tee output.txt,这样能将console.log输出的信息保存到output.txt文件中。注意,不能使用git bash之类的终端,会提示stdout is not a tty
3、Linux下的未测试。

李迟 2019.8.3 周六