k8s operator 入门

The operator design pattern defines how to manage application and infrastructure resources using domain-specific knowledge and declarative state. The goal of the pattern is to reduce the amount of manual imperative work (how to backup, scale, upgrade...) which is required to keep an application in a healthy and well-maintained state, by capturing that domain specific knowledge in code and exposing it using a declarative API.

Kubernetes Operator 是一种用于打包、部署和管理 Kubernetes 应用的方法。它是一个应用特定的控制器,扩展了 Kubernetes 的功能,使复杂的、有状态的应用能够像简单的无状态应用一样易于部署和管理。

Operator 模式由 CoreOS 公司在 2016 年首次提出。这个概念源于以下需求:

  • 需要一种方法来扩展 Kubernetes 的能力,以管理复杂的有状态应用。
  • 希望将人类操作员的知识自动化,减少人为错误和提高效率。
  • 寻求一种标准化的方式来部署和管理复杂的分布式系统。

CoreOS 最初创建 Operator 是为了管理 etcd 集群,后来这个概念被广泛采用,应用于各种复杂系统的管理。

Operator 的主要应用包括:

1、Install an Application / Take Ownership of an Application。

2、Upgrade an Application。

3、Backup

4、Revovery from backup

5、Auto-Remediation

6、Monitoring/Metrics - Observability

7、Scaling

8、Auto-Scaling

9、Auto-Configuation tuning

10、Uninstalling / Disconnect

核心概念

Controller

负责调整和维护资源状态,使得和期望的状态一直。

Technically, there is no difference between a typical controller and an operator. Often the difference referred to is the operational knowledge that is included in the operator. Therefore, a controller is the implementation, and the operator is the pattern of using custom controllers with CRDs and automation is what is looking to be achieved with this.

Operator 是一种模式,controller 是实现,差异是知识,也就是 CRD。

It’s a controller’s job to ensure that, for any given object, the actual state of the world (both the cluster state, and potentially external state like running containers for Kubelet or loadbalancers for a cloud provider) matches the desired state in the object. Each controller focuses on one root Kind, but may interact with other Kinds.

控制器的工作是确保,对于任何给定对象,世界的实际状态(集群状态,以及潜在的外部状态,如 Kubelet 的运行容器或云提供商的负载均衡器)与对象中的所需状态匹配。每个控制器都专注于一个根种类,但可以与其他种类交互。

Groups and Versions

An API Group in Kubernetes is simply a collection of related functionality. Each group has one or more versions, which, as the name suggests, allow us to change how an API works over time.

API 组是是相关功能的江南,每个组有一个或者多个版本。

Kind

每个 API 组版本都包含一个或多个 API 类型,称之为 Kind。

Resources

A resource is simply a use of a Kind in the API. Often, there’s a one-to-one mapping between Kinds and resources. For instance, the pods resource corresponds to the Pod Kind. However, sometimes, the same Kind may be returned by multiple resources. For instance, the Scale Kind is returned by all scale subresources, like deployments/scale or replicasets/scale. This is what allows the Kubernetes HorizontalPodAutoscaler to interact with different resources. With CRDs, however, each Kind will correspond to a single resource.

资源只是在 Kind 的使用。资源和 Kind 通常是一对一的。例如,pods 资源对应 Pod Kind。有时,一个 Kind 对应多个资源,如 Scale Kind 对应的资源有 deployments/scalereplicasets/scale 。对于 CRD ,每个 Kind 对应一个资源。

GVK and GVR

When we refer to a kind in a particular group-version, we’ll call it a GroupVersionKind, or GVK for short. Same with resources and GVR. As we’ll see shortly, each GVK corresponds to a given root Go type in a package.

特定组版本中的一个类型或资源,每个 GVK 对应一个 Go type。

工作原理

Operator 的工作原理基于 Kubernetes 的核心概念和扩展机制。主要包括以下几个关键组件和过程:

自定义资源 (Custom Resources, CRs)

  • 定义: 自定义资源是 Kubernetes API 的扩展,允许用户定义特定于应用的对象。
  • 作用: CRs 代表了应用的期望状态,包含了配置、规格等信息。
  • 示例: 对于一个数据库 Operator,CR 可能包含数据库的版本、副本数、存储配置等。

自定义资源定义 (Custom Resource Definitions, CRDs)

  • 定义: CRD 是自定义资源的模式定义,描述了 CR 的结构和验证规则。
  • 作用: 告诉 Kubernetes 如何解释和处理自定义资源。
  • 示例: 数据库 CRD 可能定义字段如 spec.versionspec.replicasspec.storage 等。

