import weave
from contextlib import contextmanager
from typing import Dict
# Global thread context for coordinating nested threads
class ThreadContext:
def __init__(self):
self.app_thread_id = None
self.infra_thread_id = None
self.logic_thread_id = None
def setup_for_request(self, request_id: str):
self.app_thread_id = f"app_{request_id}"
self.infra_thread_id = f"{self.app_thread_id}_infra"
self.logic_thread_id = f"{self.app_thread_id}_logic"
# Global instance
thread_ctx = ThreadContext()
class InfrastructureLayer:
"""Handles all infrastructure operations in a dedicated thread"""
@weave.op
def authenticate_user(self, user_id: str) -> Dict:
# Authentication logic...
return {"user_id": user_id, "authenticated": True}
@weave.op
def call_payment_gateway(self, amount: float) -> Dict:
# Payment processing...
return {"status": "approved", "amount": amount}
@weave.op
def update_inventory(self, product_id: str, quantity: int) -> Dict:
# Inventory management...
return {"product_id": product_id, "updated": True}
def execute_operations(self, user_id: str, order_data: Dict) -> Dict:
"""Execute all infrastructure operations in dedicated thread context"""
with weave.thread(thread_ctx.infra_thread_id):
auth_result = self.authenticate_user(user_id)
payment_result = self.call_payment_gateway(order_data["amount"])
inventory_result = self.update_inventory(order_data["product_id"], order_data["quantity"])
return {
"auth": auth_result,
"payment": payment_result,
"inventory": inventory_result
}
class BusinessLogicLayer:
"""Handles business logic in a dedicated thread"""
@weave.op
def validate_order(self, order_data: Dict) -> Dict:
# Validation logic...
return {"valid": True}
@weave.op
def calculate_pricing(self, order_data: Dict) -> Dict:
# Pricing calculations...
return {"total": order_data["amount"], "tax": order_data["amount"] * 0.08}
@weave.op
def apply_business_rules(self, order_data: Dict) -> Dict:
# Business rules...
return {"rules_applied": ["standard_processing"], "priority": "normal"}
def execute_logic(self, order_data: Dict) -> Dict:
"""Execute all business logic in dedicated thread context"""
with weave.thread(thread_ctx.logic_thread_id):
validation = self.validate_order(order_data)
pricing = self.calculate_pricing(order_data)
rules = self.apply_business_rules(order_data)
return {"validation": validation, "pricing": pricing, "rules": rules}
class OrderProcessingApp:
"""Main application orchestrator"""
def __init__(self):
self.infra = InfrastructureLayer()
self.business = BusinessLogicLayer()
@weave.op
def process_order(self, user_id: str, order_data: Dict) -> Dict:
"""Main order processing - becomes a turn in the app thread"""
# Execute nested operations in their dedicated threads
infra_results = self.infra.execute_operations(user_id, order_data)
logic_results = self.business.execute_logic(order_data)
# Final orchestration
return {
"order_id": f"order_12345",
"status": "completed",
"infra_results": infra_results,
"logic_results": logic_results
}
# Usage with global thread context coordination
def handle_order_request(request_id: str, user_id: str, order_data: Dict):
# Setup thread context for this request
thread_ctx.setup_for_request(request_id)
# Execute in app thread context
with weave.thread(thread_ctx.app_thread_id):
app = OrderProcessingApp()
result = app.process_order(user_id, order_data)
return result
# Example usage
order_result = handle_order_request(
request_id="req_789",
user_id="user_001",
order_data={"product_id": "laptop", "quantity": 1, "amount": 1299.99}
)
# Expected Thread Structure:
#
# App Thread: app_req_789
# └── Turn: process_order() ← Main orchestration
#
# Infra Thread: app_req_789_infra
# ├── Turn: authenticate_user() ← Infrastructure operation 1
# ├── Turn: call_payment_gateway() ← Infrastructure operation 2
# └── Turn: update_inventory() ← Infrastructure operation 3
#
# Logic Thread: app_req_789_logic
# ├── Turn: validate_order() ← Business logic operation 1
# ├── Turn: calculate_pricing() ← Business logic operation 2
# └── Turn: apply_business_rules() ← Business logic operation 3
#
# Benefits:
# - Clear separation of concerns across threads
# - No parameter drilling of thread IDs
# - Independent monitoring of app/infra/logic layers
# - Global coordination through thread context