|
19 | 19 |
|
20 | 20 | import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
|
21 | 21 | import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
|
| 22 | +import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState; |
22 | 23 | import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
|
23 |
| -import org.apache.flink.kubernetes.operator.controller.BlueGreenStateMachine.BlueGreenTransitionContext; |
| 24 | +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry; |
| 25 | +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; |
| 26 | +import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler; |
| 27 | +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService; |
24 | 28 | import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
|
25 | 29 |
|
26 | 30 | import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
|
@@ -48,13 +52,13 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG
|
48 | 52 | private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
|
49 | 53 |
|
50 | 54 | private final FlinkResourceContextFactory ctxFactory;
|
51 |
| - private final BlueGreenStateMachine stateMachine; |
| 55 | + private final BlueGreenStateHandlerRegistry handlerRegistry; |
52 | 56 |
|
53 | 57 | public static long minimumAbortGracePeriodMs = ABORT_GRACE_PERIOD.defaultValue().toMillis();
|
54 | 58 |
|
55 | 59 | public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
|
56 | 60 | this.ctxFactory = ctxFactory;
|
57 |
| - this.stateMachine = new BlueGreenStateMachine(); |
| 61 | + this.handlerRegistry = new BlueGreenStateHandlerRegistry(); |
58 | 62 | }
|
59 | 63 |
|
60 | 64 | @Override
|
@@ -85,27 +89,36 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
|
85 | 89 |
|
86 | 90 | if (deploymentStatus == null) {
|
87 | 91 | var context =
|
88 |
| - new BlueGreenTransitionContext( |
| 92 | + new BlueGreenContext( |
89 | 93 | bgDeployment,
|
90 | 94 | new FlinkBlueGreenDeploymentStatus(),
|
91 | 95 | josdkContext,
|
92 | 96 | null,
|
93 | 97 | ctxFactory);
|
94 |
| - return stateMachine |
| 98 | + return BlueGreenDeploymentService |
95 | 99 | .patchStatusUpdateControl(context, INITIALIZING_BLUE, null)
|
96 | 100 | .rescheduleAfter(100);
|
97 | 101 | } else {
|
| 102 | + FlinkBlueGreenDeploymentState currentState = deploymentStatus.getBlueGreenState(); |
98 | 103 | var context =
|
99 |
| - new BlueGreenTransitionContext( |
| 104 | + new BlueGreenContext( |
100 | 105 | bgDeployment,
|
101 | 106 | deploymentStatus,
|
102 | 107 | josdkContext,
|
103 |
| - deploymentStatus.getBlueGreenState() == INITIALIZING_BLUE |
| 108 | + currentState == INITIALIZING_BLUE |
104 | 109 | ? null
|
105 | 110 | : FlinkBlueGreenDeployments.fromSecondaryResources(
|
106 | 111 | josdkContext),
|
107 | 112 | ctxFactory);
|
108 |
| - return stateMachine.processState(context); |
| 113 | + |
| 114 | + LOG.debug( |
| 115 | + "Processing state: {} for deployment: {}", |
| 116 | + currentState, |
| 117 | + context.getDeploymentName()); |
| 118 | + |
| 119 | + BlueGreenStateHandler handler = handlerRegistry.getHandler(currentState); |
| 120 | + return handler.handle(context); |
| 121 | +// return stateMachine.processState(context); |
109 | 122 | }
|
110 | 123 | }
|
111 | 124 |
|
|
0 commit comments