Files
Sankofa/api/src/services/sovereign-stack/tx-orchestrator-service.ts
defiQUG 8436e22f4c API: Phoenix railing proxy, API key auth for /api/v1/*, schema export, docs, migrations, tests
- Phoenix API Railing: proxy to PHOENIX_RAILING_URL, tenant me routes
- Tenant-auth: X-API-Key support for /api/v1/* (api_keys table)
- Migration 026: api_keys table; 025 sovereign stack marketplace
- GET /graphql/schema, GET /graphql-playground, api/docs OpenAPI
- Integration tests: phoenix-railing.test.ts
- docs/api/API_VERSIONING: /api/v1/ railing alignment
- docs/phoenix/PORTAL_RAILING_WIRING

Made-with: Cursor
2026-03-11 12:57:41 -07:00

128 lines
3.1 KiB
TypeScript

/**
* Phoenix Transaction Orchestrator Service
* On-chain/off-chain workflow orchestration with retries and compensations
*/
import { getDb } from '../../db/index.js'
import { logger } from '../../lib/logger.js'
export interface Workflow {
workflowId: string
correlationId: string
state: 'INITIATED' | 'AUTHORIZED' | 'CAPTURED' | 'SETTLED' | 'REVERSED' | 'FAILED'
steps: WorkflowStep[]
retryCount: number
maxRetries: number
}
export interface WorkflowStep {
stepId: string
type: 'ON_CHAIN' | 'OFF_CHAIN'
action: string
status: 'PENDING' | 'IN_PROGRESS' | 'COMPLETED' | 'FAILED'
retryCount: number
compensation?: string
}
class TransactionOrchestratorService {
/**
* Create a workflow
*/
async createWorkflow(
correlationId: string,
steps: Omit<WorkflowStep, 'stepId' | 'status' | 'retryCount'>[]
): Promise<Workflow> {
const db = getDb()
const result = await db.query(
`INSERT INTO workflows (correlation_id, state, max_retries, metadata)
VALUES ($1, 'INITIATED', 3, $2)
RETURNING *`,
[correlationId, JSON.stringify({ steps })]
)
const workflowId = result.rows[0].id
// Create workflow steps
for (const step of steps) {
await db.query(
`INSERT INTO workflow_steps (workflow_id, type, action, status, compensation)
VALUES ($1, $2, $3, 'PENDING', $4)`,
[workflowId, step.type, step.action, step.compensation || null]
)
}
logger.info('Workflow created', { workflowId, correlationId })
return this.mapWorkflow(result.rows[0])
}
/**
* Execute workflow step
*/
async executeStep(workflowId: string, stepId: string): Promise<void> {
const db = getDb()
// Update step status
await db.query(
`UPDATE workflow_steps SET status = 'IN_PROGRESS' WHERE id = $1`,
[stepId]
)
try {
// Execute step logic here
// This would route to appropriate provider adapter
await db.query(
`UPDATE workflow_steps SET status = 'COMPLETED' WHERE id = $1`,
[stepId]
)
logger.info('Workflow step completed', { workflowId, stepId })
} catch (error) {
await db.query(
`UPDATE workflow_steps SET status = 'FAILED' WHERE id = $1`,
[stepId]
)
throw error
}
}
/**
* Retry failed step
*/
async retryStep(workflowId: string, stepId: string): Promise<void> {
const db = getDb()
const step = await db.query(
`SELECT * FROM workflow_steps WHERE id = $1`,
[stepId]
)
if (step.rows[0].retry_count >= 3) {
throw new Error('Max retries exceeded')
}
await db.query(
`UPDATE workflow_steps
SET status = 'PENDING', retry_count = retry_count + 1
WHERE id = $1`,
[stepId]
)
await this.executeStep(workflowId, stepId)
}
private mapWorkflow(row: any): Workflow {
return {
workflowId: row.id,
correlationId: row.correlation_id,
state: row.state,
steps: [],
retryCount: row.retry_count || 0,
maxRetries: row.max_retries || 3
}
}
}
export const txOrchestratorService = new TransactionOrchestratorService()