博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink taskmanager的jvm-exit-on-oom配置
阅读量:6701 次
发布时间:2019-06-25

本文共 21358 字,大约阅读时间需要 71 分钟。

本文主要研究一下flink taskmanager的jvm-exit-on-oom配置

taskmanager.jvm-exit-on-oom

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolvingpublic class TaskManagerOptions {    //......    /**     * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.     */    public static final ConfigOption
KILL_ON_OUT_OF_MEMORY = key("taskmanager.jvm-exit-on-oom") .defaultValue(false) .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); //......}
  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager

TaskManagerConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class);    private final int numberSlots;    private final String[] tmpDirectories;    private final Time timeout;    // null indicates an infinite duration    @Nullable    private final Time maxRegistrationDuration;    private final Time initialRegistrationPause;    private final Time maxRegistrationPause;    private final Time refusedRegistrationPause;    private final UnmodifiableConfiguration configuration;    private final boolean exitJvmOnOutOfMemory;    private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;    private final String[] alwaysParentFirstLoaderPatterns;    @Nullable    private final String taskManagerLogPath;    @Nullable    private final String taskManagerStdoutPath;    public TaskManagerConfiguration(        int numberSlots,        String[] tmpDirectories,        Time timeout,        @Nullable Time maxRegistrationDuration,        Time initialRegistrationPause,        Time maxRegistrationPause,        Time refusedRegistrationPause,        Configuration configuration,        boolean exitJvmOnOutOfMemory,        FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,        String[] alwaysParentFirstLoaderPatterns,        @Nullable String taskManagerLogPath,        @Nullable String taskManagerStdoutPath) {        this.numberSlots = numberSlots;        this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);        this.timeout = Preconditions.checkNotNull(timeout);        this.maxRegistrationDuration = maxRegistrationDuration;        this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);        this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);        this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);        this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));        this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;        this.classLoaderResolveOrder = classLoaderResolveOrder;        this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;        this.taskManagerLogPath = taskManagerLogPath;        this.taskManagerStdoutPath = taskManagerStdoutPath;    }    public int getNumberSlots() {        return numberSlots;    }    public Time getTimeout() {        return timeout;    }    @Nullable    public Time getMaxRegistrationDuration() {        return maxRegistrationDuration;    }    public Time getInitialRegistrationPause() {        return initialRegistrationPause;    }    @Nullable    public Time getMaxRegistrationPause() {        return maxRegistrationPause;    }    public Time getRefusedRegistrationPause() {        return refusedRegistrationPause;    }    @Override    public Configuration getConfiguration() {        return configuration;    }    @Override    public String[] getTmpDirectories() {        return tmpDirectories;    }    @Override    public boolean shouldExitJvmOnOutOfMemoryError() {        return exitJvmOnOutOfMemory;    }    public FlinkUserCodeClassLoaders.ResolveOrder getClassLoaderResolveOrder() {        return classLoaderResolveOrder;    }    public String[] getAlwaysParentFirstLoaderPatterns() {        return alwaysParentFirstLoaderPatterns;    }    @Nullable    public String getTaskManagerLogPath() {        return taskManagerLogPath;    }    @Nullable    public String getTaskManagerStdoutPath() {        return taskManagerStdoutPath;    }    // --------------------------------------------------------------------------------------------    //  Static factory methods    // --------------------------------------------------------------------------------------------    public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {        int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);        if (numberSlots == -1) {            numberSlots = 1;        }        final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);        final Time timeout;        try {            timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());        } catch (Exception e) {            throw new IllegalArgumentException(                "Invalid format for '" + AkkaOptions.ASK_TIMEOUT.key() +                    "'.Use formats like '50 s' or '1 min' to specify the timeout.");        }        LOG.info("Messages have a max timeout of " + timeout);        final Time finiteRegistrationDuration;        try {            Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));            if (maxRegistrationDuration.isFinite()) {                finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis());            } else {                finiteRegistrationDuration = null;            }        } catch (NumberFormatException e) {            throw new IllegalArgumentException("Invalid format for parameter " +                TaskManagerOptions.REGISTRATION_TIMEOUT.key(), e);        }        final Time initialRegistrationPause;        try {            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));            if (pause.isFinite()) {                initialRegistrationPause = Time.milliseconds(pause.toMillis());            } else {                throw new IllegalArgumentException("The initial registration pause must be finite: " + pause);            }        } catch (NumberFormatException e) {            throw new IllegalArgumentException("Invalid format for parameter " +                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);        }        final Time maxRegistrationPause;        try {            Duration pause = Duration.create(configuration.getString(                TaskManagerOptions.REGISTRATION_MAX_BACKOFF));            if (pause.isFinite()) {                maxRegistrationPause = Time.milliseconds(pause.toMillis());            } else {                throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause);            }        } catch (NumberFormatException e) {            throw new IllegalArgumentException("Invalid format for parameter " +                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);        }        final Time refusedRegistrationPause;        try {            Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));            if (pause.isFinite()) {                refusedRegistrationPause = Time.milliseconds(pause.toMillis());            } else {                throw new IllegalArgumentException("The refused registration pause must be finite: " + pause);            }        } catch (NumberFormatException e) {            throw new IllegalArgumentException("Invalid format for parameter " +                TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);        }        final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);        final String classLoaderResolveOrder =            configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);        final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration);        final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));        final String taskManagerStdoutPath;        if (taskManagerLogPath != null) {            final int extension = taskManagerLogPath.lastIndexOf('.');            if (extension > 0) {                taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";            } else {                taskManagerStdoutPath = null;            }        } else {            taskManagerStdoutPath = null;        }        return new TaskManagerConfiguration(            numberSlots,            tmpDirPaths,            timeout,            finiteRegistrationDuration,            initialRegistrationPause,            maxRegistrationPause,            refusedRegistrationPause,            configuration,            exitOnOom,            FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),            alwaysParentFirstLoaderPatterns,            taskManagerLogPath,            taskManagerStdoutPath);    }}
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性

