Initial commit: add .gitignore and README
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled

This commit is contained in:
defiQUG
2026-02-09 21:51:42 -08:00
commit c052b07662
3146 changed files with 808305 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
"""MAA layers: DLT, intent, geometry, physics, process, machine, toolpath, MPC."""
from fusionagi.maa.layers.dlt_engine import DLTEngine
from fusionagi.maa.layers.mpc_authority import MPCAuthority
from fusionagi.maa.layers.intent_engine import IntentEngine
from fusionagi.maa.layers.geometry_kernel import GeometryAuthorityInterface, InMemoryGeometryKernel
from fusionagi.maa.layers.physics_authority import PhysicsAuthorityInterface, StubPhysicsAuthority
from fusionagi.maa.layers.process_authority import ProcessAuthority
from fusionagi.maa.layers.machine_binding import MachineBinding, MachineProfile
from fusionagi.maa.layers.toolpath_engine import ToolpathEngine, ToolpathArtifact
__all__ = [
"DLTEngine",
"MPCAuthority",
"IntentEngine",
"GeometryAuthorityInterface",
"InMemoryGeometryKernel",
"PhysicsAuthorityInterface",
"StubPhysicsAuthority",
"ProcessAuthority",
"MachineBinding",
"MachineProfile",
"ToolpathEngine",
"ToolpathArtifact",
]

View File

@@ -0,0 +1,68 @@
"""Deterministic Decision Logic Tree Engine: store and evaluate DLTs; fail-closed."""
from typing import Any
from fusionagi.maa.schemas.dlt import DLTContract, DLTNode
class DLTEngine:
"""Store and evaluate Deterministic Decision Logic Trees; immutable, versioned contracts."""
def __init__(self) -> None:
self._contracts: dict[str, DLTContract] = {}
def register(self, contract: DLTContract) -> None:
"""Register an immutable DLT contract (by contract_id)."""
key = f"{contract.contract_id}@v{contract.version}"
self._contracts[key] = contract
def get(self, contract_id: str, version: int | None = None) -> DLTContract | None:
"""Return contract by id; optional version (latest if omitted)."""
if version is not None:
return self._contracts.get(f"{contract_id}@v{version}")
best: DLTContract | None = None
for k, c in self._contracts.items():
if c.contract_id == contract_id and (best is None or c.version > best.version):
best = c
return best
def evaluate(
self,
contract_id: str,
context: dict[str, Any],
version: int | None = None,
) -> tuple[bool, str]:
"""Evaluate DLT from root; deterministic, fail-closed. Return (True, "") or (False, root_cause)."""
contract = self.get(contract_id, version)
if not contract:
return False, f"DLT contract not found: {contract_id}"
return self._evaluate_node(contract, contract.root_id, context)
def _evaluate_node(
self,
contract: DLTContract,
node_id: str,
context: dict[str, Any],
) -> tuple[bool, str]:
node = contract.nodes.get(node_id)
if not node:
return False, f"DLT node not found: {node_id}"
passed = self._check_condition(node, context)
if not passed:
if node.fail_closed:
return False, f"DLT node failed (fail-closed): {node_id} condition={node.condition}"
for child_id in node.children:
ok, cause = self._evaluate_node(contract, child_id, context)
if not ok:
return False, cause
return True, ""
def _check_condition(self, node: DLTNode, context: dict[str, Any]) -> bool:
"""Evaluate condition; unknown conditions are fail-closed (False)."""
if node.condition.startswith("required:"):
key = node.condition.split(":", 1)[1].strip()
return key in context and context[key] is not None
if node.condition == "always":
return True
# Unknown condition: fail-closed
return False

View File