控制器 (Controller)

  • 定义: 控制器是 Operator 的核心,是一个持续运行的循环程序。
  • 作用: 观察集群的当前状态,将其与 CR 中定义的期望状态进行比较,并采取行动使实际状态与期望状态一致。
  • 工作流程:
    1. 观察 (Watch): 监听与其负责的 CR 相关的事件。
    2. 分析 (Analyze): 比较当前状态与期望状态的差异。
    3. 调谐 (Reconcile): 执行必要的操作以达成期望状态。

调谐循环 (Reconciliation Loop)

  • 定义: 调谐循环是控制器的核心逻辑,持续执行以确保系统状态与期望状态一致。
  • 过程:
    1. 获取 CR 实例的最新状态。
    2. 观察相关资源的实际状态。
    3. 比较实际状态与期望状态。
    4. 执行必要的操作以消除差异。
    5. 更新 CR 的状态。

Operator SDK 和 Kubebuilder

  • 作用: 这些工具简化了 Operator 的开发过程,提供了脚手架和常用功能。
  • 功能: 生成代码框架、处理常见的 Kubernetes 交互、提供测试工具等。

工作流程示例

以数据库 Operator 为例:

  1. 用户创建一个描述所需数据库配置的 CR。
  2. Operator 的控制器检测到新的 CR 被创建。
  3. 控制器分析 CR 并开始调谐过程:
    • 创建必要的 Kubernetes 资源(如 Pods, Services, ConfigMaps)。
    • 配置数据库软件。
    • 设置监控和备份。4. 控制器持续监控数据库状态,执行自动化操作(如扩缩容、备份、升级)。
  4. 当 CR 被更新时,控制器检测变化并进行相应调整。

状态报告

  • Operator 通过更新 CR 的 status 字段来报告应用的当前状态。
  • 这允许用户和其他系统组件了解应用的运行情况。

错误处理和重试机制

  • Operator 实现了错误处理和重试逻辑,以处理临时故障和异常情况。
  • 复杂的 Operator 还可能实现自我修复和故障转移机制。

扩展 Kubernetes API

  • Operator 通过 CRD 和自定义控制器扩展了 Kubernetes API。
  • 这使得复杂应用可以像内置资源一样使用 kubectl 和其他 Kubernetes 工具进行管理。

通过这些机制,Operator 能够自动化复杂应用的管理,将人类操作员的知识编码到软件中。这不仅提高了效率和可靠性,还标准化了复杂应用的部署和管理流程。

Reconsile 机制

调谐循环的具体实现过程:

触发机制

调谐循环可以通过以下方式触发:

  • 资源变更: 当相关的自定义资源(CR)被创建、更新或删除时。
  • 周期性调度: 控制器定期触发调谐,即使没有明显的变化。
  • 外部事件: 如集群状态变化、定时任务等。

获取资源状态

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 获取 CR 实例
    var cr myapp.MyCustomResource
    if err := r.Get(ctx, req.NamespacedName, &cr); err != nil {
        // 处理错误,如资源不存在
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // ... 后续步骤
}

状态比对

比较 CR 中定义的期望状态与实际观察到的集群状态:

// 检查所需的 Deployment 是否存在
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.Namespace}, deployment)
if err != nil && errors.IsNotFound(err) {
    // Deployment 不存在,需要创建
    return r.createDeployment(ctx, cr)
} else if err != nil {
    // 其他错误
    return ctrl.Result{}, err
}

// 比较 Deployment 规格
if !reflect.DeepEqual(deployment.Spec, desiredDeploymentSpec(cr)) {
    // 需要更新 Deployment
    return r.updateDeployment(ctx, cr, deployment)
}

执行调谐操作

根据状态比对结果,执行必要的操作:

func (r *Reconciler) createDeployment(ctx context.Context, cr *myapp.MyCustomResource) (ctrl.Result, error) {
    deployment := &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      cr.Name,
            Namespace: cr.Namespace,
        },
        Spec: desiredDeploymentSpec(cr),
    }
    if err := r.Create(ctx, deployment); err != nil {
        return ctrl.Result{}, err
    }
    return ctrl.Result{Requeue: true}, nil
}

func (r *Reconciler) updateDeployment(ctx context.Context, cr *myapp.MyCustomResource, deployment *appsv1.Deployment) (ctrl.Result, error) {
    deployment.Spec = desiredDeploymentSpec(cr)
    if err := r.Update(ctx, deployment); err != nil {
        return ctrl.Result{}, err
    }
    return ctrl.Result{Requeue: true}, nil
}

