[源码解析] 深度学习分布式训练框架 horovod (20) --- Elastic Training Operator

目录

0x00 摘要

Horovod 是一款基于 AllReduce 的分布式训练框架。凭借其对 TensorFlow、PyTorch 等主流深度学习框架的支持,以及通信优化等特点,Horovod 被广泛应用于数据并行的训练中。

本文是 horovod on k8s 的最后一篇,看看 MPI-Operator 可能被如何改进,主要就是根据 Elastic Training Operator 作者 团队的博客内容来学习源码。所以本文以大量源码为主。

本系列其他文章链接如下:

[源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识

[源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入

[源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么

[源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver

[源码解析] 深度学习分布式训练框架 horovod (5) --- 融合框架

[源码解析] 深度学习分布式训练框架 horovod (6) --- 后台线程架构

[源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer

[源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

[源码解析] 深度学习分布式训练框架 horovod (9) --- 启动 on spark

[源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

[源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

[源码解析] 深度学习分布式训练框架 horovod (12) --- 弹性训练总体架构

[源码解析] 深度学习分布式训练框架 horovod (13) --- 弹性训练之 Driver

[源码解析] 深度学习分布式训练框架 horovod (14) --- 弹性训练发现节点 & State

[源码解析] 深度学习分布式训练框架 horovod (15) --- 广播 & 通知

[源码解析] 深度学习分布式训练框架 horovod (16) --- 弹性训练之Worker生命周期

[源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错

[源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator

[源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错

[源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator

[源码解析] 深度学习分布式训练框架 horovod (19) --- kubeflow MPI-operator

0x01 背景知识

0x01, 0x02 两节均来自于 Elastic Training Operator 团队博客内容,这个博客真得很给力。

1.1 已有弹性能力

Kubernetes 和云计算提供敏捷性和伸缩性,我们可以通过 cluster-AutoScaler 等组件为训练任务设置弹性策略,利用 Kubernetes 的弹性能力,按需创建,减少 GPU 设备空转。

但这种伸缩模式面对训练这种离线任务还是略有不足:

  • 不支持容错,当部分 Worker 由于设备原因失败,整个任务需要停止重来。
  • 训练任务一般时间较长,占用算力大,任务缺少弹性能力。当资源不足时,除非任务终止,无法按需为其他业务腾出资源。
  • 训练任务时间较长,不支持 worker 动态配置, 无法安全地使用抢占实例,发挥云上最大性价比

如何给训练任务赋予弹性能力,是提高性价比的关键路径。近期 horovod 等分布式框架逐渐支持了 Elastic Training,即弹性训练能力。也就是允许一个训练任务在执行的过程中动态的扩容或者缩容训练 worker, 从不会引起训练任务的中断。需要在代码中做少量修改适配,可参考:https://horovod.readthedocs.io/en/stable/elastic_include.html。

1.2 mpi-operator 的缺点

在 mpi-operator 中,参与训练的 Worker 都是作为静态资源设计和维护,支持弹性训练模式后,给任务增加了灵活性,同时也给运维层带来了挑战,例如:

  • 必须通过 horovod 提供的 horovordrun 作为入口,horovod 中 launcher 通过 ssh 登陆 worker,需要打通 launcher 和 worker 之间的登陆隧道。
  • 负责计算弹性的 Elastic Driver 模块通过指定 discover_host 脚本获取最新 worker 拓扑信息,从而拉起或停止 worker 实例。当 worker 变化时,首先要更新 discover_host 脚本的返回值。
  • 在抢占或价格计算等场景中,有时需要指定 worker 缩容,K8s 原生的编排元语 deployment,statefulset 无法满足指定缩容的场景。

针对以上问题,我们设计开发了 et-operator,提供 TrainingJob CRD 描述训练任务, ScaleOut 和 ScaleIn CRD 描述扩容和缩容操作, 通过它们的组合,使我们的训练任务更具有弹性。将这个方案开源,欢迎大家提需求、交流、吐槽。

开源方案地址:https://github.com/AliyunContainerService/et-operator

0x02 总体架构

TrainingJob Controller 主要有以下功能:

  • 维护 TrainingJob 的创建/删除生命周期,以及子资源管理。
  • 执行扩缩容操作。
  • 容错,当 worker 被驱逐,创建新的 worker 加入到训练中。

2.1 资源创建

TrainingJob 子资源创建顺序如下:

  • 创建打通 ssh 所需的密钥对, 创建 secret。
  • 创建 workers,包含 service 和 pod,挂载 secret 公钥。
  • 创建 configmap, 包含 discover_host 脚本 , hostfile 文件。
  • 创建 launcher,挂载 configmap。由于 hostfile 后续会随着拓扑关系修改,所以 hostfile 单独通过 initcontainer 从 configmap 拷贝到单独目录。

TrainingJob 相关资源:

2.2 角色

TrainingJob CR 的配置分为 Lanucher 和 Worker。在 Launcher 中指定任务的镜像和启动执行, 默认 et-operator 会根据 worker 分配情况,生成一个 hostfile 文件和 discover_host 脚本,discover_host 脚本挂载到 Launcher 的 /etc/edl/discover_hosts.sh 文件, 在入口脚本的 horovodrun 执行中通过 --host-discovery-script 参数指定。在 Worker 设置中指定 worker 的镜像和 GPU 占用 ,并可以通过 maxReplicas / minReplicas 指定 workers 的副本数允许范围。

2.3 程序主流程

程序主流程图如下:

0x03 入口

其实,学习 ETO 主要就是学习如何扩容和缩容。但是为了学习这个,我们还是需要梳理一下程序逻辑

不熟悉 K8S 的同学顺便也一起看看其 CRD 如何使用。

3.1 创建

入口代码是 main.go/main 函数,从入口可以看出,

  • 生成了 Controller.Manager。
  • 利用这个 Manager,构建了三个 Reconciler :TrainingJobReconciler,ScaleInReconciler,ScaleOutReconciler。
  • 然后启动 Manager;
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
Port: 9443,
}) const jobPollInterval = "5s" if err = controllers.NewReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
os.Exit(1)
}
if err = controllers.NewScaleOutReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
os.Exit(1)
}
if err = controllers.NewScaleInReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
os.Exit(1)
} if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
}
}

3.2 设置

这里的配置就是建立了消息的响应函数,具体就是响应哪些 CR。

  • 除了 TrainingJob 外,et-operator 同时支持 ScaleOut 和 ScaleIn 两种 CRD,下发训练任务扩容和缩容操作。

  • 当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

  • TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容。

  • 执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

func (r *ScaleInReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kaiv1alpha1.ScaleIn{}).
Complete(r)
} func (r *ScaleOutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kaiv1alpha1.ScaleOut{}).
Complete(r)
} func (r *TrainingJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kaiv1alpha1.TrainingJob{}).
Owns(&kaiv1alpha1.ScaleIn{}).
Owns(&kaiv1alpha1.ScaleOut{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Owns(&corev1.Secret{}).
// Ignore status-only and metadata-only updates
//WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}

0x04 TrainingJobReconciler

顺着代码梳理一下,寻找其设计思想精微之处。

4.1 Reconcile

k8s operator 中reconcile方法 的作用就是不断的watch,当资源变化时 就会触发reconcile方法,理论上有多少次的变化就会执行多少次的reconcile方法。

当有消息来的时候,Reconcile 方法会得到调用。

func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Fetch latest training job instance.
sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob)
trainingJob := sharedTrainingJob.DeepCopy()
// Check reconcile is required.
// No need to do reconcile or job has been deleted.
r.Scheme.Default(trainingJob)
return r.ReconcileJobs(trainingJob)
}

4.2 ReconcileJobs

因为消息中状态是 "",所以运行了 initializeJob,并且进行 reconcileResource。

func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
oldJobStatus := job.Status.DeepCopy() defer func() {
latestJob := &kaiv1alpha1.TrainingJob{}
err := r.Get(context.Background(), types.NamespacedName{
Name: job.Name,
Namespace: job.Namespace,
}, latestJob)
if err == nil {
if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
latestJob.Status = job.Status
job = latestJob
}
}
r.updateObjectStatus(job, oldJobStatus)
}() switch job.Status.Phase {
case commonv1.JobSucceeded, commonv1.JobFailed:
err = r.cleanup(job)
case "", commonv1.JobCreated: // 如果状态为空 或者 JobCreated,则初始化
r.initializeJob(job)
err = r.reconcileResource(job)
case commonv1.JobRunning:
err = r.reconcileJobRunning(job)
case commonv1.Scaling:
err = r.executeScaling(job)
} if err != nil {
if IsRequeueError(err) {
return RequeueAfterInterval(r.PollInterval, nil)
}
return RequeueAfterInterval(r.PollInterval, err)
}
return NoRequeue()
}

4.3 reconcileResource

reconcileResource 其实就是调用 doSteps,调用一个状态机继续初始化。

func (r *TrainingJobReconciler) reconcileResource(job *kaiv1alpha1.TrainingJob) error {
steps := r.newSteps()
err := r.doSteps(job, steps)
return err
}

4.4 doSteps

newSteps 构建了一个简单的状态机,是一个初始化步骤,按照序列执行,doSteps 会根据状态进行不同的分支处理。

有几点需要说明:

  • Created 之后的几个状态,应该是: WorkersCreated ---> WorkersReady ----> LauncherCreated ---> JobRunning
  • 这个是事后状态,即对应 action 完成之后应该达到的状态。
  • 在 for 循环之中,如果当前 Job 已经达到了某个状态,就跳过继续,直到某一个未完状态,就去执行对应的action。所以理论上说,会从 WorkersCreated 逐步执行到 JobRunning。
  • 在某个状态对应的 Action 中,执行完成之后,会设置 Job 为这个 完成状态。

代码如下:

func (r *TrainingJobReconciler) newSteps() []Step {
return []Step{
Step{
JobCondition: commonv1.WorkersCreated,
Action: r.createTrainingJobWorkers,
},
Step{
JobCondition: commonv1.WorkersReady,
Action: r.waitWorkersRunning,
},
Step{
JobCondition: commonv1.LauncherCreated,
Action: r.createLauncher,
},
Step{
JobCondition: commonv1.JobRunning,
Action: r.syncLauncherState,
},
}
} func (r *TrainingJobReconciler) doSteps(job *kaiv1alpha1.TrainingJob, steps []Step) error {
for _, step := range steps {
if hasCondition(*job.GetJobStatus(), step.JobCondition) {
continue
}
err := step.Action(job)
break
}
return nil
}

所以具体如下:

           Request("")
K8S +--------------------> Reconcile
+
|
|
v
+----------------------+---------------------+
| ReconcileJobs |
| + |
| | |
| +------------------------------+ |
| | | | |
| v v v |
| "", JobCreated JobRunning Scaling |
+--------+-----------------------------------+
|
|
v
reconcileResource
+
|
|
v
+---------+---------------+
| doSteps |
| |
| |
| WorkersCreated +---------> createTrainingJobWorkers
| |
| |
| WorkersReady +----------> waitWorkersRunning
| |
| |
| LauncherCreated +--------> createLauncher
| |
| |
| JobRunning +------------> syncLauncherState
| |
+-------------------------+

4.5 createTrainingJobWorkers

在 doSteps 步骤中,先来到 createTrainingJobWorkers 这个Action。这里会设置 Job 状态为 WorkersCreated。

func (r *TrainingJobReconciler) createTrainingJobWorkers(job *kaiv1alpha1.TrainingJob) error {
if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
if cm, err := r.GetOrCreateSecret(job); cm == nil || err != nil {
updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
return nil
}
} workers := getJobReplicasWorkers(job)
job.Status.TargetWorkers = workers // 创建worker
if err := r.CreateWorkers(job, workers); err != nil {
updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
return nil
}
// 设置新状态
updateJobConditions(job.GetJobStatus(), common.WorkersCreated, "", msg)
return nil
}