Task

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {    //......    @Override    public void run() {        // ----------------------------        //  Initial State transition        // ----------------------------        //......        // all resource acquisitions and registrations from here on        // need to be undone in the end        Map
> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { //...... // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { // unwrap wrapped exceptions to make stack traces more compact if (t instanceof WrappingRuntimeException) { t = ((WrappingRuntimeException) t).unwrap(); } // ---------------------------------------------------------------- // the execution failed. either the invokable code properly failed, or // an exception was thrown as a side effect of cancelling // ---------------------------------------------------------------- try { // check if the exception is unrecoverable if (ExceptionUtils.isJvmFatalError(t) || (t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) { // terminate the JVM immediately // don't attempt a clean shutdown, because we cannot expect the clean shutdown to complete try { LOG.error("Encountered fatal error {} - terminating the JVM", t.getClass().getName(), t); } finally { Runtime.getRuntime().halt(-1); } } // transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED // loop for multiple retries during concurrent state changes via calls to cancel() or // to failExternally() while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(invokable); break; } } else { if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause failureCause = t; cancelInvokable(invokable); break; } } } else if (current == ExecutionState.CANCELING) { if (transitionState(current, ExecutionState.CANCELED)) { break; } } else if (current == ExecutionState.FAILED) { // in state failed already, no transition necessary any more break; } // unexpected state, go to failed else if (transitionState(current, ExecutionState.FAILED, t)) { LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { //...... } } //......}
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM

ExceptionUtils.isJvmFatalError

flink-1.7.2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java

@Internalpublic final class ExceptionUtils {    //......    /**     * Checks whether the given exception indicates a situation that may leave the     * JVM in a corrupted state, meaning a state where continued normal operation can only be     * guaranteed via clean process restart.     *     * 

Currently considered fatal exceptions are Virtual Machine errors indicating * that the JVM is corrupted, like {@link InternalError}, {@link UnknownError}, * and {@link java.util.zip.ZipError} (a special case of InternalError). * The {@link ThreadDeath} exception is also treated as a fatal error, because when * a thread is forcefully stopped, there is a high chance that parts of the system * are in an inconsistent state. * * @param t The exception to check. * @return True, if the exception is considered fatal to the JVM, false otherwise. */ public static boolean isJvmFatalError(Throwable t) { return (t instanceof InternalError) || (t instanceof UnknownError) || (t instanceof ThreadDeath); } //......}

  • isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true

Runtime.getRuntime().halt

java.base/java/lang/Runtime.java

public class Runtime {    //......    private static final Runtime currentRuntime = new Runtime();    /**     * Returns the runtime object associated with the current Java application.     * Most of the methods of class {@code Runtime} are instance     * methods and must be invoked with respect to the current runtime object.     *     * @return  the {@code Runtime} object associated with the current     *          Java application.     */    public static Runtime getRuntime() {        return currentRuntime;    }    /**     * Forcibly terminates the currently running Java virtual machine.  This     * method never returns normally.     *     * 

This method should be used with extreme caution. Unlike the * {@link #exit exit} method, this method does not cause shutdown * hooks to be started. If the shutdown sequence has already been * initiated then this method does not wait for any running * shutdown hooks to finish their work. * * @param status * Termination status. By convention, a nonzero status code * indicates abnormal termination. If the {@link Runtime#exit exit} * (equivalently, {@link System#exit(int) System.exit}) method * has already been invoked then this status code * will override the status code passed to that method. * * @throws SecurityException * If a security manager is present and its * {@link SecurityManager#checkExit checkExit} method * does not permit an exit with the specified status * * @see #exit * @see #addShutdownHook * @see #removeShutdownHook * @since 1.3 */ public void halt(int status) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkExit(status); } Shutdown.beforeHalt(); Shutdown.halt(status); } //......}

  • halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

小结

  • taskmanager.jvm-exit-on-oom配置默认为false,用于指定当task线程抛出OutOfMemoryError的时候,是否需要kill掉TaskManager
  • TaskManagerConfiguration的静态方法fromConfiguration通过configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY)读取exitOnOom,然后传到构造器中的exitJvmOnOutOfMemory属性;同时提供了shouldExitJvmOnOutOfMemoryError方法来读取exitJvmOnOutOfMemory属性
  • Task实现了Runnable接口,其run方法对invokable.invoke()进行了try catch,在catch的时候会判断,如果是ExceptionUtils.isJvmFatalError(t)或者(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()),则会调用Runtime.getRuntime().halt(-1)来停止JVM;isJvmFatalError方法判断Throwable是否是InternalError或者UnknownError或者ThreadDeath,如果是则返回true;halt方法在SecurityManager不为null是会调用SecurityManager.checkExit;然后调用Shutdown.beforeHalt()以及Shutdown.halt(status)来停止JVM

doc

转载地址:http://gjwlo.baihongyu.com/

你可能感兴趣的文章
ASP.NET Core 2加入了Razor页面特性
查看>>
Idris趋近发布1.0版
查看>>
微服务之旅的经验分享
查看>>
访谈:关于持续敏捷交付与服务矩阵
查看>>
Dependabot:自动创建GitHub PR修复潜在漏洞
查看>>
ticketea如何从一体化转向多体化架构
查看>>
树莓派第三代跨越发展,采用64位处理器内建WiFi及蓝牙
查看>>
如何选取Linux容器镜像
查看>>
姜宁谈红帽绩效考核:不关心员工具体做什么
查看>>
微软推出VS Code新特性,为TypeScript和JavaScript用户提供AI辅助开发功能
查看>>
自己实现MVVM(Vue源码解析)
查看>>
从有到优:百度前端接入技术的升级之路
查看>>
实现TeX的算法:回首编程技术的过去三十年
查看>>
JVM很重吗?
查看>>
Chrome 42禁用NPAPI和相关插件:Java、Unity和Silverlight
查看>>
LinkedIn庄振运:从国家部委公务员到硅谷系统性能专家,创新是唯一主旋律
查看>>
携程基于云的软呼叫中心及客服平台架构实践\n
查看>>
Eclipse基金会发布MicroProfile 2.2,适用于Java微服务
查看>>
QLoo推出用于现有服务的GraphQL接口
查看>>
Cordova是否适用于你的目标行业?
查看>>