更新状态

操作完成后,更新 CR 的状态:

func (r *Reconciler) updateStatus(ctx context.Context, cr *myapp.MyCustomResource) error {
    cr.Status.Phase = "Reconciled"
    cr.Status.ObservedGeneration = cr.Generation
    return r.Status().Update(ctx, cr)
}

错误处理和重试

实现错误处理和重试逻辑:

if err != nil {
    // 记录错误
    r.Log.Error(err, "Reconciliation failed")
    // 重新入队以便稍后重试
    return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 30}, err
}

返回结果

调谐函数返回一个 Result 和一个错误:

  • Result.Requeue: 是否需要重新入队。
  • Result.RequeueAfter: 多长时间后重新调谐。
  • 返回的错误: 如果非nil,控制器会自动重新入队。
return ctrl.Result{}, nil // 成功,无需立即重新调谐
// 或
return ctrl.Result{Requeue: true}, nil // 需要立即重新调谐
// 或
return ctrl.Result{RequeueAfter: time.Minute * 5}, nil // 5分钟后重新调谐

并发和线程安全

确保调谐逻辑是幂等的和线程安全的,因为可能会并发执行多个调谐循环。

性能考虑

  • 使用索引和缓存来提高查询效率。
  • 实现合理的重试策略,避免不必要的调谐。

监控和日志

在调谐过程中添加适当的日志和监控指标:

r.Log.Info("Reconciling MyCustomResource", "name", cr.Name, "namespace", cr.Namespace)
// ... 调谐逻辑 ...
r.Metrics.ReconcileCount.Inc() // 增加调谐计数指标

并发处理

使用乐观锁(Optimistic Locking)

Kubernetes 使用 ResourceVersion 来实现乐观锁。每次更新资源时,都应该包含资源的当前版本:

func (r *Reconciler) updateResource(ctx context.Context, resource *myapp.MyResource) error {
    // 获取当前资源版本
    currentResource := &myapp.MyResource{}
    if err := r.Get(ctx, types.NamespacedName{Name: resource.Name, Namespace: resource.Namespace}, currentResource); err != nil {
        return err
    }

    // 更新资源
    resource.ResourceVersion = currentResource.ResourceVersion
    if err := r.Update(ctx, resource); err != nil {
        if errors.IsConflict(err) {
            // 处理冲突,可能需要重试
            return &RetryError{Err: err}
        }
        return err
    }
    return nil
}

实现幂等操作

确保调谐操作是幂等的,即多次执行相同的操作不会产生不同的结果:

func (r *Reconciler) ensureDeployment(ctx context.Context, cr *myapp.MyCustomResource) error {
    deployment := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.Namespace}, deployment)
    if err != nil && errors.IsNotFound(err) {
        // 创建 Deployment
        newDeployment := r.newDeploymentForCR(cr)
        return r.Create(ctx, newDeployment)
    } else if err != nil {
        return err
    }

    // 检查是否需要更新
    if !reflect.DeepEqual(deployment.Spec, r.desiredDeploymentSpec(cr)) {
        deployment.Spec = r.desiredDeploymentSpec(cr)
        return r.Update(ctx, deployment)
    }

    return nil // 不需要任何操作

使用状态机

实现清晰的状态机来管理资源的生命周期,这有助于处理并发操作:


复制
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    cr := &myapp.MyCustomResource{}
    if err := r.Get(ctx, req.NamespacedName, cr); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    switch cr.Status.Phase {
    case "":
        return r.handleInitialPhase(ctx, cr)
    case "Pending":
        return r.handlePendingPhase(ctx, cr)
    case "Running":
        return r.handleRunningPhase(ctx, cr)
    // ... 其他阶段
    }

    return ctrl.Result{}, nil
}

使用 Finalizers

Finalizers 可以确保在资源被删除之前执行必要的清理操作:

const myFinalizerName = "myapp.example.com/finalizer"

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    cr := &myapp.MyCustomResource{}
    if err := r.Get(ctx, req.NamespacedName, cr); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    if cr.ObjectMeta.DeletionTimestamp.IsZero() {
        // 对象未被标记为删除,确保 finalizer 存在
        if !containsString(cr.ObjectMeta.Finalizers, myFinalizerName) {
            cr.ObjectMeta.Finalizers = append(cr.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, cr); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // 对象被标记为删除
        if containsString(cr.ObjectMeta.Finalizers, myFinalizerName) {
            if err := r.finalizeResource(ctx, cr); err != nil {
                return ctrl.Result{}, err
            }
            cr.ObjectMeta.Finalizers = removeString(cr.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, cr); err != nil {
                return ctrl.Result{}, err
            }
        }
        return ctrl.Result{}, nil
    }

    // ... 正常的调谐逻辑
}

