Skip to content

Commit bd8f6a5

Browse files
authored
Expend feature gates support to all Strimzi operators (strimzi#10141)
Signed-off-by: Jakub Scholz <[email protected]>
1 parent 6c24f81 commit bd8f6a5

File tree

20 files changed

+261
-71
lines changed

20 files changed

+261
-71
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
To use KRaft (ZooKeeper-less Apache Kafka), you still need to use the `strimzi.io/kraft: enabled` annotation on the `Kafka` custom resources or migrate from an existing ZooKeeper-based cluster using the `strimzi.io/kraft: migration` annotation.
77
* Update the base image used by Strimzi containers from UBI8 to UBI9
88
* Enhance `KafkaBridge` resource with consumer inactivity timeout and HTTP consumer/producer enablement.
9+
* Add support for feature gates to User and Topic Operators
910

1011
## 0.41.0
1112

cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.strimzi.operator.common.Util;
1515
import io.strimzi.operator.common.config.ConfigParameter;
1616
import io.strimzi.operator.common.config.ConfigParameterParser;
17+
import io.strimzi.operator.common.featuregates.FeatureGates;
1718
import io.strimzi.operator.common.model.Labels;
1819
import org.apache.logging.log4j.LogManager;
1920
import org.apache.logging.log4j.Logger;
@@ -32,6 +33,7 @@
3233
import static io.strimzi.operator.common.config.ConfigParameterParser.LONG;
3334
import static io.strimzi.operator.common.config.ConfigParameterParser.NAMESPACE_SET;
3435
import static io.strimzi.operator.common.config.ConfigParameterParser.STRING;
36+
import static io.strimzi.operator.common.config.ConfigParameterParser.parseFeatureGates;
3537

3638
/**
3739
* Cluster Operator configuration
@@ -391,10 +393,6 @@ private static KafkaVersion.Lookup parseKafkaVersions(String kafkaImages, String
391393
}
392394
}
393395

394-
static ConfigParameterParser<FeatureGates> parseFeatureGates() {
395-
return FeatureGates::new;
396-
}
397-
398396
static ConfigParameterParser<ImagePullPolicy> parseImagePullPolicy() {
399397
return imagePullPolicyEnvVar -> {
400398
ImagePullPolicy imagePullPolicy = null;

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityOperator.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.strimzi.api.kafka.model.kafka.KafkaResources;
2525
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpec;
2626
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorTemplate;
27+
import io.strimzi.operator.cluster.ClusterOperatorConfig;
2728
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
2829
import io.strimzi.operator.common.Reconciliation;
2930
import io.strimzi.operator.common.Util;
@@ -105,20 +106,22 @@ protected EntityOperator(Reconciliation reconciliation, HasMetadata resource, Sh
105106
* @param reconciliation The reconciliation marker
106107
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Operator one
107108
* @param sharedEnvironmentProvider Shared environment provider
109+
* @param config Cluster Operator configuration
108110
*
109111
* @return Entity Operator instance, null if not configured in the ConfigMap
110112
*/
111113
public static EntityOperator fromCrd(Reconciliation reconciliation,
112114
Kafka kafkaAssembly,
113-
SharedEnvironmentProvider sharedEnvironmentProvider) {
115+
SharedEnvironmentProvider sharedEnvironmentProvider,
116+
ClusterOperatorConfig config) {
114117
EntityOperatorSpec entityOperatorSpec = kafkaAssembly.getSpec().getEntityOperator();
115118

116119
if (entityOperatorSpec != null
117120
&& (entityOperatorSpec.getUserOperator() != null || entityOperatorSpec.getTopicOperator() != null)) {
118121
EntityOperator result = new EntityOperator(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
119122

120-
EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
121-
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
123+
EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);
124+
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);
122125

123126
result.topicOperator = topicOperator;
124127
result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityTopicOperator.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class EntityTopicOperator extends AbstractModel implements SupportsLoggin
8383
/* test */ String kafkaBootstrapServers;
8484
private boolean cruiseControlEnabled;
8585
private boolean rackAwarenessEnabled;
86+
private String featureGatesEnvVarValue;
8687

8788
private String watchedNamespace;
8889
/* test */ int reconciliationIntervalMs;
@@ -110,15 +111,18 @@ protected EntityTopicOperator(Reconciliation reconciliation, HasMetadata resourc
110111
/**
111112
* Create an Entity Topic Operator from given desired resource. When Topic Operator (Or Entity Operator) are not
112113
* enabled, it returns null.
113-
* @param reconciliation The reconciliation
114-
* @param kafkaAssembly desired resource with cluster configuration containing the Entity Topic Operator one
115-
* @param sharedEnvironmentProvider Shared environment provider
114+
*
115+
* @param reconciliation The reconciliation marker
116+
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Topic Operator one
117+
* @param sharedEnvironmentProvider Shared environment provider
118+
* @param config Cluster Operator configuration
116119
*
117120
* @return Entity Topic Operator instance, null if not configured
118121
*/
119122
public static EntityTopicOperator fromCrd(Reconciliation reconciliation,
120123
Kafka kafkaAssembly,
121-
SharedEnvironmentProvider sharedEnvironmentProvider) {
124+
SharedEnvironmentProvider sharedEnvironmentProvider,
125+
ClusterOperatorConfig config) {
122126
if (kafkaAssembly.getSpec().getEntityOperator() != null
123127
&& kafkaAssembly.getSpec().getEntityOperator().getTopicOperator() != null) {
124128
EntityTopicOperatorSpec topicOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getTopicOperator();
@@ -145,6 +149,7 @@ public static EntityTopicOperator fromCrd(Reconciliation reconciliation,
145149

146150
result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
147151
result.rackAwarenessEnabled = result.cruiseControlEnabled && kafkaAssembly.getSpec().getKafka().getRack() != null;
152+
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();
148153

149154
return result;
150155
} else {
@@ -180,7 +185,12 @@ protected List<EnvVar> getEnvVars() {
180185
varList.add(ContainerUtils.createEnvVar(ENV_VAR_SECURITY_PROTOCOL, EntityTopicOperatorSpec.DEFAULT_SECURITY_PROTOCOL));
181186
varList.add(ContainerUtils.createEnvVar(ENV_VAR_TLS_ENABLED, Boolean.toString(true)));
182187
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, Boolean.toString(gcLoggingEnabled)));
183-
188+
189+
// Add feature gates configuration if not empty
190+
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
191+
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
192+
}
193+
184194
// Add environment variables required for Cruise Control integration
185195
if (this.cruiseControlEnabled) {
186196
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_ENABLED, Boolean.toString(cruiseControlEnabled)));

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityUserOperator.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public class EntityUserOperator extends AbstractModel implements SupportsLogging
7777
/* test */ int clientsCaValidityDays;
7878
/* test */ int clientsCaRenewalDays;
7979
private ResourceTemplate templateRoleBinding;
80+
private String featureGatesEnvVarValue;
8081

8182
private boolean aclsAdminApiSupported = false;
8283
private List<String> maintenanceWindows;
@@ -107,10 +108,14 @@ protected EntityUserOperator(Reconciliation reconciliation, HasMetadata resource
107108
* @param reconciliation The reconciliation
108109
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity User Operator one
109110
* @param sharedEnvironmentProvider Shared environment provider
111+
* @param config Cluster Operator configuration
110112
*
111113
* @return Entity User Operator instance, null if not configured
112114
*/
113-
public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka kafkaAssembly, SharedEnvironmentProvider sharedEnvironmentProvider) {
115+
public static EntityUserOperator fromCrd(Reconciliation reconciliation,
116+
Kafka kafkaAssembly,
117+
SharedEnvironmentProvider sharedEnvironmentProvider,
118+
ClusterOperatorConfig config) {
114119
if (kafkaAssembly.getSpec().getEntityOperator() != null
115120
&& kafkaAssembly.getSpec().getEntityOperator().getUserOperator() != null) {
116121
EntityUserOperatorSpec userOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getUserOperator();
@@ -131,6 +136,7 @@ public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka ka
131136
result.resources = userOperatorSpec.getResources();
132137
result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
133138
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
139+
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();
134140

135141
if (kafkaAssembly.getSpec().getEntityOperator().getTemplate() != null) {
136142
result.templateRoleBinding = kafkaAssembly.getSpec().getEntityOperator().getTemplate().getUserOperatorRoleBinding();
@@ -196,6 +202,11 @@ protected List<EnvVar> getEnvVars() {
196202
varList.add(ContainerUtils.createEnvVar(ENV_VAR_ACLS_ADMIN_API_SUPPORTED, String.valueOf(aclsAdminApiSupported)));
197203
JvmOptionUtils.javaOptions(varList, jvmOptions);
198204

205+
// Add feature gates configuration if not empty
206+
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
207+
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
208+
}
209+
199210
// Add shared environment variables used for all containers
200211
varList.addAll(sharedEnvironmentProvider.variables());
201212

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/EntityOperatorReconciler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public EntityOperatorReconciler(
7979
) {
8080
this.reconciliation = reconciliation;
8181
this.operationTimeoutMs = config.getOperationTimeoutMs();
82-
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider);
82+
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider, config);
8383
this.clusterCa = clusterCa;
8484
this.maintenanceWindows = kafkaAssembly.getSpec().getMaintenanceTimeWindows();
8585
this.isNetworkPolicyGeneration = config.isNetworkPolicyGeneration();

cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.strimzi.operator.cluster.model.ImagePullPolicy;
1111
import io.strimzi.operator.cluster.model.UnsupportedVersionException;
1212
import io.strimzi.operator.common.InvalidConfigurationException;
13+
import io.strimzi.operator.common.featuregates.FeatureGates;
1314
import io.strimzi.operator.common.model.Labels;
1415
import org.junit.jupiter.api.Test;
1516

@@ -65,7 +66,7 @@ public void testDefaultConfig() {
6566
assertThat(config.getConnectBuildTimeoutMs(), is(Long.parseLong(ClusterOperatorConfig.CONNECT_BUILD_TIMEOUT_MS.defaultValue())));
6667
assertThat(config.getOperatorNamespace(), is("operator-namespace"));
6768
assertThat(config.getOperatorNamespaceLabels(), is(nullValue()));
68-
assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(false));
69+
assertThat(config.featureGates(), is(new FeatureGates("")));
6970
assertThat(config.isCreateClusterRoles(), is(false));
7071
assertThat(config.isNetworkPolicyGeneration(), is(true));
7172
assertThat(config.isPodSetReconciliationOnly(), is(false));
@@ -329,6 +330,8 @@ public void testInvalidOperatorNamespaceLabels() {
329330

330331
@Test
331332
public void testInvalidFeatureGate() {
333+
// We test that the configuration is really parsing the feature gates environment variable. We test it on
334+
// non-existing feature gate instead of a real one so that we do not have to change it when the FGs are promoted
332335
Map<String, String> envVars = new HashMap<>(ClusterOperatorConfigTest.ENV_VARS);
333336
envVars.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-NonExistingGate");
334337

0 commit comments

Comments
 (0)