python-日志分析

小丫11个月前技术文章481

1、概述


生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。


一般采集流程:


  1. 日志产出

  1. 日志采集(Logstash、Flume、Scribe)

  1. 日志存储(原始日志)

  1. 日志分析

  1. 日志存储(解析后的日志)

  1. 可视化展示


开源实时日志分析ELK平台。Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端。


2、数据提取


2.1半结构化数据


数据:结构化数据、半结构化数据、非结构化数据


  • 非结构化数据:不能以某种东西来理解它(二进制文件、视频、音频、图片...)

  • 结构化数据:放在数据库中的数据,有数据类型描述。

  • 半结构化数据:是有组织的,有格式的数据,没有数据类型描述。


日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。


2.2文本分析


日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。通过这些技术就能够把日志中需要的数据提取出来。


# test.log

183.60.212.153 - - [19/Feb/2013:10:23:23 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"


这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志,如何提取出数据?


2.3 提取数据代码实现


2.3.1正则匹配


import re

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)
if m:
    print(m.groupdict())    # 字典,存放内容等同于下面
    print(m.groupdict()['ip'])         # 140.205.201.44
    print(m.groupdict()['time'])       # 07/Apr/2017:08:11:06 +0800
    print(m.groupdict()['request'])    # GET
    print(m.groupdict()['url'])        # /
    print(m.groupdict()['agree'])      # HTTP/1.1
    print(m.groupdict()['status'])     # 200
    print(m.groupdict()['byte'])       # 8642
    print(m.groupdict()['useragent'])  # Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)


2.3.2问题提出


通过正则匹配我们可以发现:时间字符串需要转换成时间格式;状态码和字节数应该是int类型的数据。


时间格式转换测试(脱离主程序):


import datetime
timestr = '07/Apr/2017:08:11:06 +0800'
dt = datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')  # 不知道的可以查官方文档
print(dt)    # 2017-04-07 08:11:06+08:00


2.3.3字典封装需要转换格式的项


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}

if m:    # 如果匹配到了
    # 方法一:
    d1 = {}
    for k, v in m.groupdict().items():
        if k in conversion:          # 判断是否在转换字典中
            d1[k] = conversion[k](v) # 需要转换则进行转换
        else:
            d1[k] = v                # 不需要转换则直接添加
    print(d1)

    # 方法二:字典解析式
    d2 = {k: conversion[k](v) if k in conversion else v for k, v in m.groupdict().items()}
    print(d2)

    # 方法三:需要转换直接转换,不需要转换直接返回
    d3 = {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    print(d3)

    
# 执行结果
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}


2.3.4函数封装


import re
import datetime

# 测试行
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        # 方法一:打印
        # 方法二:抛异常
        return None


print(extract(logline))


2.3.5多行处理


import re
import datetime

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 多行处理
with open('test.log', encoding='utf-8') as f:
    for line in f:
        fields = extract(line)
        if fields:
            print(fields)
        else:
            print('ERROR---{}'.format(line))


2.3.6多行处理函数再封装


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 生成器函数
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


for x in loadfile('test.log'):
    print(x)


2.3.7路径处理


from pathlib import Path
import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 迭代文件
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


# 传入多个路径、编码、后缀、是否递归
def load(*paths, encoding='utf-8', ext='*.log*', recursive=False):
    for x in paths:
        p = Path(x)
        if p.is_dir():
            if isinstance(ext, str):
                ext = [ext]
            else:
                ext = list(ext)

            for e in ext:
                files = p.rglob(e) if recursive else p.glob(e)
                for file in files:
                    yield from loadfile(str(file.absolute()), encoding=encoding)

        elif p.is_file():
            yield from loadfile(str(p.absolute()), encoding=encoding)


for x in load('.'):
    print(x)


相关文章

Redis 持久化机制 AOF

Redis 持久化机制 AOF

前言Redis 有两种持久化机制,分别是 RDB 与 AOF 本篇文章将介绍 AOF 的执行过程与应用。1. AOF 简介AOF (Append only file) 持久化是以独立日志的方式记录每次...

PostgreSQL 源码部署

PostgreSQL 源码部署

说明本篇文章介绍 PostgreSQL 单机源码编译部署的详细步骤。1. 准备工作1.1 源码包下载进入 PostgreSQL 官网下载页面  选择 Source 栏目: 接着就进入源码版本目录,选择...

helm安装部署trino对接hive(一)

helm安装部署trino对接hive(一)

前提:本文前提是基于hive组件已经提前安装的情况下,安装部署好trino容器之后进行对hive组件的对接。helm trino地址:https://artifacthub.io/packages/h...

副本集同步原理

一、Initial Sync大体来说,MongoDB副本集同步主要包含两个步骤:1. Initial Sync,全量同步2. Replication,即sync oplog先通过init sync同步...

CentOS6.x下的ntp服务

CentOS6.x下的ntp服务配置192.168.1.1(node01) 负责与外网同步时间,同时作为内网的ntp服务192.168.1.2(node02) 和内网192.168.1.1去同步时间,...

数据湖技术之iceberg(十一)Flink与Iceberg整合-DataStream API

数据湖技术之iceberg(十一)Flink与Iceberg整合-DataStream API

1.实时写入Iceberg表DataStream Api方式操作Iceberg方式目前仅支持Java Api。使用DataStream API 实时写入Iceberg表具体操作如下:2、编写代码使用D...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。