使用 Owner References

Owner References 可以帮助管理资源之间的依赖关系,并确保子资源随父资源的删除而删除:

func (r *Reconciler) createChildResource(ctx context.Context, cr *myapp.MyCustomResource) error {
    child := &someapi.ChildResource{
        ObjectMeta: metav1.ObjectMeta{
            Name:      cr.Name + "-child",
            Namespace: cr.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(cr, myapp.GroupVersion.WithKind("MyCustomResource")),
            },
        },
        Spec: someapi.ChildResourceSpec{
            // ... 设置规格
        },
    }
    return r.Create(ctx, child)
}

实现合适的重试逻辑

对于可能因并发问题而失败的操作,实现适当的重试逻辑:

func (r *Reconciler) retryOperation(ctx context.Context, operation func() error) error {
    return retry.RetryOnConflict(retry.DefaultRetry, func() error {
        err := operation()
        if errors.IsConflict(err) {
            // 冲突时重试
            return err
        }
        return nil
    })
}

使用分布式锁

对于某些关键操作,可能需要使用分布式锁来确保在集群范围内的互斥:

import "github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"

func (r *Reconciler) performCriticalOperation(ctx context.Context, cr *myapp.MyCustomResource) error {
    // 创建一个锁对象
    lock := &coordinationv1.Lease{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "my-critical-operation-lock",
            Namespace: cr.Namespace,
        },
    }

    // 尝试获取锁
    err := r.Create(ctx, lock)
    if err != nil && !errors.IsAlreadyExists(err) {
        return err
    }

    if err == nil {
        // 成功获取锁,执行操作
        defer r.Delete(ctx, lock) // 确保操作完成后释放锁
        return r.doCriticalOperation(ctx, cr)
    }

    // 锁已被其他实例持有,可以选择等待或跳过
    return nil
}

Informer 机制

+----------------+     List/Watch     +-------------+
|                | <----------------> |             |
|   API Server   |                    |  Reflector  |
|                | <----------------> |             |
+----------------+     Watch Events   +------+------+
                                             |
                                             | Add Deltas
                                             |
                                             v
                       +--------------------+
                       |                    |
                       |     DeltaFIFO      |
                       |                    |
                       +--------------------+
                                 ^
                                 |
                                 | Pop Deltas
                                 |
              +------------------+-----------------+
              |                                    |
              |         Shared Informer            |
              |                                    |
              +------------------+-----------------+
                                 |
                   Update Cache  |  Trigger Handlers
                                 |
    +--------------------+     +-v-------------+
    |                    |     |               |
    |      Indexer       |     | Event Handlers|
    |     (Cache)        |     |               |
    |                    |     |               |
    +--------------------+     +---------------+
              ^                        ^
              |                        |
              |                        |
    +---------+------------------------+----------------+
    |                                                   |
    |               Custom Controller                   |
    |                                                   |
    |  +-------------------+  +----------------------+  |
    |  |   Reconcile Loop  |  |  Resource Event      |  |
    |  |                   |  |  Handlers            |  |
    |  | +---------------+ |  |                      |  |
    |  | | Get from Cache| |  | +------------------+ |  |
    |  | +---------------+ |  | | OnAdd Handler    | |  |
    |  |                   |  | +------------------+ |  |
    |  | +---------------+ |  | +------------------+ |  |
    |  | | Update Status | |  | | OnUpdate Handler | |  |
    |  | +---------------+ |  | +------------------+ |  |
    |  |                   |  | +------------------+ |  |
    |  | +---------------+ |  | | OnDelete Handler | |  |
    |  | | Create/Update | |  | +------------------+ |  |
    |  | +---------------+ |  |                      |  |
    |  |                   |  |                      |  |
    |  +-------------------+  +----------------------+  |
    |                                                   |
    +---------------------------------------------------+

Informer 机制是 Kubernetes 客户端中用于高效监视和缓存资源对象的核心组件。它允许客户端(如控制器或 Operator)以一种高效、可扩展的方式跟踪 Kubernetes 资源的变化。

