1
+ from collections .abc import Callable
1
2
from typing import Any , Final , Mapping
2
3
3
4
from django .db .models import Model
21
22
MODELS : Final [tuple [type [Model ], ...]] = (Office , Program , BeneficiaryGroup )
22
23
"""List of models to synchronize."""
23
24
25
+ OFFICE_FIELDS = "name" , "slug" , "code" , "long_name" , "active"
26
+ BENEFICIARY_GROUP_FIELDS = (
27
+ "name" ,
28
+ "group_label" ,
29
+ "group_label_plural" ,
30
+ "member_label" ,
31
+ "member_label_plural" ,
32
+ "master_detail" ,
33
+ )
34
+ Program_FIELDS = "name" , "code" , "status" , "sector" , "country_office" , "beneficiary_group"
35
+
36
+ HOPE_ID = "hope_id"
37
+ BUSINESS_AREAS = "business_areas"
38
+ BENEFICIARY_GROUPS = "beneficiary-groups"
39
+ PROGRAMS = "programs"
40
+
24
41
25
42
def get_default_checkers () -> Mapping [str , DataChecker ]:
26
43
return {
@@ -33,15 +50,26 @@ def get_default_checkers() -> Mapping[str, DataChecker]:
33
50
}
34
51
35
52
53
+ def get_field_extractor (fields : tuple [str , ...]) -> Callable [[dict [str , Any ]], dict [str , Any ]]:
54
+ def field_extractor (record : dict [str , Any ]) -> dict [str , Any ]:
55
+ return {field : record .get (field ) for field in fields }
56
+
57
+ return field_extractor
58
+
59
+
60
+ def should_process_office (record : dict [str , Any ]) -> bool :
61
+ return bool (record .get ("active" ))
62
+
63
+
36
64
def sync_offices (delta_sync : bool = False ) -> None :
37
65
"""Fetch and process Office records from the remote API, deactivating those not present in the source."""
38
66
sync_entity (
39
- SyncConfig (
67
+ SyncConfig [ Office ] (
40
68
model = Office ,
41
- reference_id = "hope_id" ,
42
- endpoint = build_endpoint ("business_areas" , Office , ParamDateName .UPDATED , delta_sync ),
43
- prepare_defaults = lambda r : { f : r . get ( f ) for f in ( "name" , "slug" , "code" , "long_name" , "active" )} ,
44
- should_process = lambda r : r . get ( "active" ) ,
69
+ reference_id = HOPE_ID ,
70
+ endpoint = build_endpoint (BUSINESS_AREAS , Office , ParamDateName .UPDATED , delta_sync ),
71
+ prepare_defaults = get_field_extractor ( OFFICE_FIELDS ) ,
72
+ should_process = should_process_office ,
45
73
delta_sync = delta_sync ,
46
74
)
47
75
)
@@ -50,75 +78,72 @@ def sync_offices(delta_sync: bool = False) -> None:
50
78
def sync_beneficiary_groups (delta_sync : bool = False ) -> None :
51
79
"""Fetch and process BeneficiaryGroup records from the remote API."""
52
80
sync_entity (
53
- SyncConfig (
81
+ SyncConfig [ BeneficiaryGroup ] (
54
82
model = BeneficiaryGroup ,
55
- reference_id = "hope_id" ,
56
- endpoint = EndpointConfig (path = "beneficiary-groups" ),
57
- prepare_defaults = lambda r : {
58
- f : r .get (f )
59
- for f in (
60
- "name" ,
61
- "group_label" ,
62
- "group_label_plural" ,
63
- "member_label" ,
64
- "member_label_plural" ,
65
- "master_detail" ,
66
- )
67
- },
83
+ reference_id = HOPE_ID ,
84
+ endpoint = EndpointConfig (path = BENEFICIARY_GROUPS ),
85
+ prepare_defaults = get_field_extractor (BENEFICIARY_GROUP_FIELDS ),
68
86
delta_sync = delta_sync ,
69
87
)
70
88
)
71
89
72
90
91
+ def get_should_process_program (programs_limit_to_office : Office | None ) -> Callable [[dict [str , Any ]], bool ]:
92
+ def should_process_program (record : dict [str , Any ]) -> bool :
93
+ return record .get ("status" ) in [Program .ACTIVE , Program .DRAFT ] and (
94
+ not programs_limit_to_office or record ["business_area_code" ] == programs_limit_to_office .code
95
+ )
96
+
97
+ return should_process_program
98
+
99
+
100
+ def prepare_program_defaults (record : dict [str , Any ]) -> dict [str , Any ] | None :
101
+ try :
102
+ office = Office .objects .get (code = record ["business_area_code" ])
103
+ except Office .DoesNotExist as e :
104
+ raise SkipRecordError ("Office not found" ) from e
105
+
106
+ try :
107
+ bg = BeneficiaryGroup .objects .get (hope_id = record ["beneficiary_group" ])
108
+ except BeneficiaryGroup .DoesNotExist as e :
109
+ raise SkipRecordError ("Beneficiary group not found" ) from e
110
+
111
+ return {
112
+ "name" : record ["name" ],
113
+ "code" : record ["programme_code" ],
114
+ "status" : record ["status" ],
115
+ "sector" : record ["sector" ],
116
+ "country_office" : office ,
117
+ "beneficiary_group" : bg ,
118
+ }
119
+
120
+
121
+ def post_process_program (program : Program , created : bool ) -> None :
122
+ default_checkers = get_default_checkers ()
123
+ if created and default_checkers :
124
+ program .household_checker = default_checkers .get ("hh" )
125
+ program .individual_checker = (
126
+ default_checkers .get ("ind" ) if program .beneficiary_group .master_detail else default_checkers .get ("ppl" )
127
+ )
128
+ if program .household_checker or program .individual_checker :
129
+ program .save (update_fields = ("household_checker" , "individual_checker" ))
130
+
131
+
73
132
def sync_programs (delta_sync : bool = False , programs_limit_to_office : Office | None = None ) -> None :
74
133
"""Synchronize and process Program records from the remote API, applying filters and post-processing.
75
134
76
135
Notes:
77
136
Calls sync_beneficiary_groups to ensure dependencies are synchronized.
78
137
79
138
"""
80
-
81
- def _should_process (record : dict [str , Any ]) -> bool :
82
- return record .get ("status" ) in [Program .ACTIVE , Program .DRAFT ] and (
83
- not programs_limit_to_office or record ["business_area_code" ] == programs_limit_to_office .code
84
- )
85
-
86
- def _prepare_defaults (record : dict [str , Any ]) -> dict [str , Any ] | None :
87
- try :
88
- office = Office .objects .get (code = record ["business_area_code" ])
89
- except Office .DoesNotExist as e :
90
- raise SkipRecordError ("Office not found" ) from e
91
- try :
92
- bg = BeneficiaryGroup .objects .get (hope_id = record ["beneficiary_group" ])
93
- except BeneficiaryGroup .DoesNotExist as e :
94
- raise SkipRecordError ("Beneficiary group not found" ) from e
95
- return {
96
- "name" : record ["name" ],
97
- "code" : record ["programme_code" ],
98
- "status" : record ["status" ],
99
- "sector" : record ["sector" ],
100
- "country_office" : office ,
101
- "beneficiary_group" : bg ,
102
- }
103
-
104
- def _post_process (program : Program , created : bool ) -> None :
105
- default_checkers = get_default_checkers ()
106
- if created and default_checkers :
107
- program .household_checker = default_checkers .get ("hh" )
108
- program .individual_checker = (
109
- default_checkers .get ("ind" ) if program .beneficiary_group .master_detail else default_checkers .get ("ppl" )
110
- )
111
- if program .household_checker or program .individual_checker :
112
- program .save (update_fields = ("household_checker" , "individual_checker" ))
113
-
114
139
sync_entity (
115
- SyncConfig (
140
+ SyncConfig [ Program ] (
116
141
model = Program ,
117
- reference_id = "hope_id" ,
118
- endpoint = build_endpoint ("programs" , Program , ParamDateName .UPDATED , delta_sync ),
119
- prepare_defaults = _prepare_defaults ,
120
- should_process = _should_process ,
121
- post_process = _post_process ,
142
+ reference_id = HOPE_ID ,
143
+ endpoint = build_endpoint (PROGRAMS , Program , ParamDateName .UPDATED , delta_sync ),
144
+ prepare_defaults = prepare_program_defaults ,
145
+ should_process = get_should_process_program ( programs_limit_to_office ) ,
146
+ post_process = post_process_program ,
122
147
delta_sync = delta_sync ,
123
148
)
124
149
)
0 commit comments