Flink API 解析 Flink Job 依赖的checkpoint 路径

news/2025/2/25 12:06:20
引言

之前写一篇 Python 脚本解析 Flink _metadata 中依赖的 checkpoint 路径文章 Python解析 Flink Job 依赖的checkpoint 路径
,代码比较暴力,直接按照 checkpoint 路径前缀判断,最近发现网上有通过 Flink API 解析 Flink Checkpoint 元数据代码的例子,参考了网上代码,并调试运行成功。

实现代码

import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.*;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;

import java.io.*;
import java.util.HashSet;
import java.util.Set;

/*
* @author: david.zhou
* @Flink Version: 1.14.4
* @Date: 2025/2/24 11:25
* @Description: Flink API 解析 Checkpoint 路径
* */

public class CheckpointMetadataParser {


    private static Set<String> ckPath = new HashSet<>();


    public static void main(String[] args) throws IOException {


        //  读取元数据文件
        File f=new File("/tmp/_metadata");
        FileInputStream fis=new FileInputStream(f);
        BufferedInputStream bis = new BufferedInputStream(fis);
        DataInputStream dis = new DataInputStream(bis);


        // 通过 Flink 的 Checkpoints 类解析元数据文件
        CheckpointMetadata savepoint = Checkpoints.loadCheckpointMetadata(dis,
                CheckpointMetadataParser.class.getClassLoader(), f.getAbsolutePath());
        // 打印当前的 CheckpointId
        System.out.println("CheckpointId:" + savepoint.getCheckpointId());

        // 遍历 OperatorState,这里的每个 OperatorState 对应一个 Flink 任务的 Operator 算子
        // 不要与 O
        // peratorState  和 KeyedState 混淆,不是一个层级的概念
        for(OperatorState operatorState :savepoint.getOperatorStates()) {
            //System.out.println(operatorState);
            // 当前算子的状态大小为 0 ,表示算子不带状态,直接退出
            if(operatorState.getStateSize() == 0){
                continue;
            }

            // 遍历当前算子的所有 subtask
            for(OperatorSubtaskState operatorSubtaskState: operatorState.getStates()) {
                // 解析 operatorSubtaskState 的 ManagedKeyedState
                parseManagedKeyedState(operatorSubtaskState);
                // 解析 operatorSubtaskState 的 ManagedOperatorState
                parseManagedOperatorState(operatorSubtaskState);
            }
        }

        for(String path: ckPath) {
            System.out.println("sstable 文件对应的 hdfs 位置:" + path);
        }
    }


    /**
     * 解析 operatorSubtaskState 的 ManagedKeyedState
     * @param operatorSubtaskState operatorSubtaskState
     */
    private static void parseManagedKeyedState(OperatorSubtaskState operatorSubtaskState) {
        // 遍历当前 subtask 的 KeyedState
        for(KeyedStateHandle keyedStateHandle:operatorSubtaskState.getManagedKeyedState()) {
            // 处理增量 Checkpoint
            if(keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
                IncrementalRemoteKeyedStateHandle incrementalStateHandle =
                        (IncrementalRemoteKeyedStateHandle) keyedStateHandle;

                // 获取 RocksDB 的 sharedState
                for (StateHandleID stateHandleID : incrementalStateHandle.getSharedStateHandleIDs()) {
                    StreamStateHandle stateHandle = incrementalStateHandle.getSharedState().get(stateHandleID);
                    //System.out.println("sstable 文件名:" + stateHandleID);
                    if (stateHandle instanceof FileStateHandle) {
                        Path filePath = ((FileStateHandle) stateHandle).getFilePath();
                        //System.out.println("filePath = " + filePath);
                        String ckSubPath = filePath.getPath().substring(0, filePath.getPath().indexOf("/shared"));
                        ckPath.add(ckSubPath);
                    }
                }
            }
        }
    }