@@ -0,0 +1,81 @@
"""Layer 3 — Geometry Authority Kernel: implicit geometry, constraint solvers, feature lineage."""
from abc import ABC, abstractmethod
from typing import Any
from pydantic import BaseModel, Field
class FeatureLineageEntry(BaseModel):
"""Single feature lineage entry: feature -> intent node, physics justification, process eligibility."""
feature_id: str = Field(...)
intent_node_id: str = Field(...)
physics_justification_ref: str | None = Field(default=None)
process_eligible: bool = Field(default=False)
class GeometryAuthorityInterface(ABC):
"""
Interface for implicit geometry, constraint solvers, feature lineage.
Every geometric feature must map to intent node, physics justification, process eligibility.
Orphan geometry is prohibited.
"""
@abstractmethod
def add_feature(
self,
feature_id: str,
intent_node_id: str,
physics_justification_ref: str | None = None,
process_eligible: bool = False,
payload: dict[str, Any] | None = None,
) -> FeatureLineageEntry:
"""Register a feature with lineage; orphan (no intent) prohibited."""
...
@abstractmethod
def get_lineage(self, feature_id: str) -> FeatureLineageEntry | None:
"""Return lineage for feature or None."""
...
@abstractmethod
def validate_no_orphans(self) -> list[str]:
"""Return list of feature ids with no valid lineage (orphans); must be empty for MPC."""
...
class InMemoryGeometryKernel(GeometryAuthorityInterface):
"""
In-memory lineage model; no concrete CAD kernel.
Only tracks features registered via add_feature; validate_no_orphans returns []
since every stored feature has lineage. For a kernel that tracks all feature ids
separately, override validate_no_orphans to return ids not in lineage.
"""
def __init__(self) -> None:
self._lineage: dict[str, FeatureLineageEntry] = {}
def add_feature(
self,
feature_id: str,
intent_node_id: str,
physics_justification_ref: str | None = None,
process_eligible: bool = False,
payload: dict[str, Any] | None = None,
) -> FeatureLineageEntry:
entry = FeatureLineageEntry(
feature_id=feature_id,
intent_node_id=intent_node_id,
physics_justification_ref=physics_justification_ref,
process_eligible=process_eligible,
)
self._lineage[feature_id] = entry
return entry
def get_lineage(self, feature_id: str) -> FeatureLineageEntry | None:
return self._lineage.get(feature_id)
def validate_no_orphans(self) -> list[str]:
"""Return []; this stub only tracks registered features, so none are orphans."""
return []

View File