4.5.1 CreateWorkers

CreateWorkers 会进行创建worker,如本文前面介绍,worker 包含 service 和 pod,所以创建过程具体为:

  • 调用 另一个同名函数CreateWorkers 来间接创建 workerService。

  • 调用 newWorker 去创建 Pod。

func (r *TrainingJobReconciler) CreateWorkers(job *kaiv1alpha1.TrainingJob, workers []string) error {
return r.createWorkers(job, workers, func(name string, index string) *corev1.Pod {
worker := newWorker(job, name, index)
return worker
})
}
4.5.1.1 createWorkers

这里会循环调用 createWorker 依据配置生成一系列 workers

func (r *TrainingJobReconciler) createWorkers(job *kaiv1alpha1.TrainingJob, workers []string, newPod PodTplGenerator) error {
// 遍历,创建
for _, podName := range workers {
index, err := getWorkerIndex(job.Name, podName)
if err != nil {
return err
}
_, err = r.createWorker(job, int32(index), newPod)
if err != nil {
return err
}
}
return nil
}
4.5.1.2 createWorker

这里会依据参数对 worker Pod 进行判断,如果不存在,则创建 某一个 worker

func (r *TrainingJobReconciler) createWorker(job *kaiv1alpha1.TrainingJob, index int32, workerPodTempl PodTplGenerator) (*corev1.Pod, error) {
name := getWorkerName(job.Name, int(index))
indexStr := strconv.Itoa(int(index))
pod := &corev1.Pod{}
nsn := types.NamespacedName{
Name: name,
Namespace: job.Namespace,
}
err := r.Get(context.Background(), nsn, pod) if err != nil {
// If the worker Pod doesn't exist, we'll create it.
if errors.IsNotFound(err) {
// 如果没有pod,这里也可以创建pod
worker := workerPodTempl(name, indexStr)
if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
util.MountRsaKey(worker, job.Name)
}
if err = r.Create(context.Background(), worker); err != nil {
return nil, err
}
}
} service := &corev1.Service{}
err = r.Get(context.Background(), nsn, service)
if errors.IsNotFound(err) {
// 调用newService 进行具体创建
err = r.Create(context.Background(), newService(job, name, indexStr))
}
return nil, nil
}
4.5.1.3 newService

