python-日志分析

小丫2年前技术文章1399

1、概述


生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。


一般采集流程:


  1. 日志产出

  1. 日志采集(Logstash、Flume、Scribe)

  1. 日志存储(原始日志)

  1. 日志分析

  1. 日志存储(解析后的日志)

  1. 可视化展示


开源实时日志分析ELK平台。Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端。


2、数据提取


2.1半结构化数据


数据:结构化数据、半结构化数据、非结构化数据


  • 非结构化数据:不能以某种东西来理解它(二进制文件、视频、音频、图片...)

  • 结构化数据:放在数据库中的数据,有数据类型描述。

  • 半结构化数据:是有组织的,有格式的数据,没有数据类型描述。


日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。


2.2文本分析


日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。通过这些技术就能够把日志中需要的数据提取出来。


# test.log

183.60.212.153 - - [19/Feb/2013:10:23:23 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"


这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志,如何提取出数据?


2.3 提取数据代码实现


2.3.1正则匹配


import re

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)
if m:
    print(m.groupdict())    # 字典,存放内容等同于下面
    print(m.groupdict()['ip'])         # 140.205.201.44
    print(m.groupdict()['time'])       # 07/Apr/2017:08:11:06 +0800
    print(m.groupdict()['request'])    # GET
    print(m.groupdict()['url'])        # /
    print(m.groupdict()['agree'])      # HTTP/1.1
    print(m.groupdict()['status'])     # 200
    print(m.groupdict()['byte'])       # 8642
    print(m.groupdict()['useragent'])  # Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)


2.3.2问题提出


通过正则匹配我们可以发现:时间字符串需要转换成时间格式;状态码和字节数应该是int类型的数据。


时间格式转换测试(脱离主程序):


import datetime
timestr = '07/Apr/2017:08:11:06 +0800'
dt = datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')  # 不知道的可以查官方文档
print(dt)    # 2017-04-07 08:11:06+08:00


2.3.3字典封装需要转换格式的项


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}

if m:    # 如果匹配到了
    # 方法一:
    d1 = {}
    for k, v in m.groupdict().items():
        if k in conversion:          # 判断是否在转换字典中
            d1[k] = conversion[k](v) # 需要转换则进行转换
        else:
            d1[k] = v                # 不需要转换则直接添加
    print(d1)

    # 方法二:字典解析式
    d2 = {k: conversion[k](v) if k in conversion else v for k, v in m.groupdict().items()}
    print(d2)

    # 方法三:需要转换直接转换,不需要转换直接返回
    d3 = {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    print(d3)

    
# 执行结果
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}


2.3.4函数封装


import re
import datetime

# 测试行
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        # 方法一:打印
        # 方法二:抛异常
        return None


print(extract(logline))


2.3.5多行处理


import re
import datetime

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 多行处理
with open('test.log', encoding='utf-8') as f:
    for line in f:
        fields = extract(line)
        if fields:
            print(fields)
        else:
            print('ERROR---{}'.format(line))


2.3.6多行处理函数再封装


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 生成器函数
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


for x in loadfile('test.log'):
    print(x)


2.3.7路径处理


from pathlib import Path
import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 迭代文件
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


# 传入多个路径、编码、后缀、是否递归
def load(*paths, encoding='utf-8', ext='*.log*', recursive=False):
    for x in paths:
        p = Path(x)
        if p.is_dir():
            if isinstance(ext, str):
                ext = [ext]
            else:
                ext = list(ext)

            for e in ext:
                files = p.rglob(e) if recursive else p.glob(e)
                for file in files:
                    yield from loadfile(str(file.absolute()), encoding=encoding)

        elif p.is_file():
            yield from loadfile(str(p.absolute()), encoding=encoding)


for x in load('.'):
    print(x)


相关文章

MapReduce工作机制解析

MapReduce工作机制解析

一、MapTask工作机制主要可以分为Read阶段,Map阶段,Collect阶段,Spill阶段(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入In...

PG的锁(一)

一、表级锁1.1 表级锁模式常见锁模式以及应用场景:ACCESS SHARE :select操作获取该模式锁资源,通常情况下所有只读取不修改表的查询都会获取该模式锁资源ROW SHARE : sele...

Presto开发语句简介

Presto开发语句简介

根据presto中的结构配置,catalog表示连接,主要看presto中catalog文件夹下的配置,一般包含hive、mysql等,其中可以根据业务的不同设置多个配置文件。schema表示连接中的...

Greenplum数据库建立外部表加载HDFS文件实践指导

Greenplum数据库建立外部表加载HDFS文件实践指导

环境概述(1)     Greenplum数据库版本号Greenplum Database 4.3.0.0POC3 build 45206(基于PostgreS...

Flume使用案例之Flume与Flume之间数据传递(单Flume多Channel、Sink)

目标:使用flume1监控文件变动,flume1将变动内容传递给flume-2,flume-2负责存储到HDFS。同时flume1将变动内容传递给flume-3,flume-3负责输出到local分步...

bucket跨域问题处理

bucket跨域问题处理

问题描述OSS bucket 访问存在跨域问题问题处理查看oss 能否针对整个bucket设置no-cache吗核实目前阿里云后台只支持单个文件的HTTP头设置,不支持批量设置,如果有多个文件或者后续...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。