@@ -0,0 +1,431 @@
"""Layer 1 — Intent Formalization Engine.
Responsible for:
1. Intent decomposition - breaking natural language into structured requirements
2. Requirement typing - classifying requirements (dimensional, load, environmental, process)
3. Load case enumeration - identifying operational scenarios
"""
import re
import uuid
from typing import Any
from fusionagi.maa.schemas.intent import EngineeringIntentGraph, IntentNode, LoadCase, RequirementType
from fusionagi._logger import logger
class IntentIncompleteError(Exception):
"""Raised when intent formalization cannot be completed due to missing information."""
def __init__(self, message: str, missing_fields: list[str] | None = None):
self.missing_fields = missing_fields or []
super().__init__(message)
class IntentEngine:
"""
Intent decomposition, requirement typing, and load case enumeration.
Features:
- Pattern-based requirement extraction from natural language
- Automatic requirement type classification
- Load case identification
- Environmental bounds extraction
- LLM-assisted formalization (optional)
"""
# Patterns for dimensional requirements (measurements, tolerances)
DIMENSIONAL_PATTERNS = [
r"(\d+(?:\.\d+)?)\s*(mm|cm|m|in|inch|inches|ft|feet)\b",
r"tolerance[s]?\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"±\s*(\d+(?:\.\d+)?)",
r"(\d+(?:\.\d+)?)\s*×\s*(\d+(?:\.\d+)?)",
r"diameter\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"radius\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"thickness\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"length\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"width\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"height\s*(?:of\s*)?(\d+(?:\.\d+)?)",
]
# Patterns for load requirements (forces, pressures, stresses)
LOAD_PATTERNS = [
r"(\d+(?:\.\d+)?)\s*(N|kN|MN|lb|lbf|kg|kgf)\b",
r"(\d+(?:\.\d+)?)\s*(MPa|GPa|Pa|psi|ksi)\b",
r"load\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"force\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"stress\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"pressure\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"factor\s*of\s*safety\s*(?:of\s*)?(\d+(?:\.\d+)?)",
r"yield\s*strength",
r"tensile\s*strength",
r"fatigue\s*(?:life|limit|strength)",
]
# Patterns for environmental requirements
ENVIRONMENTAL_PATTERNS = [
r"(\d+(?:\.\d+)?)\s*(?:°|deg|degrees?)?\s*(C|F|K|Celsius|Fahrenheit|Kelvin)\b",
r"temperature\s*(?:range|of)?\s*(\d+)",
r"humidity\s*(?:of\s*)?(\d+)",
r"corrosion\s*resist",
r"UV\s*resist",
r"water\s*(?:proof|resist)",
r"chemical\s*resist",
r"outdoor",
r"marine",
r"aerospace",
]
# Patterns for process requirements
PROCESS_PATTERNS = [
r"CNC|machining|milling|turning|drilling",
r"3D\s*print|additive|FDM|SLA|SLS|DMLS",
r"cast|injection\s*mold|die\s*cast",
r"weld|braze|solder",
r"heat\s*treat|anneal|harden|temper",
r"surface\s*finish|polish|anodize|plate",
r"assembly|sub-assembly",
r"material:\s*(\w+)",
r"aluminum|steel|titanium|plastic|composite",
]
# Load case indicator patterns
LOAD_CASE_PATTERNS = [
r"(?:during|under|in)\s+(\w+(?:\s+\w+)?)\s+(?:conditions?|operation|mode)",
r"(\w+)\s+load\s+case",
r"(?:static|dynamic|cyclic|impact|thermal)\s+load",
r"(?:normal|extreme|emergency|failure)\s+(?:operation|conditions?|mode)",
r"operating\s+(?:at|under|in)",
]
def __init__(self, llm_adapter: Any | None = None):
"""
Initialize the IntentEngine.
Args:
llm_adapter: Optional LLM adapter for enhanced natural language processing.
"""
self._llm = llm_adapter
def formalize(
self,
intent_id: str,
natural_language: str | None = None,
file_refs: list[str] | None = None,
metadata: dict[str, Any] | None = None,
use_llm: bool = True,
) -> EngineeringIntentGraph:
"""
Formalize engineering intent from natural language and file references.
Args:
intent_id: Unique identifier for this intent.
natural_language: Natural language description of requirements.
file_refs: References to CAD files, specifications, etc.
metadata: Additional metadata.
use_llm: Whether to use LLM for enhanced processing (if available).
Returns:
EngineeringIntentGraph with extracted requirements.
Raises:
IntentIncompleteError: If required information is missing.
"""
if not intent_id:
raise IntentIncompleteError("intent_id required", ["intent_id"])
if not natural_language and not file_refs:
raise IntentIncompleteError(
"At least one of natural_language or file_refs required",
["natural_language", "file_refs"],
)
nodes: list[IntentNode] = []
load_cases: list[LoadCase] = []
environmental_bounds: dict[str, Any] = {}
# Process natural language if provided
if natural_language:
# Use LLM if available and requested
if use_llm and self._llm:
llm_result = self._formalize_with_llm(intent_id, natural_language)
if llm_result:
return llm_result
# Fall back to pattern-based extraction
extracted = self._extract_requirements(intent_id, natural_language)
nodes.extend(extracted["nodes"])
load_cases.extend(extracted["load_cases"])
environmental_bounds.update(extracted["environmental_bounds"])
# Process file references
if file_refs:
for ref in file_refs:
nodes.append(
IntentNode(
node_id=f"{intent_id}_file_{uuid.uuid4().hex[:8]}",
requirement_type=RequirementType.OTHER,
description=f"Reference: {ref}",
metadata={"file_ref": ref},
)
)
# If no nodes were extracted, create a general requirement
if not nodes and natural_language:
nodes.append(
IntentNode(
node_id=f"{intent_id}_general_0",
requirement_type=RequirementType.OTHER,
description=natural_language[:500],
)
)
logger.info(
"Intent formalized",
extra={
"intent_id": intent_id,
"num_nodes": len(nodes),
"num_load_cases": len(load_cases),
},
)
return EngineeringIntentGraph(
intent_id=intent_id,
nodes=nodes,
load_cases=load_cases,
environmental_bounds=environmental_bounds,
metadata=metadata or {},
)
def _extract_requirements(
self,
intent_id: str,
text: str,
) -> dict[str, Any]:
"""
Extract requirements from text using pattern matching.
Returns dict with nodes, load_cases, and environmental_bounds.
"""
nodes: list[IntentNode] = []
load_cases: list[LoadCase] = []
environmental_bounds: dict[str, Any] = {}
# Split into sentences for processing
sentences = re.split(r'[.!?]+', text)
node_counter = 0
load_case_counter = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Check for dimensional requirements
for pattern in self.DIMENSIONAL_PATTERNS:
if re.search(pattern, sentence, re.IGNORECASE):
nodes.append(
IntentNode(
node_id=f"{intent_id}_dim_{node_counter}",
requirement_type=RequirementType.DIMENSIONAL,
description=sentence,
metadata={"pattern": "dimensional"},
)
)
node_counter += 1
break
# Check for load requirements
for pattern in self.LOAD_PATTERNS:
if re.search(pattern, sentence, re.IGNORECASE):
nodes.append(
IntentNode(
node_id=f"{intent_id}_load_{node_counter}",
requirement_type=RequirementType.LOAD,
description=sentence,
metadata={"pattern": "load"},
)
)
node_counter += 1
break
# Check for environmental requirements
for pattern in self.ENVIRONMENTAL_PATTERNS:
match = re.search(pattern, sentence, re.IGNORECASE)
if match:
nodes.append(
IntentNode(
node_id=f"{intent_id}_env_{node_counter}",
requirement_type=RequirementType.ENVIRONMENTAL,
description=sentence,
metadata={"pattern": "environmental"},
)
)
node_counter += 1
# Extract specific bounds if possible
if "temperature" in sentence.lower():
temp_match = re.search(r"(-?\d+(?:\.\d+)?)", sentence)
if temp_match:
environmental_bounds["temperature"] = float(temp_match.group(1))
break
# Check for process requirements
for pattern in self.PROCESS_PATTERNS:
if re.search(pattern, sentence, re.IGNORECASE):
nodes.append(
IntentNode(
node_id=f"{intent_id}_proc_{node_counter}",
requirement_type=RequirementType.PROCESS,
description=sentence,
metadata={"pattern": "process"},
)
)
node_counter += 1
break
# Check for load cases
for pattern in self.LOAD_CASE_PATTERNS:
match = re.search(pattern, sentence, re.IGNORECASE)
if match:
load_case_desc = match.group(0) if match.group(0) else sentence
load_cases.append(
LoadCase(
load_case_id=f"{intent_id}_lc_{load_case_counter}",
description=load_case_desc,
metadata={"source_sentence": sentence},
)
)
load_case_counter += 1
break
return {
"nodes": nodes,
"load_cases": load_cases,
"environmental_bounds": environmental_bounds,
}
def _formalize_with_llm(
self,
intent_id: str,
natural_language: str,
) -> EngineeringIntentGraph | None:
"""
Use LLM to extract structured requirements from natural language.
Returns None if LLM processing fails (falls back to pattern matching).
"""
if not self._llm:
return None
import json
prompt = f"""Extract engineering requirements from the following text.
Return a JSON object with:
- "nodes": list of requirements, each with:
- "requirement_type": one of "dimensional", "load", "environmental", "process", "other"
- "description": the requirement text
- "load_cases": list of operational scenarios, each with:
- "description": the scenario description
- "environmental_bounds": dict of environmental limits (e.g., {{"temperature_max": 85, "humidity_max": 95}})
Text: {natural_language[:2000]}
Return only valid JSON, no markdown."""
try:
messages = [
{"role": "system", "content": "You are an engineering requirements extraction system."},
{"role": "user", "content": prompt},
]
# Try structured output if available
if hasattr(self._llm, "complete_structured"):
result = self._llm.complete_structured(messages)
if result:
return self._parse_llm_result(intent_id, result)
# Fall back to text completion
raw = self._llm.complete(messages)
if raw:
# Clean up response
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
result = json.loads(raw)
return self._parse_llm_result(intent_id, result)
except Exception as e:
logger.warning(f"LLM formalization failed: {e}")
return None
def _parse_llm_result(
self,
intent_id: str,
result: dict[str, Any],
) -> EngineeringIntentGraph:
"""Parse LLM result into EngineeringIntentGraph."""
nodes = []
for i, node_data in enumerate(result.get("nodes", [])):
req_type_str = node_data.get("requirement_type", "other")
try:
req_type = RequirementType(req_type_str)
except ValueError:
req_type = RequirementType.OTHER
nodes.append(
IntentNode(
node_id=f"{intent_id}_llm_{i}",
requirement_type=req_type,
description=node_data.get("description", ""),
metadata={"source": "llm"},
)
)
load_cases = []
for i, lc_data in enumerate(result.get("load_cases", [])):
load_cases.append(
LoadCase(
load_case_id=f"{intent_id}_lc_llm_{i}",
description=lc_data.get("description", ""),
metadata={"source": "llm"},
)
)
environmental_bounds = result.get("environmental_bounds", {})
return EngineeringIntentGraph(
intent_id=intent_id,
nodes=nodes,
load_cases=load_cases,
environmental_bounds=environmental_bounds,
metadata={"formalization_source": "llm"},
)
def validate_completeness(self, graph: EngineeringIntentGraph) -> tuple[bool, list[str]]:
"""
Validate that an intent graph has sufficient information.
Returns:
Tuple of (is_complete, list_of_missing_items)
"""
missing = []
if not graph.nodes:
missing.append("No requirements extracted")
# Check for at least one dimensional or load requirement for manufacturing
has_dimensional = any(n.requirement_type == RequirementType.DIMENSIONAL for n in graph.nodes)
has_load = any(n.requirement_type == RequirementType.LOAD for n in graph.nodes)
if not has_dimensional:
missing.append("No dimensional requirements specified")
# Load cases are recommended but not required
if not graph.load_cases:
logger.info("No load cases specified for intent", extra={"intent_id": graph.intent_id})
return len(missing) == 0, missing

