Skip to content

Commit 2a3a376

Browse files
authored
feat: delete s2 granules after success (#309)
1 parent ce1f0cf commit 2a3a376

File tree

5 files changed

+204
-6
lines changed

5 files changed

+204
-6
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""
2+
HLS: Remove Sentinel-2 granules after successful processing
3+
4+
This Lambda function is connected in our workflow after the check for
5+
failures in the "success" pathway. As such we do not need to ensure the
6+
granule has been successfully processed within this scope.
7+
8+
The only complication involved is handling of "twin" granules because
9+
we need to ensure the first granule processing workflow does not delete
10+
the 2nd of the "twin" granules. A "twin" granule occurs when the same
11+
tile and date are acquired twice due to the satellite switching receiving
12+
stations during the downlink. Two granules exist for the same date and tile,
13+
and for complete coverage we must process and combine the two granules
14+
together.
15+
16+
A "twin" granule is processed in two steps,
17+
18+
1. The first of the "twin" is downloaded into the input bucket, triggering a
19+
workflow to process this granule.
20+
2. This first workflow only finds one of the two granules that will eventually be
21+
downloaded, and only processes the first.
22+
3. The second of the "twin" granules is downloaded into the input bucket, triggering
23+
a workflow to process this granule.
24+
4. This second workflow finds two input granule IDs, and processes them both.
25+
26+
In this scenario we only want to delete the inputs for the second workflow.
27+
"""
28+
29+
import os
30+
31+
import boto3
32+
33+
34+
s3 = boto3.client("s3")
35+
bucket = os.getenv("SENTINEL_INPUT_BUCKET", None)
36+
if bucket is None:
37+
raise Exception("No Input Bucket set")
38+
39+
40+
def handler(event: dict, context: dict):
41+
# We may receive 2 granules split by a comma if this is a twin granule workflow
42+
granules = event["granule"].split(",")
43+
44+
prefixes = {granule[0:-6] for granule in granules}
45+
if len(prefixes) != 1:
46+
raise ValueError(f"Received {len(prefixes)} granule prefixes")
47+
prefix = list(prefixes)[0]
48+
49+
response = s3.list_objects_v2(
50+
Bucket=bucket,
51+
Prefix=prefix,
52+
)
53+
54+
granule_zips = [obj["Key"] for obj in response["Contents"]]
55+
56+
# We have three possible cases,
57+
# 1. Non-twin granule (1 ID, 1 zip)
58+
# 2. Twin granule input and twin granule job (2 IDs, 2 zips)
59+
if len(granules) == len(granule_zips):
60+
if len(granules) > 1:
61+
print(f"Deleting inputs of twin granule case: {granule_zips}")
62+
else:
63+
print(f"Deleting input of single granule case: {granule_zips[0]}")
64+
65+
for granule_zip in granule_zips:
66+
s3.delete_object(Bucket=bucket, Key=granule_zip)
67+
return granule_zips
68+
69+
# 3. Twin granules downloaded but only one was processed in this workflow
70+
else:
71+
print(
72+
"Twin granule case detected but this workflow did not process it. "
73+
f"Skipping deletion (IDs={granules}, zips={granule_zips})"
74+
)
75+
return []
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from unittest.mock import patch
2+
import pytest
3+
4+
5+
@pytest.fixture(autouse=True)
6+
def env_setup(monkeypatch):
7+
monkeypatch.setenv("SENTINEL_INPUT_BUCKET", "sentinelinput")
8+
9+
10+
@pytest.fixture
11+
def mock_delete_object():
12+
from lambda_functions.cleanup_sentinel2_granules import s3
13+
14+
with patch.object(s3, "delete_object") as mock_delete_object:
15+
yield mock_delete_object
16+
17+
18+
def test_single_granule_case(mock_delete_object, capsys):
19+
from lambda_functions.cleanup_sentinel2_granules import handler, s3
20+
21+
with patch.object(
22+
s3,
23+
"list_objects_v2",
24+
return_value={"Contents": [{"Key": "one.zip"}]},
25+
):
26+
handler({"granule": "one12345"}, {})
27+
assert mock_delete_object.call_count == 1
28+
29+
captured = capsys.readouterr()
30+
assert "Deleting input of single granule case" in captured.out
31+
32+
33+
def test_twin_granule_only_one_skip_delete(mock_delete_object, capsys):
34+
from lambda_functions.cleanup_sentinel2_granules import handler, s3
35+
36+
with patch.object(
37+
s3,
38+
"list_objects_v2",
39+
return_value={"Contents": [{"Key": "one.zip"}, {"Key": "two.zip"}]},
40+
):
41+
handler({"granule": "one12345"}, {})
42+
mock_delete_object.assert_not_called()
43+
44+
captured = capsys.readouterr()
45+
assert (
46+
"Twin granule case detected but this workflow did not process it. Skipping"
47+
in captured.out
48+
)
49+
50+
51+
def test_twin_granule_has_both_deletes_both(mock_delete_object, capsys):
52+
from lambda_functions.cleanup_sentinel2_granules import handler, s3
53+
54+
with patch.object(
55+
s3,
56+
"list_objects_v2",
57+
return_value={"Contents": [{"Key": "one.zip"}, {"Key": "two.zip"}]},
58+
):
59+
handler({"granule": "one12345_111111,one12345_222222"}, {})
60+
assert mock_delete_object.call_count == 2
61+
62+
captured = capsys.readouterr()
63+
assert "Deleting inputs of twin granule case" in captured.out
64+
65+
66+
def test_bad_granule_id_inputs(mock_delete_object):
67+
from lambda_functions.cleanup_sentinel2_granules import handler, s3
68+
69+
with (
70+
patch.object(
71+
s3,
72+
"list_objects_v2",
73+
return_value={"Contents": [{"Key": "one.zip"}, {"Key": "two.zip"}]},
74+
) as mock_list_objects,
75+
pytest.raises(ValueError, match=r"Received 2 granule prefixes"),
76+
):
77+
handler({"granule": "thisiswronggranuleid,obviouslynotagranuleid"}, {})
78+
mock_list_objects.assert_not_called()
79+
mock_delete_object.assert_not_called()

stack/hlsconstructs/lambdafunc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(
2727
code_str: str = None,
2828
package_code_dir: str = None,
2929
env: Dict = None,
30-
runtime: aws_lambda.Runtime = aws_lambda.Runtime.PYTHON_3_8,
30+
runtime: aws_lambda.Runtime = aws_lambda.Runtime.PYTHON_3_9,
3131
handler: str = "index.handler",
3232
layers: list = None,
3333
cron_str: str = None,

stack/hlsconstructs/sentinel_step_function.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
2-
from typing import Union
2+
from typing import Optional, Union
33

4-
from aws_cdk import aws_iam, aws_stepfunctions
4+
from aws_cdk import aws_stepfunctions
55
from constructs import Construct
66
from hlsconstructs.batch_step_function import BatchStepFunction
77
from hlsconstructs.lambdafunc import Lambda
@@ -22,6 +22,7 @@ def __init__(
2222
sentinel_ac_logger: Lambda,
2323
sentinel_logger: Lambda,
2424
check_exit_code: Lambda,
25+
cleanup_granule: Optional[Lambda],
2526
replace_existing: bool,
2627
gibs_outputbucket: str,
2728
debug_bucket: Union[bool, str] = False,
@@ -124,24 +125,27 @@ def __init__(
124125
"Type": "Task",
125126
"Resource": sentinel_ac_logger.function.function_arn,
126127
"Next": "CheckSentinelExitCode",
128+
"ResultPath": "$.exitCode",
127129
"Retry": [retry],
128130
},
129131
"CheckSentinelExitCode": {
130132
"Type": "Task",
131133
"Resource": check_exit_code.function.function_arn,
132134
"Next": "HadSentinelFailure",
135+
"InputPath": "$.exitCode",
136+
"ResultPath": "$.success",
133137
"Retry": [retry],
134138
},
135139
"HadSentinelFailure": {
136140
"Type": "Choice",
137141
"Choices": [
138142
{
139-
"Variable": "$",
143+
"Variable": "$.success",
140144
"BooleanEquals": True,
141145
"Next": "Done",
142146
},
143147
{
144-
"Variable": "$",
148+
"Variable": "$.success",
145149
"BooleanEquals": False,
146150
"Next": "Error",
147151
},
@@ -153,6 +157,20 @@ def __init__(
153157
},
154158
}
155159

160+
# Add "cleanup" step to delete successfully processed granules for
161+
# forward processing, but not historic
162+
if cleanup_granule is not None:
163+
sentinel_state_definition["States"]["CleanupGranule"] = {
164+
"Type": "Task",
165+
"Resource": cleanup_granule.function.function_arn,
166+
"Next": "Done",
167+
"InputPath": "$",
168+
"Retry": [retry],
169+
}
170+
sentinel_state_definition["States"]["HadSentinelFailure"]["Choices"][0][
171+
"Next"
172+
] = "CleanupGranule"
173+
156174
if debug_bucket:
157175
sentinel_state_definition["States"]["ProcessSentinel"]["Parameters"][
158176
"ContainerOverrides"

stack/stack.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
304304
os.path.dirname(__file__), "..", "layers", "hls_lambda_layer"
305305
)
306306
),
307-
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_8],
307+
compatible_runtimes=[aws_lambda.Runtime.PYTHON_3_9],
308308
)
309309

310310
self.pr2mgrs_lambda = Lambda(
@@ -492,6 +492,14 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
492492
layers=[self.hls_lambda_layer],
493493
)
494494

495+
self.cleanup_sentinel2_granule = Lambda(
496+
self,
497+
"CleanupSentinelSuccesses",
498+
code_file="cleanup_sentinel2_granules.py",
499+
env={"SENTINEL_INPUT_BUCKET": SENTINEL_INPUT_BUCKET},
500+
timeout=120,
501+
)
502+
495503
self.get_random_wait = Lambda(
496504
self,
497505
"GetRandomWait",
@@ -625,6 +633,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
625633
sentinel_ac_logger=self.sentinel_ac_logger,
626634
sentinel_logger=self.sentinel_logger,
627635
check_exit_code=self.check_exit_code,
636+
cleanup_granule=self.cleanup_sentinel2_granule,
628637
outputbucket_role_arn=OUTPUT_BUCKET_ROLE_ARN,
629638
replace_existing=REPLACE_EXISTING,
630639
gibs_outputbucket=GIBS_OUTPUT_BUCKET,
@@ -643,6 +652,9 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
643652
sentinel_ac_logger=self.sentinel_ac_logger,
644653
sentinel_logger=self.sentinel_logger_historic,
645654
check_exit_code=self.check_exit_code,
655+
# Do not cleanup granules for historic workflow to avoid
656+
# twin granule race condition
657+
cleanup_granule=None,
646658
outputbucket_role_arn=OUTPUT_BUCKET_ROLE_ARN,
647659
replace_existing=REPLACE_EXISTING,
648660
gibs_outputbucket=GIBS_OUTPUT_BUCKET_HISTORIC,
@@ -1042,6 +1054,20 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:
10421054
self.sentinel_input_bucket_historic_policy
10431055
)
10441056

1057+
self.cleanup_sentinel_input_bucket_policy = aws_iam.PolicyStatement(
1058+
resources=[
1059+
self.sentinel_input_bucket.bucket_arn,
1060+
f"{self.sentinel_input_bucket.bucket_arn}/*",
1061+
],
1062+
actions=[
1063+
"s3:List*",
1064+
"s3:DeleteObject",
1065+
],
1066+
)
1067+
self.cleanup_sentinel2_granule.function.add_to_role_policy(
1068+
self.cleanup_sentinel_input_bucket_policy
1069+
)
1070+
10451071
self.laads_task.role.add_to_policy(
10461072
aws_iam.PolicyStatement(
10471073
resources=[

0 commit comments

Comments
 (0)