Shared Informer:

  • 这是 client-go 库提供的标准 Informer 实现。
  • 它管理 Reflector、DeltaFIFO、和 Indexer。
  • 多个 controller 可以共享同一个 Informer,以减少资源消耗。

Custom Controller:

  • 这是自定义 controller。
  • 它包含两个主要部分:Reconcile Loop 和 Resource Event Handlers。

Reconcile Loop:

  • 这是 controller 的核心逻辑,负责协调资源的实际状态和期望状态。
  • 从 Indexer(缓存)中获取资源信息,而不是直接查询 API Server。
  • 执行必要的创建、更新或删除操作。
  • 更新资源的状态。

Resource Event Handlers:

  • 这些是回调函数,用于响应资源的添加、更新和删除事件。
  • 通常,这些处理器会将资源的 key 加入到工作队列中,以便 Reconcile Loop 处理。

工作流程:

  • Shared Informer 监听 API Server 的变化。
  • 当资源发生变化时,Shared Informer 更新本地缓存(Indexer)。
  • 同时,Shared Informer 触发相应的 Event Handler。
  • Event Handler 将资源的 key 加入工作队列。
  • Reconcile Loop 从工作队列中获取 key,然后从缓存中读取资源信息。
  • Reconcile Loop 执行必要的操作,如创建、更新或删除资源。
  • 操作完成后,Reconcile Loop 更新资源状态。

优势:

  • 减少对 API Server 的直接访问,提高性能。
  • 提供一致的资源视图。
  • 允许多个 controller 共享同一个 Informer,节省资源。
  • 支持事件驱动的编程模型。

错误处理和重试

在 Reconcile Loop 中正确处理错误和实现有效的重试机制是确保 controller 稳定性和可靠性的关键。以下是一些处理错误和实现重试机制的策略和最佳实践:

错误分类

首先,将错误分为不同的类别:

a) 临时错误:可能会自行解决的错误,如网络抖动。
b) 永久错误:需要人工干预的错误,如配置错误。
c) 不可恢复错误:无法解决的错误,如资源不存在。

实现重试逻辑

对于临时错误,实现指数退避重试:

import (
    "time"
    "k8s.io/apimachinery/pkg/util/wait"
    ctrl "sigs.k8s.io/controller-runtime"
)

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // ... 其他逻辑 ...

    err := wait.ExponentialBackoff(wait.Backoff{
        Duration: time.Second,
        Factor:   2,
        Jitter:   0.1,
        Steps:    5,
    }, func() (bool, error) {
        err := r.doSomething(ctx)
        if err != nil {
            // 检查是否是临时错误
            if isTemporaryError(err) {
                return false, nil // 继续重试
            }
            return false, err // 永久错误,停止重试
        }
        return true, nil // 成功,停止重试
    })

    if err != nil {
        // 处理最终错误
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

func isTemporaryError(err error) bool {
    // 实现逻辑来判断是否为临时错误
    // 例如,检查错误类型或错误消息
}

使用 Result.RequeueAfter

对于需要延迟重试的情况,使用 Result.RequeueAfter

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // ... 其他逻辑 ...

    if someCondition {
        // 5分钟后重新入队
        return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
    }

    return ctrl.Result{}, nil
}

错误记录和度量

记录错误并收集度量信息,以便于监控和调试:

import (
    "github.com/go-logr/logr"
    "sigs.k8s.io/controller-runtime/pkg/metrics"
    "github.com/prometheus/client_golang/prometheus"
)

var (
    reconcileErrors = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "reconcile_errors_total",
            Help: "Total number of reconciliation errors",
        },
        []string{"error_type"},
    )
)

func init() {
    metrics.Registry.MustRegister(reconcileErrors)
}

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := logr.FromContext(ctx)

    // ... 其他逻辑 ...

    if err != nil {
        log.Error(err, "Reconciliation error", "name", req.Name, "namespace", req.Namespace)
        reconcileErrors.WithLabelValues(categorizeError(err)).Inc()
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

func categorizeError(err error) string {
    // 实现错误分类逻辑
    // 返回 "temporary", "permanent", 或其他分类
}

状态更新和错误处理

在更新资源状态时处理错误:

func (r *MyReconciler) updateStatus(ctx context.Context, instance *myapiv1.MyResource, status string) error {
    instance.Status.State = status
    if err := r.Status().Update(ctx, instance); err != nil {
        return fmt.Errorf("failed to update status: %w", err)
    }
    return nil
}

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &myapiv1.MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // ... 主要逻辑 ...

    if err := r.updateStatus(ctx, instance, "Reconciled"); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

使用 finalizer 处理删除操作

确保在资源被删除前执行必要的清理操作:

const myFinalizerName = "myresource.finalizers.myapp.com"

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &myapiv1.MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    if instance.ObjectMeta.DeletionTimestamp.IsZero() {
        // 资源没有被标记为删除
        if !containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
            instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // 资源被标记为删除
        if containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
            if err := r.deleteExternalResources(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }

            instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }
        }
        return ctrl.Result{}, nil
    }

    // ... 主要逻辑 ...

    return ctrl.Result{}, nil
}

