kafka日志数据清理策略

浩客1年前技术文章1187


1.关于Kafka的日志

在Kafka中,日志分为两种:

1、数据日志是指Kafka的topic中存储的数据,这种日志的路径是$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir。log.dir的默认路径为/tmp/kafka-logs,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,应将文件目录设置在其他可以永久保存的路径。

2、另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOME路径下启动Kafka服务,那么操作日志的路径为KAFKA_HOME/logs。


2.关于Kafka日志清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

  • log.retention.hours,最低优先级小时,默认 7 天。

  • log.retention.minutes,分钟。

  • log.retention.ms,最高优先级毫秒。

  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours,那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略delete 和 compact 两种。

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。

  • 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。


2.1、delete 日志删除:将过期数据删除

     当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除

  • log.cleanup.policy = delete 所有数据启用删除策略

1基于时间:默认打开segment 中所有记录中的最大时间戳作为该文件时间戳。

  • 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)

  • Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?


2.2、compact 日志压缩,对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment

  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件

  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空

  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态


图1:

22222222222.PNG

图2:

333333333333.PNG

       压缩后的offset可能是不连续的,比如上图2中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

      这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。



相关文章

HAProxy

HAProxy

1、HAProxy简介  HAProxy 是一款基于 TCP(第四层)和 HTTP(第七层)应用的代理软件,它也可作为负载均衡器,而且完全免费。 借助 HAProxy,可以快速并且可靠地提供基于 TC...

PG的统计信息(三)

1.3 数据分布类统计信息1.3.1 pg_stats通过对pg_stats的查询,可以查看每个字段的数据分析统计信息,类似SQL Server的直方图,为优化器选择最佳执行计划提供依据,pg_sta...

bucket跨域问题处理

bucket跨域问题处理

问题描述OSS bucket 访问存在跨域问题问题处理查看oss 能否针对整个bucket设置no-cache吗核实目前阿里云后台只支持单个文件的HTTP头设置,不支持批量设置,如果有多个文件或者后续...

Yarn调度器和调度算法详解

Yarn调度器和调度算法详解

目前,Hadoop作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。Apache Hadoop3.1.3默认的资源调度器是C...

MySQL 创建索引报错

创建索引报错添加索引发现报错,具体报错如下:create unique index sm_sample_clothing_skc_SkcUniqueKey_uindex on sm_sample_cl...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。