这里才来到具体创建service,真是百转千回。

func newService(obj interface{}, name string, index string) *corev1.Service {
job, _ := obj.(*kaiv1alpha1.TrainingJob)
labels := GenLabels(job.Name)
labels[labelTrainingRoleType] = worker
labels[replicaIndexLabel] = index
return &corev1.Service{ // 具体创建
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: job.Namespace,
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
},
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Ports: []corev1.ServicePort{
{
Name: "ssh-port",
Port: 22,
},
},
},
}
}

4.5.2 newWorker

newWorker 则构建了 Pod,就是比较常见的套路。

func newWorker(obj interface{}, name string, index string) *corev1.Pod {
job, _ := obj.(*kaiv1alpha1.TrainingJob)
labels := GenLabels(job.Name)
labels[labelTrainingRoleType] = worker
labels[replicaIndexLabel] = index
podSpec := job.Spec.ETReplicaSpecs.Worker.Template.DeepCopy() // keep the labels which are set in PodTemplate
if len(podSpec.Labels) == 0 {
podSpec.Labels = make(map[string]string)
}
for key, value := range labels {
podSpec.Labels[key] = value
} // RestartPolicy=Never
setRestartPolicy(podSpec) container := podSpec.Spec.Containers[0] // if we want to use ssh, will start sshd service firstly.
if len(container.Command) == 0 {
if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
container.Command = []string{"sh", "-c", "/usr/sbin/sshd && sleep 365d"}
} else {
container.Command = []string{"sh", "-c", "sleep 365d"}
}
}
podSpec.Spec.Containers[0] = container // 创建了pod
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: job.Namespace,
Labels: podSpec.Labels,
Annotations: podSpec.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
},
},
Spec: podSpec.Spec,
}
}

逻辑如下:

           Request("")
K8S +--------------------> Reconcile
+
|
|
v
+----------------------+---------------------+
| ReconcileJobs |
| + |
| | |
| +------------------------------+ |
| | | | |
| v v v |
| "", JobCreated JobRunning Scaling |
+--------+-----------------------------------+
|
|
v
reconcileResource
+
|
|
v
+---------+---------------+
| doSteps | +----> createWorkers +----> createWorker +----> newService
| | |
| | +
| WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers +-------> newWorker +------> WorkersCreated
| |
| |
| WorkersReady +----------> waitWorkersRunning
| |
| |
| LauncherCreated +--------> createLauncher
| |
| |
| JobRunning +------------> syncLauncherState
| |
+-------------------------+

手机如下:

4.8 createLauncher

建立完 worker 之后,就开始建立 Launcher。所以继续执行 createLauncher。

func (r *TrainingJobReconciler) createLauncher(job *kaiv1alpha1.TrainingJob) error {
if _, err := r.GetOrCreateLauncherServiceAccount(job); err != nil {
updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
return nil
}
if _, err := r.GetOrCreateLauncherRole(job, 0); err != nil {
updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
return nil
}
if _, err := r.GetLauncherRoleBinding(job); err != nil {
updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
return nil
} if cm, err := r.CreateHostConfigMap(job); cm == nil || err != nil {
updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
return nil
} launcher, err := r.GetLauncherJob(job) if launcher == nil {
if _, err := r.CreateLauncher(job); err != nil {
updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
return nil
}
} updateJobConditions(job.GetJobStatus(), commonv1.LauncherCreated, "", msg)
return nil
}

我们取两个重点步骤。

4.8.1 CreateHostConfigMap

这里获取关于host的配置。