func (r *MyReconciler) deleteExternalResources(ctx context.Context, instance *myapiv1.MyResource) error {
    // 实现删除外部资源的逻辑
    return nil
}

处理并发更新

使用 Update 或 Patch 操作时,处理可能的并发更新冲突:

import "k8s.io/client-go/util/retry"

func (r *MyReconciler) updateResource(ctx context.Context, instance *myapiv1.MyResource) error {
    return retry.RetryOnConflict(retry.DefaultRetry, func() error {
        // 获取最新版本的资源
        if err := r.Get(ctx, client.ObjectKeyFromObject(instance), instance); err != nil {
            return err
        }

        // 进行必要的更新
        instance.Spec.SomeField = "new value"

        // 尝试更新
        if err := r.Update(ctx, instance); err != nil {
            return err
        }
        return nil
    })
}

状态更新最佳实践

使用子资源状态更新

Kubernetes 提供了 /status 子资源,允许你独立于主资源更新状态。这样可以分离状态更新和规格更新的权限,并减少更新冲突。

func (r *MyReconciler) updateStatus(ctx context.Context, instance *myapiv1.MyResource) error {
    return r.Status().Update(ctx, instance)
}

实现乐观并发控制

使用 retry.RetryOnConflict 来处理可能的并发更新冲突:

import (
    "k8s.io/client-go/util/retry"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *MyReconciler) updateStatus(ctx context.Context, instance *myapiv1.MyResource) error {
    return retry.RetryOnConflict(retry.DefaultRetry, func() error {
        // 获取最新版本的资源
        if err := r.Get(ctx, client.ObjectKeyFromObject(instance), instance); err != nil {
            return err
        }
        
        // 更新状态字段
        instance.Status.Phase = "Running"
        instance.Status.LastUpdateTime = metav1.Now()
        
        // 尝试更新状态
        return r.Status().Update(ctx, instance)
    })
}

使用部分更新(Patch)

对于大型资源或频繁更新的情况,考虑使用 Patch 操作而不是完整的 Update:

import "sigs.k8s.io/controller-runtime/pkg/client"

func (r *MyReconciler) patchStatus(ctx context.Context, instance *myapiv1.MyResource) error {
    patch := client.MergeFrom(instance.DeepCopy())
    instance.Status.Phase = "Running"
    instance.Status.LastUpdateTime = metav1.Now()
    return r.Status().Patch(ctx, instance, patch)
}

实现状态条件

使用条件(Conditions)来表示资源的详细状态,而不是简单的字符串状态:

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type MyResourceStatus struct {
    Conditions []metav1.Condition `json:"conditions,omitempty"`
}

func (r *MyReconciler) updateCondition(instance *myapiv1.MyResource, conditionType string, status metav1.ConditionStatus, reason, message string) {
    newCondition := metav1.Condition{
        Type:               conditionType,
        Status:             status,
        Reason:             reason,
        Message:            message,
        LastTransitionTime: metav1.Now(),
    }

    // 更新或添加条件
    for i, condition := range instance.Status.Conditions {
        if condition.Type == conditionType {
            if condition.Status != status {
                instance.Status.Conditions[i] = newCondition
            }
            return
        }
    }
    instance.Status.Conditions = append(instance.Status.Conditions, newCondition)
}

最小化状态更新

只在状态实际发生变化时才进行更新,以减少不必要的 API 调用:

func (r *MyReconciler) updateStatusIfChanged(ctx context.Context, instance *myapiv1.MyResource, newStatus myapiv1.MyResourceStatus) error {
    if !reflect.DeepEqual(instance.Status, newStatus) {
        instance.Status = newStatus
        return r.updateStatus(ctx, instance)
    }
    return nil
}

使用结构化日志记录状态变化

记录状态变化,以便于调试和监控:

import "github.com/go-logr/logr"

