1. Log Cleanup Policy

  • Kafka将消息存储一段时间,并清除超过保留期的消息;

  • 此过期由log.cleanup.policy控制,有两种策略;

  • log.cleanup.policy=delete

  • 所有用户主题(user topic)的默认设置,默认保留期为一周;

  • Kafka会删除早于配置的保留时间的事件;

  • log.cleanup.policy=compact

  • Kafka的 __consumer_offsets 的主题默认策略;

  • kafka只存储主题中每个键的最新值,将策略设置为compact,
    仅在应用程序为其生成同时包含键和值的事件的主题上才有意义;

1.1. Log Cleanup Purpose

  • Kafka最初并不打算永远保留数据,尽管有人正使用large disk或Tiered Storage;
    也不需等待所有消费者阅读消息后再删除,通过为每个主题设置保留策略,它允许管理员:

    • 控制磁盘上数据的大小并删除过时的数据;

    • 限制Kafka集群的维护工作;

    • 限制消费者可能需消耗的历史数据量以catch up主题;

1.2. Log Cleanup Frequency

  • 日志清理发生在partition segment,分段将分区数据
    划分为更小的文件,此类文件可更好的管理并独立清理;

  • 清理应经常进行,以确保文件被删除,但不要经常影响broker
    和磁盘性能,较小的日志保留大小可能需更频繁的检查;

2. Log Compaction Theory

  • 正如Kafka日志清理策略提到,Kafka将存储消息一段时间,并清除超过保留期的消息;

  • 但如Kafka用于存储公司所有员工工资信息,那此场景下,只存储
    每个员工的最新薪资,并不是有限时间内的历史数据是有意义的;

  • Kafka支持此用例,允许将主题的保留策略设置为compact,且属性至少保留
    分区中每个键的最新值,如只需SNAPSHOT,而非完整的历史记录,这很有用;

2.1. Log Compaction Case

  • 想保留员工的最新工资,为此创建"employee-salary"的主题,我们不想知道员工的历史薪资;

LogCompaction
  • 生产producing这些事件的应用应同时包含一个键和一个值,
    此情况下,关键是员工Id,值将是薪资,当数据进入(come in);

  • 它会被追加到append into 分区的分段segment,在compaction后,
    将创建新的segment,只保留某个键的最新事件,键的旧事件被删除,消息的偏移量保持不变intact;

2.2. Log Compaction Guarantee

  • Kafka在log-compacted topic上生成produce的
    消息提供provide重要保证important guarantee;

  • 任何正在读取日志尾部tail(即最新数据)的消费者仍会看到发送到该主题的所有消息;

  • 主题是否经过log-compacted并不重要,
    订阅该主题的消费者将在该主题上produce消息时看到消息;

  • 保持Key级别和分区级别消息排序,log compaction只删除(remove)一些消息,
    但不会重新排序,偏移量将保留,只有一些消息被删除delete;

  • 消息的偏移量不可变immutable,永远不变,若消息丢失missing,则跳过skip偏移量;

  • 在log.cleaner.delete.retension.ms(默认24小时)的一段时间内,
    消费者仍可看到已删除delete的记录,这为消费者提供一些提醒时间
    heads-up time来catch up on msg将被删除的消息;

2.3. Log Compaction Myth Busting

  • log compaction misconception(误解) and Clear:

    • A:它不会组织我们将重复的数据推送给Kafka,prevent pushing duplicate;

    • 重复数据消除deduplication是在提交commit数据段segment后完成的,
      一旦数据到达,消费者仍将从日志段log segment的尾部tail读取,
      这不是执行消息重复数据消除msg deduplication的方法;

    • B:它不会阻止我们从Kafka中读取重复的数据,prevent reading duplicate;

    • 若消费者重启,可能会看到重复数据,
      这是基于at-least-once reading semantic;;

2.4. Log Compaction Work

  • 若kafka启动时启用compaction,每个broker将启动一个compaction manager
    thread和多个compaction thread,这些线程负责compaction task;

LogCompactionCleanerThread
  • cleaner thread从最旧的段oldest segment开始,并检查其内容,
    活动分段active segment不受(left untouch) cleaner thread的影响;

  • 若它刚刚读取的消息仍是Key的最新消息,它会将消息复制到替换段copy
    over msg to replacement segment,否则,它会omit该消息,
    因分区中稍后会有一个具有identical key但具有较新值的消息;

一旦干净的线程,我们就将替换段替换为原始段,然后继续下一个段。
在该过程结束时,每个键只剩下一条消息,即具有最新值的消息

  • 一旦cleaner thread复制(copy over)所有仍包含其Key的最新值的消息,
    我们将替换段替换为原始段,然后继续下一段,那每个键只剩下一条消息,即具有最新值的消息;

  • swap replacement segment for original and move on to next segment;

  • Ordering Intact:在cleaner thread完成工作后,
    将从旧的段创建/组合新的段,偏移量保持不变,也保持基于Key的排序;

  • new segment create/combine from old segment after cleaner thread,
    offset left untouch and key-based ordering as well;