func (r *TrainingJobReconciler) CreateHostConfigMap(job *kaiv1alpha1.TrainingJob) (*corev1.ConfigMap, error) {
return r.createConfigMap(job, newHostfileConfigMap)
} func (r *TrainingJobReconciler) createConfigMap(job *kaiv1alpha1.TrainingJob, newCm func(job *kaiv1alpha1.TrainingJob) *corev1.ConfigMap) (*corev1.ConfigMap, error) {
cm := &corev1.ConfigMap{}
name := ctrl.Request{}
name.NamespacedName.Namespace = job.GetNamespace()
name.NamespacedName.Name = job.GetName() + configSuffix
err := r.Get(context.Background(), name.NamespacedName, cm)
if errors.IsNotFound(err) {
if err = r.Create(context.Background(), newCm(job)); err != nil {
return cm, err
}
}
return cm, nil
}

4.8.2 创建pod

4.8.2.1 CreateLauncher

这里进行pod的创建

func (r *TrainingJobReconciler) CreateLauncher(obj interface{}) (*corev1.Pod, error) {
job, ok := obj.(*kaiv1alpha1.TrainingJob)
launcher := newLauncher(job) // 创建pod
if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
util.MountRsaKey(launcher, job.Name)
}
err := r.Create(context.Background(), launcher)
return launcher, nil
}
4.8.2.2 newLauncher

这里就是具体构建 Pod。

func newLauncher(obj interface{}) *corev1.Pod {
job, _ := obj.(*kaiv1alpha1.TrainingJob)
launcherName := job.Name + launcherSuffix
labels := GenLabels(job.Name)
labels[labelTrainingRoleType] = launcher
podSpec := job.Spec.ETReplicaSpecs.Launcher.Template.DeepCopy()
// copy the labels and annotations to pod from PodTemplate
if len(podSpec.Labels) == 0 {
podSpec.Labels = make(map[string]string)
}
for key, value := range labels {
podSpec.Labels[key] = value
}
podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, initContainer(job))
//podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, kubedeliveryContainer()) container := podSpec.Spec.Containers[0]
container.VolumeMounts = append(container.VolumeMounts,
corev1.VolumeMount{
Name: hostfileVolumeName,
MountPath: hostfileMountPath,
},
corev1.VolumeMount{
Name: configVolumeName,
MountPath: configMountPath,
},
corev1.VolumeMount{
Name: kubectlVolumeName,
MountPath: kubectlMountPath,
}) if job.GetAttachMode() == kaiv1alpha1.AttachModeKubexec {
container.Env = append(container.Env, corev1.EnvVar{
Name: "OMPI_MCA_plm_rsh_agent",
Value: getKubexecPath(),
})
}
podSpec.Spec.Containers[0] = container
podSpec.Spec.ServiceAccountName = launcherName setRestartPolicy(podSpec)
hostfileMode := int32(0444)
scriptMode := int32(0555) podSpec.Spec.Volumes = append(podSpec.Spec.Volumes,
corev1.Volume{
Name: hostfileVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
corev1.Volume{
Name: kubectlVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
corev1.Volume{
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: job.Name + configSuffix,
},
Items: []corev1.KeyToPath{
{
Key: hostfileName,
Path: hostfileName,
Mode: &hostfileMode,
},
{
Key: discoverHostName,
Path: discoverHostName,
Mode: &hostfileMode,
},
{
Key: kubexeclFileName,
Path: kubexeclFileName,
Mode: &scriptMode,
},
},
},
},
})
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: launcherName,
Namespace: job.Namespace,
Labels: podSpec.Labels,
Annotations: podSpec.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
},
},
Spec: podSpec.Spec,
}
}

至此,一个新的训练job被运行起来,其逻辑拓展如下:

           Request("")
K8S ---------------------> Reconcile
+
|
|
v
+----------------------+---------------------+
| ReconcileJobs |
| + |
| | |
| +------------------------------+ |
| | | | |
| v v v |
| "", JobCreated JobRunning Scaling |
+--------+-----------------------------------+
|
|
v
reconcileResource
+
|
|
v
+---------+---------------+
| doSteps | +----> createWorkers +----> createWorker +----> newService
| | |
| | |
| WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers +-------> newWorker +------> WorkersCreated
| |
| |
| WorkersReady +----------> waitWorkersRunning
| |
| |
| LauncherCreated +--------> createLauncher+----> CreateHostConfigMap +-----> CreateLauncher +------> newLauncher
| |
| |
| JobRunning +------------> syncLauncherState
| |
+-------------------------+

手机如下:

完成了新job的创建,我们看看本文的关键技术点,scaleOut 和 scaleIn。

0x05 ScaleOut

5.1 思路

ScaleOut 任务 CR如下:

当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

以一个 ScaleOut 操作举例:

- apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleOut
metadata:
creationTimestamp: "2020-11-04T13:54:26Z
name: scaleout-ptfnk
namespace: default
ownerReferences:
- apiVersion: kai.alibabacloud.com/v1alpha1
blockOwnerDeletion: true
controller: true
kind: TrainingJob
name: elastic-training // 指向扩容对象TrainingJob
uid: 075b9c4a-22f9-40ce-83c7-656b329a2b9e
spec:
selector:
name: elastic-training
toAdd:
count: 2

5.2 Reconcile

当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile。主要就是调用 setScalingOwner。

func (r *ScaleOutReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
scaleOut, err := getScaleOut(req.NamespacedName, r.Client)
if err != nil {
// Error reading the object - requeue the request.
return RequeueImmediately()
}
if scaleOut == nil || scaleOut.DeletionTimestamp != nil {
return NoRequeue()
} if isScaleFinished(*scaleOut.GetJobStatus()) {
return NoRequeue()
} return setScalingOwner(r, scaleOut, r.PollInterval)
}

5.3 setScalingOwner

setScalingOwner 是关键之一。

这里主要是处理当 ScaleOut CR 没有设置 OwnerReferences 的情况,就设置一个。

