#!/usr/bin/env python3 """ ISO-20022 Parser Parses ISO-20022 financial messages (pacs.008, camt.054, etc.) """ import xml.etree.ElementTree as ET from typing import Dict, Any, List, Optional from datetime import datetime import logging logger = logging.getLogger(__name__) class ISO20022Parser: """Parser for ISO-20022 financial messages""" # ISO-20022 namespaces NAMESPACES = { 'pacs': 'urn:iso:std:iso:20022:tech:xsd:pacs.008.001.10', 'paint': 'urn:iso:std:iso:20022:tech:xsd:paint.001.001.11', 'camt': 'urn:iso:std:iso:20022:tech:xsd:camt.054.001.10', 'pacs009': 'urn:iso:std:iso:20022:tech:xsd:pacs.009.001.10', 'camt053': 'urn:iso:std:iso:20022:tech:xsd:camt.053.001.10', 'camt052': 'urn:iso:std:iso:20022:tech:xsd:camt.052.001.10', } def __init__(self): self.supported_types = [ 'pacs.008', # Payment Instruction 'pacs.009', # Financial Institution Credit Transfer 'camt.052', # Bank-to-Customer Account Report 'camt.053', # Bank-to-Customer Statement 'camt.054', # Bank-to-Customer Debit Credit Notification 'paint.001', # Payment Initiation ] def parse(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 message (auto-detect type)""" try: root = ET.fromstring(xml_content) # Detect message type message_type = self._detect_message_type(root) if message_type == 'pacs.008': return self.parse_pacs008(xml_content) elif message_type == 'pacs.009': return self.parse_pacs009(xml_content) elif message_type == 'camt.052': return self.parse_camt052(xml_content) elif message_type == 'camt.053': return self.parse_camt053(xml_content) elif message_type == 'camt.054': return self.parse_camt054(xml_content) elif message_type == 'paint.001': return self.parse_paint001(xml_content) else: raise ValueError(f"Unsupported ISO-20022 message type: {message_type}") except ET.ParseError as e: logger.error(f"Failed to parse ISO-20022 XML: {e}") raise ValueError(f"Invalid ISO-20022 XML: {e}") def _detect_message_type(self, root: ET.Element) -> str: """Detect ISO-20022 message type from root element""" tag = root.tag if 'pacs.008' in tag: return 'pacs.008' elif 'pacs.009' in tag: return 'pacs.009' elif 'camt.052' in tag: return 'camt.052' elif 'camt.053' in tag: return 'camt.053' elif 'camt.054' in tag: return 'camt.054' elif 'paint.001' in tag: return 'paint.001' else: raise ValueError(f"Unknown ISO-20022 message type: {tag}") def parse_pacs008(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 pacs.008 (Payment Instruction) message""" root = ET.fromstring(xml_content) message = { 'type': 'pacs.008', 'messageId': self._get_text(root, './/pacs:MsgId'), 'creationDateTime': self._get_text(root, './/pacs:CreDtTm'), 'instructionId': self._get_text(root, './/pacs:InstrId'), 'endToEndId': self._get_text(root, './/pacs:EndToEndId'), 'transactionId': self._get_text(root, './/pacs:TxId'), 'paymentInformationId': self._get_text(root, './/pacs:PmtInfId'), 'paymentMethod': self._get_text(root, './/pacs:PmtMtd'), 'amount': self._get_text(root, './/pacs:InstdAmt'), 'currency': self._get_text(root, './/pacs:InstdAmt/@Ccy'), 'debtor': self._extract_party(root, './/pacs:Dbtr'), 'debtorAccount': self._extract_account(root, './/pacs:DbtrAcct'), 'debtorAgent': self._extract_agent(root, './/pacs:DbtrAgt'), 'creditor': self._extract_party(root, './/pacs:Cdtr'), 'creditorAccount': self._extract_account(root, './/pacs:CdtrAcct'), 'creditorAgent': self._extract_agent(root, './/pacs:CdtrAgt'), 'remittanceInfo': self._extract_remittance_info(root), 'purpose': self._get_text(root, './/pacs:Purp'), 'executionDate': self._get_text(root, './/pacs:ReqdExctnDt'), } return message def parse_pacs009(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 pacs.009 (Financial Institution Credit Transfer) message""" root = ET.fromstring(xml_content) message = { 'type': 'pacs.009', 'messageId': self._get_text(root, './/pacs009:MsgId'), 'creationDateTime': self._get_text(root, './/pacs009:CreDtTm'), 'instructionId': self._get_text(root, './/pacs009:InstrId'), 'endToEndId': self._get_text(root, './/pacs009:EndToEndId'), 'transactionId': self._get_text(root, './/pacs009:TxId'), 'amount': self._get_text(root, './/pacs009:InstdAmt'), 'currency': self._get_text(root, './/pacs009:InstdAmt/@Ccy'), 'debtorAgent': self._extract_agent(root, './/pacs009:DbtrAgt'), 'creditorAgent': self._extract_agent(root, './/pacs009:CdtrAgt'), } return message def parse_camt052(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 camt.052 (Bank-to-Customer Account Report) message""" root = ET.fromstring(xml_content) message = { 'type': 'camt.052', 'messageId': self._get_text(root, './/camt:MsgId'), 'creationDateTime': self._get_text(root, './/camt:CreDtTm'), 'account': self._extract_account_info(root), 'balance': self._extract_balance(root), 'entries': self._extract_entries(root), } return message def parse_camt053(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 camt.053 (Bank-to-Customer Statement) message""" root = ET.fromstring(xml_content) message = { 'type': 'camt.053', 'messageId': self._get_text(root, './/camt:MsgId'), 'creationDateTime': self._get_text(root, './/camt:CreDtTm'), 'account': self._extract_account_info(root), 'openingBalance': self._extract_balance(root, './/camt:Bal[@Tp/@Cd="OPBD"]'), 'closingBalance': self._extract_balance(root, './/camt:Bal[@Tp/@Cd="CLBD"]'), 'entries': self._extract_entries(root), } return message def parse_camt054(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 camt.054 (Bank-to-Customer Debit Credit Notification) message""" root = ET.fromstring(xml_content) message = { 'type': 'camt.054', 'messageId': self._get_text(root, './/camt:MsgId'), 'creationDateTime': self._get_text(root, './/camt:CreDtTm'), 'account': self._extract_account_info(root), 'entries': self._extract_entries(root), } return message def parse_paint001(self, xml_content: str) -> Dict[str, Any]: """Parse ISO-20022 paint.001 (Payment Initiation) message""" root = ET.fromstring(xml_content) message = { 'type': 'paint.001', 'messageId': self._get_text(root, './/paint:MsgId'), 'creationDateTime': self._get_text(root, './/paint:CreDtTm'), 'paymentInformationId': self._get_text(root, './/paint:PmtInfId'), 'paymentMethod': self._get_text(root, './/paint:PmtMtd'), 'debtor': self._extract_party(root, './/paint:Dbtr'), 'debtorAccount': self._extract_account(root, './/paint:DbtrAcct'), 'payments': self._extract_payments(root), } return message def _get_text(self, root: ET.Element, xpath: str, default: str = '') -> str: """Get text from XML element""" try: element = root.find(xpath, self.NAMESPACES) return element.text if element is not None else default except Exception: return default def _get_attribute(self, root: ET.Element, xpath: str, attribute: str, default: str = '') -> str: """Get attribute from XML element""" try: element = root.find(xpath, self.NAMESPACES) return element.get(attribute, default) if element is not None else default except Exception: return default def _extract_party(self, root: ET.Element, xpath: str) -> Dict[str, Any]: """Extract party information from XML""" party = root.find(xpath, self.NAMESPACES) if party is None: return {} return { 'name': self._get_text(party, './/pacs:Nm') or self._get_text(party, './/camt:Nm') or self._get_text(party, './/paint:Nm'), 'postalAddress': self._extract_postal_address(party), 'identification': self._extract_identification(party), 'contactDetails': self._extract_contact_details(party), } def _extract_account(self, root: ET.Element, xpath: str) -> Dict[str, Any]: """Extract account information from XML""" account = root.find(xpath, self.NAMESPACES) if account is None: return {} return { 'id': self._get_text(account, './/pacs:Id') or self._get_text(account, './/camt:Id') or self._get_text(account, './/paint:Id'), 'type': self._get_text(account, './/pacs:Tp') or self._get_text(account, './/camt:Tp') or self._get_text(account, './/paint:Tp'), 'currency': self._get_text(account, './/pacs:Ccy') or self._get_text(account, './/camt:Ccy') or self._get_text(account, './/paint:Ccy'), 'name': self._get_text(account, './/pacs:Nm') or self._get_text(account, './/camt:Nm') or self._get_text(account, './/paint:Nm'), } def _extract_agent(self, root: ET.Element, xpath: str) -> Dict[str, Any]: """Extract agent (financial institution) information from XML""" agent = root.find(xpath, self.NAMESPACES) if agent is None: return {} return { 'bic': self._get_text(agent, './/pacs:BIC') or self._get_text(agent, './/camt:BIC') or self._get_text(agent, './/paint:BIC'), 'name': self._get_text(agent, './/pacs:Nm') or self._get_text(agent, './/camt:Nm') or self._get_text(agent, './/paint:Nm'), 'postalAddress': self._extract_postal_address(agent), } def _extract_account_info(self, root: ET.Element) -> Dict[str, Any]: """Extract account information from camt messages""" account = root.find('.//camt:Acct', self.NAMESPACES) if account is None: return {} return { 'id': self._get_text(account, './/camt:Id'), 'type': self._get_text(account, './/camt:Tp'), 'currency': self._get_text(account, './/camt:Ccy'), 'name': self._get_text(account, './/camt:Nm'), 'owner': self._extract_party(account, './/camt:Ownr'), } def _extract_balance(self, root: ET.Element, xpath: str = './/camt:Bal') -> Dict[str, Any]: """Extract balance information from XML""" balance = root.find(xpath, self.NAMESPACES) if balance is None: return {} return { 'amount': self._get_text(balance, './/camt:Amt'), 'currency': self._get_attribute(balance, './/camt:Amt', 'Ccy'), 'creditDebitIndicator': self._get_text(balance, './/camt:CdtDbtInd'), 'type': self._get_text(balance, './/camt:Tp'), 'date': self._get_text(balance, './/camt:Dt'), } def _extract_entries(self, root: ET.Element) -> List[Dict[str, Any]]: """Extract entry information from XML""" entries = [] for entry in root.findall('.//camt:Ntry', self.NAMESPACES): entries.append({ 'amount': self._get_text(entry, './/camt:Amt'), 'currency': self._get_attribute(entry, './/camt:Amt', 'Ccy'), 'creditDebitIndicator': self._get_text(entry, './/camt:CdtDbtInd'), 'status': self._get_text(entry, './/camt:Sts'), 'bookingDate': self._get_text(entry, './/camt:BookgDt'), 'valueDate': self._get_text(entry, './/camt:ValDt'), 'transactionId': self._get_text(entry, './/camt:AcctSvcrRef'), 'accountServicerReference': self._get_text(entry, './/camt:AcctSvcrRef'), 'transactionDetails': self._extract_transaction_details(entry), }) return entries def _extract_payments(self, root: ET.Element) -> List[Dict[str, Any]]: """Extract payment information from XML""" payments = [] for payment in root.findall('.//paint:CdtTrfTxInf', self.NAMESPACES): payments.append({ 'paymentId': self._get_text(payment, './/paint:PmtId'), 'amount': self._get_text(payment, './/paint:InstdAmt'), 'currency': self._get_attribute(payment, './/paint:InstdAmt', 'Ccy'), 'creditor': self._extract_party(payment, './/paint:Cdtr'), 'creditorAccount': self._extract_account(payment, './/paint:CdtrAcct'), 'creditorAgent': self._extract_agent(payment, './/paint:CdtrAgt'), 'remittanceInfo': self._extract_remittance_info(payment), }) return payments def _extract_transaction_details(self, entry: ET.Element) -> List[Dict[str, Any]]: """Extract transaction details from entry""" details = [] for detail in entry.findall('.//camt:TxDtls', self.NAMESPACES): details.append({ 'references': { 'accountServicerReference': self._get_text(detail, './/camt:AcctSvcrRef'), 'endToEndId': self._get_text(detail, './/camt:EndToEndId'), 'instructionId': self._get_text(detail, './/camt:InstrId'), 'transactionId': self._get_text(detail, './/camt:TxId'), }, 'relatedParties': { 'debtor': self._extract_party(detail, './/camt:RltdPties/camt:Dbtr'), 'creditor': self._extract_party(detail, './/camt:RltdPties/camt:Cdtr'), }, 'remittanceInfo': self._extract_remittance_info(detail), 'additionalInfo': self._get_text(detail, './/camt:AddtlTxInf'), }) return details def _extract_remittance_info(self, root: ET.Element) -> Dict[str, Any]: """Extract remittance information from XML""" remittance = root.find('.//pacs:RmtInf', self.NAMESPACES) or root.find('.//camt:RmtInf', self.NAMESPACES) or root.find('.//paint:RmtInf', self.NAMESPACES) if remittance is None: return {} return { 'unstructured': self._get_text(remittance, './/pacs:Ustrd') or self._get_text(remittance, './/camt:Ustrd') or self._get_text(remittance, './/paint:Ustrd'), 'structured': self._extract_structured_remittance(remittance), } def _extract_structured_remittance(self, remittance: ET.Element) -> List[Dict[str, Any]]: """Extract structured remittance information""" structured = [] for struct in remittance.findall('.//pacs:Strd', self.NAMESPACES) or remittance.findall('.//camt:Strd', self.NAMESPACES) or remittance.findall('.//paint:Strd', self.NAMESPACES): structured.append({ 'referredDocumentInformation': self._get_text(struct, './/pacs:RfrdDocInf') or self._get_text(struct, './/camt:RfrdDocInf') or self._get_text(struct, './/paint:RfrdDocInf'), 'referredDocumentAmount': self._get_text(struct, './/pacs:RfrdDocAmt') or self._get_text(struct, './/camt:RfrdDocAmt') or self._get_text(struct, './/paint:RfrdDocAmt'), }) return structured def _extract_postal_address(self, root: ET.Element) -> Dict[str, Any]: """Extract postal address information""" address = root.find('.//pacs:PstlAdr', self.NAMESPACES) or root.find('.//camt:PstlAdr', self.NAMESPACES) or root.find('.//paint:PstlAdr', self.NAMESPACES) if address is None: return {} return { 'streetName': self._get_text(address, './/pacs:StrtNm') or self._get_text(address, './/camt:StrtNm') or self._get_text(address, './/paint:StrtNm'), 'buildingNumber': self._get_text(address, './/pacs:BldgNb') or self._get_text(address, './/camt:BldgNb') or self._get_text(address, './/paint:BldgNb'), 'postCode': self._get_text(address, './/pacs:PstCd') or self._get_text(address, './/camt:PstCd') or self._get_text(address, './/paint:PstCd'), 'townName': self._get_text(address, './/pacs:TwnNm') or self._get_text(address, './/camt:TwnNm') or self._get_text(address, './/paint:TwnNm'), 'country': self._get_text(address, './/pacs:Ctry') or self._get_text(address, './/camt:Ctry') or self._get_text(address, './/paint:Ctry'), } def _extract_identification(self, root: ET.Element) -> Dict[str, Any]: """Extract identification information""" identification = root.find('.//pacs:Id', self.NAMESPACES) or root.find('.//camt:Id', self.NAMESPACES) or root.find('.//paint:Id', self.NAMESPACES) if identification is None: return {} return { 'organisationId': self._get_text(identification, './/pacs:OrgId') or self._get_text(identification, './/camt:OrgId') or self._get_text(identification, './/paint:OrgId'), 'privateId': self._get_text(identification, './/pacs:PrvtId') or self._get_text(identification, './/camt:PrvtId') or self._get_text(identification, './/paint:PrvtId'), 'other': self._get_text(identification, './/pacs:Othr') or self._get_text(identification, './/camt:Othr') or self._get_text(identification, './/paint:Othr'), } def _extract_contact_details(self, root: ET.Element) -> Dict[str, Any]: """Extract contact details""" contact = root.find('.//pacs:CtctDtls', self.NAMESPACES) or root.find('.//camt:CtctDtls', self.NAMESPACES) or root.find('.//paint:CtctDtls', self.NAMESPACES) if contact is None: return {} return { 'name': self._get_text(contact, './/pacs:Nm') or self._get_text(contact, './/camt:Nm') or self._get_text(contact, './/paint:Nm'), 'phoneNumber': self._get_text(contact, './/pacs:PhneNb') or self._get_text(contact, './/camt:PhneNb') or self._get_text(contact, './/paint:PhneNb'), 'emailAddress': self._get_text(contact, './/pacs:EmailAdr') or self._get_text(contact, './/camt:EmailAdr') or self._get_text(contact, './/paint:EmailAdr'), }