View File

@@ -0,0 +1,33 @@
"""Layer 6 — Machine Binding & Personality Profiles."""
from typing import Any
from pydantic import BaseModel, Field
class MachineProfile(BaseModel):
"""Machine personality profile: limits, historical deviation models."""
machine_id: str = Field(..., description="Bound machine id")
limits_ref: str | None = Field(default=None)
deviation_model_ref: str | None = Field(default=None)
metadata: dict[str, Any] = Field(default_factory=dict)
class MachineBinding:
"""Each design binds to a specific machine with known limits. No abstraction without binding."""
def __init__(self) -> None:
self._profiles: dict[str, MachineProfile] = {}
def register(self, profile: MachineProfile) -> None:
"""Register a machine profile."""
self._profiles[profile.machine_id] = profile
def get(self, machine_id: str) -> MachineProfile | None:
"""Return profile for machine or None."""
return self._profiles.get(machine_id)
def resolve(self, machine_id: str) -> MachineProfile | None:
"""Resolve machine binding; reject if unknown (no abstraction)."""
return self.get(machine_id)

View File

@@ -0,0 +1,65 @@
"""MPC Authority: issue and verify Manufacturing Proof Certificates; immutable, versioned."""
from typing import Any
from fusionagi.maa.schemas.mpc import (
ManufacturingProofCertificate,
MPCId,
DecisionLineageEntry,
SimulationProof,
ProcessJustification,
MachineDeclaration,
RiskRegisterEntry,
)
from fusionagi.maa.versioning import VersionStore
class MPCAuthority:
"""Central issue and verify MPCs; immutable, versioned."""
def __init__(self) -> None:
self._store = VersionStore()
self._by_value: dict[str, ManufacturingProofCertificate] = {} # mpc_id.value -> cert
def issue(
self,
mpc_id_value: str,
decision_lineage: list[DecisionLineageEntry] | None = None,
simulation_proof: SimulationProof | None = None,
process_justification: ProcessJustification | None = None,
machine_declaration: MachineDeclaration | None = None,
risk_register: list[RiskRegisterEntry] | None = None,
metadata: dict[str, Any] | None = None,
) -> ManufacturingProofCertificate:
"""Issue a new MPC; version auto-incremented."""
latest = self._store.get_latest_version(mpc_id_value)
version = (latest or 0) + 1
mpc_id = MPCId(value=mpc_id_value, version=version)
cert = ManufacturingProofCertificate(
mpc_id=mpc_id,
decision_lineage=decision_lineage or [],
simulation_proof=simulation_proof,
process_justification=process_justification,
machine_declaration=machine_declaration,
risk_register=risk_register or [],
metadata=metadata or {},
)
self._store.put(mpc_id_value, version, cert)
self._by_value[mpc_id_value] = cert
return cert
def verify(self, mpc_id: str | MPCId, version: int | None = None) -> ManufacturingProofCertificate | None:
"""Verify and return MPC if valid; None if not found or invalid."""
value = mpc_id.value if isinstance(mpc_id, MPCId) else mpc_id
cert = self._store.get(value, version) if version is not None else self._by_value.get(value)
if cert is None and version is None:
cert = self._store.get(value, self._store.get_latest_version(value))
return cert
def get(self, mpc_id_value: str, version: int | None = None) -> ManufacturingProofCertificate | None:
"""Return stored MPC by value and optional version."""
if version is not None:
return self._store.get(mpc_id_value, version)
return self._by_value.get(mpc_id_value) or self._store.get(
mpc_id_value, self._store.get_latest_version(mpc_id_value)
)