逻辑是 根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
ownerRefs := scaler.GetOwnerReferences()
if len(ownerRefs) == 0 {
trainingJob := &kaiv1alpha1.TrainingJob{}
nsn := types.NamespacedName{}
nsn.Namespace = scaler.GetNamespace()
nsn.Name = scaler.GetSelector().Name
err := r.Get(context.Background(), nsn, trainingJob)
gvk := kaiv1alpha1.SchemeGroupVersionKind
ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
scaler.SetOwnerReferences(ownerRefs) initializeJobStatus(scaler.GetJobStatus())
updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
err = r.Status().Update(context.Background(), scaler)
err = r.Update(context.Background(), scaler)
}
return NoRequeue()
} // RequeueAfterInterval requeues after a duration when duration > 0 is specified.
func RequeueAfterInterval(interval time.Duration, err error) (ctrl.Result, error) {
return ctrl.Result{RequeueAfter: interval}, err
}

5.4 TrainingJobController

TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容

5.4.1 Reconcile

func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

	rlog := r.Log.WithValues("trainingjob", req.NamespacedName)
// Fetch latest training job instance.
sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob) trainingJob := sharedTrainingJob.DeepCopy()
// Check reconcile is required.
// No need to do reconcile or job has been deleted. r.Scheme.Default(trainingJob) return r.ReconcileJobs(trainingJob)
}

5.4.2 ReconcileJobs

func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
oldJobStatus := job.Status.DeepCopy() logger.Infof("jobName: %v, phase %s", job.Name, job.Status.Phase) defer func() {
latestJob := &kaiv1alpha1.TrainingJob{}
err := r.Get(context.Background(), types.NamespacedName{
Name: job.Name,
Namespace: job.Namespace,
}, latestJob)
if err == nil {
if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
latestJob.Status = job.Status
job = latestJob
}
}
r.updateObjectStatus(job, oldJobStatus)
}() switch job.Status.Phase {
case commonv1.JobSucceeded, commonv1.JobFailed:
err = r.cleanup(job)
case "", commonv1.JobCreated:
r.initializeJob(job)
err = r.reconcileResource(job)
case commonv1.JobRunning:
err = r.reconcileJobRunning(job)
case commonv1.Scaling:
err = r.executeScaling(job)
default:
logger.Warnf("job %s unknown status %s", job.Name, job.Status.Phase)
} if err != nil {
if IsRequeueError(err) {
return RequeueAfterInterval(r.PollInterval, nil)
}
return RequeueAfterInterval(r.PollInterval, err)
}
return NoRequeue()
}

以下根据当前 job 状态不同,就有两条线,先是 JobRunning ,然后是 Scaling,最后恢复成 JobRunning。

我们一一分析。

5.5 JobRunning

首先是来到 JobRunning 状态,我们依次看看如何处理。

5.5.1 reconcileJobRunning

func (r *TrainingJobReconciler) reconcileJobRunning(job *kaiv1alpha1.TrainingJob) error {
if err := r.syncLauncherState(job); err != nil {
return err
}
if err := r.syncWorkersState(job); err != nil {
return err
} if job.Status.Phase == commonv1.JobRunning {
return r.setTrainingJobScaler(job) // 既然是JobRunning状态,就可以开始进行设置scaler
} return nil
}

5.5.2 setTrainingJobScaler

首先,通过 availableScaleOutList 或者 availableScaleInList ,然后进行update。

func (r *TrainingJobReconciler) setTrainingJobScaler(job *kaiv1alpha1.TrainingJob) error {
scaleOut, err := r.availableScaleOutList(job) // 找到scaleout list scaleIn, err := r.availableScaleInList(job) // 找到scaleIn list scalerList := append(scaleOut, scaleIn...) // 合并 // Select the latest scaling job
r.updateLatestScaler(job, scalerList) // 开始设置
return nil
}

5.5.3 updateLatestScaler

依据创建时间和状态时间,找到最后一个Scaler。

func (r *TrainingJobReconciler) updateLatestScaler(job *kaiv1alpha1.TrainingJob, scalers []Scaler) error {
var latestScaler Scaler
if len(scalers) == 0 {
return nil
}
for i, _ := range scalers {
scalerItem := scalers[i]
// 依据创建时间和状态时间,找到最后一个Scaler
if latestScaler == nil || latestScaler.GetCreationTimestamp().Time.Before(scalerItem.GetCreationTimestamp().Time) {
latestScaler = scalerItem
}
}
return r.updateCurrentScaler(job, latestScaler)
}

5.5.4 updateCurrentScaler

对找到的scaler进行设置。

func (r *TrainingJobReconciler) updateCurrentScaler(job *kaiv1alpha1.TrainingJob, scaleItem Scaler) error {
job.Status.CurrentScaler = scaleItem.GetFullName()
msg := fmt.Sprintf("trainingJobob(%s/%s) execute %s", job.Namespace, job.Name, scaleItem.GetFullName()) // 设置状态
r.updateScalerState(scaleItem, job, newCondition(common.Scaling, scalingStartReason, msg)) if err := r.updateObjectStatus(scaleItem, nil); err != nil {
return err
}
return nil
}

5.5.5 updateScalerState

这时候会设置 common.Scaling。所以下次运行,会到 Scaling 分支。

func (r *TrainingJobReconciler) updateScalerState(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, condition common.JobCondition) error {

	jobPhase := common.Scaling // 设置 common.Scaling。所以下次运行,会到 Scaling 分支
currentJob := scaleObj.GetFullName()
if condition.Type == common.ScaleSucceeded || condition.Type == common.ScaleFailed {
jobPhase = common.JobRunning
currentJob = ""
} setCondition(trainingJob.GetJobStatus(), condition)
updateStatusPhase(trainingJob.GetJobStatus(), jobPhase)
updateTrainingJobCurrentScaler(trainingJob.GetJobStatus(), currentJob) setCondition(scaleObj.GetJobStatus(), condition)
updateStatusPhase(scaleObj.GetJobStatus(), condition.Type) return nil
}

