flink kafka的enableCommitOnCheckpoints 和 enable.auto.commit 参数

背景

每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰,本文就来从源码看看这两个参数的作用

enableCommitOnCheckpoints 和 enable.auto.commit参数

1.FlinkKafkaConsumerBase的open方法,查看offsetCommitMode的赋值

public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

}

2.OffsetCommitModes.fromConfiguration方法

public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {

if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}

从这个代码可知,enableCommitOnCheckpoint 和 enableAutoCommit是不会同时存在的,也就是flink如果在checkpoint的时候提交偏移,他就肯定不会设置enableAutoCommit自动提交,反之亦然

enableCommitOnCheckpoint 提交偏移的关键代码

1.FlinkKafkaConsumerBase.snapshotState方法

public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();

final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
//  这里如果是checkpoint模式会在checkpoint的时候保存offset到状态中
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}

}

2.FlinkKafkaConsumerBase.notifyCheckpointComplete方法

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);

fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);

enable.auto.commit参数

1.KafkaConsumerThread.run线程

if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}

2.KafkaConsumer的poll方法

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
//  updateAssignmentMetadataIfNeeded方法是关键
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());

return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}

3.KafkaConsumer.updateAssignmentMetadataIfNeeded方法

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (coordinator != null && !coordinator.poll(timer)) {
return false;
}

return updateFetchPositions(timer);
}

4.ConsumerCoordinator.poll方法

public boolean poll(Timer timer) {
maybeUpdateSubscriptionMetadata();

invokeCompletedOffsetCommitCallbacks();

if (subscriptions.partitionsAutoAssigned()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}

if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}

if (!client.ensureFreshMetadata(timer)) {
return false;
}

maybeUpdateSubscriptionMetadata();
}

if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
//  这里是重点
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}

5.ConsumerCoordinatormaybeAutoCommitOffsetsAsync方法

public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}

看到没,这里就是判断autoCommitEnabled的地方,这里如果打开了自动提交功能的话,就会进行offset的提交

特别重要的两点

1.kafkaconsumer当开始进行消费时,即使不提交任何偏移量,也不影响它消费消息,他还是能正常消费kafka主题的消息,这里提交偏移的主要作用在于当kafkaconsumer断线然后需要重连kafka broker进行消费时,此时它一般会从它最后提交的offset位置开始消费(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下),这才是consumer提交offset偏移的最大意义

2.对于flink来说,由于每次重启的时候,flink的consumer都会从checkpoint中把偏移取出来并设置,所以flink的consumer在消息消费过程中无论通过enableCommitOnCheckpoint 还是enableAutoCommit提交的偏移并没有意义,因为并没有使用到,它的意义只在于flink没有从checkpoint中启动时,此时flink的consumer才会从enableCommitOnCheckpoint 、enableAutoCommit提交的偏移开始消费消息(此时还依赖于没有设置startFromLatest,startFromEarliest,startFromTimeStamp的情况下)

参考文章:https://blog.csdn.net/qq_42009500/article/details/119875158

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/607866.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电商核心技术揭秘53:社群营销的策略与实施

相关系列文章 电商技术揭秘相关系列文章合集&#xff08;1&#xff09; 电商技术揭秘相关系列文章合集&#xff08;2&#xff09; 电商技术揭秘相关系列文章合集&#xff08;3&#xff09; 电商技术揭秘四十一&#xff1a;电商平台的营销系统浅析 电商技术揭秘四十二&#…

Python轻量级Web框架Flask(13)—— Flask个人博客项目

0、前言: ★这部分内容是基于之前Flask学习内容的一个实战项目梳理内容,没有可以直接抄下来跑的代码,是学习了之前Flask基础知识之后,再来看这部分内容,就会对Flask项目开发流程有更清楚的认知,对一些开发细节可以进一步的学习。项目功能,通过Flask制作个人博客。项目架…

YOLOv9中模块总结补充|RepNCSPELAN4详图

专栏地址&#xff1a;目前售价售价69.9&#xff0c;改进点70 专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 1. RepNCSPELAN4详图 RepNCSPELAN4是YOLOv9中的特征提取-融合模块&#xff0c;类似前几…

制造业的智慧进化:机器学习与人工智能的全方位渗透

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

libcity 笔记:添加自定义dataset

假设我们把libcity/data/dataset/trajectory_dataset.py复制一份到libcity/data/dataset/dataset_subclass/GeolifeDM_dataset.py&#xff0c;里面内容不变&#xff0c;只是把class的名字换了 那其他需要修改哪些内容&#xff0c;使得这个dataset生效呢 libcity/data/dataset/d…

国内唯一!阿里云荣膺MongoDB“2024年度DBaaS认证合作伙伴奖”

近日&#xff0c;在MongoDB用户大会纽约站上&#xff0c;阿里云荣膺MongoDB“2024年度DBaaS认证合作伙伴奖”。这是阿里云连续第五年斩获MongoDB合作伙伴奖项&#xff0c;也是唯一获此殊荣的中国云厂商。 MongoDB是当今全球最受欢迎的非关系型数据库之一。凭借灵活的模式和丰富…

Web3探索加密世界:如何避免限制并增加空投成功的几率

今天分享空投如何避免限制以提高效率&#xff0c;增加成功几率&#xff0c;首先我们来了解什么是空投加密&#xff0c;有哪些空投类型。 一、什么是空投加密&#xff1f; 加密货币空投是一种营销策略&#xff0c;包括向用户的钱包地址发送免费的硬币或代币。 加密货币项目使用…

