Files
smom-dbis-138/services/financial-tokenization/parsers/iso20022_parser.py
defiQUG 1fb7266469 Add Oracle Aggregator and CCIP Integration
- Introduced Aggregator.sol for Chainlink-compatible oracle functionality, including round-based updates and access control.
- Added OracleWithCCIP.sol to extend Aggregator with CCIP cross-chain messaging capabilities.
- Created .gitmodules to include OpenZeppelin contracts as a submodule.
- Developed a comprehensive deployment guide in NEXT_STEPS_COMPLETE_GUIDE.md for Phase 2 and smart contract deployment.
- Implemented Vite configuration for the orchestration portal, supporting both Vue and React frameworks.
- Added server-side logic for the Multi-Cloud Orchestration Portal, including API endpoints for environment management and monitoring.
- Created scripts for resource import and usage validation across non-US regions.
- Added tests for CCIP error handling and integration to ensure robust functionality.
- Included various new files and directories for the orchestration portal and deployment scripts.
2025-12-12 14:57:48 -08:00

383 lines
19 KiB
Python

#!/usr/bin/env python3
"""
ISO-20022 Parser
Parses ISO-20022 financial messages (pacs.008, camt.054, etc.)
"""
import xml.etree.ElementTree as ET
from typing import Dict, Any, List, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ISO20022Parser:
"""Parser for ISO-20022 financial messages"""
# ISO-20022 namespaces
NAMESPACES = {
'pacs': 'urn:iso:std:iso:20022:tech:xsd:pacs.008.001.10',
'paint': 'urn:iso:std:iso:20022:tech:xsd:paint.001.001.11',
'camt': 'urn:iso:std:iso:20022:tech:xsd:camt.054.001.10',
'pacs009': 'urn:iso:std:iso:20022:tech:xsd:pacs.009.001.10',
'camt053': 'urn:iso:std:iso:20022:tech:xsd:camt.053.001.10',
'camt052': 'urn:iso:std:iso:20022:tech:xsd:camt.052.001.10',
}
def __init__(self):
self.supported_types = [
'pacs.008', # Payment Instruction
'pacs.009', # Financial Institution Credit Transfer
'camt.052', # Bank-to-Customer Account Report
'camt.053', # Bank-to-Customer Statement
'camt.054', # Bank-to-Customer Debit Credit Notification
'paint.001', # Payment Initiation
]
def parse(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 message (auto-detect type)"""
try:
root = ET.fromstring(xml_content)
# Detect message type
message_type = self._detect_message_type(root)
if message_type == 'pacs.008':
return self.parse_pacs008(xml_content)
elif message_type == 'pacs.009':
return self.parse_pacs009(xml_content)
elif message_type == 'camt.052':
return self.parse_camt052(xml_content)
elif message_type == 'camt.053':
return self.parse_camt053(xml_content)
elif message_type == 'camt.054':
return self.parse_camt054(xml_content)
elif message_type == 'paint.001':
return self.parse_paint001(xml_content)
else:
raise ValueError(f"Unsupported ISO-20022 message type: {message_type}")
except ET.ParseError as e:
logger.error(f"Failed to parse ISO-20022 XML: {e}")
raise ValueError(f"Invalid ISO-20022 XML: {e}")
def _detect_message_type(self, root: ET.Element) -> str:
"""Detect ISO-20022 message type from root element"""
tag = root.tag
if 'pacs.008' in tag:
return 'pacs.008'
elif 'pacs.009' in tag:
return 'pacs.009'
elif 'camt.052' in tag:
return 'camt.052'
elif 'camt.053' in tag:
return 'camt.053'
elif 'camt.054' in tag:
return 'camt.054'
elif 'paint.001' in tag:
return 'paint.001'
else:
raise ValueError(f"Unknown ISO-20022 message type: {tag}")
def parse_pacs008(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 pacs.008 (Payment Instruction) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'pacs.008',
'messageId': self._get_text(root, './/pacs:MsgId'),
'creationDateTime': self._get_text(root, './/pacs:CreDtTm'),
'instructionId': self._get_text(root, './/pacs:InstrId'),
'endToEndId': self._get_text(root, './/pacs:EndToEndId'),
'transactionId': self._get_text(root, './/pacs:TxId'),
'paymentInformationId': self._get_text(root, './/pacs:PmtInfId'),
'paymentMethod': self._get_text(root, './/pacs:PmtMtd'),
'amount': self._get_text(root, './/pacs:InstdAmt'),
'currency': self._get_text(root, './/pacs:InstdAmt/@Ccy'),
'debtor': self._extract_party(root, './/pacs:Dbtr'),
'debtorAccount': self._extract_account(root, './/pacs:DbtrAcct'),
'debtorAgent': self._extract_agent(root, './/pacs:DbtrAgt'),
'creditor': self._extract_party(root, './/pacs:Cdtr'),
'creditorAccount': self._extract_account(root, './/pacs:CdtrAcct'),
'creditorAgent': self._extract_agent(root, './/pacs:CdtrAgt'),
'remittanceInfo': self._extract_remittance_info(root),
'purpose': self._get_text(root, './/pacs:Purp'),
'executionDate': self._get_text(root, './/pacs:ReqdExctnDt'),
}
return message
def parse_pacs009(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 pacs.009 (Financial Institution Credit Transfer) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'pacs.009',
'messageId': self._get_text(root, './/pacs009:MsgId'),
'creationDateTime': self._get_text(root, './/pacs009:CreDtTm'),
'instructionId': self._get_text(root, './/pacs009:InstrId'),
'endToEndId': self._get_text(root, './/pacs009:EndToEndId'),
'transactionId': self._get_text(root, './/pacs009:TxId'),
'amount': self._get_text(root, './/pacs009:InstdAmt'),
'currency': self._get_text(root, './/pacs009:InstdAmt/@Ccy'),
'debtorAgent': self._extract_agent(root, './/pacs009:DbtrAgt'),
'creditorAgent': self._extract_agent(root, './/pacs009:CdtrAgt'),
}
return message
def parse_camt052(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 camt.052 (Bank-to-Customer Account Report) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'camt.052',
'messageId': self._get_text(root, './/camt:MsgId'),
'creationDateTime': self._get_text(root, './/camt:CreDtTm'),
'account': self._extract_account_info(root),
'balance': self._extract_balance(root),
'entries': self._extract_entries(root),
}
return message
def parse_camt053(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 camt.053 (Bank-to-Customer Statement) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'camt.053',
'messageId': self._get_text(root, './/camt:MsgId'),
'creationDateTime': self._get_text(root, './/camt:CreDtTm'),
'account': self._extract_account_info(root),
'openingBalance': self._extract_balance(root, './/camt:Bal[@Tp/@Cd="OPBD"]'),
'closingBalance': self._extract_balance(root, './/camt:Bal[@Tp/@Cd="CLBD"]'),
'entries': self._extract_entries(root),
}
return message
def parse_camt054(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 camt.054 (Bank-to-Customer Debit Credit Notification) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'camt.054',
'messageId': self._get_text(root, './/camt:MsgId'),
'creationDateTime': self._get_text(root, './/camt:CreDtTm'),
'account': self._extract_account_info(root),
'entries': self._extract_entries(root),
}
return message
def parse_paint001(self, xml_content: str) -> Dict[str, Any]:
"""Parse ISO-20022 paint.001 (Payment Initiation) message"""
root = ET.fromstring(xml_content)
message = {
'type': 'paint.001',
'messageId': self._get_text(root, './/paint:MsgId'),
'creationDateTime': self._get_text(root, './/paint:CreDtTm'),
'paymentInformationId': self._get_text(root, './/paint:PmtInfId'),
'paymentMethod': self._get_text(root, './/paint:PmtMtd'),
'debtor': self._extract_party(root, './/paint:Dbtr'),
'debtorAccount': self._extract_account(root, './/paint:DbtrAcct'),
'payments': self._extract_payments(root),
}
return message
def _get_text(self, root: ET.Element, xpath: str, default: str = '') -> str:
"""Get text from XML element"""
try:
element = root.find(xpath, self.NAMESPACES)
return element.text if element is not None else default
except Exception:
return default
def _get_attribute(self, root: ET.Element, xpath: str, attribute: str, default: str = '') -> str:
"""Get attribute from XML element"""
try:
element = root.find(xpath, self.NAMESPACES)
return element.get(attribute, default) if element is not None else default
except Exception:
return default
def _extract_party(self, root: ET.Element, xpath: str) -> Dict[str, Any]:
"""Extract party information from XML"""
party = root.find(xpath, self.NAMESPACES)
if party is None:
return {}
return {
'name': self._get_text(party, './/pacs:Nm') or self._get_text(party, './/camt:Nm') or self._get_text(party, './/paint:Nm'),
'postalAddress': self._extract_postal_address(party),
'identification': self._extract_identification(party),
'contactDetails': self._extract_contact_details(party),
}
def _extract_account(self, root: ET.Element, xpath: str) -> Dict[str, Any]:
"""Extract account information from XML"""
account = root.find(xpath, self.NAMESPACES)
if account is None:
return {}
return {
'id': self._get_text(account, './/pacs:Id') or self._get_text(account, './/camt:Id') or self._get_text(account, './/paint:Id'),
'type': self._get_text(account, './/pacs:Tp') or self._get_text(account, './/camt:Tp') or self._get_text(account, './/paint:Tp'),
'currency': self._get_text(account, './/pacs:Ccy') or self._get_text(account, './/camt:Ccy') or self._get_text(account, './/paint:Ccy'),
'name': self._get_text(account, './/pacs:Nm') or self._get_text(account, './/camt:Nm') or self._get_text(account, './/paint:Nm'),
}
def _extract_agent(self, root: ET.Element, xpath: str) -> Dict[str, Any]:
"""Extract agent (financial institution) information from XML"""
agent = root.find(xpath, self.NAMESPACES)
if agent is None:
return {}
return {
'bic': self._get_text(agent, './/pacs:BIC') or self._get_text(agent, './/camt:BIC') or self._get_text(agent, './/paint:BIC'),
'name': self._get_text(agent, './/pacs:Nm') or self._get_text(agent, './/camt:Nm') or self._get_text(agent, './/paint:Nm'),
'postalAddress': self._extract_postal_address(agent),
}
def _extract_account_info(self, root: ET.Element) -> Dict[str, Any]:
"""Extract account information from camt messages"""
account = root.find('.//camt:Acct', self.NAMESPACES)
if account is None:
return {}
return {
'id': self._get_text(account, './/camt:Id'),
'type': self._get_text(account, './/camt:Tp'),
'currency': self._get_text(account, './/camt:Ccy'),
'name': self._get_text(account, './/camt:Nm'),
'owner': self._extract_party(account, './/camt:Ownr'),
}
def _extract_balance(self, root: ET.Element, xpath: str = './/camt:Bal') -> Dict[str, Any]:
"""Extract balance information from XML"""
balance = root.find(xpath, self.NAMESPACES)
if balance is None:
return {}
return {
'amount': self._get_text(balance, './/camt:Amt'),
'currency': self._get_attribute(balance, './/camt:Amt', 'Ccy'),
'creditDebitIndicator': self._get_text(balance, './/camt:CdtDbtInd'),
'type': self._get_text(balance, './/camt:Tp'),
'date': self._get_text(balance, './/camt:Dt'),
}
def _extract_entries(self, root: ET.Element) -> List[Dict[str, Any]]:
"""Extract entry information from XML"""
entries = []
for entry in root.findall('.//camt:Ntry', self.NAMESPACES):
entries.append({
'amount': self._get_text(entry, './/camt:Amt'),
'currency': self._get_attribute(entry, './/camt:Amt', 'Ccy'),
'creditDebitIndicator': self._get_text(entry, './/camt:CdtDbtInd'),
'status': self._get_text(entry, './/camt:Sts'),
'bookingDate': self._get_text(entry, './/camt:BookgDt'),
'valueDate': self._get_text(entry, './/camt:ValDt'),
'transactionId': self._get_text(entry, './/camt:AcctSvcrRef'),
'accountServicerReference': self._get_text(entry, './/camt:AcctSvcrRef'),
'transactionDetails': self._extract_transaction_details(entry),
})
return entries
def _extract_payments(self, root: ET.Element) -> List[Dict[str, Any]]:
"""Extract payment information from XML"""
payments = []
for payment in root.findall('.//paint:CdtTrfTxInf', self.NAMESPACES):
payments.append({
'paymentId': self._get_text(payment, './/paint:PmtId'),
'amount': self._get_text(payment, './/paint:InstdAmt'),
'currency': self._get_attribute(payment, './/paint:InstdAmt', 'Ccy'),
'creditor': self._extract_party(payment, './/paint:Cdtr'),
'creditorAccount': self._extract_account(payment, './/paint:CdtrAcct'),
'creditorAgent': self._extract_agent(payment, './/paint:CdtrAgt'),
'remittanceInfo': self._extract_remittance_info(payment),
})
return payments
def _extract_transaction_details(self, entry: ET.Element) -> List[Dict[str, Any]]:
"""Extract transaction details from entry"""
details = []
for detail in entry.findall('.//camt:TxDtls', self.NAMESPACES):
details.append({
'references': {
'accountServicerReference': self._get_text(detail, './/camt:AcctSvcrRef'),
'endToEndId': self._get_text(detail, './/camt:EndToEndId'),
'instructionId': self._get_text(detail, './/camt:InstrId'),
'transactionId': self._get_text(detail, './/camt:TxId'),
},
'relatedParties': {
'debtor': self._extract_party(detail, './/camt:RltdPties/camt:Dbtr'),
'creditor': self._extract_party(detail, './/camt:RltdPties/camt:Cdtr'),
},
'remittanceInfo': self._extract_remittance_info(detail),
'additionalInfo': self._get_text(detail, './/camt:AddtlTxInf'),
})
return details
def _extract_remittance_info(self, root: ET.Element) -> Dict[str, Any]:
"""Extract remittance information from XML"""
remittance = root.find('.//pacs:RmtInf', self.NAMESPACES) or root.find('.//camt:RmtInf', self.NAMESPACES) or root.find('.//paint:RmtInf', self.NAMESPACES)
if remittance is None:
return {}
return {
'unstructured': self._get_text(remittance, './/pacs:Ustrd') or self._get_text(remittance, './/camt:Ustrd') or self._get_text(remittance, './/paint:Ustrd'),
'structured': self._extract_structured_remittance(remittance),
}
def _extract_structured_remittance(self, remittance: ET.Element) -> List[Dict[str, Any]]:
"""Extract structured remittance information"""
structured = []
for struct in remittance.findall('.//pacs:Strd', self.NAMESPACES) or remittance.findall('.//camt:Strd', self.NAMESPACES) or remittance.findall('.//paint:Strd', self.NAMESPACES):
structured.append({
'referredDocumentInformation': self._get_text(struct, './/pacs:RfrdDocInf') or self._get_text(struct, './/camt:RfrdDocInf') or self._get_text(struct, './/paint:RfrdDocInf'),
'referredDocumentAmount': self._get_text(struct, './/pacs:RfrdDocAmt') or self._get_text(struct, './/camt:RfrdDocAmt') or self._get_text(struct, './/paint:RfrdDocAmt'),
})
return structured
def _extract_postal_address(self, root: ET.Element) -> Dict[str, Any]:
"""Extract postal address information"""
address = root.find('.//pacs:PstlAdr', self.NAMESPACES) or root.find('.//camt:PstlAdr', self.NAMESPACES) or root.find('.//paint:PstlAdr', self.NAMESPACES)
if address is None:
return {}
return {
'streetName': self._get_text(address, './/pacs:StrtNm') or self._get_text(address, './/camt:StrtNm') or self._get_text(address, './/paint:StrtNm'),
'buildingNumber': self._get_text(address, './/pacs:BldgNb') or self._get_text(address, './/camt:BldgNb') or self._get_text(address, './/paint:BldgNb'),
'postCode': self._get_text(address, './/pacs:PstCd') or self._get_text(address, './/camt:PstCd') or self._get_text(address, './/paint:PstCd'),
'townName': self._get_text(address, './/pacs:TwnNm') or self._get_text(address, './/camt:TwnNm') or self._get_text(address, './/paint:TwnNm'),
'country': self._get_text(address, './/pacs:Ctry') or self._get_text(address, './/camt:Ctry') or self._get_text(address, './/paint:Ctry'),
}
def _extract_identification(self, root: ET.Element) -> Dict[str, Any]:
"""Extract identification information"""
identification = root.find('.//pacs:Id', self.NAMESPACES) or root.find('.//camt:Id', self.NAMESPACES) or root.find('.//paint:Id', self.NAMESPACES)
if identification is None:
return {}
return {
'organisationId': self._get_text(identification, './/pacs:OrgId') or self._get_text(identification, './/camt:OrgId') or self._get_text(identification, './/paint:OrgId'),
'privateId': self._get_text(identification, './/pacs:PrvtId') or self._get_text(identification, './/camt:PrvtId') or self._get_text(identification, './/paint:PrvtId'),
'other': self._get_text(identification, './/pacs:Othr') or self._get_text(identification, './/camt:Othr') or self._get_text(identification, './/paint:Othr'),
}
def _extract_contact_details(self, root: ET.Element) -> Dict[str, Any]:
"""Extract contact details"""
contact = root.find('.//pacs:CtctDtls', self.NAMESPACES) or root.find('.//camt:CtctDtls', self.NAMESPACES) or root.find('.//paint:CtctDtls', self.NAMESPACES)
if contact is None:
return {}
return {
'name': self._get_text(contact, './/pacs:Nm') or self._get_text(contact, './/camt:Nm') or self._get_text(contact, './/paint:Nm'),
'phoneNumber': self._get_text(contact, './/pacs:PhneNb') or self._get_text(contact, './/camt:PhneNb') or self._get_text(contact, './/paint:PhneNb'),
'emailAddress': self._get_text(contact, './/pacs:EmailAdr') or self._get_text(contact, './/camt:EmailAdr') or self._get_text(contact, './/paint:EmailAdr'),
}