逻辑如下:

           1 Request("")
K8S +--------------------> Reconcile <------------------+
2 ScaleOut CR + |
K8S +--------------------> | |
| |
v |
+----------------------+---------------------+ |
| ReconcileJobs | |
| + | |
| | | |
| +------------------------------+ | |
| 1 | | 2 3 | | |
| v v v | |
| "", JobCreated JobRunning Scaling | |
+--------+-------------+---------------------+ |
| | |
1 | | 2 |
v v |
reconcileResource reconcileJobRunning |
+ + |
1 | | 2 |
| | |
v v |
+--------------------+----+ setTrainingJobScaler |
| doSteps | + |
| | | 2 |
| | | |
| WorkersCreated | v |
| | updateScalerState |
| | + |
| WorkersReady | | |
| | | 2 |
| | v |
| LauncherCreated | common.Scaling |
| | + |
| | | |
| JobRunning | | 2 |
| | | |
+-------------------------+ +-------------------------+

5.6 Scaling

5.6.1 executeScaling

依据 scale 的类型不同,进行不同扩展。

func (r *TrainingJobReconciler) executeScaling(job *kaiv1alpha1.TrainingJob) error {
if err := r.syncLauncherState(job); err != nil {
return err
} if job.Status.CurrentScaler == "" {
updateStatusPhase(job.GetJobStatus(), common.JobRunning)
return nil
} if isFinished(*job.GetJobStatus()) {
return nil
} scalerType, scalerName := getScalerName(job.Status.CurrentScaler)
// 根据 in 还是 out 进行不同的处理
if scalerType == "ScaleIn" {
scaleIn, err := getScaleIn(scalerName, r) if scaleIn == nil || isScaleFinished(*scaleIn.GetJobStatus()) {
finishTrainingScaler(job.GetJobStatus())
return nil
} oldStatus := scaleIn.Status.DeepCopy()
defer r.updateObjectStatus(scaleIn, oldStatus) // 执行具体缩容操作
if err = r.executeScaleIn(job, scaleIn); err != nil {
return err
}
} else if scalerType == "ScaleOut" {
scaleOut, err := getScaleOut(scalerName, r) if scaleOut == nil || isScaleFinished(*scaleOut.GetJobStatus()) {
finishTrainingScaler(job.GetJobStatus())
return nil
} oldStatus := scaleOut.Status.DeepCopy()
defer r.updateObjectStatus(scaleOut, oldStatus) // 执行具体扩容操作
if err = r.executeScaleOut(job, scaleOut); err != nil {
}
}
return nil
}

5.6.2 executeScaleOut

进行扩展。

  • 使用 setScaleOutWorkers 对 scaleOut.Status.AddPods 进行添加新 pods。
  • 使用 workersAfterScaler 得到 最终的 worker。
  • 使用 executeScaleScript 进行scale 操作。
func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut) error {

  initializeJobStatus(scaleOut.GetJobStatus())

	if err := r.validateScaleOut(scaleOut); err != nil {
r.updateScalerFailed(scaleOut, job, err.Error())
return err
} if err := r.setScaleOutWorkers(job, scaleOut); err != nil {
return err
} err := r.ScaleOutWorkers(job, scaleOut)
if err != nil {
msg := fmt.Sprintf("%s create scaleout workers failed, error: %v", scaleOut.GetFullName(), err)
r.ScaleOutFailed(job, scaleOut, msg)
return err
} scaleOutWorkers, err := r.getScalerOutWorkers(job, scaleOut) workerStatuses, _ := r.workerReplicasStatus(scaleOut.GetJobStatus(), scaleOutWorkers) if workerStatuses.Active < *scaleOut.Spec.ToAdd.Count {
if IsScaleOutTimeout(scaleOut) {
msg := fmt.Sprintf("scaleout job %s execution timeout", scaleOut.GetFullName())
r.ScaleOutFailed(job, scaleOut, msg)
}
return NewRequeueError(fmt.Errorf("wait for workers running"))
} hostWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleOut) // execute scalein script
// 执行scale脚本
if err := r.executeScaleScript(job, scaleOut, hostWorkers); err != nil {
msg := fmt.Sprintf("%s execute script failed, error: %v", scaleOut.GetFullName(), err)
r.ScaleOutFailed(job, scaleOut, msg)
return err
} else {
job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleOut)
r.updateScalerSuccessd(scaleOut, job)
} return nil
}

5.6.3 executeScaleScript

这时候调用 hostfileUpdateScript,更新 host file;

最终调用 executeOnLauncher执行脚本。

func (r *TrainingJobReconciler) executeScaleScript(trainingJob *kaiv1alpha1.TrainingJob, scaler Scaler, workers []string) error {
if isScriptExecuted(*scaler.GetJobStatus()) {
return nil
}
msg := fmt.Sprintf("trainingjob(%s/%s): execute script on launcher for %s", trainingJob.Namespace, trainingJob.Name, scaler.GetFullName()) slots := getSlots(trainingJob)
scriptSpec := scaler.GetScriptSpec() var script string
// 得到脚本
if scriptSpec.Script != "" {
script = scalerScript(scriptSpec.GetTimeout(), scriptSpec.Env, scriptSpec.Script, scaler.GetPodNames(), slots)
} else {
hostfilePath := getHostfilePath(trainingJob)
script = hostfileUpdateScript(hostfilePath, workers, slots)
} // 执行脚本
_, _, err := r.executeOnLauncher(trainingJob, script) updateJobConditions(scaler.GetJobStatus(), common.ScriptExecuted, "", msg)
return nil
}
5.6.3.1 hostfileUpdateScript

得到最终的脚本string。

func hostfileUpdateScript(hostfile string, workers []string, slot int) string {
return fmt.Sprintf(
`echo '%s' > %s`, getHostfileContent(workers, slot), hostfile)
}
5.6.3.2 getHostfileContent

获取host file内容