View File

@@ -0,0 +1,449 @@
"""Layer 4 — Physics Closure & Simulation Authority.
Responsible for:
- Governing equation selection (structural, thermal, fluid)
- Boundary condition enforcement
- Safety factor calculation and validation
- Failure mode completeness analysis
- Simulation binding (simulations are binding, not illustrative)
"""
import hashlib
import math
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from fusionagi._logger import logger
class PhysicsUnderdefinedError(Exception):
"""Failure state: physics not fully defined."""
def __init__(self, message: str, missing_data: list[str] | None = None):
self.missing_data = missing_data or []
super().__init__(message)
class ProofResult(str, Enum):
"""Result of physics validation."""
PROOF = "proof"
PHYSICS_UNDEFINED = "physics_underdefined"
VALIDATION_FAILED = "validation_failed"
class PhysicsProof(BaseModel):
"""Binding simulation proof reference."""
proof_id: str = Field(...)
governing_equations: str | None = Field(default=None)
boundary_conditions_ref: str | None = Field(default=None)
safety_factor: float | None = Field(default=None)
failure_modes_covered: list[str] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
validation_status: str = Field(default="validated")
warnings: list[str] = Field(default_factory=list)
class PhysicsAuthorityInterface(ABC):
"""
Abstract interface for physics validation.
Governing equation selection, boundary condition enforcement, safety factor declaration,
failure-mode completeness. Simulations are binding, not illustrative.
"""
@abstractmethod
def validate_physics(
self,
design_ref: str,
load_cases: list[dict[str, Any]] | None = None,
**kwargs: Any,
) -> PhysicsProof | None:
"""
Validate physics for design; return Proof or None (PhysicsUnderdefined).
Raises PhysicsUnderdefinedError if required data missing.
"""
...
# Common material properties database (simplified)
MATERIAL_PROPERTIES: dict[str, dict[str, float]] = {
"aluminum_6061": {
"yield_strength_mpa": 276,
"ultimate_strength_mpa": 310,
"elastic_modulus_gpa": 68.9,
"density_kg_m3": 2700,
"poisson_ratio": 0.33,
"thermal_expansion_per_c": 23.6e-6,
"max_service_temp_c": 150,
},
"steel_4140": {
"yield_strength_mpa": 655,
"ultimate_strength_mpa": 1020,
"elastic_modulus_gpa": 205,
"density_kg_m3": 7850,
"poisson_ratio": 0.29,
"thermal_expansion_per_c": 12.3e-6,
"max_service_temp_c": 400,
},
"titanium_ti6al4v": {
"yield_strength_mpa": 880,
"ultimate_strength_mpa": 950,
"elastic_modulus_gpa": 113.8,
"density_kg_m3": 4430,
"poisson_ratio": 0.34,
"thermal_expansion_per_c": 8.6e-6,
"max_service_temp_c": 350,
},
"pla_plastic": {
"yield_strength_mpa": 60,
"ultimate_strength_mpa": 65,
"elastic_modulus_gpa": 3.5,
"density_kg_m3": 1240,
"poisson_ratio": 0.36,
"thermal_expansion_per_c": 68e-6,
"max_service_temp_c": 55,
},
"abs_plastic": {
"yield_strength_mpa": 40,
"ultimate_strength_mpa": 44,
"elastic_modulus_gpa": 2.3,
"density_kg_m3": 1050,
"poisson_ratio": 0.35,
"thermal_expansion_per_c": 90e-6,
"max_service_temp_c": 85,
},
}
# Standard failure modes to check
STANDARD_FAILURE_MODES = [
"yield_failure",
"ultimate_failure",
"buckling",
"fatigue",
"creep",
"thermal_distortion",
"vibration_resonance",
]
@dataclass
class LoadCaseResult:
"""Result of validating a single load case."""
load_case_id: str
max_stress_mpa: float
safety_factor: float
passed: bool
failure_mode: str | None = None
details: dict[str, Any] | None = None
class PhysicsAuthority(PhysicsAuthorityInterface):
"""
Physics validation authority with actual validation logic.
Features:
- Material property validation
- Load case analysis
- Safety factor calculation
- Failure mode coverage analysis
- Governing equation selection based on load types
"""
def __init__(
self,
required_safety_factor: float = 2.0,
material_db: dict[str, dict[str, float]] | None = None,
custom_failure_modes: list[str] | None = None,
):
"""
Initialize the PhysicsAuthority.
Args:
required_safety_factor: Minimum required safety factor (default 2.0).
material_db: Custom material properties database.
custom_failure_modes: Additional failure modes to check.
"""
self._required_sf = required_safety_factor
self._materials = material_db or MATERIAL_PROPERTIES
self._failure_modes = list(STANDARD_FAILURE_MODES)
if custom_failure_modes:
self._failure_modes.extend(custom_failure_modes)
def validate_physics(
self,
design_ref: str,
load_cases: list[dict[str, Any]] | None = None,
material: str | None = None,
dimensions: dict[str, float] | None = None,
boundary_conditions: dict[str, Any] | None = None,
**kwargs: Any,
) -> PhysicsProof | None:
"""
Validate physics for a design.
Args:
design_ref: Reference to the design being validated.
load_cases: List of load cases to validate against.
material: Material identifier (must be in material database).
dimensions: Key dimensions for stress calculation.
boundary_conditions: Boundary condition specification.
**kwargs: Additional parameters.
Returns:
PhysicsProof if validation passes, None if physics underdefined.
Raises:
PhysicsUnderdefinedError: If critical data is missing.
"""
missing_data = []
if not design_ref:
missing_data.append("design_ref")
if not material:
missing_data.append("material")
if not load_cases:
missing_data.append("load_cases")
if missing_data:
raise PhysicsUnderdefinedError(
f"Physics validation requires: {', '.join(missing_data)}",
missing_data=missing_data,
)
# Get material properties
mat_props = self._materials.get(material.lower().replace(" ", "_"))
if not mat_props:
raise PhysicsUnderdefinedError(
f"Unknown material: {material}. Available: {list(self._materials.keys())}",
missing_data=["material_properties"],
)
# Validate each load case
load_case_results: list[LoadCaseResult] = []
min_safety_factor = float("inf")
warnings: list[str] = []
failure_modes_covered: list[str] = []
for lc in load_cases:
result = self._validate_load_case(lc, mat_props, dimensions)
load_case_results.append(result)
if result.safety_factor < min_safety_factor:
min_safety_factor = result.safety_factor
if not result.passed:
warnings.append(
f"Load case '{result.load_case_id}' failed: {result.failure_mode}"
)
# Track failure modes analyzed
if result.failure_mode and result.failure_mode not in failure_modes_covered:
failure_modes_covered.append(result.failure_mode)
# Determine governing equations based on load types
governing_equations = self._select_governing_equations(load_cases)
# Check minimum required failure modes
required_modes = ["yield_failure", "ultimate_failure"]
for mode in required_modes:
if mode not in failure_modes_covered:
failure_modes_covered.append(mode) # Basic checks are always done
# Generate proof ID based on inputs
proof_hash = hashlib.sha256(
f"{design_ref}:{material}:{load_cases}".encode()
).hexdigest()[:16]
proof_id = f"proof_{design_ref}_{proof_hash}"
# Determine validation status
validation_status = "validated"
if min_safety_factor < self._required_sf:
validation_status = "insufficient_safety_factor"
warnings.append(
f"Safety factor {min_safety_factor:.2f} < required {self._required_sf}"
)
if any(not r.passed for r in load_case_results):
validation_status = "load_case_failure"
logger.info(
"Physics validation completed",
extra={
"design_ref": design_ref,
"material": material,
"min_sf": min_safety_factor,
"status": validation_status,
"num_load_cases": len(load_cases),
},
)
return PhysicsProof(
proof_id=proof_id,
governing_equations=governing_equations,
boundary_conditions_ref=str(boundary_conditions) if boundary_conditions else None,
safety_factor=min_safety_factor if min_safety_factor != float("inf") else None,
failure_modes_covered=failure_modes_covered,
metadata={
"material": material,
"material_properties": mat_props,
"load_case_results": [
{
"id": r.load_case_id,
"max_stress_mpa": r.max_stress_mpa,
"sf": r.safety_factor,
"passed": r.passed,
}
for r in load_case_results
],
"required_safety_factor": self._required_sf,
},
validation_status=validation_status,
warnings=warnings,
)
def _validate_load_case(
self,
load_case: dict[str, Any],
mat_props: dict[str, float],
dimensions: dict[str, float] | None,
) -> LoadCaseResult:
"""Validate a single load case."""
lc_id = load_case.get("id", str(uuid.uuid4())[:8])
# Extract load parameters
force_n = load_case.get("force_n", 0)
moment_nm = load_case.get("moment_nm", 0)
pressure_mpa = load_case.get("pressure_mpa", 0)
temperature_c = load_case.get("temperature_c", 25)
# Get material limits
yield_strength = mat_props.get("yield_strength_mpa", 100)
ultimate_strength = mat_props.get("ultimate_strength_mpa", 150)
max_temp = mat_props.get("max_service_temp_c", 100)
# Calculate stress (simplified - assumes basic geometry)
area_mm2 = 100.0 # Default cross-sectional area
if dimensions:
width = dimensions.get("width_mm", 10)
height = dimensions.get("height_mm", 10)
area_mm2 = width * height
# Basic stress calculation
axial_stress = force_n / area_mm2 if area_mm2 > 0 else 0
bending_stress = 0
if moment_nm and dimensions:
# Simplified bending: M*c/I where c = height/2, I = width*height^3/12
height = dimensions.get("height_mm", 10)
width = dimensions.get("width_mm", 10)
c = height / 2
i = width * (height ** 3) / 12
bending_stress = (moment_nm * 1000 * c) / i if i > 0 else 0
# Combined stress (von Mises simplified for 1D)
max_stress = abs(axial_stress) + abs(bending_stress) + pressure_mpa
# Calculate safety factors
yield_sf = yield_strength / max_stress if max_stress > 0 else float("inf")
ultimate_sf = ultimate_strength / max_stress if max_stress > 0 else float("inf")
# Check temperature limits
temp_ok = temperature_c <= max_temp
# Determine if load case passes
passed = (
yield_sf >= self._required_sf
and ultimate_sf >= self._required_sf
and temp_ok
)
failure_mode = None
if yield_sf < self._required_sf:
failure_mode = "yield_failure"
elif ultimate_sf < self._required_sf:
failure_mode = "ultimate_failure"
elif not temp_ok:
failure_mode = "thermal_failure"
return LoadCaseResult(
load_case_id=lc_id,
max_stress_mpa=max_stress,
safety_factor=min(yield_sf, ultimate_sf),
passed=passed,
failure_mode=failure_mode,
details={
"axial_stress_mpa": axial_stress,
"bending_stress_mpa": bending_stress,
"yield_sf": yield_sf,
"ultimate_sf": ultimate_sf,
"temperature_ok": temp_ok,
},
)
def _select_governing_equations(self, load_cases: list[dict[str, Any]]) -> str:
"""Select appropriate governing equations based on load types."""
equations = []
# Check load types
has_static = any(lc.get("type") == "static" or lc.get("force_n") for lc in load_cases)
has_thermal = any(lc.get("temperature_c") for lc in load_cases)
has_dynamic = any(lc.get("type") == "dynamic" or lc.get("frequency_hz") for lc in load_cases)
has_pressure = any(lc.get("pressure_mpa") for lc in load_cases)
if has_static:
equations.append("Linear elasticity (Hooke's Law)")
if has_thermal:
equations.append("Thermal expansion (α·ΔT)")
if has_dynamic:
equations.append("Modal analysis (eigenvalue)")
if has_pressure:
equations.append("Pressure vessel (hoop stress)")
if not equations:
equations.append("Linear elasticity (default)")
return "; ".join(equations)
def get_material_properties(self, material: str) -> dict[str, float] | None:
"""Get properties for a material."""
return self._materials.get(material.lower().replace(" ", "_"))
def list_materials(self) -> list[str]:
"""List available materials."""
return list(self._materials.keys())
def add_material(self, name: str, properties: dict[str, float]) -> None:
"""Add a custom material to the database."""
self._materials[name.lower().replace(" ", "_")] = properties
class StubPhysicsAuthority(PhysicsAuthorityInterface):
"""
Stub implementation for testing.
Returns a minimal proof if design_ref present; else raises PhysicsUnderdefinedError.
Note: This is a stub for testing. Use PhysicsAuthority for real validation.
"""
def validate_physics(
self,
design_ref: str,
load_cases: list[dict[str, Any]] | None = None,
**kwargs: Any,
) -> PhysicsProof | None:
if not design_ref:
raise PhysicsUnderdefinedError("design_ref required")
return PhysicsProof(
proof_id=f"stub_proof_{design_ref}",
failure_modes_covered=["stub"],
validation_status="stub_validated",
warnings=["This is a stub validation - not for production use"],
)

