Skip to content

Commit f4d1ab1

Browse files
Merge pull request #246 from NASA-IMPACT/fix/jsonb_query_logic
Refactor Sentinel 2 error processing to use dedicated status fields instead of jsonb.
2 parents e03e676 + 87cd7ac commit f4d1ab1

14 files changed

+485
-313
lines changed

lambda_functions/process_sentinel_errors.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,10 @@ def handler(event, context):
6262

6363
q = (
6464
"SELECT id, granule from sentinel_log WHERE"
65-
+ " (jobinfo->'Container'->>'ExitCode' NOT IN ('0', '137', '3', '4')"
66-
+ " or jobinfo->'Container'->>'ExitCode' is NULL)"
67-
+ " AND jobinfo is NOT NULL"
65+
+ " unexpected_error"
6866
+ " AND run_count < :retry_limit::integer"
6967
+ " AND historic = :historic_value::boolean"
70-
+ " LIMIT 4000;"
68+
+ " LIMIT 1000;"
7169
)
7270
print(q)
7371
response = execute_statement(

lambda_functions/sentinel_ac_logger.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,50 @@ def handler(event, context):
3838
jobinfo, jobinfostring, exitcode = itemgetter(
3939
"jobinfo", "jobinfostring", "exitcode"
4040
)(parsed_info)
41+
print(f"Exit Code is {exitcode}")
42+
43+
succeeded = False
44+
expected_error = False
45+
unexpected_error = True
46+
47+
if exitcode == 0:
48+
succeeded = True
49+
unexpected_error = False
50+
elif exitcode in [137, 3, 4]:
51+
succeeded = False
52+
expected_error = True
53+
unexpected_error = False
54+
else:
55+
succeeded = False
56+
expected_error = False
57+
unexpected_error = True
58+
59+
if "id" in event:
60+
selector_string = " WHERE id = :selector"
61+
selector_value = event["id"]
62+
selector_parameter = {
63+
"name": "selector",
64+
"value": {"longValue": selector_value},
65+
}
66+
else:
67+
selector_string = " WHERE granule = :selector::text"
68+
selector_value = event["granule"]
69+
selector_parameter = {
70+
"name": "selector",
71+
"value": {"stringValue": selector_value},
72+
}
4173
q = (
42-
"UPDATE sentinel_log SET (jobinfo, run_count) ="
43-
+ " (:jobinfo::jsonb, run_count + 1)"
44-
+ " WHERE granule = :granule::text"
74+
"UPDATE sentinel_log SET"
75+
+ " (jobinfo, run_count, succeeded, expected_error, unexpected_error) ="
76+
+ " (:jobinfo::jsonb, run_count + 1, :succeeded::boolean, :expected_error::boolean, :unexpected_error::boolean)"
77+
+ selector_string
4578
)
4679
sql_parameters = [
4780
{"name": "jobinfo", "value": {"stringValue": jobinfostring}},
48-
{"name": "granule", "value": {"stringValue": event["granule"]}},
81+
{"name": "succeeded", "value": {"booleanValue": succeeded}},
82+
{"name": "expected_error", "value": {"booleanValue": expected_error}},
83+
{"name": "unexpected_error", "value": {"booleanValue": unexpected_error}},
4984
]
85+
sql_parameters.append(selector_parameter)
5086
execute_statement(q, sql_parameters=sql_parameters)
51-
print(f"Exit Code is {exitcode}")
5287
return exitcode

lambda_functions/setupdb.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,44 +72,23 @@ def execute_statement(sql, sql_parameters=[]):
7272
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS run_count INTEGER;
7373
7474
DROP VIEW IF EXISTS sentinel_granule_log;
75-
CREATE VIEW sentinel_granule_log AS
76-
select id, ts, granule, run_count,
77-
jobinfo->>'Status' as status,
78-
to_timestamp((jobinfo->>'CreatedAt')::float/1000) as job_created,
79-
to_timestamp((jobinfo->>'StartedAt')::float/1000) as job_started,
80-
to_timestamp((jobinfo->>'StoppedAt')::float/1000) as job_stopped,
81-
jobinfo
82-
from sentinel_log WHERE jobinfo IS NOT NULL;
83-
84-
DROP VIEW IF EXISTS granule_log;
75+
8576
DROP FUNCTION IF EXISTS granule(IN event jsonb, OUT granule text);
8677
8778
ALTER TABLE landsat_ac_log ADD COLUMN IF NOT EXISTS run_count INTEGER;
8879
ALTER TABLE landsat_mgrs_log ADD COLUMN IF NOT EXISTS run_count INTEGER;
8980
9081
DROP VIEW IF EXISTS landsat_ac_granule_log;
91-
CREATE VIEW landsat_ac_granule_log AS
92-
select id, ts, run_count, scene_id,
93-
jobinfo->>'Status' as status,
94-
to_timestamp((jobinfo->>'CreatedAt')::float/1000) as job_created,
95-
to_timestamp((jobinfo->>'StartedAt')::float/1000) as job_started,
96-
to_timestamp((jobinfo->>'StoppedAt')::float/1000) as job_stopped,
97-
jobinfo
98-
from landsat_ac_log WHERE jobinfo IS NOT NULL;
99-
10082
DROP VIEW IF EXISTS landsat_mgrs_granule_log;
101-
CREATE VIEW landsat_mgrs_granule_log AS
102-
select id, ts, run_count,
103-
jobinfo->>'Status' as status,
104-
to_timestamp((jobinfo->>'CreatedAt')::float/1000) as job_created,
105-
to_timestamp((jobinfo->>'StartedAt')::float/1000) as job_started,
106-
to_timestamp((jobinfo->>'StoppedAt')::float/1000) as job_stopped,
107-
jobinfo
108-
from landsat_mgrs_log WHERE jobinfo IS NOT NULL;
10983
11084
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS historic BOOLEAN;
11185
ALTER TABLE landsat_ac_log ADD COLUMN IF NOT EXISTS historic BOOLEAN;
11286
ALTER TABLE landsat_mgrs_log ADD COLUMN IF NOT EXISTS historic BOOLEAN;
87+
88+
89+
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS succeeded BOOLEAN;
90+
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS expected_error BOOLEAN;
91+
ALTER TABLE sentinel_log ADD COLUMN IF NOT EXISTS unexpected_error BOOLEAN;
11392
"""
11493

11594

lambda_functions/tests/test_sentinel_ac_logger.py

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import pytest
55
from hls_lambda_layer.batch_test_events import (
6+
batch_expected_failed_event,
67
batch_failed_event,
78
batch_failed_event_string_cause,
89
batch_succeeded_event,
@@ -20,15 +21,45 @@ def test_handler_keyError(client):
2021
}
2122
cause = json.loads(event["jobinfo"]["Cause"])
2223
jobinfo = {"name": "jobinfo", "value": {"stringValue": json.dumps(cause)}}
23-
granule = {"name": "granule", "value": {"stringValue": event["granule"]}}
24+
selector = {"name": "selector", "value": {"stringValue": event["granule"]}}
25+
succeeded = {"name": "succeeded", "value": {"booleanValue": False}}
26+
expected_error = {"name": "expected_error", "value": {"booleanValue": False}}
27+
unexpected_error = {"name": "unexpected_error", "value": {"booleanValue": True}}
2428
client.execute_statement.return_value = {}
2529
output = handler(event, {})
2630
args, kwargs = client.execute_statement.call_args
2731
assert jobinfo in kwargs["parameters"]
28-
assert granule in kwargs["parameters"]
32+
assert selector in kwargs["parameters"]
33+
assert succeeded in kwargs["parameters"]
34+
assert expected_error in kwargs["parameters"]
35+
assert unexpected_error in kwargs["parameters"]
2936
assert output == 1
3037

3138

39+
@patch("lambda_functions.sentinel_ac_logger.rds_client")
40+
def test_handler_expected_keyError(client):
41+
"""Test handler."""
42+
event = {
43+
"granule": "S2A_MSIL1C_20200708T232851_N0209_R044_T58LEP_20200709T005119",
44+
"jobinfo": batch_expected_failed_event,
45+
}
46+
cause = json.loads(event["jobinfo"]["Cause"])
47+
jobinfo = {"name": "jobinfo", "value": {"stringValue": json.dumps(cause)}}
48+
selector = {"name": "selector", "value": {"stringValue": event["granule"]}}
49+
succeeded = {"name": "succeeded", "value": {"booleanValue": False}}
50+
expected_error = {"name": "expected_error", "value": {"booleanValue": True}}
51+
unexpected_error = {"name": "unexpected_error", "value": {"booleanValue": False}}
52+
client.execute_statement.return_value = {}
53+
output = handler(event, {})
54+
args, kwargs = client.execute_statement.call_args
55+
assert jobinfo in kwargs["parameters"]
56+
assert selector in kwargs["parameters"]
57+
assert succeeded in kwargs["parameters"]
58+
assert expected_error in kwargs["parameters"]
59+
assert unexpected_error in kwargs["parameters"]
60+
assert output == 137
61+
62+
3263
@patch("lambda_functions.sentinel_ac_logger.rds_client")
3364
def test_handler(client):
3465
"""Test handler."""
@@ -43,8 +74,44 @@ def test_handler(client):
4374
"name": "jobinfo",
4475
"value": {"stringValue": json.dumps(event["jobinfo"])},
4576
}
77+
selector = {"name": "selector", "value": {"stringValue": event["granule"]}}
78+
succeeded = {"name": "succeeded", "value": {"booleanValue": True}}
79+
expected_error = {"name": "expected_error", "value": {"booleanValue": False}}
80+
unexpected_error = {"name": "unexpected_error", "value": {"booleanValue": False}}
81+
assert jobinfo in kwargs["parameters"]
82+
assert succeeded in kwargs["parameters"]
83+
assert expected_error in kwargs["parameters"]
84+
assert unexpected_error in kwargs["parameters"]
85+
assert selector in kwargs["parameters"]
86+
assert output == 0
87+
assert "WHERE granule" in kwargs["sql"]
88+
89+
90+
@patch("lambda_functions.sentinel_ac_logger.rds_client")
91+
def test_handler_update_by_id(client):
92+
"""Test handler."""
93+
event = {
94+
"id": 1,
95+
"jobinfo": batch_succeeded_event,
96+
}
97+
client.execute_statement.return_value = {}
98+
output = handler(event, {})
99+
args, kwargs = client.execute_statement.call_args
100+
jobinfo = {
101+
"name": "jobinfo",
102+
"value": {"stringValue": json.dumps(event["jobinfo"])},
103+
}
104+
selector = {"name": "selector", "value": {"longValue": event["id"]}}
105+
succeeded = {"name": "succeeded", "value": {"booleanValue": True}}
106+
expected_error = {"name": "expected_error", "value": {"booleanValue": False}}
107+
unexpected_error = {"name": "unexpected_error", "value": {"booleanValue": False}}
46108
assert jobinfo in kwargs["parameters"]
109+
assert succeeded in kwargs["parameters"]
110+
assert expected_error in kwargs["parameters"]
111+
assert unexpected_error in kwargs["parameters"]
47112
assert output == 0
113+
assert selector in kwargs["parameters"]
114+
assert "WHERE id" in kwargs["sql"]
48115

49116

50117
@patch("lambda_functions.sentinel_ac_logger.rds_client")
@@ -58,8 +125,14 @@ def test_handler_valueError(client):
58125
jobinfo_value = {"cause": event["jobinfo"]["Cause"]}
59126

60127
jobinfo = {"name": "jobinfo", "value": {"stringValue": json.dumps(jobinfo_value)}}
128+
succeeded = {"name": "succeeded", "value": {"booleanValue": False}}
129+
expected_error = {"name": "expected_error", "value": {"booleanValue": False}}
130+
unexpected_error = {"name": "unexpected_error", "value": {"booleanValue": True}}
61131
client.execute_statement.return_value = {}
62132
output = handler(event, {})
63133
args, kwargs = client.execute_statement.call_args
64134
assert jobinfo in kwargs["parameters"]
135+
assert succeeded in kwargs["parameters"]
136+
assert expected_error in kwargs["parameters"]
137+
assert unexpected_error in kwargs["parameters"]
65138
assert output == "nocode"

lambda_functions/update_sentinel_failure.py

Lines changed: 0 additions & 44 deletions
This file was deleted.

layers/hls_lambda_layer/python/hls_lambda_layer/batch_test_events.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
cause = '{"Attempts":[{"Container":{"ContainerInstanceArn":"arn:aws:ecs:us-west-2:018923174646:container-instance/cebc97a6-5c16-4987-9d24-27b96246dcdd","ExitCode":1,"LogStreamName":"LandsatTaskBatchJob1274-131d33be785ce5d/default/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","NetworkInterfaces":[],"TaskArn":"arn:aws:ecs:us-west-2:018923174646:task/7c7cd09e-33c0-44f5-919a-2f66e0b8e995"},"StartedAt":1591403928719,"StatusReason":"Essential container in task exited","StoppedAt":1591403929450}],"Container":{"Command":["export && landsat.sh"],"ContainerInstanceArn":"arn:aws:ecs:us-west-2:018923174646:container-instance/cebc97a6-5c16-4987-9d24-27b96246dcdd","Environment":[{"Name":"PREFIX","Value":"x1/L8/127/010/LC08_L1GT_127010_20200527_20200527_01_RT"},{"Name":"GRANULE","Value":"LC08_L1GT_127010_20200527_20200527_01_RT"},{"Name":"OUTPUT_BUCKET","Value":"hls-development-landsat-intermediate-output"},{"Name":"LASRC_AUX_DIR","Value":"/var/lasrc_aux"},{"Name":"MANAGED_BY_AWS","Value":"STARTED_BY_STEP_FUNCTIONS"},{"Name":"INPUT_BUCKET","Value":"landsat-pds"},{"Name":"REPLACE_EXISTING","Value":"replace"}],"ExitCode":1,"Image":"018923174646.dkr.ecr.us-west-2.amazonaws.com/hls-landsat:latest","JobRoleArn":"arn:aws:iam::018923174646:role/hls-development-LandsatTaskTaskRoleFD2391A2-440VUZKTYZ2O","LogStreamName":"LandsatTaskBatchJob1274-131d33be785ce5d/default/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","Memory":12000,"MountPoints":[{"ContainerPath":"/var/lasrc_aux","ReadOnly":false,"SourceVolume":"volume"},{"ContainerPath":"/var/scratch","ReadOnly":false,"SourceVolume":"scratch_volume"}],"NetworkInterfaces":[],"ResourceRequirements":[],"TaskArn":"arn:aws:ecs:us-west-2:018923174646:task/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","Ulimits":[],"Vcpus":2,"Volumes":[{"Host":{"SourcePath":"/mnt/efs"},"Name":"volume"},{"Host":{"SourcePath":"/scratch"},"Name":"scratch_volume"}]},"CreatedAt":1591403386307,"DependsOn":[],"JobDefinition":"arn:aws:batch:us-west-2:018923174646:job-definition/LandsatTaskBatchJob1274-131d33be785ce5d:2","JobId":"5ce9a71e-2f18-4dd1-b9ac-9d3618774d3f","JobName":"LandsatAcJob","JobQueue":"arn:aws:batch:us-west-2:018923174646:job-queue/BatchJobQueueFD3B0361-88c4344c33bfb4a","Parameters":{},"RetryStrategy":{"Attempts":1},"StartedAt":1591403928719,"Status":"FAILED","StatusReason":"Essential container in task exited","StoppedAt":1591403929450,"Timeout":{"AttemptDurationSeconds":5400}}'
2+
expected_cause = cause.replace('"ExitCode":1', '"ExitCode":137')
3+
14
batch_failed_event = {
25
"Error": "States.TaskFailed",
3-
"Cause": '{"Attempts":[{"Container":{"ContainerInstanceArn":"arn:aws:ecs:us-west-2:018923174646:container-instance/cebc97a6-5c16-4987-9d24-27b96246dcdd","ExitCode":1,"LogStreamName":"LandsatTaskBatchJob1274-131d33be785ce5d/default/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","NetworkInterfaces":[],"TaskArn":"arn:aws:ecs:us-west-2:018923174646:task/7c7cd09e-33c0-44f5-919a-2f66e0b8e995"},"StartedAt":1591403928719,"StatusReason":"Essential container in task exited","StoppedAt":1591403929450}],"Container":{"Command":["export && landsat.sh"],"ContainerInstanceArn":"arn:aws:ecs:us-west-2:018923174646:container-instance/cebc97a6-5c16-4987-9d24-27b96246dcdd","Environment":[{"Name":"PREFIX","Value":"x1/L8/127/010/LC08_L1GT_127010_20200527_20200527_01_RT"},{"Name":"GRANULE","Value":"LC08_L1GT_127010_20200527_20200527_01_RT"},{"Name":"OUTPUT_BUCKET","Value":"hls-development-landsat-intermediate-output"},{"Name":"LASRC_AUX_DIR","Value":"/var/lasrc_aux"},{"Name":"MANAGED_BY_AWS","Value":"STARTED_BY_STEP_FUNCTIONS"},{"Name":"INPUT_BUCKET","Value":"landsat-pds"},{"Name":"REPLACE_EXISTING","Value":"replace"}],"ExitCode":1,"Image":"018923174646.dkr.ecr.us-west-2.amazonaws.com/hls-landsat:latest","JobRoleArn":"arn:aws:iam::018923174646:role/hls-development-LandsatTaskTaskRoleFD2391A2-440VUZKTYZ2O","LogStreamName":"LandsatTaskBatchJob1274-131d33be785ce5d/default/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","Memory":12000,"MountPoints":[{"ContainerPath":"/var/lasrc_aux","ReadOnly":false,"SourceVolume":"volume"},{"ContainerPath":"/var/scratch","ReadOnly":false,"SourceVolume":"scratch_volume"}],"NetworkInterfaces":[],"ResourceRequirements":[],"TaskArn":"arn:aws:ecs:us-west-2:018923174646:task/7c7cd09e-33c0-44f5-919a-2f66e0b8e995","Ulimits":[],"Vcpus":2,"Volumes":[{"Host":{"SourcePath":"/mnt/efs"},"Name":"volume"},{"Host":{"SourcePath":"/scratch"},"Name":"scratch_volume"}]},"CreatedAt":1591403386307,"DependsOn":[],"JobDefinition":"arn:aws:batch:us-west-2:018923174646:job-definition/LandsatTaskBatchJob1274-131d33be785ce5d:2","JobId":"5ce9a71e-2f18-4dd1-b9ac-9d3618774d3f","JobName":"LandsatAcJob","JobQueue":"arn:aws:batch:us-west-2:018923174646:job-queue/BatchJobQueueFD3B0361-88c4344c33bfb4a","Parameters":{},"RetryStrategy":{"Attempts":1},"StartedAt":1591403928719,"Status":"FAILED","StatusReason":"Essential container in task exited","StoppedAt":1591403929450,"Timeout":{"AttemptDurationSeconds":5400}}',
6+
"Cause": cause,
47
}
58

9+
batch_expected_failed_event = {"Error": "States.TaskFailed", "Cause": expected_cause}
10+
11+
612
batch_succeeded_event = {
713
"Attempts": [
814
{

scripts/create_job_definition.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import sys
2+
3+
import boto3
4+
5+
image = sys.argv[1]
6+
pr_number = sys.argv[2]
7+
8+
if image == "hls-laads":
9+
role_arn = "arn:aws:iam::018923174646:role/hls-development-LaadsTaskTaskRoleC8A42E20-2V99O0N3RMJ0"
10+
if image == "hls-sentinel":
11+
role_arn = "arn:aws:iam::018923174646:role/hls-development-SentinelTaskTaskRoleEB97BB3F-1AIL1PXLA4JKJ"
12+
if image == "hls-landsat-c2":
13+
role_arn = "arn:aws:iam::018923174646:role/hls-development-LandsatTaskTaskRoleFD2391A2-440VUZKTYZ2O"
14+
15+
16+
client = boto3.client("batch")
17+
response = client.register_job_definition(
18+
jobDefinitionName=f"{image}_{pr_number}",
19+
type="container",
20+
containerProperties={
21+
"image": f"018923174646.dkr.ecr.us-west-2.amazonaws.com/{image}:{pr_number}",
22+
"vcpus": 2,
23+
"memory": 10000,
24+
"command": [],
25+
"jobRoleArn": role_arn,
26+
"volumes": [
27+
{"host": {"sourcePath": "/mnt/efs"}, "name": "volume"},
28+
{"host": {"sourcePath": "/scratch"}, "name": "scratch_volume"},
29+
],
30+
"mountPoints": [
31+
{
32+
"containerPath": "/var/lasrc_aux",
33+
"readOnly": False,
34+
"sourceVolume": "volume",
35+
},
36+
{
37+
"containerPath": "/var/scratch",
38+
"readOnly": False,
39+
"sourceVolume": "scratch_volume",
40+
},
41+
],
42+
},
43+
timeout={"attemptDurationSeconds": 259200},
44+
)
45+
print(response["jobDefinitionArn"])

scripts/run_landsat_tile_validation.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)