2.5. Log Compaction Config

  • log.cleanup.policy=compact:还有其他配置会影响log compaction的频率;

  • log.cleaner.enable:可禁用log compaction,
    不推荐,这会影响__consumer_offsets主题,若需要,可暂时禁用;

  • log.cleaner.threads:用于log cleaning的background thread;

  • log.segment.ms:默认7天,关闭活动段的最长等待时间,
    max amount of time to wait to close aactive segment;

  • log.segment.bytes:默认1G,单个元数据日志文件的最大大小,
    max size of single metadata log file;

  • log.cleaner.delete.retention.ms:默认24小时,
    控制删除记录和事务标记marker符合删除条件后保留的时间,
    用于确保同时读取日志的消费者有机会在删除这些记录前读取这些记录;

  • log.cleaner.backoff.ms:默认15秒,没有要清理的日志log时的sleep时间;

  • log.cleaner.min.com.paction.lag.ms:默认0,防止更新到最短消息期限的消息被
    compaction,若未设置,则除最后一个段(即当前正在写入的段)外,所有日志段都有资格compaction;

  • 即使active segment的所有消息都早于minimum compaction time lag,也不会compact活动段,
    log cleaner可配置为确保最大延迟,uncompact "head" log有资格eligible log compaction;

  • log.cleaner.max.compaction.lag.ms:默认infinite,防止low produce rate
    from remaining ineligible for compaction for unbound duration;

  • 若未设置,则不会压缩不超过min.cleanable.dirty.ratio的日志;

  • 注:此压缩截止日期并非硬性保证,因它仍受日志清理线程的可用性和实际压缩时间的影响,
    将需监控uncleanable-partitions-count、
    max-clean-time-secs和max-compaction-delay-secs指标;

  • min.cleanable.dirty.ratio:默认0.5,有资格进行清理的日志的脏日志dirty
    与总日志的最小比率;若还已指定log.cleaner.max.compaction.lag.ms或
    log.cleaner.min.compaction.lag.ms,则log compactor在以下情况认为日志符合压缩条件:

  • A:已达脏比率阈值threshold,且该日志至少在log.cleaner.min.com.Paction.lag.ms
    的持续时间内有脏dirty(uncompact))未压缩记录;

  • B:该日志最多在log.clean.mix.com.pacation.lag.mss的时间内有不脏(未处理)记录;

2.6. Log Compaction Tombstone

  • Compaction允许删除delete,带有Key和null payload的消息
    将被视为从日志中删除,这样的记录有时被称为tombstone;

  • 此删除标记将导致删除任何以前带有该Key的消息,就像删除任何新消息一样,
    但删除标记将在一段时间后从日志中清除clean out,以释放free up空间;

  • 从日志开始的任何消费者都将至少看到所有记录的最终状态(按写入顺序),另若消费者在短于
    less than主题的delete.retension.ms(默认24小时)的时间段内到达日志的顶端head;

  • 则会看到已删除记录的所有删除标记,换言之:因删除标记与读取同时发生,
    故若延迟lag超过delete.retension.ms,则用户可能会错过删除标记;

3. Log Compaction Practice

  • 我们想保留员工的最新工资,不想知道员工的旧工资,创建employee-salary主题;

LogCompaction
  • Win用户,若不能使用WSL2,则不能使用log compaction;

  • 令Win还由长期存在的Bug(long-standing bug (KAFKA-1194)),
    若使用日志清理log cleaning,就会导致Kafka奔溃,类删除主题;

  • 从奔溃crash中恢复recover的唯一方法是手动删除log.dirs目录中的文件夹;

  1. This enables log compaction for the topic.

    • 创建employee salary的log-compact主题,使用单分区,复制因子为1:

    • 分区数使用1,确保所有消息都进入同一分区;

    • cleanup.policy=compact,这将启用该主题的log compaction;

    • min.cleanable.dirty.ratio=0.001,为确保始终触发log cleanup;

    • segment.ms=5000,每5秒将创建一个新片段segment,
      log compaction仅发生在closed segment;

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--create --topic employee-salary \
    --partitions 1 --replication-factor 1 \
    --config cleanup.policy=compact \
    --config min.cleanable.dirty.ratio=0.001 \
    --config segment.ms=5000
  • 描述主题以确保正确应用配置:

./kafka-topics.sh --bootstrap-server 192.168.0.123:9092 \
	--describe --topic employee-salary
  • 启动kafka console consumer,显示键和值,并用逗号分隔:

./kafka-console-consumer.sh --bootstrap-server 192.168.0.123:9092 \
	--topic employee-salary \
    --from-beginning \
    --property print.key=true \
    --property key.separator=,
  • 启动另一个shell终端来创建kafka console producer,
    想发送消息的Key,键和值之间的分隔符是逗号:

./kafka-console-producer.sh --bootstrap-server 192.168.0.123:9092 \
    --topic employee-salary \
    --property parse.key=true \
    --property key.separator=,
  • 生成一些具有重复Key的消息,以下消息中,Key是elf,值是salary:1000;

elf,salary: 10000
pal,salary: 20000
bob,salary: 20000
elf,salary: 25000
pal,salary: 30000
elf,salary: 30000
  • 请稍等,然后再生成一些消息,可能是相同的消息:

fly,salary: 0
  • 停止kafka console consumer,并启动一个新的消费者,
    我们从一开始旧获取所有消息,将只看到具有相应最新值的唯一Key;

bob,salary: 20000
pal,salary: 30000
elf,salary: 30000
fly,salary: 0
  • log compaction将自动在后台进行,不能显示触发,
    但可使用log compaction properties来控制它的触发频率;