    /**
     * 解析 operatorSubtaskState 的 ManagedOperatorState
     * @param operatorSubtaskState operatorSubtaskState
     */
    private static void parseManagedOperatorState(OperatorSubtaskState operatorSubtaskState) {
        // 遍历当前 subtask 的 OperatorState
        for(OperatorStateHandle operatorStateHandle:operatorSubtaskState.getManagedOperatorState()) {
            StreamStateHandle delegateStateHandle = operatorStateHandle.getDelegateStateHandle();
            if(delegateStateHandle instanceof FileStateHandle) {
                Path filePath = ((FileStateHandle) delegateStateHandle).getFilePath();
                //System.out.println("filePath: " + filePath.getPath());
            }
        }
    }


}
结果对比

对比了之前 Python 脚本demo,发现有结果不太一样,初步看 Python 中暴力解析部分元数据可能无效的。后续持续观察中,看看差异点。


http://www.niftyadmin.cn/n/5865477.html

相关文章

【JavaEE进阶】Spring Boot配置文件

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗 如有错误&#xff0c;欢迎指出~ 目录 SpringBoot配置⽂件 举例: 通过配置文件修改端口号 配置⽂件的格式 properties基本语法 读取配置⽂件 properties配置文件的缺点 yml配置⽂件 yml基本语法 yml和proper…

【搭建SigNoz性能监控平台】在Ubuntu上快速搭建高效的SigNoz性能监控平台与远程使用技巧

文章目录 前言1.关于SigNoz2.本地部署SigNoz3.SigNoz简单使用4. 安装内网穿透5.配置SigNoz公网地址6. 配置固定公网地址 前言 本文介绍如何在Ubuntu系统上使用 Docker 快速部署一款强大的应用性能监控工具SigNoz&#xff0c;并结合cpolar内网穿透工具轻松实现异地远程使用。 …

插入排序:一种简单而直观的排序算法

大家好&#xff01;今天我们来聊聊一个简单却非常经典的排序算法——插入排序&#xff08;Insertion Sort&#xff09;。在所有的排序算法中&#xff0c;插入排序是最直观的一个。 一、插入排序的基本思想 插入排序的核心思想是&#xff1a;将一个待排序的元素&#xff0c;插…

独立开发者Product Hunt打榜教程

Product Hunt 是创业者和开发者展示新产品的地方&#xff0c;对于独立开发者来说&#xff0c;打榜不仅仅是展示产品的良机&#xff0c;更是提高品牌知名度和获取早期用户的重要途径。以下是独立开发者如何在Product Hunt上打榜的详细教程&#xff1a; 1. 产品准备阶段 确保产…

conda 基本命令

1、查询当前所有的环境 conda env list 2、创建虚拟环境 conda create -n 环境名 [pythonpython版本号] 其中[pythonpython版本号]可以不写 conda create -n test python3.12 我们输入conda env list看到我们的环境创建成功了&#xff0c;但是发现他是创建在我们默认的C盘的…

基础知识3

文章目录 MySQL的执行引擎有哪些&#xff1f;1. **InnoDB**2. **MyISAM**3. **Memory**4. **Archive**5. **CSV**6. **Blackhole**7. **Federated**8. **NDB Cluster**9. **其他存储引擎**总结 MySQL为什么使用B树来作索引1. **InnoDB**2. **MyISAM**3. **Memory**4. **Archive…

Threejs教程三【揭秘3D贴图魔法】

定义 贴图&#xff08;Texture&#xff09;是 Three.js 中用于为物体表面添加纹理的一种技术。它可以将图像、视频或其他类型的媒体映射到物体的表面&#xff0c;使其看起来更加真实和生动。 基本原理 贴图的基本原理是将图像或视频映射到物体的表面&#xff0c;使其看起来更…

[java基础-JVM篇]3_JVM类加载机制

摘要&#xff1a;JVM通过设立不同优先级和职责的加载器保证了类加载的安全性与灵活性&#xff0c;即双亲委派机制&#xff0c;但是实际生产中更复杂的需求又需要破坏双亲委派&#xff0c;即打破JVM约定过的类加载程序 目录 类的生命周期 类加载 加载 类加载器 双亲委派机制…