View File

@@ -0,0 +1,32 @@
"""Layer 5 — Manufacturing Process Authority."""
from typing import Any
from pydantic import BaseModel, Field
class ProcessEligibilityResult(BaseModel):
eligible: bool = Field(...)
process_type: str = Field(...)
reason: str | None = Field(default=None)
class ProcessAuthority:
"""Evaluates eligibility for additive, subtractive, hybrid."""
def process_eligible(
self,
design_ref: str,
process_type: str,
context: dict[str, Any] | None = None,
) -> ProcessEligibilityResult:
if not design_ref or not process_type:
return ProcessEligibilityResult(
eligible=False,
process_type=process_type or "unknown",
reason="design_ref and process_type required",
)
pt = process_type.lower()
if pt not in ("additive", "subtractive", "hybrid"):
return ProcessEligibilityResult(eligible=False, process_type=process_type, reason="Unknown process_type")
return ProcessEligibilityResult(eligible=True, process_type=pt, reason=None)

View File

@@ -0,0 +1,63 @@
"""Layer 7 — Toolpath Determinism Engine: toolpath -> geometry -> intent -> requirement."""
from typing import Any
from pydantic import BaseModel, Field
class ToolpathLineage(BaseModel):
"""Lineage: toolpath traces to geometry, geometry to intent, intent to requirement."""
toolpath_ref: str = Field(...)
geometry_ref: str = Field(...)
intent_ref: str = Field(...)
requirement_ref: str | None = Field(default=None)
metadata: dict[str, Any] = Field(default_factory=dict)
class ToolpathArtifact(BaseModel):
"""Toolpath artifact + lineage (G-code or AM slice)."""
artifact_id: str = Field(...)
artifact_type: str = Field(..., description="cnc_gcode or am_slice")
content_ref: str | None = Field(default=None)
lineage: ToolpathLineage = Field(...)
metadata: dict[str, Any] = Field(default_factory=dict)
class ToolpathEngine:
"""Every toolpath traces to geometry -> intent -> requirement. Generates only after full closure."""
def __init__(self) -> None:
self._artifacts: dict[str, ToolpathArtifact] = {}
def generate(
self,
artifact_id: str,
artifact_type: str,
geometry_ref: str,
intent_ref: str,
requirement_ref: str | None = None,
content_ref: str | None = None,
metadata: dict[str, Any] | None = None,
) -> ToolpathArtifact:
"""Generate toolpath artifact with lineage; only after full closure (caller ensures)."""
lineage = ToolpathLineage(
toolpath_ref=artifact_id,
geometry_ref=geometry_ref,
intent_ref=intent_ref,
requirement_ref=requirement_ref,
)
artifact = ToolpathArtifact(
artifact_id=artifact_id,
artifact_type=artifact_type,
content_ref=content_ref,
lineage=lineage,
metadata=metadata or {},
)
self._artifacts[artifact_id] = artifact
return artifact
def get(self, artifact_id: str) -> ToolpathArtifact | None:
"""Return artifact by id or None."""
return self._artifacts.get(artifact_id)