Skip to content

Commit 34dafdd

Browse files
add ! External Beneficiary Reference field
1 parent 5aa9a2f commit 34dafdd

File tree

3 files changed

+175
-0
lines changed

3 files changed

+175
-0
lines changed

src/country_workspace/contrib/hope/apps.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .geo import Admin1Choice, Admin2Choice, Admin3Choice, Admin4Choice, CountryChoice
44
from .lookups import FinancialInstitutionChoice
55
from .phone_numbers import PhoneNumberField
6+
from .beneficiary_reference import BeneficiaryReferenceModelChoice
67

78

89
class Config(AppConfig):
@@ -23,6 +24,7 @@ def ready(self) -> None:
2324
field_registry.register(Admin4Choice)
2425
field_registry.register(FinancialInstitutionChoice)
2526
field_registry.register(PhoneNumberField)
27+
field_registry.register(BeneficiaryReferenceModelChoice)
2628

2729
from country_workspace.contrib.hope.validators import FullHouseholdValidator
2830
from country_workspace.validators.registry import beneficiary_validator_registry
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from typing import Any, TYPE_CHECKING
2+
from django.forms import ModelChoiceField
3+
from django_select2.forms import ModelSelect2Widget
4+
from django.db.models import QuerySet
5+
from django.core.exceptions import ValidationError
6+
from django.urls import resolve
7+
from country_workspace.state import state
8+
9+
if TYPE_CHECKING:
10+
from country_workspace.models import Individual
11+
12+
13+
def get_household_id_from_request() -> int | None:
14+
if not state.request:
15+
return None
16+
17+
if (resolved := resolve(state.request.path)) and "object_id" in resolved.kwargs:
18+
return int(resolved.kwargs["object_id"])
19+
return None
20+
21+
22+
class BeneficiarySelect2Widget(ModelSelect2Widget):
23+
search_fields = ["name__icontains"]
24+
25+
def __init__(
26+
self, program_id: int | None = None, household_id: int | None = None, *args: Any, **kwargs: Any
27+
) -> None:
28+
self.program_id = program_id
29+
self.household_id = household_id
30+
kwargs.setdefault(
31+
"attrs",
32+
{
33+
"data-minimum-input-length": 0,
34+
"class": "form-control",
35+
},
36+
)
37+
super().__init__(*args, **kwargs)
38+
39+
def get_queryset(self) -> QuerySet["Individual"]:
40+
from django.apps import apps
41+
42+
Individual = apps.get_model("country_workspace", "Individual")
43+
44+
if not self.program_id:
45+
return Individual.objects.none()
46+
47+
try:
48+
Program = apps.get_model("country_workspace", "Program")
49+
program = Program.objects.get(id=self.program_id)
50+
queryset = program.individuals.filter(removed=False)
51+
if self.household_id:
52+
queryset = queryset.filter(household=self.household_id)
53+
except Program.DoesNotExist:
54+
return Individual.objects.none()
55+
else:
56+
return queryset
57+
58+
59+
class BeneficiaryReferenceModelChoice(ModelChoiceField):
60+
def __init__(self, filter_by_household: bool = False, *args: Any, **kwargs: Any) -> None:
61+
program = state.program
62+
household_id = get_household_id_from_request() if filter_by_household else None
63+
64+
queryset = program.individuals.filter(removed=False)
65+
if household_id:
66+
queryset = queryset.filter(household=household_id)
67+
68+
kwargs.setdefault("queryset", queryset)
69+
kwargs.setdefault(
70+
"widget", BeneficiarySelect2Widget(program_id=program.id, household_id=household_id, model=queryset.model)
71+
)
72+
73+
super().__init__(*args, **kwargs)
74+
75+
def to_python(self, value: Any) -> str | None:
76+
if value in self.empty_values:
77+
return None
78+
79+
match value:
80+
case int() | str() if str(value).isdigit():
81+
try:
82+
individual = self.queryset.get(pk=int(value))
83+
except self.queryset.model.DoesNotExist:
84+
raise ValidationError(self.error_messages["invalid_choice"], code="invalid_choice")
85+
return individual.name
86+
87+
case str() if self.queryset.filter(name=value).exists():
88+
return value
89+
90+
case _:
91+
raise ValidationError(self.error_messages["invalid_choice"], code="invalid_choice")
92+
93+
def prepare_value(self, value: Any) -> int | None:
94+
if isinstance(value, str) and value:
95+
try:
96+
individual = self.queryset.get(name=value)
97+
except self.queryset.model.DoesNotExist:
98+
return None
99+
else:
100+
return individual.pk
101+
return getattr(value, "pk", value)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Generated by HCW 0.1.0 on 2025 07 29 07:39:07
2+
from typing import Any
3+
from packaging.version import Version
4+
from concurrency.utils import fqn
5+
from contextlib import suppress
6+
from django.db import transaction
7+
from django.utils.text import slugify
8+
9+
from hope_flex_fields.models import FieldDefinition, Fieldset
10+
from hope_flex_fields.utils import get_kwargs_from_field_class, get_common_attrs
11+
from country_workspace.contrib.hope.beneficiary_reference import BeneficiaryReferenceModelChoice
12+
from country_workspace.contrib.hope.constants import HOUSEHOLD_FIELDSET_NAME
13+
14+
_script_for_version = Version("0.1.0")
15+
16+
17+
attrs_default = lambda cls: get_kwargs_from_field_class(cls, get_common_attrs())
18+
19+
FIELDS: dict[str, dict[str, Any]] = {
20+
"alternate_collector": {
21+
"name": "Alternate Collector Reference",
22+
"defaults": {
23+
"field_type": fqn(BeneficiaryReferenceModelChoice),
24+
"attrs": {
25+
**attrs_default(BeneficiaryReferenceModelChoice),
26+
"filter_by_household": False,
27+
"help_text": "A unique reference for an alternate collector.",
28+
},
29+
},
30+
},
31+
"primary_collector": {
32+
"name": "Primary Collector Reference",
33+
"defaults": {
34+
"field_type": fqn(BeneficiaryReferenceModelChoice),
35+
"attrs": {
36+
**attrs_default(BeneficiaryReferenceModelChoice),
37+
"filter_by_household": True,
38+
"help_text": "A unique reference for a primary collector.",
39+
},
40+
},
41+
},
42+
}
43+
44+
45+
def forward() -> None:
46+
with transaction.atomic():
47+
fs, __ = Fieldset.objects.get_or_create(name=HOUSEHOLD_FIELDSET_NAME)
48+
for field_name, field_config in FIELDS.items():
49+
field_config["defaults"]["slug"] = slugify(field_config["name"])
50+
fd, __ = FieldDefinition.objects.update_or_create(
51+
name=field_config["name"], defaults=field_config["defaults"]
52+
)
53+
fs.fields.update_or_create(name=field_name, defaults={"definition": fd})
54+
55+
56+
def backward() -> None:
57+
with transaction.atomic():
58+
try:
59+
fs = Fieldset.objects.get(name=HOUSEHOLD_FIELDSET_NAME)
60+
except Fieldset.DoesNotExist:
61+
return
62+
63+
for field_name, field_config in FIELDS.items():
64+
fs.fields.filter(name=field_name).delete()
65+
with suppress(FieldDefinition.DoesNotExist):
66+
fd = FieldDefinition.objects.get(name=field_config["name"])
67+
fd.delete()
68+
69+
70+
class Scripts:
71+
requires = []
72+
operations = [(forward, backward)]

0 commit comments

Comments
 (0)