func (r *MyReconciler) logStatusChange(log logr.Logger, instance *myapiv1.MyResource, oldStatus, newStatus myapiv1.MyResourceStatus) {
    log.Info("Status changed",
        "name", instance.Name,
        "namespace", instance.Namespace,
        "oldPhase", oldStatus.Phase,
        "newPhase", newStatus.Phase,
    )
}

实现状态聚合

如果你的资源有子资源,考虑实现状态聚合:


复制
func (r *MyReconciler) aggregateStatus(ctx context.Context, instance *myapiv1.MyResource) error {
    // 获取子资源
    childList := &myapiv1.ChildResourceList{}
    if err := r.List(ctx, childList, client.InNamespace(instance.Namespace), client.MatchingFields{"parentName": instance.Name}); err != nil {
        return err
    }

    // 聚合子资源状态
    readyCount := 0
    for _, child := range childList.Items {
        if child.Status.Phase == "Ready" {
            readyCount++
        }
    }

    // 更新父资源状态
    instance.Status.ReadyChildren = readyCount
    instance.Status.TotalChildren = len(childList.Items)

    if readyCount == len(childList.Items) {
        instance.Status.Phase = "Ready"
    } else {
        instance.Status.Phase = "Progressing"
    }

    return r.updateStatus(ctx, instance)
}

实现状态校验

在更新状态之前,实现校验逻辑以确保状态的一致性:

func (r *MyReconciler) validateStatus(status *myapiv1.MyResourceStatus) error {
    if status.ReadyChildren > status.TotalChildren {
        return fmt.Errorf("ready children count cannot be greater than total children")
    }
    // 其他校验逻辑...
    return nil
}

使用 finalizer 确保状态更新

在资源被删除之前,使用 finalizer 确保最终状态被正确更新:

