Files
smom-dbis-138/connectors/firefly-cacti/connector.py
defiQUG 1fb7266469 Add Oracle Aggregator and CCIP Integration
- Introduced Aggregator.sol for Chainlink-compatible oracle functionality, including round-based updates and access control.
- Added OracleWithCCIP.sol to extend Aggregator with CCIP cross-chain messaging capabilities.
- Created .gitmodules to include OpenZeppelin contracts as a submodule.
- Developed a comprehensive deployment guide in NEXT_STEPS_COMPLETE_GUIDE.md for Phase 2 and smart contract deployment.
- Implemented Vite configuration for the orchestration portal, supporting both Vue and React frameworks.
- Added server-side logic for the Multi-Cloud Orchestration Portal, including API endpoints for environment management and monitoring.
- Created scripts for resource import and usage validation across non-US regions.
- Added tests for CCIP error handling and integration to ensure robust functionality.
- Included various new files and directories for the orchestration portal and deployment scripts.
2025-12-12 14:57:48 -08:00

60 lines
2.2 KiB
Python

#!/usr/bin/env python3
"""
Firefly-Cacti Connector
Connects Hyperledger Firefly to Hyperledger Cacti
"""
import os
import json
import logging
from typing import Dict, Any, Optional
import requests
logger = logging.getLogger(__name__)
class FireflyCactiConnector:
"""Connector for Firefly-Cacti integration"""
def __init__(self, firefly_api_url: str, firefly_api_key: str, cactus_api_url: str):
self.firefly_api_url = firefly_api_url
self.firefly_api_key = firefly_api_key
self.cactus_api_url = cactus_api_url
self.firefly_session = requests.Session()
if firefly_api_key:
self.firefly_session.headers.update({'X-API-Key': firefly_api_key})
self.cactus_session = requests.Session()
def create_cross_chain_bridge(self, source_network: str, target_network: str) -> Dict[str, Any]:
"""Create cross-chain bridge between Firefly networks via Cacti"""
url = f"{self.cactus_api_url}/api/v1/plugins/ledger-connector/bridge"
data = {
"sourceNetwork": source_network,
"targetNetwork": target_network,
"connector": "firefly"
}
response = self.cactus_session.post(url, json=data)
response.raise_for_status()
return response.json()
def transfer_tokens_cross_chain(self, token_pool_id: str, amount: str, target_network: str, recipient: str) -> Dict[str, Any]:
"""Transfer tokens cross-chain via Cacti"""
url = f"{self.cactus_api_url}/api/v1/plugins/ledger-connector/bridge/transfer"
data = {
"tokenPoolId": token_pool_id,
"amount": amount,
"targetNetwork": target_network,
"recipient": recipient,
}
response = self.cactus_session.post(url, json=data)
response.raise_for_status()
return response.json()
def get_bridge_status(self, bridge_id: str) -> Dict[str, Any]:
"""Get bridge status"""
url = f"{self.cactus_api_url}/api/v1/plugins/ledger-connector/bridge/status"
params = {"bridgeId": bridge_id}
response = self.cactus_session.get(url, params=params)
response.raise_for_status()
return response.json()