#!/usr/bin/env python3 """ Financial Tokenization Service Tokenizes ISO-20022, SWIFT FIN, and other financial files using Hyperledger Firefly """ import os import json import logging from typing import Dict, Any, List, Optional from pathlib import Path from datetime import datetime from flask import Flask, request, jsonify from flask_restx import Api, Resource, fields import xml.etree.ElementTree as ET from web3 import Web3 import requests import structlog from parsers.iso20022_parser import ISO20022Parser from parsers.swift_fin_parser import SWIFTFINParser # Configure logging logging.basicConfig(level=logging.INFO) logger = structlog.get_logger() # Flask app app = Flask(__name__) api = Api(app, version='1.0', title='Financial Tokenization Service', description='Tokenize ISO-20022, SWIFT FIN, and other financial files') # Configuration FIREFLY_API_URL = os.getenv('FIREFLY_API_URL', 'http://firefly-api:5000') FIREFLY_API_KEY = os.getenv('FIREFLY_API_KEY', '') BESU_RPC_URL = os.getenv('BESU_RPC_URL', 'http://besu-rpc-service:8545') CHAIN_ID = int(os.getenv('CHAIN_ID', '138')) # Firefly client class FireflyClient: """Client for interacting with Firefly API""" def __init__(self, api_url: str, api_key: str): self.api_url = api_url self.api_key = api_key self.session = requests.Session() if api_key: self.session.headers.update({'X-API-Key': api_key}) def create_token_pool(self, name: str, symbol: str, description: str) -> Dict[str, Any]: """Create a token pool in Firefly""" url = f"{self.api_url}/api/v1/tokens/pools" data = { "name": name, "symbol": symbol, "description": description, "type": "fungible" } response = self.session.post(url, json=data) response.raise_for_status() return response.json() def mint_tokens(self, pool_id: str, amount: str, recipient: str) -> Dict[str, Any]: """Mint tokens in a pool""" url = f"{self.api_url}/api/v1/tokens/mint" data = { "pool": pool_id, "amount": amount, "to": recipient } response = self.session.post(url, json=data) response.raise_for_status() return response.json() def create_nft(self, pool_id: str, token_id: str, uri: str, recipient: str) -> Dict[str, Any]: """Create an NFT in a pool""" url = f"{self.api_url}/api/v1/tokens/nfts" data = { "pool": pool_id, "tokenId": token_id, "uri": uri, "to": recipient } response = self.session.post(url, json=data) response.raise_for_status() return response.json() def upload_data(self, data: bytes, filename: str) -> str: """Upload data to IPFS via Firefly""" url = f"{self.api_url}/api/v1/data" files = {'file': (filename, data)} response = self.session.post(url, files=files) response.raise_for_status() result = response.json() return result.get('id', '') # Import parsers from parsers module # Financial Tokenization Service class FinancialTokenizationService: """Service for tokenizing financial files""" def __init__(self, firefly_client: FireflyClient): self.firefly = firefly_client self.iso20022_parser = ISO20022Parser() self.swift_parser = SWIFTFINParser() self.nft_pools = {} # Cache for NFT pools def tokenize_iso20022(self, xml_content: str, file_name: str) -> Dict[str, Any]: """Tokenize ISO-20022 message""" try: # Parse ISO-20022 message (auto-detects type) message = self.iso20022_parser.parse(xml_content) # Upload to IPFS ipfs_id = self.firefly.upload_data(xml_content.encode('utf-8'), file_name) # Create NFT for the message nft_pool_id = self._get_or_create_nft_pool("ISO20022-Messages") nft = self.firefly.create_nft( pool_id=nft_pool_id, token_id=message.get('messageId', ''), uri=f"ipfs://{ipfs_id}", recipient=message.get('debtor', {}).get('account', '') ) return { 'status': 'success', 'messageId': message.get('messageId', ''), 'nft': nft, 'ipfsId': ipfs_id, 'parsed': message, } except Exception as e: logger.error("Failed to tokenize ISO-20022 message", error=str(e)) raise def tokenize_swift_fin(self, swift_message: str, file_name: str) -> Dict[str, Any]: """Tokenize SWIFT FIN message""" try: # Parse SWIFT FIN message message = self.swift_parser.parse(swift_message) # Upload to IPFS ipfs_id = self.firefly.upload_data(swift_message.encode('utf-8'), file_name) # Create NFT for the message nft_pool_id = self._get_or_create_nft_pool("SWIFT-FIN-Messages") nft = self.firefly.create_nft( pool_id=nft_pool_id, token_id=message.get('applicationHeader', {}).get('messageType', ''), uri=f"ipfs://{ipfs_id}", recipient=message.get('parsed', {}).get('debtor', '') ) return { 'status': 'success', 'messageType': message.get('messageType', ''), 'nft': nft, 'ipfsId': ipfs_id, 'parsed': message, } except Exception as e: logger.error("Failed to tokenize SWIFT FIN message", error=str(e)) raise def _get_or_create_nft_pool(self, pool_name: str) -> str: """Get or create NFT pool""" # In production, this would check if pool exists first # For now, create a new pool pool = self.firefly.create_token_pool( name=pool_name, symbol=pool_name.replace('-', ''), description=f"NFT pool for {pool_name}" ) return pool.get('id', '') # Initialize service firefly_client = FireflyClient(FIREFLY_API_URL, FIREFLY_API_KEY) tokenization_service = FinancialTokenizationService(firefly_client) # API Models iso20022_model = api.model('ISO20022', { 'xml_content': fields.String(required=True, description='ISO-20022 XML content'), 'file_name': fields.String(required=True, description='File name'), }) swift_fin_model = api.model('SWIFTFIN', { 'swift_message': fields.String(required=True, description='SWIFT FIN message'), 'file_name': fields.String(required=True, description='File name'), }) # API Endpoints @api.route('/api/v1/tokenize/iso20022') class TokenizeISO20022(Resource): @api.expect(iso20022_model) def post(self): """Tokenize ISO-20022 message""" data = request.json try: result = tokenization_service.tokenize_iso20022( data['xml_content'], data['file_name'] ) return jsonify(result), 200 except Exception as e: return jsonify({'error': str(e)}), 400 @api.route('/api/v1/tokenize/swift-fin') class TokenizeSWIFTFIN(Resource): @api.expect(swift_fin_model) def post(self): """Tokenize SWIFT FIN message""" data = request.json try: result = tokenization_service.tokenize_swift_fin( data['swift_message'], data['file_name'] ) return jsonify(result), 200 except Exception as e: return jsonify({'error': str(e)}), 400 @api.route('/api/v1/health') class Health(Resource): def get(self): """Health check""" return jsonify({'status': 'healthy'}), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=8080, debug=True)