kafka日志数据清理策略
1.关于Kafka的日志
在Kafka中,日志分为两种:
1、数据日志是指Kafka的topic中存储的数据,这种日志的路径是$KAFKA_HOME/config/server.properties文件中配置,配置项为log.dirs。如果此项没有被配置,默认会使用配置项 log.dir。log.dir的默认路径为/tmp/kafka-logs,/tmp路径下的文件在计算机重启的时候是会被删除的,因此,应将文件目录设置在其他可以永久保存的路径。
2、另一种日志是操作日志,类似于我们在自己开发的程序中输出的log日志(log4j),这种日志的路径是在启动Kafka的路径下。比如一般我们在KAFKA_HOME路径下启动Kafka服务,那么操作日志的路径为KAFKA_HOME/logs。
2.关于Kafka日志清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
log.retention.hours,最低优先级小时,默认 7 天。
log.retention.minutes,分钟。
log.retention.ms,最高优先级毫秒。
log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours,那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。
2.1、delete 日志删除:将过期数据删除
当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除
log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
log.retention.bytes,默认等于-1,表示无穷大。
如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
2.2、compact 日志压缩,对于相同key的不同value值,只保留最后一个版本。
log.cleanup.policy = compact 所有数据启用压缩策略
Log Compaction执行后,offset将不再连续,但依然可以查询Segment
Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态
图1:
图2:
压缩后的offset可能是不连续的,比如上图2中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。