diff --git a/config/crds/v1/all-crds.yaml b/config/crds/v1/all-crds.yaml index 34a0c1fffb..d36a9c1e37 100644 --- a/config/crds/v1/all-crds.yaml +++ b/config/crds/v1/all-crds.yaml @@ -4661,9 +4661,17 @@ spec: type: array podDisruptionBudget: description: |- - PodDisruptionBudget provides access to the default Pod disruption budget for the Elasticsearch cluster. - The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. - In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + PodDisruptionBudget provides access to the default Pod disruption budget(s) for the Elasticsearch cluster. + The behavior depends on the license level. + With a Basic license: + The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. + In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + With an Enterprise license: + The default budget is split into multiple budgets, each targeting a specific node role type allowing additional disruptions + for certain roles according to the health status of the cluster. + Example: + All data roles (excluding frozen): allows disruptions only when the cluster is green. + All other roles: allows disruptions only when the cluster is yellow or green. To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). properties: metadata: diff --git a/config/crds/v1/resources/elasticsearch.k8s.elastic.co_elasticsearches.yaml b/config/crds/v1/resources/elasticsearch.k8s.elastic.co_elasticsearches.yaml index cf14063fd0..43f86d67f2 100644 --- a/config/crds/v1/resources/elasticsearch.k8s.elastic.co_elasticsearches.yaml +++ b/config/crds/v1/resources/elasticsearch.k8s.elastic.co_elasticsearches.yaml @@ -9239,9 +9239,17 @@ spec: type: array podDisruptionBudget: description: |- - PodDisruptionBudget provides access to the default Pod disruption budget for the Elasticsearch cluster. - The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. - In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + PodDisruptionBudget provides access to the default Pod disruption budget(s) for the Elasticsearch cluster. + The behavior depends on the license level. + With a Basic license: + The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. + In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + With an Enterprise license: + The default budget is split into multiple budgets, each targeting a specific node role type allowing additional disruptions + for certain roles according to the health status of the cluster. + Example: + All data roles (excluding frozen): allows disruptions only when the cluster is green. + All other roles: allows disruptions only when the cluster is yellow or green. To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). properties: metadata: diff --git a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml index f394c61306..9a923864bd 100644 --- a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml +++ b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml @@ -4703,9 +4703,17 @@ spec: type: array podDisruptionBudget: description: |- - PodDisruptionBudget provides access to the default Pod disruption budget for the Elasticsearch cluster. - The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. - In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + PodDisruptionBudget provides access to the default Pod disruption budget(s) for the Elasticsearch cluster. + The behavior depends on the license level. + With a Basic license: + The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. + In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + With an Enterprise license: + The default budget is split into multiple budgets, each targeting a specific node role type allowing additional disruptions + for certain roles according to the health status of the cluster. + Example: + All data roles (excluding frozen): allows disruptions only when the cluster is green. + All other roles: allows disruptions only when the cluster is yellow or green. To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). properties: metadata: diff --git a/docs/reference/api-docs.md b/docs/reference/api-docs.md index 433645cfd3..0070b4ae3f 100644 --- a/docs/reference/api-docs.md +++ b/docs/reference/api-docs.md @@ -10,4 +10,4 @@ applies_to: # {{eck}} API Reference (moved) -This page has moved [here](./api-reference/index.md). \ No newline at end of file +This page has moved [here](./api-reference/index.md). diff --git a/docs/reference/api-reference/main.md b/docs/reference/api-reference/main.md index 4fa23902f7..73aa4e6cd2 100644 --- a/docs/reference/api-reference/main.md +++ b/docs/reference/api-reference/main.md @@ -1093,7 +1093,7 @@ ElasticsearchSpec holds the specification of an Elasticsearch cluster. | *`transport`* __[TransportConfig](#transportconfig)__ | Transport holds transport layer settings for Elasticsearch. | | *`nodeSets`* __[NodeSet](#nodeset) array__ | NodeSets allow specifying groups of Elasticsearch nodes sharing the same configuration and Pod templates. | | *`updateStrategy`* __[UpdateStrategy](#updatestrategy)__ | UpdateStrategy specifies how updates to the cluster should be performed. | -| *`podDisruptionBudget`* __[PodDisruptionBudgetTemplate](#poddisruptionbudgettemplate)__ | PodDisruptionBudget provides access to the default Pod disruption budget for the Elasticsearch cluster.
The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`.
In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1.
To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). | +| *`podDisruptionBudget`* __[PodDisruptionBudgetTemplate](#poddisruptionbudgettemplate)__ | PodDisruptionBudget provides access to the default Pod disruption budget(s) for the Elasticsearch cluster.
The behavior depends on the license level.
With a Basic license:
The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`.
In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1.
With an Enterprise license:
The default budget is split into multiple budgets, each targeting a specific node role type allowing additional disruptions
for certain roles according to the health status of the cluster.
Example:
All data roles (excluding frozen): allows disruptions only when the cluster is green.
All other roles: allows disruptions only when the cluster is yellow or green.
To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). | | *`auth`* __[Auth](#auth)__ | Auth contains user authentication and authorization security settings for Elasticsearch. | | *`secureSettings`* __[SecretSource](#secretsource) array__ | SecureSettings is a list of references to Kubernetes secrets containing sensitive configuration options for Elasticsearch. | | *`serviceAccountName`* __string__ | ServiceAccountName is used to check access from the current resource to a resource (for ex. a remote Elasticsearch cluster) in a different namespace.
Can only be used if ECK is enforcing RBAC on references. | diff --git a/pkg/apis/elasticsearch/v1/elasticsearch_config.go b/pkg/apis/elasticsearch/v1/elasticsearch_config.go index 52a39dbe6e..be7c0f3561 100644 --- a/pkg/apis/elasticsearch/v1/elasticsearch_config.go +++ b/pkg/apis/elasticsearch/v1/elasticsearch_config.go @@ -16,6 +16,7 @@ import ( type NodeRole string const ( + CoordinatingRole NodeRole = "" DataColdRole NodeRole = "data_cold" DataContentRole NodeRole = "data_content" DataFrozenRole NodeRole = "data_frozen" @@ -129,6 +130,8 @@ func (n *Node) IsConfiguredWithRole(role NodeRole) bool { return ptr.Deref(n.Transform, n.IsConfiguredWithRole(DataRole)) case VotingOnlyRole: return ptr.Deref(n.VotingOnly, false) + case CoordinatingRole: + return n.Roles != nil && len(n.Roles) == 0 } // This point should never be reached. The default is to assume that a node has all roles except voting_only. diff --git a/pkg/apis/elasticsearch/v1/elasticsearch_types.go b/pkg/apis/elasticsearch/v1/elasticsearch_types.go index 7744c589b8..a6af657dc9 100644 --- a/pkg/apis/elasticsearch/v1/elasticsearch_types.go +++ b/pkg/apis/elasticsearch/v1/elasticsearch_types.go @@ -103,9 +103,17 @@ type ElasticsearchSpec struct { // +kubebuilder:validation:Optional UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"` - // PodDisruptionBudget provides access to the default Pod disruption budget for the Elasticsearch cluster. - // The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. - // In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + // PodDisruptionBudget provides access to the default Pod disruption budget(s) for the Elasticsearch cluster. + // The behavior depends on the license level. + // With a Basic license: + // The default budget doesn't allow any Pod to be removed in case the cluster is not green or if there is only one node of type `data` or `master`. + // In all other cases the default PodDisruptionBudget sets `minUnavailable` equal to the total number of nodes minus 1. + // With an Enterprise license: + // The default budget is split into multiple budgets, each targeting a specific node role type allowing additional disruptions + // for certain roles according to the health status of the cluster. + // Example: + // All data roles (excluding frozen): allows disruptions only when the cluster is green. + // All other roles: allows disruptions only when the cluster is yellow or green. // To disable, set `PodDisruptionBudget` to the empty value (`{}` in YAML). // +kubebuilder:validation:Optional PodDisruptionBudget *commonv1.PodDisruptionBudgetTemplate `json:"podDisruptionBudget,omitempty"` diff --git a/pkg/apis/elasticsearch/v1/name.go b/pkg/apis/elasticsearch/v1/name.go index c7f7290976..bcd747934c 100644 --- a/pkg/apis/elasticsearch/v1/name.go +++ b/pkg/apis/elasticsearch/v1/name.go @@ -200,3 +200,13 @@ func StackConfigAdditionalSecretName(esName string, secretName string) string { secretNameHash := hash.HashObject(secretName) return ESNamer.Suffix(esName, "scp", secretNameHash) } + +// PodDisruptionBudgetNameForRole returns the name of the PodDisruptionBudget for a given Elasticsearch cluster name and role. +func PodDisruptionBudgetNameForRole(esName string, role string) string { + name := DefaultPodDisruptionBudget(esName) + "-" + role + // For coordinating nodes (no roles), append "coordinating" to the name + if role == "" { + name += "coordinating" + } + return name +} diff --git a/pkg/controller/common/statefulset/fixtures.go b/pkg/controller/common/statefulset/fixtures.go index 9fb85d1dcb..987232aa81 100644 --- a/pkg/controller/common/statefulset/fixtures.go +++ b/pkg/controller/common/statefulset/fixtures.go @@ -16,16 +16,24 @@ import ( ) type TestSset struct { - Namespace string - Name string - ClusterName string - Version string - Replicas int32 - Master bool - Data bool - Ingest bool - Status appsv1.StatefulSetStatus - ResourceVersion string + Namespace string + Name string + ClusterName string + Version string + Replicas int32 + Master bool + Data bool + Ingest bool + ML bool + Transform bool + RemoteClusterClient bool + DataHot bool + DataWarm bool + DataCold bool + DataContent bool + DataFrozen bool + Status appsv1.StatefulSetStatus + ResourceVersion string } func (t TestSset) Pods() []client.Object { @@ -54,6 +62,14 @@ func (t TestSset) Build() appsv1.StatefulSet { label.NodeTypesMasterLabelName.Set(t.Master, labels) label.NodeTypesDataLabelName.Set(t.Data, labels) label.NodeTypesIngestLabelName.Set(t.Ingest, labels) + label.NodeTypesMLLabelName.Set(t.ML, labels) + label.NodeTypesTransformLabelName.Set(t.Transform, labels) + label.NodeTypesRemoteClusterClientLabelName.Set(t.RemoteClusterClient, labels) + label.NodeTypesDataHotLabelName.Set(t.DataHot, labels) + label.NodeTypesDataWarmLabelName.Set(t.DataWarm, labels) + label.NodeTypesDataColdLabelName.Set(t.DataCold, labels) + label.NodeTypesDataContentLabelName.Set(t.DataContent, labels) + label.NodeTypesDataFrozenLabelName.Set(t.DataFrozen, labels) statefulSet := appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: t.Name, @@ -86,19 +102,27 @@ func (t TestSset) BuildPtr() *appsv1.StatefulSet { } type TestPod struct { - Namespace string - Name string - ClusterName string - StatefulSetName string - Version string - Revision string - Master bool - Data bool - Ingest bool - Ready bool - RestartCount int32 - Phase corev1.PodPhase - ResourceVersion string + Namespace string + Name string + ClusterName string + StatefulSetName string + Version string + Revision string + Master bool + Data bool + Ingest bool + ML bool + Transform bool + RemoteClusterClient bool + DataHot bool + DataWarm bool + DataCold bool + DataContent bool + DataFrozen bool + Ready bool + RestartCount int32 + Phase corev1.PodPhase + ResourceVersion string } func (t TestPod) Build() corev1.Pod { @@ -111,6 +135,14 @@ func (t TestPod) Build() corev1.Pod { label.NodeTypesMasterLabelName.Set(t.Master, labels) label.NodeTypesDataLabelName.Set(t.Data, labels) label.NodeTypesIngestLabelName.Set(t.Ingest, labels) + label.NodeTypesMLLabelName.Set(t.ML, labels) + label.NodeTypesTransformLabelName.Set(t.Transform, labels) + label.NodeTypesRemoteClusterClientLabelName.Set(t.RemoteClusterClient, labels) + label.NodeTypesDataHotLabelName.Set(t.DataHot, labels) + label.NodeTypesDataWarmLabelName.Set(t.DataWarm, labels) + label.NodeTypesDataColdLabelName.Set(t.DataCold, labels) + label.NodeTypesDataContentLabelName.Set(t.DataContent, labels) + label.NodeTypesDataFrozenLabelName.Set(t.DataFrozen, labels) status := corev1.PodStatus{ // assume Running by default diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 4252bee69d..765cae3ba9 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -133,7 +133,7 @@ func (d *defaultDriver) reconcileNodeSpecs( } // Update PDB to account for new replicas. - if err := pdb.Reconcile(ctx, d.Client, d.ES, actualStatefulSets, meta); err != nil { + if err := pdb.Reconcile(ctx, d.Client, d.ES, actualStatefulSets, expectedResources, meta); err != nil { return results.WithError(err) } diff --git a/pkg/controller/elasticsearch/elasticsearch_controller.go b/pkg/controller/elasticsearch/elasticsearch_controller.go index 33ccffa7a5..2cb0d1e813 100644 --- a/pkg/controller/elasticsearch/elasticsearch_controller.go +++ b/pkg/controller/elasticsearch/elasticsearch_controller.go @@ -13,6 +13,7 @@ import ( "go.elastic.co/apm/v2" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -108,6 +109,12 @@ func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileElasti return err } + // Watch PodDisruptionBudgets + if err := c.Watch( + source.Kind(mgr.GetCache(), &policyv1.PodDisruptionBudget{}, handler.TypedEnqueueRequestForOwner[*policyv1.PodDisruptionBudget](mgr.GetScheme(), mgr.GetRESTMapper(), &esv1.Elasticsearch{}, handler.OnlyControllerOwner()))); err != nil { + return err + } + // Watch owned and soft-owned secrets if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, r.dynamicWatches.Secrets)); err != nil { return err diff --git a/pkg/controller/elasticsearch/label/label.go b/pkg/controller/elasticsearch/label/label.go index 25912449fc..5d49118864 100644 --- a/pkg/controller/elasticsearch/label/label.go +++ b/pkg/controller/elasticsearch/label/label.go @@ -85,7 +85,7 @@ func IsMasterNodeSet(statefulSet appsv1.StatefulSet) bool { // IsDataNodeSet returns true if the given StatefulSet specifies data nodes. func IsDataNodeSet(statefulSet appsv1.StatefulSet) bool { - return NodeTypesDataLabelName.HasValue(true, statefulSet.Spec.Template.Labels) + return NodeTypesDataLabelName.HasValue(true, statefulSet.Spec.Template.Labels) || NodeTypesDataHotLabelName.HasValue(true, statefulSet.Spec.Template.Labels) || NodeTypesDataColdLabelName.HasValue(true, statefulSet.Spec.Template.Labels) || NodeTypesDataContentLabelName.HasValue(true, statefulSet.Spec.Template.Labels) || NodeTypesDataWarmLabelName.HasValue(true, statefulSet.Spec.Template.Labels) } // IsIngestNodeSet returns true if the given StatefulSet specifies ingest nodes. diff --git a/pkg/controller/elasticsearch/pdb/fixtures.go b/pkg/controller/elasticsearch/pdb/fixtures.go new file mode 100644 index 0000000000..7d8a6eaae0 --- /dev/null +++ b/pkg/controller/elasticsearch/pdb/fixtures.go @@ -0,0 +1,183 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pdb + +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" +) + +// Builder helps create test fixtures for the Elasticsearch PDB tests. +type Builder struct { + Elasticsearch esv1.Elasticsearch + StatefulSets []appsv1.StatefulSet +} + +// NewBuilder creates a new Builder with default values. +func NewBuilder(name string) Builder { + return Builder{ + Elasticsearch: esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: esv1.ElasticsearchSpec{ + Version: "9.0.1", + NodeSets: []esv1.NodeSet{}, + }, + }, + StatefulSets: []appsv1.StatefulSet{}, + } +} + +// WithNamespace sets the namespace for the Elasticsearch resource. +func (b Builder) WithNamespace(namespace string) Builder { + b.Elasticsearch.Namespace = namespace + return b +} + +// WithVersion sets the version for the Elasticsearch resource. +func (b Builder) WithVersion(version string) Builder { + b.Elasticsearch.Spec.Version = version + return b +} + +// WithNodeSet adds a NodeSet to the Elasticsearch spec. +func (b Builder) WithNodeSet(name string, count int32, nodeTypes ...esv1.NodeRole) Builder { + config := map[string]interface{}{} + + // Only set node.Roles if the first role is not "all_roles" + // to properly handle no roles set to equal having all roles assigned. + if !(len(nodeTypes) == 1 && nodeTypes[0] == "all_roles") { + // This handles the 'coordinating' role properly. + config["node.roles"] = []esv1.NodeRole{} + for _, nodeType := range nodeTypes { + if string(nodeType) != "" { + config["node.roles"] = append(config["node.roles"].([]esv1.NodeRole), nodeType) //nolint:forcetypeassert + } + } + } + + nodeset := esv1.NodeSet{ + Name: name, + Count: count, + Config: &v1.Config{ + Data: config, + }, + } + + b.Elasticsearch.Spec.NodeSets = append(b.Elasticsearch.Spec.NodeSets, nodeset) + + // Create a corresponding StatefulSet + sset := b.buildStatefulSet(name, count, nodeTypes) + b.StatefulSets = append(b.StatefulSets, sset) + + return b +} + +// buildStatefulSet creates a StatefulSet based on the given parameters. +func (b Builder) buildStatefulSet(name string, replicas int32, nodeRoles []esv1.NodeRole) appsv1.StatefulSet { + sset := statefulset.TestSset{ + Namespace: b.Elasticsearch.Namespace, + Name: name, + ClusterName: b.Elasticsearch.Name, + Version: b.Elasticsearch.Spec.Version, + Replicas: replicas, + } + + // Set node roles based on nodeRoles + for _, nodeRole := range nodeRoles { + switch nodeRole { + case esv1.MasterRole: + sset.Master = true + case esv1.DataRole: + sset.Data = true + case esv1.IngestRole: + sset.Ingest = true + case esv1.MLRole: + sset.ML = true + case esv1.TransformRole: + sset.Transform = true + case esv1.RemoteClusterClientRole: + sset.RemoteClusterClient = true + case esv1.DataHotRole: + sset.DataHot = true + case esv1.DataWarmRole: + sset.DataWarm = true + case esv1.DataColdRole: + sset.DataCold = true + case esv1.DataContentRole: + sset.DataContent = true + case esv1.DataFrozenRole: + sset.DataFrozen = true + case esv1.CoordinatingRole: + continue + case esv1.VotingOnlyRole: + continue + } + } + + return sset.Build() +} + +// WithStatefulSet adds a custom StatefulSet to the builder. +func (b Builder) WithStatefulSet(sset appsv1.StatefulSet) Builder { + b.StatefulSets = append(b.StatefulSets, sset) + return b +} + +// BuildResourcesList generates a nodespec.ResourcesList from the builder data. +// This allows the tests to properly unpack the Config object for a nodeSet +// and use the Node.Roles directly. +func (b Builder) BuildResourcesList() (nodespec.ResourcesList, error) { + v, err := version.Parse(b.Elasticsearch.Spec.Version) + if err != nil { + return nil, err + } + + resourcesList := make(nodespec.ResourcesList, 0, len(b.StatefulSets)) + + for i, sset := range b.StatefulSets { + // Create config based on the nodeset if available + config := &v1.Config{Data: map[string]interface{}{}} + if i < len(b.Elasticsearch.Spec.NodeSets) { + config = b.Elasticsearch.Spec.NodeSets[i].Config + } + + cfg, err := settings.NewMergedESConfig( + b.Elasticsearch.Name, + v, + corev1.IPv4Protocol, + b.Elasticsearch.Spec.HTTP, + *config, + nil, + false, + false, + ) + if err != nil { + return nil, err + } + + resourcesList = append(resourcesList, nodespec.Resources{ + NodeSet: sset.Name, + StatefulSet: sset, + Config: cfg, + }) + } + + return resourcesList, nil +} + +func (b Builder) GetStatefulSets() []appsv1.StatefulSet { + return b.StatefulSets +} diff --git a/pkg/controller/elasticsearch/pdb/reconcile.go b/pkg/controller/elasticsearch/pdb/reconcile_default.go similarity index 66% rename from pkg/controller/elasticsearch/pdb/reconcile.go rename to pkg/controller/elasticsearch/pdb/reconcile_default.go index b335af0e10..0af820cd53 100644 --- a/pkg/controller/elasticsearch/pdb/reconcile.go +++ b/pkg/controller/elasticsearch/pdb/reconcile_default.go @@ -6,30 +6,56 @@ package pdb import ( "context" + "fmt" policyv1 "k8s.io/api/policy/v1" - policyv1beta1 "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/hash" + lic "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) -// Reconcile ensures that a PodDisruptionBudget exists for this cluster, inheriting the spec content. -// The default PDB we setup dynamically adapts MinAvailable to the number of nodes in the cluster. +// Reconcile ensures that PodDisruptionBudget(s) exists for this cluster, inheriting the spec content. +// 1. Without an enterprise license: The default PDB we setup dynamically adapts MinAvailable to the number of nodes in the cluster. +// 2. With an enterprise license: We optimize the PDBs that we setup to speed up Kubernetes cluster operations such as upgrades as much +// as safely possible by grouping statefulSets by associated Elasticsearch node roles into the same PDB, and then dynamically setting +// maxUnavailable according to whatever cluster health is optimal for the set of roles. +// 3. In the case of an expired enterprise license, the PDBs will revert back to a single PDB that covers the whole cluster. +// // If the spec has disabled the default PDB, it will ensure none exist. -func Reconcile(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, statefulSets sset.StatefulSetList, meta metadata.Metadata) error { +func Reconcile(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, statefulSets sset.StatefulSetList, resources nodespec.ResourcesList, meta metadata.Metadata) error { + licenseChecker := lic.NewLicenseChecker(k8sClient, es.Namespace) + enterpriseEnabled, err := licenseChecker.EnterpriseFeaturesEnabled(ctx) + if err != nil { + return fmt.Errorf("while checking license during pdb reconciliation: %w", err) + } + if enterpriseEnabled { + return reconcileRoleSpecificPDBs(ctx, k8sClient, es, statefulSets, resources, meta) + } + + return reconcileDefaultPDB(ctx, k8sClient, es, statefulSets, meta) +} + +// reconcileDefaultPDB reconciles the default PDB for non-enterprise users. +func reconcileDefaultPDB( + ctx context.Context, + k8sClient k8s.Client, + es esv1.Elasticsearch, + statefulSets sset.StatefulSetList, + meta metadata.Metadata, +) error { expected, err := expectedPDB(es, statefulSets, meta) if err != nil { return err @@ -38,48 +64,32 @@ func Reconcile(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, return deleteDefaultPDB(ctx, k8sClient, es) } + return reconcilePDB(ctx, k8sClient, es, expected) +} + +// reconcilePDB reconciles a single PDB, handling both v1 and v1beta1 versions. +func reconcilePDB( + ctx context.Context, + k8sClient k8s.Client, + es esv1.Elasticsearch, + expected *policyv1.PodDisruptionBudget, +) error { // label the PDB with a hash of its content, for comparison purposes expected.Labels = hash.SetTemplateHashLabel(expected.Labels, expected) - v1Available, err := isPDBV1Available(k8sClient) - if err != nil { - return err - } - - if v1Available { - reconciled := &policyv1.PodDisruptionBudget{} - return reconciler.ReconcileResource( - reconciler.Params{ - Context: ctx, - Client: k8sClient, - Owner: &es, - Expected: expected, - Reconciled: reconciled, - NeedsUpdate: func() bool { - return hash.GetTemplateHashLabel(expected.Labels) != hash.GetTemplateHashLabel(reconciled.Labels) - }, - UpdateReconciled: func() { - expected.DeepCopyInto(reconciled) - }, - }, - ) - } - - // Fall back to v1beta1 - reconciled := &policyv1beta1.PodDisruptionBudget{} - converted := convert(expected) + reconciled := &policyv1.PodDisruptionBudget{} return reconciler.ReconcileResource( reconciler.Params{ Context: ctx, Client: k8sClient, Owner: &es, - Expected: converted, + Expected: expected, Reconciled: reconciled, NeedsUpdate: func() bool { - return hash.GetTemplateHashLabel(converted.Labels) != hash.GetTemplateHashLabel(reconciled.Labels) + return hash.GetTemplateHashLabel(expected.Labels) != hash.GetTemplateHashLabel(reconciled.Labels) }, UpdateReconciled: func() { - converted.DeepCopyInto(reconciled) + expected.DeepCopyInto(reconciled) }, }, ) @@ -87,30 +97,16 @@ func Reconcile(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, // deleteDefaultPDB deletes the default pdb if it exists. func deleteDefaultPDB(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch) error { - // we do this by getting first because that is a local cache read, - // versus a Delete call, which would hit the API. - - v1Available, err := isPDBV1Available(k8sClient) - if err != nil { - return err - } - var pdb client.Object - if v1Available { - pdb = &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: es.Namespace, - Name: esv1.DefaultPodDisruptionBudget(es.Name), - }, - } - } else { - pdb = &policyv1beta1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: es.Namespace, - Name: esv1.DefaultPodDisruptionBudget(es.Name), - }, - } + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: es.Namespace, + Name: esv1.DefaultPodDisruptionBudget(es.Name), + }, } + return deletePDB(ctx, k8sClient, pdb) +} +func deletePDB(ctx context.Context, k8sClient k8s.Client, pdb *policyv1.PodDisruptionBudget) error { if err := k8sClient.Get(ctx, k8s.ExtractNamespacedName(pdb), pdb); err != nil && !apierrors.IsNotFound(err) { return err } else if apierrors.IsNotFound(err) { @@ -170,7 +166,7 @@ func buildPDBSpec(es esv1.Elasticsearch, statefulSets sset.StatefulSetList) poli // compute MinAvailable based on the maximum number of Pods we're supposed to have nodeCount := statefulSets.ExpectedNodeCount() // maybe allow some Pods to be disrupted - minAvailable := nodeCount - allowedDisruptions(es, statefulSets) + minAvailable := nodeCount - allowedDisruptionsForSinglePDB(es, statefulSets) minAvailableIntStr := intstr.IntOrString{Type: intstr.Int, IntVal: minAvailable} @@ -188,8 +184,9 @@ func buildPDBSpec(es esv1.Elasticsearch, statefulSets sset.StatefulSetList) poli } } -// allowedDisruptions returns the number of Pods that we allow to be disrupted while keeping the cluster healthy. -func allowedDisruptions(es esv1.Elasticsearch, actualSsets sset.StatefulSetList) int32 { +// allowedDisruptionsForSinglePDB returns the number of Pods that we allow to be disrupted while keeping the cluster healthy +// when there is a single PodDisruptionBudget that encompasses a whole Elasticsearch cluster. +func allowedDisruptionsForSinglePDB(es esv1.Elasticsearch, actualSsets sset.StatefulSetList) int32 { if actualSsets.ExpectedNodeCount() == 1 { // single node cluster (not highly-available) // allow the node to be disrupted to ensure K8s nodes operations can be performed diff --git a/pkg/controller/elasticsearch/pdb/reconcile_test.go b/pkg/controller/elasticsearch/pdb/reconcile_default_test.go similarity index 76% rename from pkg/controller/elasticsearch/pdb/reconcile_test.go rename to pkg/controller/elasticsearch/pdb/reconcile_default_test.go index 426d0ffb7b..dc880babb7 100644 --- a/pkg/controller/elasticsearch/pdb/reconcile_test.go +++ b/pkg/controller/elasticsearch/pdb/reconcile_default_test.go @@ -32,30 +32,31 @@ import ( es_sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" ) -func TestReconcile(t *testing.T) { - defaultPDB := func() *policyv1.PodDisruptionBudget { - return &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: esv1.DefaultPodDisruptionBudget("cluster"), - Namespace: "ns", - Labels: map[string]string{label.ClusterNameLabelName: "cluster", commonv1.TypeLabelName: label.Type}, - }, - Spec: policyv1.PodDisruptionBudgetSpec{ - MinAvailable: intStrPtr(intstr.FromInt(3)), - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - label.ClusterNameLabelName: "cluster", - }, +func defaultPDB() *policyv1.PodDisruptionBudget { + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: esv1.DefaultPodDisruptionBudget("cluster"), + Namespace: "ns", + Labels: map[string]string{label.ClusterNameLabelName: "cluster", commonv1.TypeLabelName: label.Type}, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: intStrPtr(intstr.FromInt(3)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + label.ClusterNameLabelName: "cluster", }, - MaxUnavailable: nil, }, - } + MaxUnavailable: nil, + }, } - defaultEs := esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}} +} + +func TestReconcile(t *testing.T) { + defaultEs := esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}, Spec: esv1.ElasticsearchSpec{Version: "9.0.1"}} type args struct { - initObjs []client.Object - es esv1.Elasticsearch - statefulSets es_sset.StatefulSetList + initObjs []client.Object + es esv1.Elasticsearch + builder Builder } tests := []struct { name string @@ -65,26 +66,35 @@ func TestReconcile(t *testing.T) { { name: "no existing pdb: should create one", args: args{ - es: defaultEs, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, wantPDB: defaultPDB(), }, { name: "pdb already exists: should remain unmodified", args: args{ - initObjs: []client.Object{withHashLabel(withOwnerRef(defaultPDB(), defaultEs))}, - es: defaultEs, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + initObjs: []client.Object{withHashLabel(withOwnerRef(defaultPDB(), defaultEs))}, + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, wantPDB: defaultPDB(), }, { name: "pdb needs a MinAvailable update", args: args{ - initObjs: []client.Object{defaultPDB()}, - es: defaultEs, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 5, Master: true, Data: true}.Build()}, + initObjs: []client.Object{defaultPDB()}, + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 5, esv1.MasterRole, esv1.DataRole), }, wantPDB: &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ @@ -111,7 +121,10 @@ func TestReconcile(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}, Spec: esv1.ElasticsearchSpec{PodDisruptionBudget: &commonv1.PodDisruptionBudgetTemplate{}}, }, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, wantPDB: nil, }, @@ -133,7 +146,12 @@ func TestReconcile(t *testing.T) { WithRESTMapper(restMapper). WithObjects(tt.args.initObjs...).Build() - err := Reconcile(context.Background(), k8sClient, tt.args.es, tt.args.statefulSets, metadata.Propagate(&tt.args.es, metadata.Metadata{Labels: tt.args.es.GetIdentityLabels()})) + resourcesList, err := tt.args.builder.BuildResourcesList() + require.NoError(t, err) + + statefulSets := tt.args.builder.GetStatefulSets() + + err = Reconcile(context.Background(), k8sClient, tt.args.es, statefulSets, resourcesList, metadata.Propagate(&tt.args.es, metadata.Metadata{Labels: tt.args.es.GetIdentityLabels()})) require.NoError(t, err) pdbNsn := types.NamespacedName{Namespace: tt.args.es.Namespace, Name: esv1.DefaultPodDisruptionBudget(tt.args.es.Name)} var retrieved policyv1.PodDisruptionBudget @@ -168,8 +186,8 @@ func intStrPtr(intStr intstr.IntOrString) *intstr.IntOrString { func Test_expectedPDB(t *testing.T) { type args struct { - es esv1.Elasticsearch - statefulSets es_sset.StatefulSetList + es esv1.Elasticsearch + builder Builder } tests := []struct { name string @@ -179,16 +197,22 @@ func Test_expectedPDB(t *testing.T) { { name: "PDB disabled in the spec", args: args{ - es: esv1.Elasticsearch{Spec: esv1.ElasticsearchSpec{PodDisruptionBudget: &commonv1.PodDisruptionBudgetTemplate{}}}, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + es: esv1.Elasticsearch{Spec: esv1.ElasticsearchSpec{PodDisruptionBudget: &commonv1.PodDisruptionBudgetTemplate{}}}, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, want: nil, }, { name: "Build default PDB", args: args{ - es: esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}}, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + es: esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}}, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, want: &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ @@ -219,7 +243,10 @@ func Test_expectedPDB(t *testing.T) { }}, }, }, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, want: &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ @@ -248,7 +275,10 @@ func Test_expectedPDB(t *testing.T) { Spec: policyv1.PodDisruptionBudgetSpec{MinAvailable: intStrPtr(intstr.FromInt(42))}}, }, }, - statefulSets: es_sset.StatefulSetList{sset.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithVersion("9.0.1"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), }, want: &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ @@ -268,7 +298,8 @@ func Test_expectedPDB(t *testing.T) { // set owner ref tt.want = withOwnerRef(tt.want, tt.args.es) } - got, err := expectedPDB(tt.args.es, tt.args.statefulSets, metadata.Propagate(&tt.args.es, metadata.Metadata{Labels: tt.args.es.GetIdentityLabels()})) + statefulSets := tt.args.builder.GetStatefulSets() + got, err := expectedPDB(tt.args.es, statefulSets, metadata.Propagate(&tt.args.es, metadata.Metadata{Labels: tt.args.es.GetIdentityLabels()})) require.NoError(t, err) if !reflect.DeepEqual(got, tt.want) { t.Errorf("expectedPDB() got = %v, want %v", got, tt.want) @@ -277,7 +308,7 @@ func Test_expectedPDB(t *testing.T) { } } -func Test_allowedDisruptions(t *testing.T) { +func Test_allowedDisruptionsForSinglePDB(t *testing.T) { type args struct { es esv1.Elasticsearch actualSsets es_sset.StatefulSetList @@ -336,7 +367,7 @@ func Test_allowedDisruptions(t *testing.T) { want: 1, }, { - name: "green health but only 1 master: 0 disruption allowed", + name: "green health but only 1 master: no disruption allowed", args: args{ es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, actualSsets: es_sset.StatefulSetList{ @@ -347,7 +378,7 @@ func Test_allowedDisruptions(t *testing.T) { want: 0, }, { - name: "green health but only 1 data node: 0 disruption allowed", + name: "green health but only 1 data node: no disruption allowed", args: args{ es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, actualSsets: es_sset.StatefulSetList{ @@ -371,7 +402,7 @@ func Test_allowedDisruptions(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := allowedDisruptions(tt.args.es, tt.args.actualSsets); got != tt.want { + if got := allowedDisruptionsForSinglePDB(tt.args.es, tt.args.actualSsets); got != tt.want { t.Errorf("allowedDisruptions() = %v, want %v", got, tt.want) } }) diff --git a/pkg/controller/elasticsearch/pdb/reconcile_with_roles.go b/pkg/controller/elasticsearch/pdb/reconcile_with_roles.go new file mode 100644 index 0000000000..0d9984c351 --- /dev/null +++ b/pkg/controller/elasticsearch/pdb/reconcile_with_roles.go @@ -0,0 +1,523 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pdb + +import ( + "context" + "fmt" + "slices" + "sort" + + appsv1 "k8s.io/api/apps/v1" + policyv1 "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" + commonsts "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" + + "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" +) + +var ( + // group the statefulsets by the priority of their roles. + // master, data_*, ingest, ml, transform, coordinating, and we ignore remote_cluster_client as it has no impact on availability + priority = []esv1.NodeRole{esv1.DataRole, esv1.MasterRole, esv1.DataFrozenRole, esv1.IngestRole, esv1.MLRole, esv1.TransformRole, esv1.CoordinatingRole} + // All data role variants should be treated as a generic data role for PDB purposes + dataRoles = []esv1.NodeRole{ + esv1.DataRole, + esv1.DataHotRole, + esv1.DataWarmRole, + esv1.DataColdRole, + esv1.DataContentRole, + // Note: DataFrozenRole is excluded as it has different disruption rules (yellow+ health) + } +) + +// toGenericDataRole returns the normalized form of a role where any data role +// is normalized to the same data role. +func toGenericDataRole(role esv1.NodeRole) esv1.NodeRole { + if slices.Contains(dataRoles, role) { + return esv1.DataRole + } + return role +} + +// reconcileRoleSpecificPDBs creates and reconciles PodDisruptionBudgets per nodeSet roles for enterprise-licensed clusters. +func reconcileRoleSpecificPDBs( + ctx context.Context, + k8sClient k8s.Client, + es esv1.Elasticsearch, + statefulSets sset.StatefulSetList, + resources nodespec.ResourcesList, + meta metadata.Metadata, +) error { + // Check if PDB is disabled in the ES spec, and if so delete all existing PDBs (both default and role-specific) + // that have a proper owner reference. + if es.Spec.PodDisruptionBudget != nil && es.Spec.PodDisruptionBudget.IsDisabled() { + if err := deleteDefaultPDB(ctx, k8sClient, es); err != nil { + return err + } + return deleteAllRoleSpecificPDBs(ctx, k8sClient, es) + } + + // Retrieve the expected list of PDBs. + pdbs, err := expectedRolePDBs(es, statefulSets, resources, meta) + if err != nil { + return fmt.Errorf("while retrieving expected role-specific PDBs: %w", err) + } + + // Reconcile and delete unnecessary role-specific PDBs that could have been created + // by a previous reconciliation with a different set of StatefulSets. + if err := reconcileAndDeleteUnnecessaryPDBs(ctx, k8sClient, es, pdbs); err != nil { + return err + } + + // Always ensure any existing default PDB is removed. + if err := deleteDefaultPDB(ctx, k8sClient, es); err != nil { + return fmt.Errorf("while deleting the default PDB: %w", err) + } + + return nil +} + +// expectedRolePDBs returns a slice of PDBs to reconcile based on statefulSet roles. +func expectedRolePDBs( + es esv1.Elasticsearch, + statefulSets sset.StatefulSetList, + resources nodespec.ResourcesList, + meta metadata.Metadata, +) ([]*policyv1.PodDisruptionBudget, error) { + pdbs := make([]*policyv1.PodDisruptionBudget, 0, len(statefulSets)) + + v, err := version.Parse(es.Spec.Version) + if err != nil { + return nil, fmt.Errorf("while parsing Elasticsearch version: %w", err) + } + + // Group StatefulSets by their connected roles. + groups, ssetNamesToRoles, err := groupBySharedRoles(statefulSets, resources, v) + if err != nil { + return nil, fmt.Errorf("while grouping StatefulSets by roles: %w", err) + } + + // Create one PDB per group + // Maps order isn't guaranteed so process in order of defined priority. + for _, roleName := range priority { + group, ok := groups[roleName] + if !ok { + continue + } + if len(group) == 0 { + continue + } + + // Determine the roles for this group + groupRoles := sets.New[esv1.NodeRole]() + for _, sset := range group { + roles := ssetNamesToRoles[sset.Name] + for _, role := range roles.AsSlice() { + groupRoles.Insert(esv1.NodeRole(role)) + } + } + + // Determine the most conservative role to use when determining the maxUnavailable setting. + // If group has no roles, it's a coordinating ES role. + primaryRole := getPrimaryRoleForPDB(groupRoles) + + pdb, err := createPDBForStatefulSets(es, primaryRole, string(roleName), group, statefulSets, meta) + if err != nil { + return nil, err + } + if pdb != nil { + pdbs = append(pdbs, pdb) + } + } + + return pdbs, nil +} + +func groupBySharedRoles(statefulSets sset.StatefulSetList, resources nodespec.ResourcesList, v version.Version) (map[esv1.NodeRole][]appsv1.StatefulSet, map[string]set.StringSet, error) { + n := len(statefulSets) + if n == 0 { + return map[esv1.NodeRole][]appsv1.StatefulSet{}, nil, nil + } + + rolesToIndices := make(map[esv1.NodeRole][]int) + indicesToRoles := make(map[int]set.StringSet) + ssetNamesToRoles := make(map[string]set.StringSet) + for i, sset := range statefulSets { + roles, err := getRolesForStatefulSet(sset, resources, v) + if err != nil { + return nil, nil, err + } + if len(roles) == 0 { + // StatefulSets with no roles are coordinating nodes - group them together + rolesToIndices[esv1.CoordinatingRole] = append(rolesToIndices[esv1.CoordinatingRole], i) + indicesToRoles[i] = set.Make(string(esv1.CoordinatingRole)) + ssetNamesToRoles[sset.Name] = set.Make(string(esv1.CoordinatingRole)) + continue + } + for _, role := range roles { + // Ensure that the data* roles are grouped together. + normalizedRole := toGenericDataRole(role) + rolesToIndices[normalizedRole] = append(rolesToIndices[normalizedRole], i) + if _, ok := indicesToRoles[i]; !ok { + indicesToRoles[i] = set.Make() + } + indicesToRoles[i].Add(string(normalizedRole)) + ssetNamesToRoles[sset.Name] = indicesToRoles[i] + } + } + + // This keeps track of which roles have been assigned to a PDB to avoid assigning the same role to multiple PDBs. + roleToTargetPDB := map[esv1.NodeRole]esv1.NodeRole{} + grouped := map[esv1.NodeRole][]int{} + visited := make([]bool, n) + for _, role := range priority { + indices, ok := rolesToIndices[role] + if !ok { + continue + } + for _, idx := range indices { + if visited[idx] { + continue + } + targetPDBRole := role + // if we already assigned a PDB for this role, use that instead + if target, ok := roleToTargetPDB[role]; ok { + targetPDBRole = target + } + grouped[targetPDBRole] = append(grouped[targetPDBRole], idx) + for _, r := range indicesToRoles[idx].AsSlice() { + roleToTargetPDB[esv1.NodeRole(r)] = targetPDBRole + } + visited[idx] = true + } + } + // transform into the expected format + res := make(map[esv1.NodeRole][]appsv1.StatefulSet) + for role, indices := range grouped { + group := make([]appsv1.StatefulSet, 0, len(indices)) + for _, idx := range indices { + group = append(group, statefulSets[idx]) + } + res[role] = group + } + return res, ssetNamesToRoles, nil +} + +// getPrimaryRoleForPDB returns the primary role from a set of roles for PDB naming and grouping. +// Data roles are most restrictive (require green health), so they take priority. +// All other roles have similar disruption rules (require yellow+ health). +func getPrimaryRoleForPDB(roles sets.Set[esv1.NodeRole]) esv1.NodeRole { + if len(roles) == 0 { + return "" // coordinating role + } + + // Data roles are most restrictive (require green health), so they take priority. + // Check if any data role variant is present (excluding data_frozen) + if slices.ContainsFunc(dataRoles, func(dataRole esv1.NodeRole) bool { + return roles.Has(dataRole) + }) { + // Return generic data role for all data role variants + return esv1.DataRole + } + + // Master role comes next in priority + if _, ok := roles[esv1.MasterRole]; ok { + return esv1.MasterRole + } + + // Data frozen role (has different disruption rules than other data roles) + if _, ok := roles[esv1.DataFrozenRole]; ok { + return esv1.DataFrozenRole + } + + // Return the first role we encounter in a deterministic order + // Define a priority order for non-data roles + nonDataRoles := []esv1.NodeRole{ + esv1.IngestRole, + esv1.MLRole, + esv1.TransformRole, + esv1.RemoteClusterClientRole, + } + + // Check non-data roles in priority order + for _, role := range nonDataRoles { + if _, ok := roles[role]; ok { + return role + } + } + + // If no known role found, return any role from the map + for role := range roles { + return role + } + + // Should never reach here if roles is not empty + return "" +} + +// getRolesForStatefulSet gets the roles from a StatefulSet's expected configuration. +func getRolesForStatefulSet( + statefulSet appsv1.StatefulSet, + expectedResources nodespec.ResourcesList, + v version.Version, +) ([]esv1.NodeRole, error) { + forStatefulSet, err := expectedResources.ForStatefulSet(statefulSet.Name) + if err != nil { + return nil, err + } + cfg, err := forStatefulSet.Config.Unpack(v) + if err != nil { + return nil, err + } + var nodeRoles []esv1.NodeRole + // Special case of no roles specified, which results in all roles being valid for this sts. + if cfg.Node.Roles == nil { + // since the priority slice contains all the roles that we are interested in + // when creating a pdb for a sts, we can use the priority slice as the roles. + nodeRoles = priority + // remove Coordinating role from the end of the slice. + nodeRoles = nodeRoles[:len(nodeRoles)-1] + return nodeRoles, nil + } + // Special case of empty roles being specified, which indicates the coordinating role for this sts. + if len(cfg.Node.Roles) == 0 { + nodeRoles = append(nodeRoles, esv1.CoordinatingRole) + return nodeRoles, nil + } + nodeRoles = make([]esv1.NodeRole, len(cfg.Node.Roles)) + // Otherwise, use the list of roles from the configuration. + for i, role := range cfg.Node.Roles { + nodeRoles[i] = esv1.NodeRole(role) + } + return nodeRoles, nil +} + +// createPDBForStatefulSets creates a PDB for a group of StatefulSets with shared roles. +func createPDBForStatefulSets( + es esv1.Elasticsearch, + // role is the role used to determine the maxUnavailable value. + role esv1.NodeRole, + // roleName is used to determine the name of the PDB. + roleName string, + // statefulSets are the statefulSets grouped into this pdb. + statefulSets []appsv1.StatefulSet, + // allStatefulSets are all statefulsets in the whole ES cluster. + allStatefulSets sset.StatefulSetList, + meta metadata.Metadata, +) (*policyv1.PodDisruptionBudget, error) { + if len(statefulSets) == 0 { + return nil, nil + } + + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: esv1.PodDisruptionBudgetNameForRole(es.Name, roleName), + Namespace: es.Namespace, + }, + Spec: buildRoleSpecificPDBSpec(es, role, statefulSets, allStatefulSets), + } + + mergedMeta := meta.Merge(metadata.Metadata{ + Labels: pdb.Labels, + Annotations: pdb.Annotations, + }) + pdb.Labels = mergedMeta.Labels + pdb.Annotations = mergedMeta.Annotations + + // Set owner reference + if err := controllerutil.SetControllerReference(&es, pdb, scheme.Scheme); err != nil { + return nil, err + } + + return pdb, nil +} + +// buildRoleSpecificPDBSpec returns a PDBSpec for a specific node role. +func buildRoleSpecificPDBSpec( + es esv1.Elasticsearch, + role esv1.NodeRole, + // statefulSets are the statefulSets grouped into this pdb. + statefulSets sset.StatefulSetList, + // allStatefulSets are all statefulSets in the whole ES cluster. + allStatefulSets sset.StatefulSetList, +) policyv1.PodDisruptionBudgetSpec { + // Get the allowed disruptions for this role based on cluster health and role type + allowedDisruptions := allowedDisruptionsForRole(es, role, statefulSets, allStatefulSets) + + spec := policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: allowedDisruptions}, + } + + // Get StatefulSet names for the selector + ssetNames := make([]string, 0, len(statefulSets)) + for _, sset := range statefulSets { + ssetNames = append(ssetNames, sset.Name) + } + + // Sort for consistency + sort.Strings(ssetNames) + + spec.Selector = selectorForStatefulSets(es, ssetNames) + return spec +} + +// allowedDisruptionsForRole returns the maximum number of pods that can be disrupted for a given role. +func allowedDisruptionsForRole( + es esv1.Elasticsearch, + role esv1.NodeRole, + // statefulSets are the statefulSets grouped into this pdb. + statefulSets sset.StatefulSetList, + // allStatefulSets are all statefulSets in the whole ES cluster. + allStatefulSets sset.StatefulSetList, +) int32 { + // If the Elasticsearch cluster's health is unknown or not healthy, don't allow any disruptions. + if es.Status.Health == esv1.ElasticsearchUnknownHealth || es.Status.Health == esv1.ElasticsearchHealth("") { + return 0 + } + + // In a single node cluster (not highly-available) always allow 1 disruption + // to ensure K8s nodes operations can be performed. + if allStatefulSets.ExpectedNodeCount() == 1 { + return 1 + } + + // If the statefulSets that are contained within this PDB include the master, ingest, or data role and + // there's a risk the single master, ingest, or data node of the cluster gets removed, don't allow it. + for _, sts := range statefulSets { + if isSensitiveToDisruptions(sts) && commonsts.GetReplicas(sts) == 1 { + return 0 + } + } + + // For data roles, only allow disruption if cluster is green + if role == esv1.DataRole && es.Status.Health != esv1.ElasticsearchGreenHealth { + return 0 + } + + // If we end up here, we are one of the remaining roles where we can allow disruptions if the cluster is at least yellow. + if es.Status.Health != esv1.ElasticsearchGreenHealth && es.Status.Health != esv1.ElasticsearchYellowHealth { + return 0 + } + + // Allow one pod to be disrupted for all other cases + return 1 +} + +// selectorForStatefulSets returns a label selector that matches pods from specific StatefulSets. +func selectorForStatefulSets(es esv1.Elasticsearch, ssetNames []string) *metav1.LabelSelector { + // For simplicity both single and multi-statefulsets use matchExpressions with In operator + return &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{es.Name}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: ssetNames, + }, + }, + } +} + +func isSensitiveToDisruptions(sts appsv1.StatefulSet) bool { + return label.IsMasterNodeSet(sts) || label.IsIngestNodeSet(sts) || label.IsDataNodeSet(sts) +} + +// reconcileAndDeleteUnnecessaryPDBs reconciles the PDBs that are expected to exist and deletes any that exist but are not expected. +func reconcileAndDeleteUnnecessaryPDBs(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch, expectedPDBs []*policyv1.PodDisruptionBudget) error { + existingPDBs, err := listAllRoleSpecificPDBs(ctx, k8sClient, es) + if err != nil { + return fmt.Errorf("while listing existing role-specific PDBs: %w", err) + } + + toDelete := make(map[string]policyv1.PodDisruptionBudget) + + // Populate the toDelete map with existing PDBs + for _, pdb := range existingPDBs { + toDelete[pdb.GetName()] = pdb + } + + // Remove expected PDBs from the toDelete map + for _, pdb := range expectedPDBs { + delete(toDelete, pdb.Name) + // Ensure that the expected PDB is reconciled. + if err := reconcilePDB(ctx, k8sClient, es, pdb); err != nil { + return fmt.Errorf("while reconciling role-specific PDB %s: %w", pdb.Name, err) + } + } + + // Delete unnecessary PDBs + for name, pdb := range toDelete { + if err := deletePDB(ctx, k8sClient, &pdb); err != nil { + return fmt.Errorf("while deleting role-specific PDB %s: %w", name, err) + } + } + + return nil +} + +// listAllRoleSpecificPDBs lists all role-specific PDBs for the cluster by retrieving +// all PDBs in the namespace with the cluster label and verifying the owner reference. +func listAllRoleSpecificPDBs(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch) ([]policyv1.PodDisruptionBudget, error) { + // List all PDBs in the namespace with the cluster label + pdbList := &policyv1.PodDisruptionBudgetList{} + + if err := k8sClient.List(ctx, pdbList, client.InNamespace(es.Namespace), client.MatchingLabels{ + label.ClusterNameLabelName: es.Name, + }); err != nil { + return nil, err + } + + // Filter only PDBs that are owned by this Elasticsearch controller + var roleSpecificPDBs []policyv1.PodDisruptionBudget + for _, pdb := range pdbList.Items { + // Check if this PDB is owned by the Elasticsearch resource + if k8s.HasOwner(&pdb, &es) { + roleSpecificPDBs = append(roleSpecificPDBs, pdb) + } + } + return roleSpecificPDBs, nil +} + +// deleteAllRoleSpecificPDBs deletes all existing role-specific PDBs for the cluster by retrieving +// all PDBs in the namespace with the cluster label and verifying the owner reference. +func deleteAllRoleSpecificPDBs(ctx context.Context, k8sClient k8s.Client, es esv1.Elasticsearch) error { + // List all PDBs in the namespace with the cluster label + var pdbList policyv1.PodDisruptionBudgetList + if err := k8sClient.List(ctx, &pdbList, client.InNamespace(es.Namespace), client.MatchingLabels{ + label.ClusterNameLabelName: es.Name, + }); err != nil { + return err + } + + // Delete PDBs owned by this Elasticsearch resource + for _, pdb := range pdbList.Items { + if k8s.HasOwner(&pdb, &es) { + if err := k8sClient.Delete(ctx, &pdb); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + } + + return nil +} diff --git a/pkg/controller/elasticsearch/pdb/reconcile_with_roles_test.go b/pkg/controller/elasticsearch/pdb/reconcile_with_roles_test.go new file mode 100644 index 0000000000..ce7d884b8e --- /dev/null +++ b/pkg/controller/elasticsearch/pdb/reconcile_with_roles_test.go @@ -0,0 +1,1487 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pdb + +import ( + "context" + "reflect" + "slices" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + _ "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" + ssetfixtures "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" + _ "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" +) + +func TestGetPrimaryRoleForPDB(t *testing.T) { + tests := []struct { + name string + roles func() sets.Set[esv1.NodeRole] + expected esv1.NodeRole + }{ + { + name: "empty roles map", + roles: func() sets.Set[esv1.NodeRole] { return sets.New[esv1.NodeRole]() }, + expected: "", + }, + { + name: "data role should be highest priority (most restrictive)", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataRole, + }, + { + name: "master role should be second priority when no data roles", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.MasterRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.MasterRole, + }, + { + name: "data_hot role should match data role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataHotRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataRole, + }, + { + name: "data_warm role should match data role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataWarmRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataRole, + }, + { + name: "data_cold role should match data role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataColdRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataRole, + }, + { + name: "data_content role should match data role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataContentRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataRole, + }, + { + name: "data_frozen role should return data_frozen (has different disruption rules)", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataFrozenRole, esv1.IngestRole, esv1.MLRole) + }, + expected: esv1.DataFrozenRole, + }, + { + name: "multiple data roles should match data role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.DataHotRole, esv1.DataWarmRole, esv1.DataColdRole, esv1.IngestRole) + }, + expected: esv1.DataRole, + }, + { + name: "master and data roles should return data role (data has higher priority)", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.MasterRole, esv1.DataRole, esv1.DataHotRole, esv1.IngestRole, esv1.MLRole, esv1.TransformRole) + }, + expected: esv1.DataRole, + }, + { + name: "only non-data roles should return first found", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.IngestRole, esv1.MLRole, esv1.TransformRole) + }, + expected: esv1.IngestRole, + }, + { + name: "single ingest role should return ingest role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.IngestRole) + }, + expected: esv1.IngestRole, + }, + { + name: "single ml role should return ml role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.MLRole) + }, + expected: esv1.MLRole, + }, + { + name: "single transform role should return transform role", + roles: func() sets.Set[esv1.NodeRole] { + return sets.New(esv1.TransformRole) + }, + expected: esv1.TransformRole, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getPrimaryRoleForPDB(tt.roles()) + + if !cmp.Equal(tt.expected, result) { + t.Errorf("Expected %s, got %s", tt.expected, result) + } + }) + } +} + +func TestReconcileRoleSpecificPDBs(t *testing.T) { + rolePDB := func(esName, namespace string, role esv1.NodeRole, statefulSetNames []string, maxUnavailable int32) *policyv1.PodDisruptionBudget { + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: esv1.PodDisruptionBudgetNameForRole(esName, string(role)), + Namespace: namespace, + Labels: map[string]string{label.ClusterNameLabelName: esName}, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: maxUnavailable}, + }, + } + + // Sort for consistent test comparison + sorted := make([]string, len(statefulSetNames)) + copy(sorted, statefulSetNames) + slices.Sort(sorted) + + pdb.Spec.Selector = &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{esName}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: sorted, + }, + }, + } + + return pdb + } + + defaultEs := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}, + Spec: esv1.ElasticsearchSpec{ + Version: "9.0.1", + }, + } + + defaultHealthyES := defaultEs.DeepCopy() + defaultHealthyES.Status.Health = esv1.ElasticsearchGreenHealth + + type args struct { + initObjs []client.Object + es esv1.Elasticsearch + builder Builder + } + tests := []struct { + name string + args args + wantedPDBs []*policyv1.PodDisruptionBudget + }{ + { + name: "no existing PDBs: should create role-specific PDBs", + args: args{ + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master1", 1, esv1.MasterRole). + WithNodeSet("data1", 1, esv1.DataRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + // Unhealthy es cluster; 0 disruptions allowed + rolePDB("cluster", "ns", esv1.MasterRole, []string{"master1"}, 0), + rolePDB("cluster", "ns", esv1.DataRole, []string{"data1"}, 0), + }, + }, + { + name: "no existing PDBs: should create role-specific PDBs with data roles grouped", + args: args{ + es: *defaultHealthyES, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master-data1", 2, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data2", 2, esv1.DataHotRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + rolePDB("cluster", "ns", esv1.DataRole, []string{"data2", "master-data1"}, 1), + }, + }, + { + name: "no existing PDBs: should create role-specific PDBs with data roles grouped, but no disruptions allowed because single master node", + args: args{ + es: *defaultHealthyES, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master-data1", 1, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data2", 2, esv1.DataHotRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + rolePDB("cluster", "ns", esv1.DataRole, []string{"data2", "master-data1"}, 0), + }, + }, + { + name: "existing default PDB: should delete it and create role-specific PDBs", + args: args{ + initObjs: []client.Object{ + defaultPDB(), + }, + es: *defaultHealthyES, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master1", 1, esv1.MasterRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + // single node cluster should allow 1 pod to be unavailable when cluster is healthy. + rolePDB("cluster", "ns", esv1.MasterRole, []string{"master1"}, 1), + }, + }, + { + name: "create pdb with coordinating nodes: no existing PDBs", + args: args{ + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("coord1", 1, ""). + WithNodeSet("coord2", 1, ""). + WithNodeSet("master1", 1, esv1.MasterRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + // Unhealthy es cluster; 0 disruptions allowed + rolePDB("cluster", "ns", "", []string{"coord1", "coord2"}, 0), + rolePDB("cluster", "ns", esv1.MasterRole, []string{"master1"}, 0), + }, + }, + { + name: "mixed roles: should group StatefulSets sharing roles", + args: args{ + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master-data1", 1, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data-ingest1", 1, esv1.DataRole, esv1.IngestRole). + WithNodeSet("ml1", 1, esv1.MLRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + // Unhealthy es cluster; 0 disruptions allowed + rolePDB("cluster", "ns", esv1.DataRole, []string{"master-data1", "data-ingest1"}, 0), + rolePDB("cluster", "ns", esv1.MLRole, []string{"ml1"}, 0), + }, + }, + { + name: "PDB disabled in ES spec: should delete existing PDBs and not create new ones", + args: func() args { + es := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster", Namespace: "ns"}, + Spec: esv1.ElasticsearchSpec{ + PodDisruptionBudget: &commonv1.PodDisruptionBudgetTemplate{}, + }, + } + return args{ + initObjs: []client.Object{ + withOwnerRef(defaultPDB(), es), + withOwnerRef(rolePDB("cluster", "ns", esv1.MasterRole, []string{"master1"}, 0), es), + }, + es: es, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master1", 1, esv1.MasterRole), + } + }(), + wantedPDBs: []*policyv1.PodDisruptionBudget{}, + }, + { + name: "update existing role-specific PDBs", + args: args{ + initObjs: []client.Object{ + // Existing PDB with different configuration + &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: esv1.PodDisruptionBudgetNameForRole("cluster", string(esv1.MasterRole)), + Namespace: "ns", + Labels: map[string]string{label.ClusterNameLabelName: "cluster"}, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 2}, // Wrong value + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + label.ClusterNameLabelName: "cluster", + label.StatefulSetNameLabelName: "old-master", // Wrong StatefulSet + }, + }, + }, + }, + }, + es: defaultEs, + builder: NewBuilder("cluster"). + WithNamespace("ns"). + WithNodeSet("master1", 1, esv1.MasterRole), + }, + wantedPDBs: []*policyv1.PodDisruptionBudget{ + // Unhealthy es cluster; 0 disruptions allowed + rolePDB("cluster", "ns", esv1.MasterRole, []string{"master1"}, 0), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{{ + Group: "policy", + Version: "v1", + }}) + restMapper.Add( + schema.GroupVersionKind{ + Group: "policy", + Version: "v1", + Kind: "PodDisruptionBudget", + }, meta.RESTScopeNamespace) + c := fake.NewClientBuilder(). + WithScheme(clientgoscheme.Scheme). + WithRESTMapper(restMapper). + WithObjects(tt.args.initObjs...). + Build() + + // Create metadata + meta := metadata.Propagate(&tt.args.es, metadata.Metadata{Labels: tt.args.es.GetIdentityLabels()}) + + resourcesList, err := tt.args.builder.BuildResourcesList() + require.NoError(t, err) + + statefulSets := tt.args.builder.GetStatefulSets() + + err = reconcileRoleSpecificPDBs(context.Background(), c, tt.args.es, statefulSets, resourcesList, meta) + require.NoError(t, err) + + var retrievedPDBs policyv1.PodDisruptionBudgetList + err = c.List(context.Background(), &retrievedPDBs, client.InNamespace(tt.args.es.Namespace)) + require.NoError(t, err) + + require.Equal(t, len(tt.wantedPDBs), len(retrievedPDBs.Items), "Expected %d PDBs, got %d", len(tt.wantedPDBs), len(retrievedPDBs.Items)) + + for _, expectedPDB := range tt.wantedPDBs { + // Find the matching PDB in the retrieved list + idx := slices.IndexFunc(retrievedPDBs.Items, func(pdb policyv1.PodDisruptionBudget) bool { + return pdb.Name == expectedPDB.Name + }) + require.NotEqual(t, -1, idx, "Expected PDB %s should exist, found: %+v", expectedPDB.Name, retrievedPDBs.Items) + actualPDB := &retrievedPDBs.Items[idx] + + // Verify key fields match (ignore metadata like resourceVersion, etc.) + require.Equal(t, expectedPDB.Spec.MaxUnavailable, actualPDB.Spec.MaxUnavailable, "MaxUnavailable should match for PDB %s", expectedPDB.Name) + require.Equal(t, expectedPDB.Spec.Selector, actualPDB.Spec.Selector, "Selector should match for PDB %s", expectedPDB.Name) + require.Equal(t, expectedPDB.Labels[label.ClusterNameLabelName], actualPDB.Labels[label.ClusterNameLabelName], "Cluster label should match for PDB %s", expectedPDB.Name) + } + }) + } +} + +func TestExpectedRolePDBs(t *testing.T) { + defaultUnhealthyES := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es", + Namespace: "ns", + }, + Spec: esv1.ElasticsearchSpec{ + Version: "8.0.0", + }, + Status: esv1.ElasticsearchStatus{ + Health: esv1.ElasticsearchUnknownHealth, + }, + } + + defaultHealthyES := defaultUnhealthyES.DeepCopy() + defaultHealthyES.Status.Health = esv1.ElasticsearchGreenHealth + + tests := []struct { + name string + es esv1.Elasticsearch + builder Builder + expected []*policyv1.PodDisruptionBudget + }{ + { + name: "empty input", + es: *defaultHealthyES, + builder: NewBuilder("test-es").WithNamespace("ns").WithVersion("8.0.0"), + expected: []*policyv1.PodDisruptionBudget{}, + }, + { + name: "single node cluster; role doesn't matter; 1 disruption", + es: *defaultHealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master1", 1, esv1.MasterRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-master", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"master1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + { + name: "multiple coordinating nodes; healthy es; 1 disruption allowed", + es: *defaultHealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("coord1", 2, esv1.CoordinatingRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-coordinating", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"coord1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + { + name: "separate roles - no shared roles", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master1", 1, esv1.MasterRole). + WithNodeSet("data1", 1, esv1.DataRole). + WithNodeSet("ingest1", 1, esv1.IngestRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-data", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"data1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-master", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"master1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-ingest", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"ingest1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + { + name: "existing PDB with different selector: should be updated", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master1", 1, esv1.MasterRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-master", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"master1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + { + name: "multiple coordinating nodeSets", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("coord1", 1, esv1.CoordinatingRole). + WithNodeSet("coord2", 1, esv1.CoordinatingRole). + WithNodeSet("coord3", 1, esv1.CoordinatingRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-coordinating", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"coord1", "coord2", "coord3"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + { + name: "shared roles - should be grouped", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master-data1", 1, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data-ingest1", 1, esv1.DataRole, esv1.IngestRole). + WithNodeSet("ml1", 1, esv1.MLRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-data", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"data-ingest1", "master-data1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-ml", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"ml1"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + { + name: "multiple coordinating nodeSets", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("coord1", 1, esv1.CoordinatingRole). + WithNodeSet("coord2", 1, esv1.CoordinatingRole). + WithNodeSet("coord3", 1, esv1.CoordinatingRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-coordinating", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"coord1", "coord2", "coord3"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + { + name: "multiple coordinating nodeSets", + es: defaultUnhealthyES, + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("coord1", 1, esv1.CoordinatingRole). + WithNodeSet("coord2", 1, esv1.CoordinatingRole). + WithNodeSet("coord3", 1, esv1.CoordinatingRole), + expected: []*policyv1.PodDisruptionBudget{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-es-es-default-coordinating", + Namespace: "ns", + Labels: map[string]string{ + label.ClusterNameLabelName: "test-es", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "elasticsearch.k8s.elastic.co/v1", + Kind: "Elasticsearch", + Name: "test-es", + Controller: ptr.To[bool](true), + BlockOwnerDeletion: ptr.To[bool](true), + }, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: label.ClusterNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-es"}, + }, + { + Key: label.StatefulSetNameLabelName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{"coord1", "coord2", "coord3"}, + }, + }, + }, + MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: 0}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + meta := metadata.Metadata{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/cluster-name": "test-es", + }, + } + + resourcesList, err := tt.builder.BuildResourcesList() + require.NoError(t, err) + + statefulSetList := tt.builder.GetStatefulSets() + + pdbs, err := expectedRolePDBs(tt.es, statefulSetList, resourcesList, meta) + if err != nil { + t.Fatalf("expectedRolePDBs: %v", err) + } + + if !cmp.Equal(tt.expected, pdbs) { + t.Errorf("expectedRolePDBs: PDBs do not match expected:\n%s", cmp.Diff(tt.expected, pdbs)) + } + }) + } +} + +func Test_allowedDisruptionsForRole(t *testing.T) { + type args struct { + es esv1.Elasticsearch + role []esv1.NodeRole + statefulSetsInPDB sset.StatefulSetList + allStatefulSets sset.StatefulSetList + } + tests := []struct { + name string + args args + want int32 + }{ + { + name: "no health reported: 0 disruptions allowed for any role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{}}, + role: []esv1.NodeRole{esv1.MasterRole, esv1.IngestRole, esv1.TransformRole, esv1.MLRole, esv1.DataFrozenRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3}.Build()}, + }, + want: 0, + }, + { + name: "Unknown health reported: 0 disruptions allowed for any role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchUnknownHealth}}, + role: []esv1.NodeRole{esv1.MasterRole, esv1.IngestRole, esv1.TransformRole, esv1.MLRole, esv1.DataFrozenRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3}.Build()}, + }, + want: 0, + }, + { + name: "yellow health: 0 disruptions allowed for data nodes", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchYellowHealth}}, + role: []esv1.NodeRole{esv1.DataRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3}.Build()}, + }, + want: 0, + }, + { + name: "yellow health: 1 disruption allowed for master/ingest/transform/ml/data_frozen", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchYellowHealth}}, + role: []esv1.NodeRole{esv1.MasterRole, esv1.IngestRole, esv1.TransformRole, esv1.MLRole, esv1.DataFrozenRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3}.Build()}, + }, + want: 1, + }, + { + name: "red health: 0 disruptions allowed for any role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchRedHealth}}, + role: []esv1.NodeRole{esv1.MasterRole, esv1.IngestRole, esv1.TransformRole, esv1.MLRole, esv1.DataFrozenRole, esv1.DataRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + }, + want: 0, + }, + { + name: "green health: 1 disruption allowed for any role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.MasterRole, esv1.IngestRole, esv1.TransformRole, esv1.MLRole, esv1.DataFrozenRole, esv1.DataRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 3, Master: true, Data: true}.Build()}, + }, + want: 1, + }, + { + name: "single-node cluster (not high-available): 1 disruption allowed", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.MasterRole}, + allStatefulSets: sset.StatefulSetList{ssetfixtures.TestSset{Replicas: 1, Master: true, Data: true}.Build()}, + }, + want: 1, + }, + { + name: "green health but only 1 master: 0 disruptions allowed for master role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.MasterRole}, + statefulSetsInPDB: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 1, Master: true, Data: false}.Build(), + ssetfixtures.TestSset{Replicas: 3, Master: false, Data: true}.Build(), + }, + allStatefulSets: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 1, Master: true, Data: false}.Build(), + ssetfixtures.TestSset{Replicas: 3, Master: false, Data: true}.Build(), + ssetfixtures.TestSset{Replicas: 2, Ingest: true}.Build(), + }, + }, + want: 0, + }, + { + name: "green health but only 1 master: 1 disruption allowed for data role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.DataRole}, + allStatefulSets: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 1, Master: true, Data: false}.Build(), + ssetfixtures.TestSset{Replicas: 3, Master: false, Data: true}.Build(), + }, + }, + want: 1, + }, + { + name: "green health but only 1 data node: 0 disruptions allowed for data role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.DataRole}, + statefulSetsInPDB: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 1, Master: false, Data: true}.Build(), + }, + allStatefulSets: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 3, Master: true, Data: false}.Build(), + ssetfixtures.TestSset{Replicas: 1, Master: false, Data: true}.Build(), + }, + }, + want: 0, + }, + { + name: "green health but only 1 ingest node: 0 disruptions allowed for ingest role", + args: args{ + es: esv1.Elasticsearch{Status: esv1.ElasticsearchStatus{Health: esv1.ElasticsearchGreenHealth}}, + role: []esv1.NodeRole{esv1.IngestRole}, + statefulSetsInPDB: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 3, Master: true, Data: true, Ingest: false}.Build(), + ssetfixtures.TestSset{Replicas: 1, Ingest: true, Data: true}.Build(), + }, + allStatefulSets: sset.StatefulSetList{ + ssetfixtures.TestSset{Replicas: 3, Master: true, Data: true, Ingest: false}.Build(), + ssetfixtures.TestSset{Replicas: 1, Ingest: true, Data: true}.Build(), + ssetfixtures.TestSset{Replicas: 1, DataFrozen: true}.Build(), + }, + }, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, role := range tt.args.role { + if got := allowedDisruptionsForRole(tt.args.es, role, tt.args.statefulSetsInPDB, tt.args.allStatefulSets); got != tt.want { + t.Errorf("allowedDisruptionsForRole() = %v, want %v for role: %s", got, tt.want, role) + } + } + }) + } +} + +func TestGetRolesForStatefulSet(t *testing.T) { + type args struct { + statefulSetName string + builder Builder + version string + } + tests := []struct { + name string + args args + want []esv1.NodeRole + wantErr bool + }{ + { + name: "unspecified roles (nil) - should represent all roles excluding coordinating", + args: args{ + statefulSetName: "all-roles", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("all-roles", 3, "all_roles"), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.DataRole, esv1.MasterRole, esv1.DataFrozenRole, esv1.IngestRole, esv1.MLRole, esv1.TransformRole}, + wantErr: false, + }, + { + name: "master only", + args: args{ + statefulSetName: "master-only", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master-only", 3, esv1.MasterRole), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.MasterRole}, + wantErr: false, + }, + { + name: "data only", + args: args{ + statefulSetName: "data-only", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("data-only", 3, esv1.DataRole), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.DataRole}, + wantErr: false, + }, + { + name: "multiple roles", + args: args{ + statefulSetName: "master-data", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master-data", 3, esv1.MasterRole, esv1.DataRole), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.MasterRole, esv1.DataRole}, + wantErr: false, + }, + { + name: "coordinating node (empty roles slice)", + args: args{ + statefulSetName: "coordinating", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("coordinating", 2, esv1.CoordinatingRole), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.CoordinatingRole}, + wantErr: false, + }, + { + name: "data tier roles", + args: args{ + statefulSetName: "data-hot-warm", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("data-hot-warm", 3, esv1.DataHotRole, esv1.DataWarmRole), + version: "8.0.0", + }, + want: []esv1.NodeRole{esv1.DataHotRole, esv1.DataWarmRole}, + wantErr: false, + }, + { + name: "non-existent statefulset", + args: args{ + statefulSetName: "non-existent", + builder: NewBuilder("test-es"). + WithNamespace("ns"). + WithVersion("8.0.0"). + WithNodeSet("master-only", 3, esv1.MasterRole), + version: "8.0.0", + }, + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourcesList, err := tt.args.builder.BuildResourcesList() + require.NoError(t, err) + + statefulSets := tt.args.builder.GetStatefulSets() + // get the specified statefulSet from the list to pass as argument to getRolesForStatefulSet + var statefulSet appsv1.StatefulSet + found := false + for _, sset := range statefulSets { + if sset.Name == tt.args.statefulSetName { + statefulSet = sset + found = true + break + } + } + + if !found && !tt.wantErr { + t.Fatalf("StatefulSet %s not found in test fixtures", tt.args.statefulSetName) + } + + v, err := version.Parse(tt.args.version) + require.NoError(t, err) + + got, err := getRolesForStatefulSet(statefulSet, resourcesList, v) + if (err != nil) != tt.wantErr { + t.Errorf("getRolesForStatefulSet() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getRolesForStatefulSet() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGroupBySharedRoles(t *testing.T) { + tests := []struct { + name string + builder Builder + want map[esv1.NodeRole][]appsv1.StatefulSet + wantSTSToRoles map[string]set.StringSet + }{ + { + name: "empty statefulsets", + builder: NewBuilder("test-es"), + want: map[esv1.NodeRole][]appsv1.StatefulSet{}, + wantSTSToRoles: nil, + }, + { + name: "single statefulset with no roles", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("coordinating", 1, esv1.CoordinatingRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.CoordinatingRole: { + ssetfixtures.TestSset{Name: "coordinating", ClusterName: "test-es", Version: "9.0.1"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "coordinating": set.Make(""), + }, + }, + { + name: "all statefulsets with different roles", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("master", 1, esv1.MasterRole). + WithNodeSet("ingest", 1, esv1.IngestRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.MasterRole: { + ssetfixtures.TestSset{Name: "master", Master: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.IngestRole: { + ssetfixtures.TestSset{Name: "ingest", Ingest: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "master": set.Make("master"), + "ingest": set.Make("ingest"), + }, + }, + { + name: "statefulsets with shared roles are grouped properly", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("master", 1, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data", 1, esv1.DataRole). + WithNodeSet("ingest", 1, esv1.IngestRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.DataRole: { + ssetfixtures.TestSset{Name: "master", Master: true, Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data", Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.IngestRole: { + ssetfixtures.TestSset{Name: "ingest", Ingest: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "master": set.Make("master", "data"), + "data": set.Make("data"), + "ingest": set.Make("ingest"), + }, + }, + { + name: "statefulsets with multiple shared roles in multiple groups, and data* roles are grouped properly", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("master", 1, esv1.MasterRole, esv1.DataRole). + WithNodeSet("data", 1, esv1.DataRole). + WithNodeSet("data_hot", 1, esv1.DataHotRole). + WithNodeSet("data_warm", 1, esv1.DataWarmRole). + WithNodeSet("data_cold", 1, esv1.DataColdRole). + WithNodeSet("data_frozen", 1, esv1.DataFrozenRole). + WithNodeSet("ingest", 1, esv1.IngestRole, esv1.MLRole). + WithNodeSet("ml", 1, esv1.MLRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.DataRole: { + ssetfixtures.TestSset{Name: "master", Master: true, Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data", Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_hot", DataHot: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_warm", DataWarm: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_cold", DataCold: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.DataFrozenRole: { + ssetfixtures.TestSset{Name: "data_frozen", DataFrozen: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.IngestRole: { + ssetfixtures.TestSset{Name: "ingest", Ingest: true, ML: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "ml", ML: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "master": set.Make("master", "data"), + "data": set.Make("data"), + "data_hot": set.Make("data"), + "data_warm": set.Make("data"), + "data_cold": set.Make("data"), + "data_frozen": set.Make("data_frozen"), + "ingest": set.Make("ingest", "ml"), + "ml": set.Make("ml"), + }, + }, + { + name: "coordinating nodes (no roles) in separate group", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("data", 1, esv1.DataRole). + WithNodeSet("coordinating1", 1, esv1.CoordinatingRole). + WithNodeSet("coordinating2", 1, esv1.CoordinatingRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.DataRole: { + ssetfixtures.TestSset{Name: "data", Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.CoordinatingRole: { + ssetfixtures.TestSset{Name: "coordinating1", Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "coordinating2", Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "data": set.Make("data"), + "coordinating1": set.Make(""), + "coordinating2": set.Make(""), + }, + }, + { + name: "statefulsets with multiple roles respect priority order", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("master-data-ingest", 1, esv1.MasterRole, esv1.DataRole, esv1.IngestRole). + WithNodeSet("data-ingest", 1, esv1.DataRole, esv1.IngestRole). + WithNodeSet("ingest-only", 1, esv1.IngestRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.DataRole: { + ssetfixtures.TestSset{Name: "master-data-ingest", Master: true, Data: true, Ingest: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data-ingest", Data: true, Ingest: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "ingest-only", Ingest: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "master-data-ingest": set.Make("master", "data", "ingest"), + "data-ingest": set.Make("data", "ingest"), + "ingest-only": set.Make("ingest"), + }, + }, + { + name: "mixed data role types are properly collapsed even with generic data role existing", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("data", 1, esv1.DataRole). + WithNodeSet("data_hot", 1, esv1.DataHotRole). + WithNodeSet("data_content", 1, esv1.DataContentRole). + WithNodeSet("master", 1, esv1.MasterRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.MasterRole: { + ssetfixtures.TestSset{Name: "master", Master: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.DataRole: { + ssetfixtures.TestSset{Name: "data", Data: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_hot", DataHot: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_content", DataContent: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "data": set.Make("data"), + "data_hot": set.Make("data"), + "data_content": set.Make("data"), + "master": set.Make("master"), + }, + }, + { + name: "data roles without generic data role do not maintain separate groups", + builder: NewBuilder("test-es"). + WithVersion("9.0.1"). + WithNodeSet("data_hot", 1, esv1.DataHotRole). + WithNodeSet("data_cold", 1, esv1.DataColdRole). + WithNodeSet("master", 1, esv1.MasterRole), + want: map[esv1.NodeRole][]appsv1.StatefulSet{ + esv1.MasterRole: { + ssetfixtures.TestSset{Name: "master", Master: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + esv1.DataRole: { + ssetfixtures.TestSset{Name: "data_hot", DataHot: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + ssetfixtures.TestSset{Name: "data_cold", DataCold: true, Version: "9.0.1", ClusterName: "test-es"}.Build(), + }, + }, + wantSTSToRoles: map[string]set.StringSet{ + "data_hot": set.Make("data"), + "data_cold": set.Make("data"), + "master": set.Make("master"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var resourcesList nodespec.ResourcesList + var err error + resourcesList, err = tt.builder.BuildResourcesList() + require.NoError(t, err) + + v := version.MustParse(tt.builder.Elasticsearch.Spec.Version) + stss := tt.builder.GetStatefulSets() + + got, gotSTSToRoles, err := groupBySharedRoles(stss, resourcesList, v) + assert.NoError(t, err) + + if !cmp.Equal(gotSTSToRoles, tt.wantSTSToRoles) { + t.Errorf("gotSTSToRoles: diff = %s", cmp.Diff(gotSTSToRoles, tt.wantSTSToRoles)) + } + + // Check that the number of groups matches + assert.Equal(t, len(tt.want), len(got), "Expected %d groups, got %d", len(tt.want), len(got)) + + // Check each expected group + for role, expectedSsets := range tt.want { + gotSsets, exists := got[role] + assert.True(t, exists, "Expected group for role %s not found", role) + if !exists { + continue + } + + // Sort both slices for consistent comparison + sort.Slice(expectedSsets, func(i, j int) bool { + return expectedSsets[i].Name < expectedSsets[j].Name + }) + sort.Slice(gotSsets, func(i, j int) bool { + return gotSsets[i].Name < gotSsets[j].Name + }) + + assert.Equal(t, len(expectedSsets), len(gotSsets), "Group %s has wrong size", role) + + // Check if all StatefulSets in the group match + for i := 0; i < len(expectedSsets); i++ { + if i >= len(gotSsets) { + t.Errorf("Missing StatefulSet at index %d in group %s", i, role) + continue + } + + assert.Equal(t, expectedSsets[i].Name, gotSsets[i].Name, + "StatefulSet names do not match in group %s", role) + assert.Equal(t, expectedSsets[i].Spec.Template.Labels, gotSsets[i].Spec.Template.Labels, + "StatefulSet labels do not match in group %s", role) + } + } + + // Check if there are any unexpected groups + for role := range got { + _, exists := tt.want[role] + assert.True(t, exists, "Unexpected group found: %s", role) + } + }) + } +} diff --git a/pkg/controller/elasticsearch/pdb/version.go b/pkg/controller/elasticsearch/pdb/version.go deleted file mode 100644 index d493b93040..0000000000 --- a/pkg/controller/elasticsearch/pdb/version.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License 2.0; -// you may not use this file except in compliance with the Elastic License 2.0. - -package pdb - -import ( - "reflect" - "sync" - - policyv1 "k8s.io/api/policy/v1" - policyv1beta1 "k8s.io/api/policy/v1beta1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/utils/ptr" - - "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" -) - -var ( - pdbVersionMutex sync.RWMutex - pdbV1Available *bool -) - -// convert converts v1 version of the PodDisruptionBudget resource to v1beta1 -func convert(toConvert *policyv1.PodDisruptionBudget) *policyv1beta1.PodDisruptionBudget { - v1beta1 := &policyv1beta1.PodDisruptionBudget{} - v1beta1.ObjectMeta = toConvert.ObjectMeta - v1beta1.Spec.MinAvailable = toConvert.Spec.MinAvailable - v1beta1.Spec.Selector = toConvert.Spec.Selector - v1beta1.Spec.MaxUnavailable = toConvert.Spec.MaxUnavailable - return v1beta1 -} - -func isPDBV1Available(k8sClient k8s.Client) (bool, error) { - isPDBV1Available := getPDBV1Available() - if isPDBV1Available != nil { - return *isPDBV1Available, nil - } - return initPDBV1Available(k8sClient) -} - -func getPDBV1Available() *bool { - pdbVersionMutex.RLock() - defer pdbVersionMutex.RUnlock() - return pdbV1Available -} - -func initPDBV1Available(k8sClient k8s.Client) (bool, error) { - pdbVersionMutex.Lock() - defer pdbVersionMutex.Unlock() - if pdbV1Available != nil { - return *pdbV1Available, nil - } - t := reflect.TypeOf(&policyv1.PodDisruptionBudget{}) - gk := schema.GroupKind{ - Group: policyv1.GroupName, - Kind: t.Elem().Name(), - } - preferredMapping, err := k8sClient.RESTMapper().RESTMapping(gk) - if err != nil { - return false, err - } - - // Rely on v1 as soon as v1beta1 is not the preferred version anymore. - pdbV1Available = ptr.To[bool](preferredMapping.Resource.Version != "v1beta1") - return *pdbV1Available, nil -}