您当前的位置: 首页 >  flink

杨林伟

暂无认证

  • 2浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink Checkpoint所有配置解读

杨林伟 发布时间:2022-04-19 14:06:08 ,浏览量:2

配置类在:org.apache.flink.configuration.CheckpointingOptions

配置解析:

配置类型默认值描述state.backendString无检查点存储,用于在执行过程中存储操作符的本地状态state.checkpoint-storageString无用于恢复检查点状态的检查点存储。state.backend.changelog.enabledBooleanfalse是否开启状态变更日志state.checkpoints.num-retainedInteger1保留的已完成检查点的最大数目state.backend.asyncBooleantrue已弃用,所有状态快照都是异步的。state.backend.incrementalBooleanfalse状态后端是否应该创建增量检查点,如果允许,对于增量检查点,存储的只是与前一个检查点不同的部分,而不是完整的检查点状态。state.backend.local-recoveryBooleanfalse状态后端配置本地恢复,默认本地恢复处于去激活状态。taskmanager.state.local.root-dirsString无config参数定义根目录,用于存储基于文件的状态,用于本地恢复。state.savepoints.dirString无保存点的默认目录。由状态后端使用,向文件系统写入保存点(HashMapStateBackend, EmbeddedRocksDBStateBackend)state.checkpoints.dirString无用于在Flink支持的文件系统中存储数据文件和检查点元数据的默认目录。存储路径必须能够访问所有参与的进程/节点(即:所有TaskManagers和JobManagers)。state.storage.fs.memory-thresholdMemorySize20kb状态数据文件的最小大小。所有小于此值的状态块都内联存储在根检查点元数据文件中。state.storage.fs.write-buffer-sizeInteger4 * 1024写入文件系统的检查点流的写入缓冲区的默认大小。

配置代码如下:

/** A collection of all configuration options that relate to checkpoints and savepoints. */
public class CheckpointingOptions {

    // ------------------------------------------------------------------------
    //  general checkpoint options
    // ------------------------------------------------------------------------