func getHostfileContent(workers []string, slot int) string {
var buffer bytes.Buffer
for _, worker := range workers {
buffer.WriteString(fmt.Sprintf("%s:%d\n", worker, slot))
}
return buffer.String()
}
5.6.3.3 executeOnLauncher

在pod上执行

func (r *TrainingJobReconciler) executeOnLauncher(trainingJob *kaiv1alpha1.TrainingJob, script string) (string, string, error) {
var err error
var launcherPod *corev1.Pod
if launcherPod, err = r.GetLauncherJob(trainingJob); err != nil {
} if launcherPod != nil {
stdOut, stdErr, err := kubectlOnPod(launcherPod, script)
return stdOut, stdErr, nil
}
return "", "", nil
}
5.6.3.4 kubectlOnPod

拉动 worker。

func kubectlOnPod(pod *corev1.Pod, cmd string) (string, string, error) {
cmds := []string{
"/bin/sh",
"-c",
cmd,
}
stdout, stderr, err := util.ExecCommandInContainerWithFullOutput(pod.Name, pod.Spec.Containers[0].Name, pod.Namespace, cmds)
if err != nil {
return stdout, stderr, err
}
return stdout, stderr, nil
}

逻辑如下:

           1 Request("")
K8S +--------------------> Reconcile <------------------+
2 ScaleOut CR + |
K8S +--------------------> | |
| |
v |
+----------------------+---------------------+ |
| ReconcileJobs | |
| + | |
| | | |
| +------------------------------+ | |
| 1 | | 2 3 | | |
| v v v | | 3
| "", JobCreated JobRunning Scaling +-----------> executeScaling
+--------+-------------+---------------------+ | +
| | | |
1 | | 2 | | 3
v v | v
reconcileResource reconcileJobRunning | executeScaleOut
+ + | +
1 | | 2 | |
| | | | 3
v v | v
+--------------------+----+ setTrainingJobScaler | executeScaleScript
| doSteps | + | +
| | | 2 | |
| | | | | 3
| WorkersCreated | v | v
| | updateScalerState | hostfileUpdateScript
| | + | +
| WorkersReady | | | | 3
| | | 2 | |
| | v | v
| LauncherCreated | common.Scaling | executeOnLauncher
| | + | +
| | | | |
| JobRunning | | 2 | | 3
| | | | v
+-------------------------+ +-------------------------+ kubectlOnPod

0x06 ScaleIn

6.1 思路

ScaleIn 任务 CR如下:

执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleIn
metadata:
name: scalein-workers
spec:
selector:
name: elastic-training
toDelete:
count: 1

如果想要缩容特定的 Worker,可以配置 podNames:

apiVersion: kai.alibabacloud.com/v1alpha1
kind: ScaleIn
metadata:
name: scalein-workers
spec:
selector:
name: elastic-training
toDelete:
podNames:
- elastic-training-worker-1

运行一个缩容示例,指定数量缩容 1 个 worker:

kubectl create -f examples/scale_in_count.yaml

6.2 Reconcile

当下发一个 scaleInCR,Controller 触发 Reconcile。主要就是调用 setScalingOwner。

func (r *ScaleInReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
//silog := r.Log.WithValues("scalein", req.NamespacedName)
scaleIn, err := getScaleIn(req.NamespacedName, r.Client) if isScaleFinished(*scaleIn.GetJobStatus()) {
return NoRequeue()
} // 以上基本都是各种校验
return setScalingOwner(r, scaleIn, r.PollInterval)
}

6.3 setScalingOwner

setScalingOwner 是关键之一。

这里主要是处理当 ScaleIn CR 没有设置 OwnerReferences 的情况,就设置一个。

逻辑是 根据 ScaleIn CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

下面移除各种错误检查代码。

func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
ownerRefs := scaler.GetOwnerReferences()
if len(ownerRefs) == 0 {
trainingJob := &kaiv1alpha1.TrainingJob{}
nsn := types.NamespacedName{}
nsn.Namespace = scaler.GetNamespace()
nsn.Name = scaler.GetSelector().Name
err := r.Get(context.Background(), nsn, trainingJob) gvk := kaiv1alpha1.SchemeGroupVersionKind
ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
scaler.SetOwnerReferences(ownerRefs) initializeJobStatus(scaler.GetJobStatus())
updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
err = r.Status().Update(context.Background(), scaler)
err = r.Update(context.Background(), scaler)
}
return NoRequeue()
}

6.4 executeScaleIn

JobRunning 状态处理与 ScaleOut类似,所以略过,直接看处理executeScaleIn。

执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

具体结合代码就是:

setsSaleInToDelete 指定哪些需要删除;

executeScaleScript 执行脚本;

DeleteWorkers 删除 worker;

func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
if scaleIn.DeletionTimestamp != nil || isScaleFinished(*scaleIn.GetJobStatus()) {
logger.Info("reconcile cancelled, scalein does not need to do reconcile or has been deleted")
return nil
} initializeJobStatus(scaleIn.GetJobStatus()) //TODO: Validate the scalein count for minSize
err := r.setsSaleInToDelete(job, scaleIn) currentWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleIn) // execute scalein script
if err := r.executeScaleScript(job, scaleIn, currentWorkers); err != nil {
msg := fmt.Sprintf("%s execute script failed, error: %v", scaleIn.GetFullName(), err)
r.updateScalerFailed(scaleIn, job, msg)
return nil
} toDeleteWorkers := scaleIn.GetPodNames()
remainWorkers := false
if scaleIn.Spec.Script == "" {
if shutdownWorkers, err := r.checkWorkerShutdown(job, toDeleteWorkers); err != nil {
return err
} else {
if len(toDeleteWorkers) != len(shutdownWorkers) {
remainWorkers = true
toDeleteWorkers = shutdownWorkers
}
}
}
if err := r.DeleteWorkers(job, toDeleteWorkers); err != nil {
msg := fmt.Sprintf("%s delete resource failed, error: %v", scaleIn.GetFullName(), err)
r.updateScalerFailed(scaleIn, job, msg)
return nil
} // wait pods deleted
deleted, _ := r.isWorkersDeleted(job.Namespace, scaleIn.GetPodNames())
if deleted {
job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleIn)
job.Status.CurrentWorkers = currentWorkers
r.updateScalerSuccessd(scaleIn, job)
return nil
} if remainWorkers {
msg := "wait for workers process shutdown"
logger.Info(msg)
return NewRequeueError(fmt.Errorf(msg))
} return nil
}

