python-日志分析
1、概述
生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。
一般采集流程:
日志产出
日志采集(Logstash、Flume、Scribe)
日志存储(原始日志)
日志分析
日志存储(解析后的日志)
可视化展示
开源实时日志分析ELK平台。Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端。
2、数据提取
2.1半结构化数据
数据:结构化数据、半结构化数据、非结构化数据
非结构化数据:不能以某种东西来理解它(二进制文件、视频、音频、图片...)
结构化数据:放在数据库中的数据,有数据类型描述。
半结构化数据:是有组织的,有格式的数据,没有数据类型描述。
日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。
2.2文本分析
日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。通过这些技术就能够把日志中需要的数据提取出来。
# test.log 183.60.212.153 - - [19/Feb/2013:10:23:23 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)" 140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"
这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志,如何提取出数据?
2.3 提取数据代码实现
2.3.1正则匹配
import re
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
m = regex.match(logline)
if m:
print(m.groupdict()) # 字典,存放内容等同于下面
print(m.groupdict()['ip']) # 140.205.201.44
print(m.groupdict()['time']) # 07/Apr/2017:08:11:06 +0800
print(m.groupdict()['request']) # GET
print(m.groupdict()['url']) # /
print(m.groupdict()['agree']) # HTTP/1.1
print(m.groupdict()['status']) # 200
print(m.groupdict()['byte']) # 8642
print(m.groupdict()['useragent']) # Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)2.3.2问题提出
通过正则匹配我们可以发现:时间字符串需要转换成时间格式;状态码和字节数应该是int类型的数据。
时间格式转换测试(脱离主程序):
import datetime timestr = '07/Apr/2017:08:11:06 +0800' dt = datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z') # 不知道的可以查官方文档 print(dt) # 2017-04-07 08:11:06+08:00
2.3.3字典封装需要转换格式的项
import re
import datetime
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
m = regex.match(logline)
conversion = {
'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'byte': int
}
if m: # 如果匹配到了
# 方法一:
d1 = {}
for k, v in m.groupdict().items():
if k in conversion: # 判断是否在转换字典中
d1[k] = conversion[k](v) # 需要转换则进行转换
else:
d1[k] = v # 不需要转换则直接添加
print(d1)
# 方法二:字典解析式
d2 = {k: conversion[k](v) if k in conversion else v for k, v in m.groupdict().items()}
print(d2)
# 方法三:需要转换直接转换,不需要转换直接返回
d3 = {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
print(d3)
# 执行结果
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}2.3.4函数封装
import re
import datetime
# 测试行
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''
# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
# 转换
conversion = {
'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'byte': int
}
# 提取
def extract(line: str):
m = regex.match(line)
if m:
return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
# 方法一:打印
# 方法二:抛异常
return None
print(extract(logline))2.3.5多行处理
import re
import datetime
# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
# 转换
conversion = {
'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'byte': int
}
# 提取
def extract(line: str):
m = regex.match(line)
if m:
return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
pass
# 打印
# 抛异常
return None
# 多行处理
with open('test.log', encoding='utf-8') as f:
for line in f:
fields = extract(line)
if fields:
print(fields)
else:
print('ERROR---{}'.format(line))2.3.6多行处理函数再封装
import re
import datetime
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
m = regex.match(logline)
conversion = {
'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'byte': int
}
def extract(line: str):
m = regex.match(line)
if m:
return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
pass
# 打印
# 抛异常
return None
# 生成器函数
def loadfile(filename: str, encoding='utf-8'):
with open(filename, encoding=encoding) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
print('ERROR---{}'.format(line))
continue
for x in loadfile('test.log'):
print(x)2.3.7路径处理
from pathlib import Path
import re
import datetime
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')
m = regex.match(logline)
conversion = {
'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'byte': int
}
def extract(line: str):
m = regex.match(line)
if m:
return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
pass
# 打印
# 抛异常
return None
# 迭代文件
def loadfile(filename: str, encoding='utf-8'):
with open(filename, encoding=encoding) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
print('ERROR---{}'.format(line))
continue
# 传入多个路径、编码、后缀、是否递归
def load(*paths, encoding='utf-8', ext='*.log*', recursive=False):
for x in paths:
p = Path(x)
if p.is_dir():
if isinstance(ext, str):
ext = [ext]
else:
ext = list(ext)
for e in ext:
files = p.rglob(e) if recursive else p.glob(e)
for file in files:
yield from loadfile(str(file.absolute()), encoding=encoding)
elif p.is_file():
yield from loadfile(str(p.absolute()), encoding=encoding)
for x in load('.'):
print(x)



