Files
smom-dbis-138/services/oracle-publisher/oracle_publisher_optimized.py
defiQUG 1fb7266469 Add Oracle Aggregator and CCIP Integration
- Introduced Aggregator.sol for Chainlink-compatible oracle functionality, including round-based updates and access control.
- Added OracleWithCCIP.sol to extend Aggregator with CCIP cross-chain messaging capabilities.
- Created .gitmodules to include OpenZeppelin contracts as a submodule.
- Developed a comprehensive deployment guide in NEXT_STEPS_COMPLETE_GUIDE.md for Phase 2 and smart contract deployment.
- Implemented Vite configuration for the orchestration portal, supporting both Vue and React frameworks.
- Added server-side logic for the Multi-Cloud Orchestration Portal, including API endpoints for environment management and monitoring.
- Created scripts for resource import and usage validation across non-US regions.
- Added tests for CCIP error handling and integration to ensure robust functionality.
- Included various new files and directories for the orchestration portal and deployment scripts.
2025-12-12 14:57:48 -08:00

241 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Optimized Oracle Publisher Service with caching and load balancing support
"""
import os
import asyncio
import logging
import time
from typing import List, Optional, Dict
from decimal import Decimal
from dataclasses import dataclass
from web3 import Web3
from web3.middleware import geth_poa_middleware
from eth_account import Account
import requests
from prometheus_client import Counter, Gauge, start_http_server
from functools import lru_cache
import hashlib
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Prometheus metrics
updates_sent = Counter('oracle_updates_sent_total', 'Total number of oracle updates sent')
update_errors = Counter('oracle_update_errors_total', 'Total number of oracle update errors')
current_price = Gauge('oracle_current_price', 'Current oracle price')
price_deviation = Gauge('oracle_price_deviation', 'Price deviation from last update')
cache_hits = Counter('oracle_cache_hits_total', 'Total cache hits')
cache_misses = Counter('oracle_cache_misses_total', 'Total cache misses')
# Cache for oracle data
price_cache: Dict[str, tuple] = {}
CACHE_TTL = 30 # 30 seconds
@dataclass
class DataSource:
"""Data source configuration"""
name: str
url: str
parser: str
weight: float = 1.0
@dataclass
class OracleConfig:
"""Oracle configuration"""
rpc_url: str
aggregator_address: str
private_key: str
heartbeat: int
deviation_threshold: int
data_sources: List[DataSource]
max_priority_fee: Optional[int] = None
class OraclePublisherOptimized:
"""Optimized Oracle Publisher Service with caching"""
def __init__(self, config: OracleConfig):
self.config = config
self.w3 = Web3(Web3.HTTPProvider(config.rpc_url))
self.w3.middleware_onion.inject(geth_poa_middleware, layer=0)
self.account = Account.from_key(config.private_key)
self.last_price: Optional[Decimal] = None
self.last_update_time: float = 0
self.aggregator_abi = self._load_aggregator_abi()
self.aggregator_contract = self.w3.eth.contract(
address=Web3.to_checksum_address(config.aggregator_address),
abi=self.aggregator_abi
)
def _load_aggregator_abi(self) -> List[dict]:
"""Load aggregator ABI"""
return [
{
"inputs": [{"internalType": "uint256", "name": "answer", "type": "uint256"}],
"name": "updateAnswer",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "latestAnswer",
"outputs": [{"internalType": "int256", "name": "", "type": "int256"}],
"stateMutability": "view",
"type": "function"
}
]
@lru_cache(maxsize=128)
def _get_cached_price(self, source_name: str, cache_key: str) -> Optional[Decimal]:
"""Get cached price if available"""
if cache_key in price_cache:
price, timestamp = price_cache[cache_key]
if time.time() - timestamp < CACHE_TTL:
cache_hits.inc()
return price
cache_misses.inc()
return None
def _cache_price(self, source_name: str, price: Decimal):
"""Cache price"""
cache_key = hashlib.md5(f"{source_name}_{time.time() // CACHE_TTL}".encode()).hexdigest()
price_cache[cache_key] = (price, time.time())
async def fetch_price_from_source(self, source: DataSource) -> Optional[Decimal]:
"""Fetch price from data source with caching"""
try:
# Check cache first
cache_key = hashlib.md5(source.url.encode()).hexdigest()
cached_price = self._get_cached_price(source.name, cache_key)
if cached_price is not None:
return cached_price
# Fetch from source
response = requests.get(source.url, timeout=5)
response.raise_for_status()
data = response.json()
# Parse price (simplified)
price = Decimal(str(data.get('price', 0)))
# Cache price
self._cache_price(source.name, price)
return price
except Exception as e:
logger.error(f"Error fetching from {source.name}: {e}")
return None
async def fetch_aggregated_price(self) -> Optional[Decimal]:
"""Fetch and aggregate prices from all sources"""
prices = []
weights = []
for source in self.config.data_sources:
price = await self.fetch_price_from_source(source)
if price is not None:
prices.append(price)
weights.append(source.weight)
if not prices:
return None
# Weighted median
weighted_prices = sorted(zip(prices, weights), key=lambda x: x[0])
total_weight = sum(weights)
cumulative_weight = 0
for price, weight in weighted_prices:
cumulative_weight += weight
if cumulative_weight >= total_weight / 2:
return price
return prices[len(prices) // 2]
def should_update(self, price: Decimal) -> bool:
"""Check if update is needed"""
current_time = time.time()
# Check heartbeat
if current_time - self.last_update_time >= self.config.heartbeat:
return True
# Check deviation
if self.last_price is not None:
deviation = abs((price - self.last_price) / self.last_price * 100)
if deviation >= self.config.deviation_threshold:
return True
return False
async def update_oracle(self, price: Decimal) -> bool:
"""Update oracle on blockchain"""
try:
price_int = int(price * 10**8) # 8 decimals
# Build transaction
tx = self.aggregator_contract.functions.updateAnswer(price_int).build_transaction({
'from': self.account.address,
'nonce': self.w3.eth.get_transaction_count(self.account.address),
'gas': 200000,
'gasPrice': self.w3.eth.gas_price,
})
# Sign transaction
signed_tx = self.account.sign_transaction(tx)
# Send transaction
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# Wait for receipt
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=120)
if receipt.status == 1:
self.last_price = price
self.last_update_time = time.time()
updates_sent.inc()
current_price.set(float(price))
return True
else:
update_errors.inc()
return False
except Exception as e:
logger.error(f"Error updating oracle: {e}")
update_errors.inc()
return False
async def run(self):
"""Main loop with optimized update frequency"""
logger.info("Starting optimized oracle publisher service")
start_http_server(8000)
while True:
try:
price = await self.fetch_aggregated_price()
if price is None:
await asyncio.sleep(10)
continue
if self.should_update(price):
success = await self.update_oracle(price)
if success:
logger.info(f"Oracle updated: {price}")
# Adaptive sleep based on update frequency
sleep_time = 5 if self.should_update(price) else 10
await asyncio.sleep(sleep_time)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"Error in main loop: {e}")
await asyncio.sleep(10)