【Linux】深浅睡眠状态超详解!!!

1.浅度睡眠状态【S】&#xff08;挂起&#xff09; ——S (sleeping)可中断睡眠状态 进程因等待某个条件&#xff08;如 I/O 完成、互斥锁释放或某个事件发生&#xff09;而无法继续执行。在这种情况下&#xff0c;进程会进入阻塞状态&#xff0c;在阻塞状态下&#xff0c;进程…

Python批量修改图片文件名中的指定名称

批量处理图像时&#xff0c;图片名有时需要统一&#xff0c;本教程仅针对图片中名如&#xff1a;0001x4.png&#xff0c;批量将图片名中的x4去除&#xff0c;只留下0001.png的情况。 如果想要按照原图片顺序批量修改图片名&#xff0c;参考其它博文&#xff1a;按照原顺序批量…

Joplin:自由、安全、多功能的笔记应用

什么是 Joplin&#xff1f; Joplin是一款免费、开源的笔记和待办事项应用程序&#xff0c;可以处理整理到笔记本中的大量笔记。这些笔记是可搜索的&#xff0c;可以直接从应用程序或从您自己的文本编辑器中复制、标记和修改。笔记采用Markdown 格式 功能亮点 功能丰富&#x…

科技云报道:从亚运到奥运,大型国际赛事共赴“云端”

科技云报道原创。 “广播电视转播技术拯救了奥运会”前奥委会主席萨马兰奇这句话广为流传。 奥运会、世界杯、亚运会这样的全球大型体育赛事不仅是体育竞技的盛宴&#xff0c;也是商业盛宴&#xff0c;还是技术与人文的融合秀。随着科技的进步&#xff0c;技术在体育赛事中扮…

google地图js,添加标记,以及infowindow信息弹窗

&#xff08;谷歌地图版本V3&#xff09; var contentString "<div classdevinfo><P>设备ID: BJ-20240507</p> <P>设备状态: 正常</p> <P>通讯信号: 89% </p> <P>设备位置: 中国</p> <P>剂量率: 988</p&…

MATLAB 自定义实现点云随机抽稀方法(66)

MATLAB 自定义实现点云随机抽稀方法(66) 一、算法介绍二、算法实现1.代码2.结果三、数据链接一、算法介绍 MATLAB虽然提供了点云随机抽稀的内置函数,但是我们也可以自己实现这个功能,有助于理解,下面是具体的实现效果和代码(直接复制粘贴即可使用): 使用提供的数据直接…

Matlab如何导出高质量论文插图?科研效率UpUp第8期

当你用Matlab绘制了一张论文插图&#xff1a; 想要所见即所得&#xff0c;原封不动地将其保存下来&#xff0c;该怎么操作呢&#xff1f; 虽说以前总结过7种方法&#xff08;Matlab导出论文插图的7种方法&#xff09;&#xff0c;但要说哪一种可以满足上面的要求&#xff0c;想…

树(数据结构)

树的定义 一个根结点&#xff0c;其余结点分为 m 个不相交的集合&#xff0c; 其中每个集合本身又是一棵树&#xff0c;并且称为根的子树。 树的根结点没有前驱&#xff0c;其他结点有且仅有一个前驱。 所有结点可以有0个或多个后继。 基本术语 结点的度 树的度 &#xff1a; 树…

String类 StringBuffer 类 StringBuilder 类

String 类的理解和创建对象 1&#xff0c;String 对象用于保存字符串&#xff0c;也就是一组字符数列2&#xff0c;字符串常量对象是用双引号括起的字符序列。例如&#xff1a;“你好”、“12.97”、“boy”等3&#xff0c;字符串的字符使用Unicode字符编码&#xff0c;一个字…

与Apollo共创生态:助力自动驾驶迈向新台阶

引言Apollo七周年大会企业协同工具链携手伙伴共创生态未来展望与总结 引言 2024年4月19日&#xff0c;一场智能汽车未来的盛宴正朝我们走来——Apollo开放平台的七周年大会。 此次大会主题为“破晓•拥抱智变时刻”其中“破晓”象征着新时代的曙光&#xff0c;意味着智能汽车技…

phpstudy(MySQL启动又立马停止)问题的解决办法

方法一&#xff1a;查看本地安装的MySQL有没有启动 1.鼠标右击开始按钮选择计算机管理 2.点击服务和应用程序 3.找到服务双击 4.找到MySQL服务 5.双击查看是否启动&#xff0c;如启动则停止他&#xff0c;然后确定&#xff0c;重新打开phpstudy,启动Mysql. 方法二&#xff…

css的类名冲突--css module

css module {ignore} 通过命名规范来限制类名太过死板&#xff0c;而css in js虽然足够灵活&#xff0c;但是书写不便。 css module 开辟一种全新的思路来解决类名冲突的问题 思路 css module 遵循以下思路解决类名冲突问题&#xff1a; css的类名冲突往往发生在大型项目中大…

【16-Ⅱ】Head First Java 学习笔记

HeadFirst Java 本人有C语言基础&#xff0c;通过阅读Java廖雪峰网站&#xff0c;简单速成了java&#xff0c;但对其中一些入门概念有所疏漏&#xff0c;阅读本书以弥补。 第一章 Java入门 第二章 面向对象 第三章 变量 第四章 方法操作实例变量 第五章 程序实战 第六章 Java…
最新文章