    /**
     * The checkpoint storage used to store operator state locally within the cluster during
     * execution.
     *
     * 

The implementation can be specified either via their shortcut name, or via the class name * of a {@code StateBackendFactory}. If a StateBackendFactory class name is specified, the * factory is instantiated (via its zero-argument constructor) and its {@code * StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. * *

Recognized shortcut names are 'hashmap' and 'rocksdb'. * * @deprecated Use {@link StateBackendOptions#STATE_BACKEND}. */ @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS) @Documentation.ExcludeFromDocumentation("Hidden for deprecated") @Deprecated public static final ConfigOption STATE_BACKEND = ConfigOptions.key("state.backend") .stringType() .noDefaultValue() .withDescription( Description.builder() .text("The state backend to be used to store state.") .linebreak() .text( "The implementation can be specified either via their shortcut " + " name, or via the class name of a %s. " + "If a factory is specified it is instantiated via its " + "zero argument constructor and its %s " + "method is called.", TextElement.code("StateBackendFactory"), TextElement.code( "StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)")) .linebreak() .text("Recognized shortcut names are 'hashmap' and 'rocksdb'.") .build()); /** * The checkpoint storage used to checkpoint state for recovery. * *

The implementation can be specified either via their shortcut name, or via the class name * of a {@code CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified, * the factory is instantiated (via its zero-argument constructor) and its {@code * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called. * *

Recognized shortcut names are 'jobmanager' and 'filesystem'. */ @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2) public static final ConfigOption CHECKPOINT_STORAGE = ConfigOptions.key("state.checkpoint-storage") .stringType() .noDefaultValue() .withDescription( Description.builder() .text( "The checkpoint storage implementation to be used to checkpoint state.") .linebreak() .text( "The implementation can be specified either via their shortcut " + " name, or via the class name of a %s. " + "If a factory is specified it is instantiated via its " + "zero argument constructor and its %s " + " method is called.", TextElement.code("CheckpointStorageFactory"), TextElement.code( "CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)")) .linebreak() .text( "Recognized shortcut names are 'jobmanager' and 'filesystem'.") .build()); /** Whether to enable state change log. */ @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS) @Documentation.ExcludeFromDocumentation("Hidden for now") public static final ConfigOption ENABLE_STATE_CHANGE_LOG = ConfigOptions.key("state.backend.changelog.enabled") .booleanType() .defaultValue(false) .withDescription( "Whether to enable state backend to write state changes to StateChangelog."); /** The maximum number of completed checkpoints to retain. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption MAX_RETAINED_CHECKPOINTS = ConfigOptions.key("state.checkpoints.num-retained") .defaultValue(1) .withDescription("The maximum number of completed checkpoints to retain."); /** @deprecated Checkpoints are aways asynchronous. */ @Deprecated public static final ConfigOption ASYNC_SNAPSHOTS = ConfigOptions.key("state.backend.async") .booleanType() .defaultValue(true) .withDescription("Deprecated option. All state snapshots are asynchronous."); /** * Option whether the state backend should create incremental checkpoints, if possible. For an * incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the * complete checkpoint state. * *

Once enabled, the state size shown in web UI or fetched from rest API only represents the * delta checkpoint size instead of full checkpoint size. * *

Some state backends may not support incremental checkpoints and ignore this option. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption INCREMENTAL_CHECKPOINTS = ConfigOptions.key("state.backend.incremental") .defaultValue(false) .withDescription( "Option whether the state backend should create incremental checkpoints, if possible. For" + " an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the" + " complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API" + " only represents the delta checkpoint size instead of full checkpoint size." + " Some state backends may not support incremental checkpoints and ignore this option."); /** * This option configures local recovery for this state backend. By default, local recovery is * deactivated. * *

Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend * and HashMapStateBackend do not support local recovery and ignore this option. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption LOCAL_RECOVERY = ConfigOptions.key("state.backend.local-recovery") .defaultValue(false) .withDescription( "This option configures local recovery for this state backend. By default, local recovery is " + "deactivated. Local recovery currently only covers keyed state backends. Currently, the MemoryStateBackend " + "does not support local recovery and ignores this option."); /** * The config parameter defining the root directories for storing file-based state for local * recovery. * *

Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend * does not support local recovery and ignore this option. */ @Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS) public static final ConfigOption LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS = ConfigOptions.key("taskmanager.state.local.root-dirs") .noDefaultValue() .withDescription( "The config parameter defining the root directories for storing file-based state for local " + "recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does " + "not support local recovery and ignore this option"); // ------------------------------------------------------------------------ // Options specific to the file-system-based state backends // ------------------------------------------------------------------------ /** * The default directory for savepoints. Used by the state backends that write savepoints to * file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). */ @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 3) public static final ConfigOption SAVEPOINT_DIRECTORY = ConfigOptions.key("state.savepoints.dir") .noDefaultValue() .withDeprecatedKeys("savepoints.state.backend.fs.dir") .withDescription( "The default directory for savepoints. Used by the state backends that write savepoints to" + " file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend)."); /** * The default directory used for storing the data files and meta data of checkpoints in a Flink * supported filesystem. The storage path must be accessible from all participating * processes/nodes(i.e. all TaskManagers and JobManagers). */ @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2) public static final ConfigOption CHECKPOINTS_DIRECTORY = ConfigOptions.key("state.checkpoints.dir") .stringType() .noDefaultValue() .withDeprecatedKeys("state.backend.fs.checkpointdir") .withDescription( "The default directory used for storing the data files and meta data of checkpoints " + "in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes" + "(i.e. all TaskManagers and JobManagers)."); /** * The minimum size of state data files. All state chunks smaller than that are stored inline in * the root checkpoint metadata file. */ @Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS) public static final ConfigOption FS_SMALL_FILE_THRESHOLD = ConfigOptions.key("state.storage.fs.memory-threshold") .memoryType() .defaultValue(MemorySize.parse("20kb")) .withDescription( "The minimum size of state data files. All state chunks smaller than that are stored" + " inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.") .withDeprecatedKeys("state.backend.fs.memory-threshold"); /** * The default size of the write buffer for the checkpoint streams that write to file systems. */ @Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS) public static final ConfigOption FS_WRITE_BUFFER_SIZE = ConfigOptions.key("state.storage.fs.write-buffer-size") .intType() .defaultValue(4 * 1024) .withDescription( String.format( "The default size of the write buffer for the checkpoint streams that write to file systems. " + "The actual write buffer size is determined to be the maximum of the value of this option and option '%s'.", FS_SMALL_FILE_THRESHOLD.key())) .withDeprecatedKeys("state.backend.fs.write-buffer-size"); }

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.1862s