python-日志分析

小丫5个月前技术文章183

1、概述


生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。


一般采集流程:


  1. 日志产出

  1. 日志采集(Logstash、Flume、Scribe)

  1. 日志存储(原始日志)

  1. 日志分析

  1. 日志存储(解析后的日志)

  1. 可视化展示


开源实时日志分析ELK平台。Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端。


2、数据提取


2.1半结构化数据


数据:结构化数据、半结构化数据、非结构化数据


  • 非结构化数据:不能以某种东西来理解它(二进制文件、视频、音频、图片...)

  • 结构化数据:放在数据库中的数据,有数据类型描述。

  • 半结构化数据:是有组织的,有格式的数据,没有数据类型描述。


日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。


2.2文本分析


日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。通过这些技术就能够把日志中需要的数据提取出来。


# test.log

183.60.212.153 - - [19/Feb/2013:10:23:23 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 "http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"


这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志,如何提取出数据?


2.3 提取数据代码实现


2.3.1正则匹配


import re

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)
if m:
    print(m.groupdict())    # 字典,存放内容等同于下面
    print(m.groupdict()['ip'])         # 140.205.201.44
    print(m.groupdict()['time'])       # 07/Apr/2017:08:11:06 +0800
    print(m.groupdict()['request'])    # GET
    print(m.groupdict()['url'])        # /
    print(m.groupdict()['agree'])      # HTTP/1.1
    print(m.groupdict()['status'])     # 200
    print(m.groupdict()['byte'])       # 8642
    print(m.groupdict()['useragent'])  # Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)


2.3.2问题提出


通过正则匹配我们可以发现:时间字符串需要转换成时间格式;状态码和字节数应该是int类型的数据。


时间格式转换测试(脱离主程序):


import datetime
timestr = '07/Apr/2017:08:11:06 +0800'
dt = datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')  # 不知道的可以查官方文档
print(dt)    # 2017-04-07 08:11:06+08:00


2.3.3字典封装需要转换格式的项


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}

if m:    # 如果匹配到了
    # 方法一:
    d1 = {}
    for k, v in m.groupdict().items():
        if k in conversion:          # 判断是否在转换字典中
            d1[k] = conversion[k](v) # 需要转换则进行转换
        else:
            d1[k] = v                # 不需要转换则直接添加
    print(d1)

    # 方法二:字典解析式
    d2 = {k: conversion[k](v) if k in conversion else v for k, v in m.groupdict().items()}
    print(d2)

    # 方法三:需要转换直接转换,不需要转换直接返回
    d3 = {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    print(d3)

    
# 执行结果
{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}

{'ip': '140.205.201.44', 'time': datetime.datetime(2017, 4, 7, 8, 11, 6, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': 'GET', 'url': '/', 'agree': 'HTTP/1.1', 'status': 200, 'byte': 8642, 'useragent': 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)'}


2.3.4函数封装


import re
import datetime

# 测试行
logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        # 方法一:打印
        # 方法二:抛异常
        return None


print(extract(logline))


2.3.5多行处理


import re
import datetime

# 匹配规则
regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

# 转换
conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


# 提取
def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 多行处理
with open('test.log', encoding='utf-8') as f:
    for line in f:
        fields = extract(line)
        if fields:
            print(fields)
        else:
            print('ERROR---{}'.format(line))


2.3.6多行处理函数再封装


import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 生成器函数
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


for x in loadfile('test.log'):
    print(x)


2.3.7路径处理


from pathlib import Path
import re
import datetime

logline = '''140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] "GET / HTTP/1.1" 200 8642 \
"http://job.magedu.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"'''

regex = re.compile(r'(?P<ip>[\d.]{7,}) - - \[(?P<time>.*)\] "(?P<request>.+) (?P<url>.+) (?P<agree>.+)" (?P<status>\d{3}) (?P<byte>\d+) ".+?" "(?P<useragent>.+?)"')

m = regex.match(logline)

conversion = {
    'time': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'byte': int
}


def extract(line: str):
    m = regex.match(line)
    if m:
        return {k: conversion.get(k, lambda x: x)(v) for k, v in m.groupdict().items()}
    else:    # 匹配不上: 错一两行? 错大部分? ——> 可以设置阈值: 超过2000行则告警
        pass
        # 打印
        # 抛异常
        return None


# 迭代文件
def loadfile(filename: str, encoding='utf-8'):
    with open(filename, encoding=encoding) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                print('ERROR---{}'.format(line))
                continue


# 传入多个路径、编码、后缀、是否递归
def load(*paths, encoding='utf-8', ext='*.log*', recursive=False):
    for x in paths:
        p = Path(x)
        if p.is_dir():
            if isinstance(ext, str):
                ext = [ext]
            else:
                ext = list(ext)

            for e in ext:
                files = p.rglob(e) if recursive else p.glob(e)
                for file in files:
                    yield from loadfile(str(file.absolute()), encoding=encoding)

        elif p.is_file():
            yield from loadfile(str(p.absolute()), encoding=encoding)


for x in load('.'):
    print(x)


相关文章

REPMGR-PG高可用搭建(一)

REPMGR-PG高可用搭建(一)

PG高可用对比数据库复制的术语和定义这些术语和定义应该有助于讨论复制。在与其他Postgres开发人员进行了大量讨论之后,我编译了它们,但是这些定义应该是普遍可用的,并且也应该适用于其他RDBMS。复...

Oozie安装web页面

Oozie安装web页面

1、查看web页面,页面显示页面显示无法打开oozie的web安装页面,如果需要安装页面需要安装Ext js的lib库2、安装ExtJS 2.2库进入/var/lib/oozie路径中cd  /var...

Python Web 自动化测试工具 — Selenium

Selenium 是一个 Web 自动化测试工具,Selenium 通过非常简洁方便的 API,使用 Selenium WebDrivers(Selenium web 驱动器)像使用 Firefox,...

MySQL 同步方式

同步方式一、分类同步大致为异步、半同步、增强版同步、全同步;二、详情1.异步复制MySQL 默认的复制策略,Master处理事务过程中,将其写入Binlog就会通知Dump thread线程处理,然后...

CDH配置HTTPS访问

CDH配置HTTPS访问

申请一台新的机器部署nginx,生成https/ssl证书的机器没有要求1.生成https/ssl证书[root@cdp01 ~]# mkdir -p /data/cert [root@cdp01 ...

MySQL 在线开启 GTID

MySQL 在线开启 GTID

描述生产环境上也会遇到需要开启 GTID ,有什么风险?如何在线开启?本篇 SOP 将介绍。GTID 限制由于基于 GTID 复制依赖于事务,所有开启 GTID 时,有些 MySQL 特性不支持:事务...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。