const myFinalizerName = "myresource.finalizers.example.com"

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &myapiv1.MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    if instance.ObjectMeta.DeletionTimestamp.IsZero() {
        // 资源未被标记为删除
        if !containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
            instance.ObjectMeta.Finalizers = append(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // 资源已被标记为删除
        if containsString(instance.ObjectMeta.Finalizers, myFinalizerName) {
            // 执行清理操作
            if err := r.finalizeResource(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }

            // 更新最终状态
            instance.Status.Phase = "Terminating"
            if err := r.updateStatus(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }

            // 移除 finalizer
            instance.ObjectMeta.Finalizers = removeString(instance.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(ctx, instance); err != nil {
                return ctrl.Result{}, err
            }
        }
        return ctrl.Result{}, nil
    }

    // ... 主要的 reconcile 逻辑 ...

    return ctrl.Result{}, nil
}

实现状态恢复机制

在 controller 重启或出现异常情况时,实现状态恢复机制:


func (r *MyReconciler) recoverStatus(ctx context.Context, instance *myapiv1.MyResource) error {
    // 检查实际状态并更新资源状态
    actualStatus, err := r.checkActualStatus(ctx, instance)
    if err != nil {
        return err
    }

    if !reflect.DeepEqual(instance.Status, actualStatus) {
        instance.Status = actualStatus
        return r.updateStatus(ctx, instance)
    }
    return nil
}

Status Conditions

状态条件(Status Conditions)是 Kubernetes 中表示资源详细状态的一种标准方式。它们提供了比简单的状态字符串更丰富的信息,使得资源的状态更易于理解和处理。以下是如何实现状态条件的详细说明:

定义状态条件

首先,在你的自定义资源(CRD)定义中添加条件字段:

import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

type MyResourceStatus struct {
    // 其他状态字段...
    Conditions []metav1.Condition `json:"conditions,omitempty"`
}

定义条件类型

为你的资源定义一组标准的条件类型:

const (
    TypeAvailable   = "Available"
    TypeProgressing = "Progressing"
    TypeDegraded    = "Degraded"
    // 其他条件类型...
)

实现更新条件的函数

创建一个辅助函数来更新或添加条件:

func setCondition(conditions *[]metav1.Condition, conditionType string, status metav1.ConditionStatus, reason, message string) {
    now := metav1.Now()
    for i := range *conditions {
        if (*conditions)[i].Type == conditionType {
            if (*conditions)[i].Status != status {
                (*conditions)[i].Status = status
                (*conditions)[i].LastTransitionTime = now
            }
            (*conditions)[i].Reason = reason
            (*conditions)[i].Message = message
            return
        }
    }

    // 如果条件不存在,添加新条件
    *conditions = append(*conditions, metav1.Condition{
        Type:               conditionType,
        Status:             status,
        LastTransitionTime: now,
        Reason:             reason,
        Message:            message,
    })
}

在 Reconcile Loop 中使用条件

在你的 Reconcile 函数中,根据资源的实际状态更新条件:

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &myapiv1.MyResource{}
    if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 检查资源是否可用
    isAvailable, err := r.checkAvailability(ctx, instance)
    if err != nil {
        return ctrl.Result{}, err
    }

    if isAvailable {
        setCondition(&instance.Status.Conditions, TypeAvailable, metav1.ConditionTrue, "ResourceAvailable", "The resource is available")
    } else {
        setCondition(&instance.Status.Conditions, TypeAvailable, metav1.ConditionFalse, "ResourceUnavailable", "The resource is not available")
    }

    // 检查资源是否正在进行中
    isProgressing, err := r.checkProgress(ctx, instance)
    if err != nil {
        return ctrl.Result{}, err
    }

    if isProgressing {
        setCondition(&instance.Status.Conditions, TypeProgressing, metav1.ConditionTrue, "InProgress", "The resource is being processed")
    } else {
        setCondition(&instance.Status.Conditions, TypeProgressing, metav1.ConditionFalse, "Completed", "The resource processing is complete")
    }

    // 更新状态
    if err := r.Status().Update(ctx, instance); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

实现获取条件的辅助函数

为了方便检查特定条件的状态,可以实现一个辅助函数:

func getCondition(conditions []metav1.Condition, conditionType string) *metav1.Condition {
    for i := range conditions {
        if conditions[i].Type == conditionType {
            return &conditions[i]
        }
    }
    return nil
}

使用条件来决定资源的整体状态

可以基于多个条件来决定资源的整体状态:

func (r *MyReconciler) determineOverallStatus(instance *myapiv1.MyResource) {
    availableCondition := getCondition(instance.Status.Conditions, TypeAvailable)
    progressingCondition := getCondition(instance.Status.Conditions, TypeProgressing)
    degradedCondition := getCondition(instance.Status.Conditions, TypeDegraded)

    if availableCondition != nil && availableCondition.Status == metav1.ConditionTrue {
        instance.Status.Phase = "Available"
    } else if degradedCondition != nil && degradedCondition.Status == metav1.ConditionTrue {
        instance.Status.Phase = "Degraded"
    } else if progressingCondition != nil && progressingCondition.Status == metav1.ConditionTrue {
        instance.Status.Phase = "Progressing"
    } else {
        instance.Status.Phase = "Unknown"
    }
}

实现条件的观察者模式

如果你的资源有子资源或依赖其他资源,可以实现一个观察者模式来传播条件:

func (r *MyReconciler) propagateConditions(ctx context.Context, instance *myapiv1.MyResource) error {
    // 获取依赖资源
    dep := &appsv1.Deployment{}
    err := r.Get(ctx, types.NamespacedName{Name: instance.Spec.DeploymentName, Namespace: instance.Namespace}, dep)
    if err != nil {
        return err
    }

    // 检查 Deployment 的条件并传播到自定义资源
    for _, cond := range dep.Status.Conditions {
        switch cond.Type {
        case appsv1.DeploymentAvailable:
            setCondition(&instance.Status.Conditions, TypeAvailable, cond.Status, cond.Reason, cond.Message)
        case appsv1.DeploymentProgressing:
            setCondition(&instance.Status.Conditions, TypeProgressing, cond.Status, cond.Reason, cond.Message)
        }
    }

    return nil
}

实现条件的 TTL(Time to Live)

对于某些条件,可能需要实现 TTL 机制,以确保旧的条件不会永久存在:

func (r *MyReconciler) cleanupOldConditions(instance *myapiv1.MyResource) {
    now := time.Now()
    for i := len(instance.Status.Conditions) - 1; i >= 0; i-- {
        cond := instance.Status.Conditions[i]
        if now.Sub(cond.LastTransitionTime.Time) > 24*time.Hour {
            // 移除超过24小时的条件
            instance.Status.Conditions = append(instance.Status.Conditions[:i], instance.Status.Conditions[i+1:]...)
        }
    }
}

参考

[1] Operator 白皮书

[2] Kubernetes-Operator:扩展Kubernetes API Resource与Custom Controller (上)

[3] Kubernetes Operator 开发教程

[4] Kubernetes Controller 机制详解(一)

[5] Kubernetes Operator