Files
smom-dbis-138/services/financial-tokenization/financial_tokenization_service.py
defiQUG 1fb7266469 Add Oracle Aggregator and CCIP Integration
- Introduced Aggregator.sol for Chainlink-compatible oracle functionality, including round-based updates and access control.
- Added OracleWithCCIP.sol to extend Aggregator with CCIP cross-chain messaging capabilities.
- Created .gitmodules to include OpenZeppelin contracts as a submodule.
- Developed a comprehensive deployment guide in NEXT_STEPS_COMPLETE_GUIDE.md for Phase 2 and smart contract deployment.
- Implemented Vite configuration for the orchestration portal, supporting both Vue and React frameworks.
- Added server-side logic for the Multi-Cloud Orchestration Portal, including API endpoints for environment management and monitoring.
- Created scripts for resource import and usage validation across non-US regions.
- Added tests for CCIP error handling and integration to ensure robust functionality.
- Included various new files and directories for the orchestration portal and deployment scripts.
2025-12-12 14:57:48 -08:00

231 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Financial Tokenization Service
Tokenizes ISO-20022, SWIFT FIN, and other financial files using Hyperledger Firefly
"""
import os
import json
import logging
from typing import Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime
from flask import Flask, request, jsonify
from flask_restx import Api, Resource, fields
import xml.etree.ElementTree as ET
from web3 import Web3
import requests
import structlog
from parsers.iso20022_parser import ISO20022Parser
from parsers.swift_fin_parser import SWIFTFINParser
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger()
# Flask app
app = Flask(__name__)
api = Api(app, version='1.0', title='Financial Tokenization Service',
description='Tokenize ISO-20022, SWIFT FIN, and other financial files')
# Configuration
FIREFLY_API_URL = os.getenv('FIREFLY_API_URL', 'http://firefly-api:5000')
FIREFLY_API_KEY = os.getenv('FIREFLY_API_KEY', '')
BESU_RPC_URL = os.getenv('BESU_RPC_URL', 'http://besu-rpc-service:8545')
CHAIN_ID = int(os.getenv('CHAIN_ID', '138'))
# Firefly client
class FireflyClient:
"""Client for interacting with Firefly API"""
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({'X-API-Key': api_key})
def create_token_pool(self, name: str, symbol: str, description: str) -> Dict[str, Any]:
"""Create a token pool in Firefly"""
url = f"{self.api_url}/api/v1/tokens/pools"
data = {
"name": name,
"symbol": symbol,
"description": description,
"type": "fungible"
}
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def mint_tokens(self, pool_id: str, amount: str, recipient: str) -> Dict[str, Any]:
"""Mint tokens in a pool"""
url = f"{self.api_url}/api/v1/tokens/mint"
data = {
"pool": pool_id,
"amount": amount,
"to": recipient
}
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def create_nft(self, pool_id: str, token_id: str, uri: str, recipient: str) -> Dict[str, Any]:
"""Create an NFT in a pool"""
url = f"{self.api_url}/api/v1/tokens/nfts"
data = {
"pool": pool_id,
"tokenId": token_id,
"uri": uri,
"to": recipient
}
response = self.session.post(url, json=data)
response.raise_for_status()
return response.json()
def upload_data(self, data: bytes, filename: str) -> str:
"""Upload data to IPFS via Firefly"""
url = f"{self.api_url}/api/v1/data"
files = {'file': (filename, data)}
response = self.session.post(url, files=files)
response.raise_for_status()
result = response.json()
return result.get('id', '')
# Import parsers from parsers module
# Financial Tokenization Service
class FinancialTokenizationService:
"""Service for tokenizing financial files"""
def __init__(self, firefly_client: FireflyClient):
self.firefly = firefly_client
self.iso20022_parser = ISO20022Parser()
self.swift_parser = SWIFTFINParser()
self.nft_pools = {} # Cache for NFT pools
def tokenize_iso20022(self, xml_content: str, file_name: str) -> Dict[str, Any]:
"""Tokenize ISO-20022 message"""
try:
# Parse ISO-20022 message (auto-detects type)
message = self.iso20022_parser.parse(xml_content)
# Upload to IPFS
ipfs_id = self.firefly.upload_data(xml_content.encode('utf-8'), file_name)
# Create NFT for the message
nft_pool_id = self._get_or_create_nft_pool("ISO20022-Messages")
nft = self.firefly.create_nft(
pool_id=nft_pool_id,
token_id=message.get('messageId', ''),
uri=f"ipfs://{ipfs_id}",
recipient=message.get('debtor', {}).get('account', '')
)
return {
'status': 'success',
'messageId': message.get('messageId', ''),
'nft': nft,
'ipfsId': ipfs_id,
'parsed': message,
}
except Exception as e:
logger.error("Failed to tokenize ISO-20022 message", error=str(e))
raise
def tokenize_swift_fin(self, swift_message: str, file_name: str) -> Dict[str, Any]:
"""Tokenize SWIFT FIN message"""
try:
# Parse SWIFT FIN message
message = self.swift_parser.parse(swift_message)
# Upload to IPFS
ipfs_id = self.firefly.upload_data(swift_message.encode('utf-8'), file_name)
# Create NFT for the message
nft_pool_id = self._get_or_create_nft_pool("SWIFT-FIN-Messages")
nft = self.firefly.create_nft(
pool_id=nft_pool_id,
token_id=message.get('applicationHeader', {}).get('messageType', ''),
uri=f"ipfs://{ipfs_id}",
recipient=message.get('parsed', {}).get('debtor', '')
)
return {
'status': 'success',
'messageType': message.get('messageType', ''),
'nft': nft,
'ipfsId': ipfs_id,
'parsed': message,
}
except Exception as e:
logger.error("Failed to tokenize SWIFT FIN message", error=str(e))
raise
def _get_or_create_nft_pool(self, pool_name: str) -> str:
"""Get or create NFT pool"""
# In production, this would check if pool exists first
# For now, create a new pool
pool = self.firefly.create_token_pool(
name=pool_name,
symbol=pool_name.replace('-', ''),
description=f"NFT pool for {pool_name}"
)
return pool.get('id', '')
# Initialize service
firefly_client = FireflyClient(FIREFLY_API_URL, FIREFLY_API_KEY)
tokenization_service = FinancialTokenizationService(firefly_client)
# API Models
iso20022_model = api.model('ISO20022', {
'xml_content': fields.String(required=True, description='ISO-20022 XML content'),
'file_name': fields.String(required=True, description='File name'),
})
swift_fin_model = api.model('SWIFTFIN', {
'swift_message': fields.String(required=True, description='SWIFT FIN message'),
'file_name': fields.String(required=True, description='File name'),
})
# API Endpoints
@api.route('/api/v1/tokenize/iso20022')
class TokenizeISO20022(Resource):
@api.expect(iso20022_model)
def post(self):
"""Tokenize ISO-20022 message"""
data = request.json
try:
result = tokenization_service.tokenize_iso20022(
data['xml_content'],
data['file_name']
)
return jsonify(result), 200
except Exception as e:
return jsonify({'error': str(e)}), 400
@api.route('/api/v1/tokenize/swift-fin')
class TokenizeSWIFTFIN(Resource):
@api.expect(swift_fin_model)
def post(self):
"""Tokenize SWIFT FIN message"""
data = request.json
try:
result = tokenization_service.tokenize_swift_fin(
data['swift_message'],
data['file_name']
)
return jsonify(result), 200
except Exception as e:
return jsonify({'error': str(e)}), 400
@api.route('/api/v1/health')
class Health(Resource):
def get(self):
"""Health check"""
return jsonify({'status': 'healthy'}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)