6.5 setsSaleInToDelete

通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
podNames := scaleIn.Status.ToDeletePods
if len(podNames) != 0 {
return /*filterPodNames(workers, podNames, false), */ nil
}
workers, err := r.GetWorkerPods(job) toDelete := scaleIn.Spec.ToDelete if toDelete.PodNames != nil {
workers = filterPodNames(workers, toDelete.PodNames, false)
} else if toDelete.Count > 0 {
if toDelete.Count < len(workers) {
allPodNames := getSortPodNames(job.Name, workers)
deletePodNames := allPodNames[len(workers)-toDelete.Count:]
workers = filterPodNames(workers, deletePodNames, false)
}
} for _, worker := range workers {
scaleIn.Status.ToDeletePods = append(scaleIn.Status.ToDeletePods, worker.Name)
} return nil
}

6.6 DeleteWorkers

具体删除worker service 和 pods。

func (r *TrainingJobReconciler) DeleteWorkers(trainingJob *kaiv1alpha1.TrainingJob, workers []string) error {
if err := r.DeleteWorkerServices(trainingJob, workers); err != nil {
return fmt.Errorf("delete services failed: %++v", err)
} if err := r.DeleteWorkerPods(trainingJob, workers); err != nil {
return fmt.Errorf("delete pods failed: %++v", err)
}
return nil
}

6.7 DeleteWorkerPods

删除pods。

func (r *TrainingJobReconciler) DeleteWorkerPods(job *kaiv1alpha1.TrainingJob, pods []string) error {
workerPods, err := r.GetWorkerPods(job) if pods != nil {
workerPods = filterPodNames(workerPods, pods, false)
}
for _, pod := range workerPods {
deleteOptions := &client.DeleteOptions{GracePeriodSeconds: utilpointer.Int64Ptr(0)}
if err := r.Delete(context.Background(), &pod, deleteOptions); err != nil && !errors.IsNotFound(err) {
r.recorder.Eventf(job, corev1.EventTypeWarning, trainingJobFailedReason, "Error deleting worker %s: %v", pod.Name, err)
//return err
}
r.recorder.Eventf(job, corev1.EventTypeNormal, trainingJobSucceededReason, "Deleted pod %s", pod.Name)
}
return nil
}

具体逻辑如下:

      1 Request("")
K8S-----------------> Reconcile <------------------+
2 ScaleOut CR + |
K8S-----------------> | |
| |
v |
+----------------------+---------------------+ |
| ReconcileJobs | |
| + | |
| | | |
| +------------------------------+ | |
| 1 | | 2 3 | | |
| v v v | | 3
| "", JobCreated JobRunning Scaling +---------> executeScaling -----+
+--------+-------------+---------------------+ | + |
| | | | |
1 | | 2 | | 3 | 4
v v | v v
reconcileResource reconcileJobRunning | executeScaleOut executeScaleIn
+ + | + +
1 | | 2 | | |
| | | | 3 | 4
v v | v v
+------------+--------+ setTrainingJobScaler | executeScaleScript executeScaleScript
| doSteps | + | + +
| | | 2 | | |
| | | | | 3 | 4
| WorkersCreated | v | v v
| | updateScalerState | hostfileUpdateScript DeleteWorkers
| | + | + +
| WorkersReady | | | | 3 | 4
| | | 2 | | |
| | v | v v
| LauncherCreated | common.Scaling | executeOnLauncher DeleteWorkerPods
| | + | + +
| | | | | |
| JobRunning | | 2 | | 3 | 4
| | | | v v
+---------------------+ +-------------------------+ kubectlOnPod Delete

至此,Horovod系列分析完毕,下一篇开始分析参数服务器,敬请期待。

0xEE 个人信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

0xFF 参考

ElasticDL 分析

在 Kubernetes 上弹性深度学习训练利器 -- Elastic Training Operator

最新文章

  1. GDB 多线程调试:只停止断点的线程,其他线程任然执行; 或只运行某些线程 其他线程中断
  2. js遮罩效果
  3. SQLAutoCode - error when attempting to generate schema
  4. onCreateView中加载大位图
  5. 关于javascript的一些知识以及循环
  6. Java基础(50):二分法查找的非递归实现和递归实现(完整代码可运行,参考VisualGO理解更佳)
  7. JVM基础学习
  8. libcurl 使用的几个注意事项
  9. [Angular 2] Using the @Inject decorator
  10. OpenCV——识别印刷体数字
  11. python中使用traceback来追踪异常
  12. C++ Primer注意事项11_运算符重载_算术/关系运算符_下标运算符
  13. codeigniter 操作mysql的PHP代码--更新
  14. 软工+C(2017第4期) Alpha/Beta换人
  15. 服务器cpu100%问题分析
  16. 第四节:IO、序列化和反序列化、加密解密技术
  17. MQ(2)---JMS
  18. Python快速学习09: 函数的参数
  19. Go语言的接口
  20. Java XML JSON 数据解析

热门文章

  1. 在游戏中播放cg视频遇到的问题
  2. javaScript学习关于常用注册监听和对象的创建
  3. 基于Gitea搭建属于自己的Git服务
  4. maven的setting配置远程仓库
  5. PTA 朋友圈 (25 分) 代码详解 (并查集)
  6. 生成二维码项目pom.xml中QRCode依赖报错
  7. Spring Boot 项目集成Redis
  8. MySQL——分表,分库操作
  9. sql 中的with 语句使用
  10. [ASP.NET MVC]@Html.ActionLik重载