python-日志分析
1、概述
生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。
一般采集流程:
日志产出
日志采集(Logstash、Flume、Scribe)
日志存储(原始日志)
日志分析
日志存储(解析后的日志)
可视化展示
开源实时日志分析ELK平台。Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端。
2、数据提取
2.1半结构化数据
数据:结构化数据、半结构化数据、非结构化数据
非结构化数据:不能以某种东西来理解它(二进制文件、视频、音频、图片...)
结构化数据:放在数据库中的数据,有数据类型描述。
半结构化数据:是有组织的,有格式的数据,没有数据类型描述。
日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。
2.2文本分析
日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。通过这些技术就能够把日志中需要的数据提取出来。
# test.log 183.60.212.153 - - [19/Feb/2013:10:23:23 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)" 140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"
这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志,如何提取出数据?
2.3 提取数据代码实现
2.3.1正则匹配
import re logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \ "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"''' regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') m = regex.match(logline) if m: print(m.groupdict()) # 字典,存放内容等同于下面 print(m.groupdict()['ip']) # 140.205.201.44 print(m.groupdict()['time']) # 07/Apr/2017:08:11:06 +0800 print(m.groupdict()['request']) # GET print(m.groupdict()['url']) # / print(m.groupdict()['agree']) # HTTP/1.1 print(m.groupdict()['status']) # 200 print(m.groupdict()['byte']) # 8642 print(m.groupdict()['useragent']) # Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)
2.3.2问题提出
通过正则匹配我们可以发现:时间字符串需要转换成时间格式;状态码和字节数应该是int类型的数据。
时间格式转换测试(脱离主程序):
import datetime timestr = '07/Apr/2017:08:11:06 +0800' dt = datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z') # 不知道的可以查官方文档 print(dt) # 2017-04-07 08:11:06+08:00
2.3.3字典封装需要转换格式的项
import re import datetime logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \ "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"''' regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') m = regex.match(logline) conversion = { 'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'byte': int } if m: # 如果匹配到了 # 方法一: d1 = {} for k, v in m.groupdict().items(): if k in conversion: # 判断是否在转换字典中 d1[k] = conversion[k](v) # 需要转换则进行转换 else: d1[k] = v # 不需要转换则直接添加 print(d1) # 方法二:字典解析式 d2 = {k: conversion[k](v) if k in conversion else v for k, v in m.groupdict().items()} print(d2) # 方法三:需要转换直接转换,不需要转换直接返回 d3 = {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()} print(d3) # 执行结果 {'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'} {'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'} {'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}
2.3.4函数封装
import re import datetime # 测试行 logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \ "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"''' # 匹配规则 regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') # 转换 conversion = { 'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'byte': int } # 提取 def extract(line: str): m = regex.match(line) if m: return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()} else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警 # 方法一:打印 # 方法二:抛异常 return None print(extract(logline))
2.3.5多行处理
import re import datetime # 匹配规则 regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') # 转换 conversion = { 'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'byte': int } # 提取 def extract(line: str): m = regex.match(line) if m: return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()} else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警 pass # 打印 # 抛异常 return None # 多行处理 with open('test.log', encoding='utf-8') as f: for line in f: fields = extract(line) if fields: print(fields) else: print('ERROR---{}'.format(line))
2.3.6多行处理函数再封装
import re import datetime logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \ "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"''' regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') m = regex.match(logline) conversion = { 'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'byte': int } def extract(line: str): m = regex.match(line) if m: return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()} else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警 pass # 打印 # 抛异常 return None # 生成器函数 def loadfile(filename: str, encoding='utf-8'): with open(filename, encoding=encoding) as f: for line in f: fields = extract(line) if fields: yield fields else: print('ERROR---{}'.format(line)) continue for x in loadfile('test.log'): print(x)
2.3.7路径处理
from pathlib import Path import re import datetime logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \ "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"''' regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"') m = regex.match(logline) conversion = { 'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'byte': int } def extract(line: str): m = regex.match(line) if m: return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()} else: # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警 pass # 打印 # 抛异常 return None # 迭代文件 def loadfile(filename: str, encoding='utf-8'): with open(filename, encoding=encoding) as f: for line in f: fields = extract(line) if fields: yield fields else: print('ERROR---{}'.format(line)) continue # 传入多个路径、编码、后缀、是否递归 def load(*paths, encoding='utf-8', ext='*.log*', recursive=False): for x in paths: p = Path(x) if p.is_dir(): if isinstance(ext, str): ext = [ext] else: ext = list(ext) for e in ext: files = p.rglob(e) if recursive else p.glob(e) for file in files: yield from loadfile(str(file.absolute()), encoding=encoding) elif p.is_file(): yield from loadfile(str(p.absolute()), encoding=encoding) for x in load('.'): print(x)