kafka日志数据清理策略

浩客2年前技术文章1739


1.关于Kafka的日志

在Kafka中,日志分为两种:

1、数据日志是指Kafka的topic中存储的数据,这种日志的路径是$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir。log.dir的默认路径为/tmp/kafka-logs,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,应将文件目录设置在其他可以永久保存的路径。

2、另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOME路径下启动Kafka服务,那么操作日志的路径为KAFKA_HOME/logs。


2.关于Kafka日志清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。

  • log.retention.minutes,分钟。

  • log.retention.ms,最高优先级毫秒。

  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours,那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略delete 和 compact 两种。

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。

  • 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。


2.1、delete 日志删除:将过期数据删除

     当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除

  • log.cleanup.policy = delete 所有数据启用删除策略

1基于时间:默认打开segment 中所有记录中的最大时间戳作为该文件时间戳。

  • 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)

  • Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?


2.2、compact 日志压缩,对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment

  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件

  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空

  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态


图1:

22222222222.PNG

图2:

333333333333.PNG

       压缩后的offset可能是不连续的,比如上图2中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

      这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。



相关文章

Elasticsearch8.5及Kibana8.5安装部署

Elasticsearch8.5及Kibana8.5安装部署

一、环境准备1、Centos7系统2、切换英文系统[root@master02 ~]# tail -n2 /etc/profile export LANG="en_US.UTF-8"3、下载、安...

CDP实操--集群扩容

CDP实操--集群扩容

一、前提准备工作1.确保OS的yum源可以正常使用,通过yum repolist命令可以查看到匹配的OS的所有包2.确保Cloudera Manager的yum源运行正常3.hosts文件配置,需要将...

 oracle11g打补丁31718723报错Operation not permitted

oracle11g打补丁31718723报错Operation not permitted

oracle11g 在打gi补丁的时候报错如下:原因:参考mos(Doc ID 2668094.1)可知因为Linux/Unix平台上,安装后一些Oracle可执行文件的权限需要修改成root。这是以...

数据湖Iceberg

数据湖Iceberg

1、概述         Iceberg 是一个面向海量数据分析场景的开放表格式(Table Format)。表格式(Table  Format)可以理解为元数据以及数据文件的一种组织方式,处于计算框...

大数据组件--Impala概述

Apache Impala是一个适用于实时交互的sql软件,是基于hive的大数据分析查询引擎,hive和impala有不同的侧重面,通常是两个互相配合工作,可以先用hive进行一个数据的预处理清洗转...

SpringBootWeb 篇-深入了解 SpringBoot + Vue 的前后端分离项目部署上线与 Nginx 配置文件结构(3)

SpringBootWeb 篇-深入了解 SpringBoot + Vue 的前后端分离项目部署上线与 Nginx 配置文件结构(3)

4.0 SpringBoot 后端代码进行打包上传服务器        将 SpringBoot 项目进行打包:      &nb...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。