kafka日志数据清理策略

浩客7个月前技术文章423


1.关于Kafka的日志

在Kafka中,日志分为两种:

1、数据日志是指Kafka的topic中存储的数据,这种日志的路径是$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir。log.dir的默认路径为/tmp/kafka-logs,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,应将文件目录设置在其他可以永久保存的路径。

2、另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOME路径下启动Kafka服务,那么操作日志的路径为KAFKA_HOME/logs。


2.关于Kafka日志清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。

  • log.retention.minutes,分钟。

  • log.retention.ms,最高优先级毫秒。

  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours,那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略delete 和 compact 两种。

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。

  • 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。


2.1、delete 日志删除:将过期数据删除

     当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除

  • log.cleanup.policy = delete 所有数据启用删除策略

1基于时间:默认打开segment 中所有记录中的最大时间戳作为该文件时间戳。

  • 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)

  • Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?


2.2、compact 日志压缩,对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment

  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件

  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空

  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态


图1:

22222222222.PNG

图2:

333333333333.PNG

       压缩后的offset可能是不连续的,比如上图2中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

      这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。



相关文章

MySQL 8.0 新特性:Instant Add Column

MySQL 8.0 新特性:Instant Add Column

一、前言MySQL 8.0 支持 “快速加列” 功能,既添加字段时可以支持 “INSTANT” 快速完成。通过只修改数据字典的方法来实现大表快速加列,避免之前加列操作必须做的数据拷贝,从而大幅缩小大表...

rancher证书到期处理

rancher证书到期处理

问题描述:rancher证书到期,需要更新rancher证书问题处理:基础环境信息:rancher版本: rancher:v2.4.3官方关于独立容器Rancher Server证书更新的解决方案:1...

flink获取taskmanager的pstree信息

flink获取taskmanager的pstree信息

使用pstree –p 进程号 的方式能够获取taskmanager的pstree信息,这个地方提供一个收集脚本。内容如下:#!/bin/bashsearchPID() {   l...

lru_cache 缓存

Python 语法: @functools.lru_cache(maxsize=128, typed=False)Least-recently-used 装饰器。Iru 最近最少使用、cache 缓存...

数据湖技术之iceberg(二)iceberg的特点

1  Iceberg的概念及特点Apache Iceberg是一种用于大型数据分析场景的开放表格式(Table Format)。Iceberg使用一种类似于SQL表的高性能表格式,Icebe...

oracle PUS.SPU.CPU.BP.RU.RUR概念简介

PUS.SPU.CPU.BP.RU.RUR概念介绍PSU(Patch Set Updates):Oracle 选取在每个季度用户下载数量最多,并且得到验证具有较低风险的补丁放入到每个季度的PSU中,修...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。