Skip to content

[WIP] feat: support delete event reconiliation #2762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
wip
Signed-off-by: Attila Mészáros <[email protected]>
  • Loading branch information
csviri committed Apr 11, 2025

Verified

This commit was signed with the committer’s verified signature.
leokondrashov Leonid Kondrashov
commit 44b12457d68010f2c5c86d2a5b19deeabcd1aa6a
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
private final Map<String, Object> metricsMetadata;
private ExecutorService executor;

// todo handle/test case when there is finalizer but not ours
public EventProcessor(
EventSourceManager<P> eventSourceManager, ConfigurationService configurationService) {
this(
@@ -122,15 +123,15 @@ public synchronized void handleEvent(Event event) {
}

private void handleMarkedEventForResource(ResourceState<P> state) {
if (skipDeleteEventProcessing(state)) {
if (doCleanupForDeleteEvent(state)) {
cleanupForDeletedEvent(state.getId());
} else if (!state.processedMarkForDeletionPresent()
&& !state.deleteEventReconciliationSubmitted()) {
submitReconciliationExecution(state);
}
}

private boolean skipDeleteEventProcessing(ResourceState<P> state) {
private boolean doCleanupForDeleteEvent(ResourceState<P> state) {
return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete();
}

@@ -146,6 +147,7 @@ private void submitReconciliationExecution(ResourceState<P> state) {
rateLimit = rateLimiter.initState();
state.setRateLimit(rateLimit);
}
// todo rate limit handling
var rateLimiterPermission = rateLimiter.isLimited(rateLimit);
if (rateLimiterPermission.isPresent()) {
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
@@ -158,13 +160,16 @@ private void submitReconciliationExecution(ResourceState<P> state) {

if (state.deleteEventPresent()) {
state.markDeleteEventReconciliationSubmitted();
} else {
} else if (!state.deleteEventReconciliationSubmitted()) { // if there is a retry
state.unMarkEventReceived();
}

metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ReconcilerExecutor(resourceID, executionScope));
executor.execute(
new ReconcilerExecutor(
resourceID,
executionScope,
state.deleteEventReconciliationSubmitted() ? latest : null));
} else {
log.debug(
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest"
@@ -191,7 +196,8 @@ private Optional<P> getCachedResource(ResourceID resourceID, ResourceState<P> st
if (resource.isPresent()) {
return resource;
}
if (controllerConfiguration.reconcileOnPrimaryDelete() && state.deleteEventPresent()) {
if (controllerConfiguration.reconcileOnPrimaryDelete()
&& (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted())) {
return Optional.of(state.getDeletedResource());
}
return Optional.empty();
@@ -206,10 +212,14 @@ private void handleEventMarking(Event event, ResourceState<P> state) {
// todo check can there be delete event without resource?
state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow());
} else {
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
if (state.deleteEventReconciliationSubmitted()
|| (state.processedMarkForDeletionPresent()
&& isResourceMarkedForDeletion(resourceEvent))) {
log.debug(
"Skipping mark of event received, since already processed mark for deletion and"
+ " resource marked for deletion: {}",
"Skipping mark of event received, delete event reconciliation submitted ({}), or"
+ " marked for deletion but already processed mark. resource marked for deletion:"
+ " {}",
state.deleteEventReconciliationSubmitted(),
relatedCustomResourceID);
return;
}
@@ -221,15 +231,17 @@ private void handleEventMarking(Event event, ResourceState<P> state) {
// event as below.
markEventReceived(state);
}
// todo this if is weird
} else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
} else if (!state.deleteEventPresent()
&& !state.processedMarkForDeletionPresent()
&& !state.deleteEventReconciliationSubmitted()) {
markEventReceived(state);
} else if (log.isDebugEnabled()) {
log.debug(
"Skipped marking event as received. Delete event present: {}, processed mark for"
+ " deletion: {}",
+ " deletion: {}, delete event reconciliation submitted: {}",
state.deleteEventPresent(),
state.processedMarkForDeletionPresent());
state.processedMarkForDeletionPresent(),
state.deleteEventReconciliationSubmitted());
}
}

@@ -469,10 +481,13 @@ private void handleAlreadyMarkedEvents() {
private class ReconcilerExecutor implements Runnable {
private final ExecutionScope<P> executionScope;
private final ResourceID resourceID;
private final P deleteEventResource;

private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionScope) {
private ReconcilerExecutor(
ResourceID resourceID, ExecutionScope<P> executionScope, P deleteEventResource) {
this.executionScope = executionScope;
this.resourceID = resourceID;
this.deleteEventResource = deleteEventResource;
}

@Override
@@ -488,7 +503,9 @@ public void run() {
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
var actualResource = cache.get(resourceID);

var actualResource =
deleteEventResource != null ? Optional.of(deleteEventResource) : cache.get(resourceID);
if (actualResource.isEmpty()) {
log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
return;