Skip to content

Commit 57bf55f

Browse files
add ! External Beneficiary Reference field
1 parent 49f6d67 commit 57bf55f

File tree

4 files changed

+380
-0
lines changed

4 files changed

+380
-0
lines changed

src/country_workspace/contrib/hope/apps.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .geo import Admin1Choice, Admin2Choice, Admin3Choice, Admin4Choice, CountryChoice
44
from .lookups import FinancialInstitutionChoice
55
from .phone_numbers import PhoneNumberField
6+
from .beneficiary_reference import BeneficiaryReferenceModelChoice
67

78

89
class Config(AppConfig):
@@ -23,6 +24,7 @@ def ready(self) -> None:
2324
field_registry.register(Admin4Choice)
2425
field_registry.register(FinancialInstitutionChoice)
2526
field_registry.register(PhoneNumberField)
27+
field_registry.register(BeneficiaryReferenceModelChoice)
2628

2729
from country_workspace.contrib.hope.validators import FullHouseholdValidator
2830
from country_workspace.validators.registry import beneficiary_validator_registry
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from typing import Any, TYPE_CHECKING
2+
from django.forms import ModelChoiceField
3+
from django_select2.forms import ModelSelect2Widget
4+
from django.db.models import QuerySet
5+
from django.core.exceptions import ValidationError
6+
from django.urls import resolve
7+
from country_workspace.state import state
8+
9+
if TYPE_CHECKING:
10+
from country_workspace.models import Individual
11+
12+
13+
def get_household_id_from_request() -> int | None:
14+
if not state.request:
15+
return None
16+
17+
if (resolved := resolve(state.request.path)) and "object_id" in resolved.kwargs:
18+
return int(resolved.kwargs["object_id"])
19+
return None
20+
21+
22+
class BeneficiarySelect2Widget(ModelSelect2Widget):
23+
search_fields = ["name__icontains"]
24+
25+
def __init__(
26+
self, program_id: int | None = None, household_id: int | None = None, *args: Any, **kwargs: Any
27+
) -> None:
28+
self.program_id = program_id
29+
self.household_id = household_id
30+
kwargs.setdefault(
31+
"attrs",
32+
{
33+
"data-minimum-input-length": 0,
34+
"class": "form-control",
35+
},
36+
)
37+
super().__init__(*args, **kwargs)
38+
39+
def get_queryset(self) -> QuerySet["Individual"]:
40+
from django.apps import apps
41+
42+
Individual = apps.get_model("country_workspace", "Individual")
43+
44+
if not self.program_id:
45+
return Individual.objects.none()
46+
47+
try:
48+
Program = apps.get_model("country_workspace", "Program")
49+
program = Program.objects.get(id=self.program_id)
50+
queryset = program.individuals.filter(removed=False)
51+
if self.household_id:
52+
queryset = queryset.filter(household=self.household_id)
53+
except Program.DoesNotExist:
54+
return Individual.objects.none()
55+
else:
56+
return queryset
57+
58+
59+
class BeneficiaryReferenceModelChoice(ModelChoiceField):
60+
def __init__(self, filter_by_household: bool = False, *args: Any, **kwargs: Any) -> None:
61+
program = state.program
62+
household_id = get_household_id_from_request() if filter_by_household else None
63+
64+
queryset = program.individuals.filter(removed=False)
65+
if household_id:
66+
queryset = queryset.filter(household=household_id)
67+
68+
kwargs.setdefault("queryset", queryset)
69+
kwargs.setdefault(
70+
"widget", BeneficiarySelect2Widget(program_id=program.id, household_id=household_id, model=queryset.model)
71+
)
72+
73+
super().__init__(*args, **kwargs)
74+
75+
def to_python(self, value: Any) -> str | None:
76+
if value in self.empty_values:
77+
return None
78+
79+
match value:
80+
case int() | str() if str(value).isdigit():
81+
try:
82+
individual = self.queryset.get(pk=int(value))
83+
except self.queryset.model.DoesNotExist:
84+
raise ValidationError(self.error_messages["invalid_choice"], code="invalid_choice")
85+
return individual.name
86+
87+
case str() if self.queryset.filter(name=value).exists():
88+
return value
89+
90+
case _:
91+
raise ValidationError(self.error_messages["invalid_choice"], code="invalid_choice")
92+
93+
def prepare_value(self, value: Any) -> int | None:
94+
if isinstance(value, str) and value:
95+
try:
96+
individual = self.queryset.get(name=value)
97+
except self.queryset.model.DoesNotExist:
98+
return None
99+
else:
100+
return individual.pk
101+
return getattr(value, "pk", value)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Generated by HCW 0.1.0 on 2025 07 29 07:39:07
2+
from typing import Any
3+
from packaging.version import Version
4+
from concurrency.utils import fqn
5+
from contextlib import suppress
6+
from django.db import transaction
7+
from django.utils.text import slugify
8+
9+
from hope_flex_fields.models import FieldDefinition, Fieldset
10+
from hope_flex_fields.utils import get_kwargs_from_field_class, get_common_attrs
11+
from country_workspace.contrib.hope.beneficiary_reference import BeneficiaryReferenceModelChoice
12+
from country_workspace.contrib.hope.constants import HOUSEHOLD_FIELDSET_NAME
13+
14+
_script_for_version = Version("0.1.0")
15+
16+
17+
attrs_default = lambda cls: get_kwargs_from_field_class(cls, get_common_attrs())
18+
19+
FIELDS: dict[str, dict[str, Any]] = {
20+
"alternate_collector": {
21+
"name": "Alternate Collector Reference",
22+
"defaults": {
23+
"field_type": fqn(BeneficiaryReferenceModelChoice),
24+
"attrs": {
25+
**attrs_default(BeneficiaryReferenceModelChoice),
26+
"filter_by_household": False,
27+
"help_text": "A unique reference for an alternate collector.",
28+
},
29+
},
30+
},
31+
"primary_collector": {
32+
"name": "Primary Collector Reference",
33+
"defaults": {
34+
"field_type": fqn(BeneficiaryReferenceModelChoice),
35+
"attrs": {
36+
**attrs_default(BeneficiaryReferenceModelChoice),
37+
"filter_by_household": True,
38+
"help_text": "A unique reference for a primary collector.",
39+
},
40+
},
41+
},
42+
}
43+
44+
45+
def forward() -> None:
46+
with transaction.atomic():
47+
fs, __ = Fieldset.objects.get_or_create(name=HOUSEHOLD_FIELDSET_NAME)
48+
for field_name, field_config in FIELDS.items():
49+
field_config["defaults"]["slug"] = slugify(field_config["name"])
50+
fd, __ = FieldDefinition.objects.update_or_create(
51+
name=field_config["name"], defaults=field_config["defaults"]
52+
)
53+
fs.fields.update_or_create(name=field_name, defaults={"definition": fd})
54+
55+
56+
def backward() -> None:
57+
with transaction.atomic():
58+
try:
59+
fs = Fieldset.objects.get(name=HOUSEHOLD_FIELDSET_NAME)
60+
except Fieldset.DoesNotExist:
61+
return
62+
63+
for field_name, field_config in FIELDS.items():
64+
fs.fields.filter(name=field_name).delete()
65+
with suppress(FieldDefinition.DoesNotExist):
66+
fd = FieldDefinition.objects.get(name=field_config["name"])
67+
fd.delete()
68+
69+
70+
class Scripts:
71+
requires = []
72+
operations = [(forward, backward)]
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import pytest
2+
from unittest.mock import patch, MagicMock
3+
from django.core.exceptions import ValidationError
4+
5+
from country_workspace.state import state
6+
from country_workspace.contrib.hope.beneficiary_reference import (
7+
get_household_id_from_request,
8+
BeneficiarySelect2Widget,
9+
BeneficiaryReferenceModelChoice,
10+
)
11+
12+
13+
@pytest.fixture
14+
def office():
15+
from testutils.factories import OfficeFactory
16+
17+
co = OfficeFactory()
18+
state.tenant = co
19+
return co
20+
21+
22+
@pytest.fixture
23+
def program(office):
24+
from testutils.factories import CountryProgramFactory
25+
26+
return CountryProgramFactory(country_office=office)
27+
28+
29+
@pytest.fixture
30+
def household(program):
31+
from testutils.factories import CountryHouseholdFactory
32+
33+
return CountryHouseholdFactory(batch__program=program)
34+
35+
36+
@pytest.fixture
37+
def individual(household):
38+
return household.members.first()
39+
40+
41+
@pytest.fixture
42+
def multiple_households(program):
43+
from testutils.factories import CountryHouseholdFactory
44+
45+
return [CountryHouseholdFactory(batch__program=program) for _ in range(3)]
46+
47+
48+
@pytest.fixture
49+
def all_individuals(multiple_households):
50+
individuals = []
51+
for hh in multiple_households:
52+
individuals.extend(hh.members.all())
53+
return individuals
54+
55+
56+
@pytest.mark.parametrize(
57+
("request_data", "expected"),
58+
[
59+
(None, None),
60+
(MagicMock(kwargs={"object_id": "456"}), 456),
61+
(MagicMock(kwargs={}), None),
62+
],
63+
)
64+
def test_get_household_id_from_request(request_data, expected):
65+
with (
66+
patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state,
67+
patch("country_workspace.contrib.hope.beneficiary_reference.resolve") as mock_resolve,
68+
):
69+
if request_data is None:
70+
mock_state.request = None
71+
else:
72+
mock_resolve.return_value = request_data
73+
74+
assert get_household_id_from_request() == expected
75+
76+
77+
@pytest.mark.parametrize(
78+
("program_id", "household_id"),
79+
[
80+
(None, None),
81+
(123, None),
82+
(123, 456),
83+
],
84+
)
85+
def test_beneficiary_widget_init(program_id, household_id):
86+
widget = BeneficiarySelect2Widget(program_id=program_id, household_id=household_id)
87+
assert widget.program_id == program_id
88+
assert widget.household_id == household_id
89+
90+
91+
@pytest.mark.django_db
92+
@pytest.mark.parametrize("program_id", [99999, None], ids=["non_existing_program", "no_program"])
93+
def test_widget_get_queryset_returns_empty_for_invalid_or_missing_program(program_id):
94+
widget = BeneficiarySelect2Widget(program_id=program_id)
95+
queryset = widget.get_queryset()
96+
assert queryset.count() == 0
97+
98+
99+
@pytest.mark.django_db
100+
def test_widget_get_queryset_with_household_filter(program, household, individual):
101+
widget = BeneficiarySelect2Widget(program_id=program.id, household_id=household.id)
102+
assert individual in widget.get_queryset()
103+
104+
105+
@pytest.mark.django_db
106+
@pytest.mark.parametrize("filter_by_household", [True, False])
107+
def test_field_init(program, household, individual, multiple_households, all_individuals, filter_by_household):
108+
with (
109+
patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state,
110+
patch("country_workspace.contrib.hope.beneficiary_reference.get_household_id_from_request") as mock_get,
111+
):
112+
mock_state.program = program
113+
mock_get.return_value = household.id if filter_by_household else None
114+
115+
field = BeneficiaryReferenceModelChoice(filter_by_household=filter_by_household)
116+
117+
if filter_by_household:
118+
assert individual in field.queryset
119+
assert all(ind.household == household for ind in field.queryset)
120+
else:
121+
for ind in all_individuals:
122+
assert ind in field.queryset
123+
124+
125+
@pytest.mark.django_db
126+
@pytest.mark.parametrize(
127+
("value", "expected"),
128+
[
129+
(None, None),
130+
("", None),
131+
([], None),
132+
],
133+
)
134+
def test_to_python_empty_values(program, value, expected):
135+
with patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state:
136+
mock_state.program = program
137+
field = BeneficiaryReferenceModelChoice()
138+
assert field.to_python(value) == expected
139+
140+
141+
@pytest.mark.django_db
142+
def test_to_python_valid_id(program, all_individuals):
143+
with patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state:
144+
mock_state.program = program
145+
field = BeneficiaryReferenceModelChoice()
146+
individual = all_individuals[0]
147+
assert field.to_python(str(individual.pk)) == individual.name
148+
149+
150+
@pytest.mark.django_db
151+
@pytest.mark.parametrize(
152+
("value", "should_raise"),
153+
[
154+
("99999", True), # Invalid ID
155+
(99999, True), # Invalid ID as int
156+
("Non Existing Name", True), # Non-existing name
157+
(["invalid", "list"], True), # Invalid type
158+
],
159+
)
160+
def test_to_python_validation_errors(program, value, should_raise):
161+
with patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state:
162+
mock_state.program = program
163+
field = BeneficiaryReferenceModelChoice()
164+
165+
if should_raise:
166+
with pytest.raises(ValidationError):
167+
field.to_python(value)
168+
169+
170+
@pytest.mark.django_db
171+
def test_to_python_existing_name(program, all_individuals):
172+
with patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state:
173+
mock_state.program = program
174+
field = BeneficiaryReferenceModelChoice()
175+
individual = all_individuals[0]
176+
177+
# Test with existing name - should return the same name
178+
result = field.to_python(individual.name)
179+
assert result == individual.name
180+
181+
182+
@pytest.mark.django_db
183+
@pytest.mark.parametrize("input_type", ["pk_int", "name", "object", "non_existing_name", "empty_string"])
184+
def test_prepare_value(program, all_individuals, input_type):
185+
with patch("country_workspace.contrib.hope.beneficiary_reference.state") as mock_state:
186+
mock_state.program = program
187+
field = BeneficiaryReferenceModelChoice()
188+
individual = all_individuals[0]
189+
190+
match input_type:
191+
case "pk_int":
192+
result = field.prepare_value(individual.pk)
193+
assert result == individual.pk
194+
case "name":
195+
result = field.prepare_value(individual.name)
196+
assert result == individual.pk
197+
case "object":
198+
result = field.prepare_value(individual)
199+
assert result == individual.pk
200+
case "non_existing_name":
201+
result = field.prepare_value("Non Existing Name")
202+
assert result is None
203+
case "empty_string":
204+
result = field.prepare_value("")
205+
assert result == ""

0 commit comments

Comments
 (0)