Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 202 additions & 9 deletions extropy/population/sampler/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,32 @@

logger = logging.getLogger(__name__)

_MINOR_EMPLOYMENT_ALLOWED = {
"student",
"part_time",
"part time",
"intern",
"none",
"unemployed",
"not_employed",
"not employed",
"no_job",
"no job",
}
_INCOME_KEYWORDS = ("income", "salary", "wage", "earnings", "compensation")
_OCCUPATION_KEYWORDS = (
"occupation",
"profession",
"job",
"job_title",
"job_role",
"employment_sector",
"industry",
"career",
)
_EDUCATION_KEYWORDS = ("education", "degree")
_EMPLOYMENT_KEYWORDS = ("employment", "work_status", "labor_status")


class SamplingError(Exception):
"""Raised when sampling fails for an agent."""
Expand Down Expand Up @@ -300,13 +326,22 @@ def _sample_dependent_as_agent(
Uses the dependent's known attributes (age, gender) as seeds,
then samples remaining attributes normally.
"""
fixed_attrs = {
"age": dependent.age,
"gender": dependent.gender,
}
agent = _sample_single_agent(
spec, attr_map, rng, index, id_width, stats, numeric_values
spec,
attr_map,
rng,
index,
id_width,
stats,
numeric_values,
fixed_attrs=fixed_attrs,
)

# Override with dependent's known attributes
agent["age"] = dependent.age
agent["gender"] = dependent.gender
agent["household_id"] = household_id
agent["household_role"] = f"dependent_{dependent.relationship}"
agent["relationship_to_primary"] = dependent.relationship
Expand All @@ -315,15 +350,169 @@ def _sample_dependent_as_agent(
agent["first_name"] = dep_name
if parent.get("last_name"):
agent["last_name"] = parent["last_name"]
if getattr(dependent, "school_status", None):
agent["school_status"] = dependent.school_status

# Copy household-scoped attributes from parent
for attr in spec.attributes:
if attr.scope == "household" and attr.name in parent:
agent[attr.name] = parent[attr.name]

_normalize_minor_agent_attributes(
agent=agent,
spec=spec,
attr_map=attr_map,
parent=parent,
)

return agent


def _normalize_minor_agent_attributes(
agent: dict[str, Any],
spec: PopulationSpec,
attr_map: dict[str, AttributeSpec],
parent: dict[str, Any],
) -> None:
"""Normalize promoted dependent attributes to keep minors age-appropriate."""
age = agent.get("age")
if not isinstance(age, (int, float)) or age >= 18:
return

for attr in spec.attributes:
name = attr.name
if name not in agent or attr.scope == "household":
continue

key = name.lower()
options = _get_categorical_options(attr)
value = agent[name]

if any(token in key for token in _EDUCATION_KEYWORDS):
agent[name] = _coerce_minor_education(value, int(age), options)
continue

if any(token in key for token in _EMPLOYMENT_KEYWORDS):
agent[name] = _coerce_minor_employment(value, int(age), options)
continue

if any(token in key for token in _OCCUPATION_KEYWORDS):
agent[name] = _coerce_minor_occupation(value, options)
continue

if any(token in key for token in _INCOME_KEYWORDS):
if "household" in key and name in parent:
agent[name] = parent[name]
elif isinstance(value, (int, float)):
agent[name] = 0
elif options:
agent[name] = _choose_best_option(options, ["none", "no_income", "student"])

if not agent.get("first_name"):
agent["first_name"] = "Child"


def _get_categorical_options(attr: AttributeSpec) -> list[str]:
"""Get categorical options from an attribute spec, if available."""
dist = attr.sampling.distribution
if dist is not None and hasattr(dist, "options"):
return [str(opt) for opt in getattr(dist, "options", [])]
return []


def _choose_best_option(options: list[str], preferred_tokens: list[str]) -> str:
"""Pick the first option matching preferred tokens, else first option."""
normalized = [opt.lower().replace("-", "_").replace(" ", "_") for opt in options]
for token in preferred_tokens:
token_norm = token.lower().replace("-", "_").replace(" ", "_")
for i, opt in enumerate(normalized):
if token_norm in opt:
return options[i]
return options[0] if options else "none"


def _education_stage(value: str) -> int | None:
"""Map education-like labels to a comparable stage integer."""
v = value.lower().replace("-", "_").replace(" ", "_")
stage_map: list[tuple[int, tuple[str, ...]]] = [
(0, ("none", "home", "preschool", "pre_k", "kindergarten")),
(1, ("elementary", "primary", "grade_school")),
(2, ("middle_school", "middle", "junior_high")),
(3, ("high_school", "secondary", "ged")),
(4, ("associate", "vocational", "trade", "certificate")),
(5, ("bachelor", "college", "undergraduate", "university")),
(6, ("master", "graduate", "postgraduate", "mba")),
(7, ("doctorate", "phd", "jd", "md")),
]
for stage, markers in stage_map:
if any(marker in v for marker in markers):
return stage
return None


def _max_education_stage_for_age(age: int) -> int:
if age <= 10:
return 1
if age <= 13:
return 2
if age <= 17:
return 3
if age <= 20:
return 4
if age <= 22:
return 5
if age <= 24:
return 6
return 7


def _coerce_minor_education(value: Any, age: int, options: list[str]) -> Any:
"""Cap education to an age-plausible level for promoted minors."""
value_str = str(value)
current_stage = _education_stage(value_str)
max_stage = _max_education_stage_for_age(age)
if current_stage is not None and current_stage <= max_stage:
return value

if options:
candidate: str | None = None
candidate_stage = -1
for opt in options:
stage = _education_stage(opt)
if stage is None or stage > max_stage:
continue
if stage > candidate_stage:
candidate = opt
candidate_stage = stage
if candidate is not None:
return candidate
return _choose_best_option(options, ["high_school", "middle_school", "elementary"])
return value


def _coerce_minor_employment(value: Any, age: int, options: list[str]) -> Any:
"""Restrict employment-like fields for minors."""
norm = str(value).lower().replace("-", "_").replace(" ", "_")
if norm in {x.replace("-", "_").replace(" ", "_") for x in _MINOR_EMPLOYMENT_ALLOWED}:
return value

if options:
if age < 16:
return _choose_best_option(options, ["student", "none", "unemployed"])
return _choose_best_option(options, ["student", "part_time", "intern", "none"])
return "student" if age >= 14 else "none"


def _coerce_minor_occupation(value: Any, options: list[str]) -> Any:
"""Coerce occupation-like fields for minors."""
norm = str(value).lower().replace("-", "_").replace(" ", "_")
if "student" in norm or norm in ("none", "no_job", "unemployed"):
return value
if options:
return _choose_best_option(options, ["student", "none", "unemployed"])
return "student"


def _sample_population_households(
spec: PopulationSpec,
attr_map: dict[str, AttributeSpec],
Expand Down Expand Up @@ -576,6 +765,7 @@ def _sample_single_agent(
id_width: int,
stats: SamplingStats,
numeric_values: dict[str, list[float]],
fixed_attrs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Sample a single agent following the sampling order."""
agent: dict[str, Any] = {"_id": f"agent_{index:0{id_width}d}"}
Expand All @@ -586,12 +776,15 @@ def _sample_single_agent(
logger.warning(f"Attribute '{attr_name}' in sampling_order not found")
continue

try:
value = _sample_attribute(attr, rng, agent, stats)
except FormulaError as e:
raise SamplingError(
f"Agent {index}: Failed to sample '{attr_name}': {e}"
) from e
if fixed_attrs and attr_name in fixed_attrs:
value = fixed_attrs[attr_name]
else:
try:
value = _sample_attribute(attr, rng, agent, stats)
except FormulaError as e:
raise SamplingError(
f"Agent {index}: Failed to sample '{attr_name}': {e}"
) from e

# Coerce to declared type
value = coerce_to_type(value, attr.type)
Expand Down
Loading
Loading