StruktX Documentation
StruktX is a lean, typed framework for building Natural Language → Action applications. It provides swappable components for LLMs, classifiers, handlers, and optional memory, along with middleware hooks and LangChain helpers.
Getting Started
Quickstart
Minimal example with the default components:
from strukt import create, StruktConfig, HandlersConfig
app = create(StruktConfig(
handlers=HandlersConfig(default_route="general")
))
print(app.invoke("Hello, StruktX!").response)
With LangChain and memory augmentation:
import os
from strukt import create, StruktConfig, HandlersConfig, LLMClientConfig, ClassifierConfig, MemoryConfig
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
config = StruktConfig(
llm=LLMClientConfig("langchain_openai:ChatOpenAI", dict(model="gpt-4o-mini")),
classifier=ClassifierConfig("strukt.classifiers:LLMClassifier"),
handlers=HandlersConfig(default_route="general"),
memory=MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "app1"},
use_store=True,
augment_llm=True
),
)
app = create(config)
res = app.invoke("I live in Beirut, what's the time?", context={"user_id": "u1"})
print(res.response)
Architecture
Core flow: text → classify → group parts → route to handlers → combine responses. Components are swappable via factories and follow interfaces for type safety.
Configuration
Factory-based config supports callables, classes, instances, or import strings like "module:attr". Dicts are coerced into dataclasses.
from strukt import (
create, StruktConfig, LLMClientConfig, ClassifierConfig,
HandlersConfig, MemoryConfig, MiddlewareConfig
)
config = StruktConfig(
llm=LLMClientConfig(factory="langchain_openai:ChatOpenAI", params=dict(model="gpt-4o-mini")),
classifier=ClassifierConfig(factory="strukt.classifiers:LLMClassifier"),
handlers=HandlersConfig(
registry={
# "time_service": "your_pkg.handlers:TimeHandler",
},
default_route="general",
),
memory=MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "app1"},
use_store=True,
augment_llm=True
),
middleware=[MiddlewareConfig(factory="strukt.logging:LoggingMiddleware", params=dict(verbose=True))],
)
app = create(config)
Swap LLMs and pass custom parameters (including OpenAI-compatible providers):
from strukt import StruktConfig, LLMClientConfig
# OpenAI-compatible provider
cfg = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(
model="gpt-4o-mini",
api_key="{OPENAI_API_KEY}",
base_url="https://api.openai.com/v1" # or your compatible endpoint
),
)
)
Core Components
Providers (OpenAI-compatible)
OpenRouter, Groq, and Cerebras expose OpenAI-style APIs and work via LangChain's ChatOpenAI or direct OpenAI clients using a custom base URL.
OpenRouter
import os
from strukt import create, StruktConfig, LLMClientConfig
os.environ["OPENROUTER_API_KEY"] = "..."
cfg = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(
model="openrouter/auto",
api_key=os.environ["OPENROUTER_API_KEY"],
base_url="https://openrouter.ai/api/v1",
),
)
)
app = create(cfg)
Groq
import os
from strukt import create, StruktConfig, LLMClientConfig
os.environ["GROQ_API_KEY"] = "..."
cfg = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(
model="llama3-70b-8192",
api_key=os.environ["GROQ_API_KEY"],
base_url="https://api.groq.com/openai/v1",
),
)
)
app = create(cfg)
Cerebras
import os
from strukt import create, StruktConfig, LLMClientConfig
os.environ["CEREBRAS_API_KEY"] = "..."
cfg = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(
model="llama3.1-70b",
api_key=os.environ["CEREBRAS_API_KEY"],
base_url="https://api.cerebras.ai/v1",
),
)
)
app = create(cfg)
Alternatively, if you use the OpenAI SDK directly, set OPENAI_BASE_URL env and pass your key. StruktX will auto-adapt LangChain runnables to LLMClient.
LLM Clients
Provide your own LLMClient or adapt LangChain with LangChainLLMClient.
from strukt.interfaces import LLMClient
class MyLLM(LLMClient):
def invoke(self, prompt: str, **kwargs):
return type("Resp", (), {"content": prompt.upper()})
def structured(self, prompt: str, output_model, **kwargs):
return output_model()
Classifier
Return query types, confidences, and parts.
from strukt.interfaces import Classifier
from strukt.types import InvocationState, QueryClassification
class MyClassifier(Classifier):
def classify(self, state: InvocationState) -> QueryClassification:
return QueryClassification(query_types=["general"], confidences=[1.0], parts=[state.text])
Handlers
Handle grouped parts for a given query type.
from strukt.interfaces import Handler, LLMClient
from strukt.types import InvocationState, HandlerResult
class EchoHandler(Handler):
def __init__(self, llm: LLMClient):
self.llm = llm
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
return HandlerResult(response=" | ".join(parts), status="general")
Middleware
Hooks: before_classify, after_classify, before_handle, after_handle.
from strukt.middleware import Middleware
from strukt.types import InvocationState, HandlerResult, QueryClassification
class Metrics(Middleware):
def before_classify(self, state: InvocationState):
state.context["t0"] = 0
return state
def after_handle(self, state: InvocationState, query_type: str, result: HandlerResult):
return result
Memory extraction middleware (packaged):
from strukt import StruktConfig, MemoryConfig, MiddlewareConfig
config = StruktConfig(
memory=MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "app1"},
use_store=True,
augment_llm=True,
),
middleware=[
MiddlewareConfig("strukt.memory.middleware:MemoryExtractionMiddleware"),
],
)
Background Task Middleware
Execute handlers in background threads for improved user experience with immediate responses and task tracking.
Configuration
from strukt import StruktConfig, MiddlewareConfig
from strukt.middleware import BackgroundTaskMiddleware
config = StruktConfig(
# ... other config
middleware=[
MiddlewareConfig(BackgroundTaskMiddleware, dict(
max_workers=6, # Number of concurrent background tasks
default_message="Your request is being processed.",
# Always run these handlers in background
enable_background_for={"device_control"},
# Run specific actions in background for specific handlers
action_based_background={
"maintenance_or_helpdesk": {"create"}, # Only "create" action
"some_handler": {"create", "update"}, # Multiple actions
},
# Custom messages for different handlers
custom_messages={
"device_control": "Device control successful",
"maintenance_or_helpdesk": "I've created your helpdesk ticket. Someone will be in touch shortly.",
},
# Custom return query types for different handlers
return_query_types={
"device_control": "DEVICE_CONTROL_SUCCESS",
"maintenance_or_helpdesk": "HELPDESK_TICKET_CREATED",
},
)),
],
)
Handler Integration
To enable action-based background execution, handlers should set the extracted action in the context:
class MyHandler(Handler):
def handle(self, state: InvocationState, parts: List[str]) -> HandlerResult:
# Extract intent/action from user input
intent = self._extract_intent(parts)
# Set the action in context for middleware to use
state.context['extracted_action'] = intent.action
# Handle based on action
if intent.action == "create":
return self._handle_create()
elif intent.action == "status":
return self._handle_status()
Task Management API
# Get all background tasks
all_tasks = app.get_all_background_tasks()
# Get tasks by status
running_tasks = app.get_running_background_tasks()
completed_tasks = app.get_completed_background_tasks()
failed_tasks = app.get_failed_background_tasks()
# Get specific task info
task_info = app.get_background_task_info("task-id-123")
# Get tasks filtered by status
tasks = app.get_background_tasks_by_status("running")
Complete Example
from strukt import create, StruktConfig, MiddlewareConfig
from strukt.middleware import BackgroundTaskMiddleware
# Configure with action-based background execution
config = StruktConfig(
# ... other config
middleware=[
MiddlewareConfig(BackgroundTaskMiddleware, dict(
max_workers=4,
default_message="Processing your request...",
enable_background_for={"device_control"},
action_based_background={
"maintenance_or_helpdesk": {"create"},
},
custom_messages={
"device_control": "Device control successful",
"maintenance_or_helpdesk": "Ticket created successfully. You'll receive a confirmation shortly.",
},
)),
],
)
app = create(config)
# Execute requests
result = app.invoke("turn on the bedroom lights", context={"user_id": "user1"})
print(result.response) # "Device control successful" (immediate)
# Check background tasks
running = app.get_running_background_tasks()
for task in running:
print(f"Task {task['task_id'][:8]}... is {task['progress']*100:.1f}% complete")
Custom Return Query Types
The middleware supports custom return query types, allowing handlers to specify what query type should be returned in the response instead of the generic "background_task_created:..." format. This is useful for maintaining consistent API responses and providing meaningful status information to clients.
Configuration
# Configure custom return query types
return_query_types={
"device_control": "DEVICE_CONTROL_SUCCESS",
"maintenance_or_helpdesk": "HELPDESK_TICKET_CREATED",
}
Handler Integration
Handlers can also specify return query types dynamically by setting them in the context. Since multiple handlers may run simultaneously, use a dictionary format where the key is the handler name and the value is the return query type:
class MyHandler(Handler):
def handle(self, state: InvocationState, parts: List[str]) -> HandlerResult:
# Set custom return query type for this specific request
# Use dictionary format to support multiple handlers
state.context['return_query_types'] = {
'my_handler_name': "CUSTOM_SUCCESS_STATUS"
}
# ... rest of handler logic
For backward compatibility, the old single return_query_type format is still supported, but the dictionary format is recommended when multiple handlers are involved.
Multiple Handler Example
When multiple handlers are involved in a single request, each can specify its own return query type:
class DeviceHandler(Handler):
def handle(self, state: InvocationState, parts: List[str]) -> HandlerResult:
# Set return query type for this handler
if 'return_query_types' not in state.context:
state.context['return_query_types'] = {}
state.context['return_query_types']['device_control'] = "DEVICE_CONTROL_SUCCESS"
# ... handler logic
return HandlerResult(response="Device control initiated", status="device_control")
class NotificationHandler(Handler):
def handle(self, state: InvocationState, parts: List[str]) -> HandlerResult:
# Set return query type for this handler
if 'return_query_types' not in state.context:
state.context['return_query_types'] = {}
state.context['return_query_types']['notification'] = "NOTIFICATION_SENT"
# ... handler logic
return HandlerResult(response="Notification sent", status="notification")
This approach ensures that each handler can specify its own meaningful return query type while maintaining backward compatibility.
Response Format
With custom return query types, the response will look like:
{
"response": "Device control successful",
"background_tasks": [],
"query_type": "DEVICE_CONTROL_SUCCESS",
"query_types": [
"DEVICE_CONTROL_SUCCESS"
],
"transcript_parts": [
"Turn on the kitchen AC",
"Set temperature to 25 degrees"
]
}
Instead of the generic format:
{
"response": "Device control successful",
"background_tasks": [],
"query_type": "background_task_created:9541dc3f-cf4b-4257-a3e6-9d08ca77f702",
"query_types": [
"background_task_created:9541dc3f-cf4b-4257-a3e6-9d08ca77f702"
],
"transcript_parts": [
"Turn on the kitchen AC",
"Set temperature to 25 degrees"
]
}
Handler Intents System
The background task middleware uses a handler intents system to determine which actions should run in background. Handlers extract intents and store them in the context for the middleware to use.
state.context['handler_intents'][handler_name] = actionHandler Integration
class HelpdeskHandler(Handler):
def handle(self, state: InvocationState, parts: List[str]) -> HandlerResult:
# Extract intent from user input using LLM
intent = self._extract_intent(full_request, user_id, unit_id)
# Set the extracted action in context for middleware to use
# The handler_intents dict is automatically initialized
state.context['handler_intents']['maintenance_or_helpdesk'] = intent.action
# Handle based on action
if intent.action == "create":
return self._handle_create_ticket(intent, user_id, unit_id)
elif intent.action == "status":
return self._handle_status_check(intent, user_id, unit_id)
Configuration Mapping
# Configuration
action_based_background={
"maintenance_or_helpdesk": {"create", "update"}, # These actions run in background
"device_control": {"turn_on", "turn_off"}, # These actions run in background
}
# Handler sets intent
state.context['handler_intents']['maintenance_or_helpdesk'] = "create" # ✅ Runs in background
state.context['handler_intents']['maintenance_or_helpdesk'] = "status" # ❌ Runs in foreground
The handler_intents dictionary is automatically initialized in InvocationState, so handlers don't need to check if it exists before setting values.
This system allows multiple handlers to set their intents independently, and the middleware can handle action-based background execution for each handler type.
Memory
Enable scoped memory and automatic prompt augmentation.
from strukt import StruktConfig, MemoryConfig
cfg = StruktConfig(
memory=MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "app1"},
use_store=True,
augment_llm=True
)
)
Ecosystem
MCP Server Beta
Expose StruktX handlers as MCP tools over HTTP. Configure tools, auth, and consent, then serve via FastAPI.
Auto-discovery (recommended)
Handlers that define mcp_* methods are automatically exposed as MCP tools. Tool names are generated as <handler_key>_<method_suffix>, descriptions default to the method docstring, and the input schema is inferred from type hints (including Optional, list, dict). Add precise type hints on parameters for best schemas.
from strukt import StruktConfig
config = StruktConfig(
# ... llm, handlers, etc.
mcp=dict(
enabled=True,
server_name="struktmcp",
include_handlers=["device_control","amenity_service","maintenance_or_helpdesk","bill_service","event_service","weather_service","schedule_future_event"],
default_consent_policy="ask-once",
# Optional overlays (no method_name): tweak descriptions/prompts for auto tools
tools={
"device_control": [
dict(
name="device_control_execute", # auto from handler "device_control" + mcp_execute
description="Execute device commands",
usage_prompt="Call device_control_list first. Use attributes.identifier as deviceId; see provider rules.",
)
]
}
)
)
Descriptions come from mcp_* docstrings by default. You can override per tool via an overlay entry (without method_name) to keep everything else auto-generated.
Explicit tool mapping (advanced)
from strukt import StruktConfig
config = StruktConfig(
# ... llm, handlers, etc.
mcp=dict(
enabled=True,
server_name="struktmcp",
include_handlers=["device_control","amenity_service","maintenance_or_helpdesk","bill_service","event_service","weather_service","schedule_future_event"],
default_consent_policy="ask-once",
tools={
"device_control": [
dict(
name="device_list",
description="List devices for a user/unit",
method_name="mcp_list",
parameters_schema={"type":"object","properties":{"user_id":{"type":"string"},"unit_id":{"type":"string"}},"required":["user_id","unit_id"]},
),
dict(
name="device_execute",
description="Execute device commands",
method_name="mcp_execute",
usage_prompt="Call device_list first. Use attributes.identifier as deviceId; see provider rules.",
parameters_schema={"type":"object","properties":{"commands":{"type":"array"},"user_id":{"type":"string"},"unit_id":{"type":"string"}},"required":["commands","user_id","unit_id"]},
),
],
# ... other handlers
}
)
)
Serve
Use the helper to create or extend a FastAPI app. The unified endpoint is GET/POST on the same path.
from fastapi import FastAPI
from strukt import create, build_fastapi_app
app = create(config)
fastapi_app = build_fastapi_app(app, config) # creates new app with /mcp
# Or mount onto an existing FastAPI app under a prefix
existing = FastAPI()
build_fastapi_app(app, config, app=existing, prefix="/mcp")
Endpoints
# List tools
curl -H "x-api-key: dev-key" http://localhost:8000/mcp
# Call tool (implicit op=call_tool)
curl -X POST -H "Content-Type: application/json" -H "x-api-key: dev-key" -d '{"tool_name":"amenity_check","args":{"user_id":"u1","unit_id":"UNIT","facility_name_query":"gym"}}' http://localhost:8000/mcp
# Explicit list
curl -X POST -H "Content-Type: application/json" -H "x-api-key: dev-key" -d '{"op":"list_tools"}' http://localhost:8000/mcp
Auth & Consent
API key via header (default x-api-key), and per-tool consent policies (ask-once by default) with persistence via MemoryEngine.
Extending Handlers for MCP
Add mcp_* methods to your handlers for precise, typed tool entrypoints. Descriptions default to the method docstring, and input schemas are inferred from type hints.
from strukt.interfaces import Handler, LLMClient
from strukt.types import InvocationState, HandlerResult
class MyHandler(Handler):
def __init__(self, llm: LLMClient, toolkit):
self.llm = llm
self.toolkit = toolkit
# Regular Strukt entrypoint
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
...
# MCP tool entrypoints (keyword-only args, add type hints for schema)
def mcp_list(self, *, user_id: str, unit_id: str):
"""List items for a given user and unit."""
return self.toolkit.list(user_id, unit_id)
def mcp_create(self, *, user_id: str, unit_id: str, payload: dict):
"""Create an item with the provided payload."""
return self.toolkit.create(user_id=user_id, unit_id=unit_id, payload=payload)
You can still map methods explicitly via method_name, but overlays let you keep auto-discovery and only change text:
mcp=dict(
tools={
"my_service": [
dict(
name="my_service_list", # auto from handler "my_service" + mcp_list
description="Custom list description", # override docstring
# no method_name → overlay only
)
]
}
)
Dotted method_name resolution also supports toolkit.* and _toolkit.* when you need to call directly into toolkits.
MCP Config Reference
- enabled: Enable the MCP server integration.
- server_name: Server identifier (e.g.
"struktmcp"). - include_handlers: List of handler keys to expose.
- auth_api_key.header_name: Header to read API key (default
x-api-key). - auth_api_key.env_var: Env var holding the API key (default
STRUKTX_MCP_API_KEY). - default_consent_policy: Default tool consent (
always-ask|ask-once|always-allow|never-allow). - tools: Map of handler_name → list of tool configs:
# Per tool (MCPToolConfig)
dict(
name="tool_name", # required
description="what it does", # optional for overlays; defaults to docstring
parameters_schema={...}, # optional; inferred from type hints when omitted
method_name="mcp_list", # optional; use for explicit mapping only
required_scopes=["scope:read"], # optional OAuth scopes metadata
consent_policy="ask-once", # optional per-tool consent override
usage_prompt="LLM guidance...", # optional extra prompt appended to description
)
Consent decisions are persisted through your configured MemoryEngine when available.
LangChain Helpers
Use LangChainLLMClient and create_structured_chain to generate typed outputs.
from pydantic import BaseModel
from strukt.langchain_helpers import create_structured_chain
class Foo(BaseModel):
query_types: list[str] = []
confidences: list[float] = []
parts: list[str] = []
# chain = create_structured_chain(llm_client=your_langchain_client, prompt_template="...", output_model=Foo)
Logging
Use get_logger and LoggingMiddleware.
Optional logging variables: STRUKTX_LOG_LEVEL, STRUKTX_LOG_MAXLEN, STRUKTX_RICH_TRACEBACK, STRUKTX_DEBUG.
from strukt import get_logger
log = get_logger("struktx")
log.info("Hello logs")
Augmented memory injections appear under the memory logger with the provided augment_source label.
Async Performance Optimizations
StruktX includes comprehensive async/await optimizations that make it the fastest NLP → action workflow framework. These optimizations provide true concurrency, automatic sync/async handler compatibility, and advanced performance monitoring.
Configuration
Enable async optimizations in your StruktX configuration:
from strukt import StruktConfig, EngineOptimizationsConfig
config = StruktConfig(
# ... other config
optimizations=EngineOptimizationsConfig(
enable_performance_monitoring=True,
max_concurrent_handlers=15,
enable_llm_streaming=False, # Disabled for LangChain compatibility
enable_llm_batching=True,
enable_llm_caching=True,
llm_batch_size=10,
llm_cache_size=1000,
llm_cache_ttl=3600,
)
)
Async Invocation
Use the async API for maximum performance:
# Async invocation with full optimizations
result = await app.ainvoke(
"turn on the kitchen AC and tell me the weather in Tokyo",
context={"user_id": "user1", "unit_id": "unit1"}
)
# Access performance metrics
metrics = app._engine._performance_monitor.get_metrics()
print(f"Average handler time: {metrics['handler_time_query'].average_duration_ms}ms")
Handler Compatibility
Handlers can implement either sync or async methods - StruktX automatically bridges them:
# Sync handler (automatically runs in thread pool when called via ainvoke)
class TimeHandler(Handler):
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
return HandlerResult(response=f"Current time: {datetime.now()}", status="time")
# Async handler (runs natively with true concurrency)
class WeatherHandler(Handler):
async def ahandle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
weather_data = await weather_api.get_weather(parts[0])
return HandlerResult(response=weather_data, status="weather")
# Hybrid handler (implements both for optimal performance)
class DeviceHandler(Handler):
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
# Sync implementation for quick operations
return self._process_sync(state, parts)
async def ahandle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
# Async implementation for I/O operations
return await self._process_async(state, parts)
Performance Monitoring
Access real-time performance metrics:
# Get performance metrics endpoint (if using FastAPI)
@app.get("/metrics")
async def get_performance_metrics():
monitor = app._engine._performance_monitor
return {
"metrics": {
operation: {
"count": metrics.count,
"average_duration_ms": metrics.average_duration * 1000,
"p95_latency_ms": metrics.p95_latency * 1000,
"success_rate": metrics.success_rate,
}
for operation, metrics in monitor._metrics.items()
},
"rate_limiter": {
"active_requests": app._engine._rate_limiter._active_count,
"max_concurrent": app._engine._rate_limiter.max_concurrent,
}
}
Pydantic Response Preservation
StruktX intelligently preserves structured Pydantic responses when multiple handlers execute together:
# Single handler - returns full Pydantic object
result = await app.ainvoke("show me available facilities")
# result.response = {"status": "success", "available_facilities": [...], "current_date": "..."}
# Multiple handlers - preserves all structured data
result = await app.ainvoke("show facilities and weather in Tokyo")
# result.response = [
# {"status": "success", "available_facilities": [...], "current_date": "..."},
# {"status": "success", "temperature": 22.8, "conditions": "broken clouds", "location": "Tokyo"}
# ]
# Mixed responses - falls back to string concatenation
result = await app.ainvoke("turn on AC and tell me the time")
# result.response = "Device control successful. Current time is 2:39 PM"
Migration Guide
- 3x faster concurrent execution for async handlers
- Automatic compatibility between sync and async handlers
- Real-time monitoring of performance metrics
- Intelligent response handling preserves structured data
LLM Retry Mechanism
StruktX includes built-in retry functionality for LLM calls to handle transient failures and improve reliability. The retry mechanism supports both synchronous and asynchronous LLM operations with configurable backoff strategies.
Configuration
Enable retry functionality in your LLM client configuration:
from strukt import StruktConfig, LLMClientConfig
config = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(
model="gpt-4o-mini",
api_key="your-api-key",
base_url="https://api.openai.com/v1"
),
retry={
"max_retries": 3,
"base_delay": 1.0,
"max_delay": 30.0,
"exponential_base": 2.0,
"jitter": True,
"retryable_exceptions": (Exception,), # Retry on any exception
}
)
)
Retry Parameters
- max_retries: Maximum number of retry attempts (default: 3)
- base_delay: Initial delay between retries in seconds (default: 1.0)
- max_delay: Maximum delay between retries in seconds (default: 30.0)
- exponential_base: Base for exponential backoff (default: 2.0)
- jitter: Add random jitter to prevent thundering herd (default: True)
- retryable_exceptions: Tuple of exception types to retry on (default: (Exception,))
Supported Operations
The retry mechanism automatically applies to all LLM operations:
invoke()- Text generationstructured()- Structured output generationainvoke()- Async text generationastructured()- Async structured output generation
Example Usage
from strukt import create, StruktConfig, LLMClientConfig
# Configure with retry
config = StruktConfig(
llm=LLMClientConfig(
factory="langchain_openai:ChatOpenAI",
params=dict(model="gpt-4o-mini"),
retry={
"max_retries": 2,
"base_delay": 0.5,
"max_delay": 10.0,
"jitter": True
}
)
)
app = create(config)
# All LLM calls will automatically retry on failure
result = app.invoke("Hello, world!")
Intent Caching
StruktX provides intelligent caching for handler results based on semantic similarity of user queries. This reduces redundant processing and improves response times for similar requests.
Configuration
Enable intent caching in your StruktX configuration:
from strukt import StruktConfig, HandlersConfig
from strukt.memory import InMemoryIntentCacheEngine, IntentCacheConfig, HandlerCacheConfig, CacheStrategy, CacheScope
# Create intent cache configuration
intent_cache_config = IntentCacheConfig(
enabled=True,
default_strategy=CacheStrategy.SEMANTIC,
default_ttl_seconds=3600,
similarity_threshold=0.7,
max_entries_per_handler=1000,
handler_configs={
"WeatherHandler": HandlerCacheConfig(
handler_name="WeatherHandler",
strategy=CacheStrategy.SEMANTIC,
scope=CacheScope.USER,
ttl_seconds=1800,
max_entries=500,
similarity_threshold=0.6,
enable_fast_track=True
),
"DeviceHandler": HandlerCacheConfig(
handler_name="DeviceHandler",
strategy=CacheStrategy.SEMANTIC,
scope=CacheScope.USER,
ttl_seconds=1800,
max_entries=500,
similarity_threshold=0.8,
enable_fast_track=True
)
}
)
# Create cache engine
intent_cache_engine = InMemoryIntentCacheEngine(intent_cache_config)
# Configure handlers with caching
config = StruktConfig(
handlers=HandlersConfig(
registry={
"weather_service": CachedWeatherHandler,
"device_control": CachedDeviceHandler,
},
handler_params={
"weather_service": dict(intent_cache_engine=intent_cache_engine),
"device_control": dict(intent_cache_engine=intent_cache_engine),
}
)
)
Cache Strategies
- EXACT: Match exact text queries
- SEMANTIC: Match based on semantic similarity
- FUZZY: Match with fuzzy string matching
- HYBRID: Combine multiple strategies
Cache Scopes
- GLOBAL: Cache entries visible to all users
- USER: Cache entries scoped to specific users
- UNIT: Cache entries scoped to specific units
- SESSION: Cache entries scoped to specific sessions
Cache Management
# Get cache statistics
stats = intent_cache_engine.get_stats()
print(f"Hit rate: {stats.hit_rate:.2%}")
# Clean up expired entries
cleanup_stats = intent_cache_engine.cleanup()
print(f"Removed {cleanup_stats.expired_entries} expired entries")
# Clear all cache entries
intent_cache_engine.clear()
Cache Management API Endpoints
When using StruktX with FastAPI, you can expose cache management endpoints:
from fastapi import FastAPI
from strukt import build_fastapi_app
app = FastAPI()
strukt_app = create(config)
build_fastapi_app(strukt_app, config, app=app)
# Cache management endpoints are automatically available:
# GET /cache/stats - Get cache statistics
# POST /cache/cleanup - Clean up expired entries
# DELETE /cache/clear - Clear all cache entries
Example usage with curl:
# Get cache statistics
curl -H "x-api-key: dev-key" http://localhost:8000/cache/stats
# Clean up expired entries
curl -X POST -H "x-api-key: dev-key" http://localhost:8000/cache/cleanup
# Clear all cache entries
curl -X DELETE -H "x-api-key: dev-key" http://localhost:8000/cache/clear
Pretty Logging
The caching system provides rich console output for cache operations:
╭───────────────── 📦 JSON: Cache Hit Result ─────────────────╮
│ { │
│ "cache_hit": true, │
│ "handler_name": "CachedWeatherHandler", │
│ "similarity": 0.95, │
│ "match_type": "semantic", │
│ "key": "weather:dubai:what is the weat..." │
│ } │
╰─────────────────────────────────────────────────────────────╯
Multi-Request Handling
The caching system properly handles multi-request transcripts by caching each individual request component separately:
# This multi-request will cache each component individually
result = app.invoke(
"turn off the kitchen AC and tell me the weather in Dubai",
context={"user_id": "user1", "unit_id": "unit1"}
)
# Each component (device control, weather) is cached separately
# Subsequent similar requests will hit the cache for individual components
Creating Cached Handlers
To create a cached version of your handler:
from strukt.memory import CacheAwareHandler
from strukt.types import InvocationState, HandlerResult
class CachedMyHandler(CacheAwareHandler, MyHandler):
"""My handler with intent caching support."""
def __init__(self, *args, intent_cache_engine=None, **kwargs):
super().__init__(*args, **kwargs)
self.intent_cache_engine = intent_cache_engine
self._cache_config = None
self._cache_data_type = MyCacheData
def get_cache_config(self) -> Optional[HandlerCacheConfig]:
return self._cache_config
def should_cache(self, state: InvocationState) -> bool:
return True # Cache all requests
def build_cache_key(self, state: InvocationState) -> str:
return f"my_handler:{state.text}:{state.context.get('user_id', '')}"
def extract_cache_data(self, result: HandlerResult) -> Dict[str, Any]:
return {"response": result.response, "status": result.status}
def apply_cached_data(self, cached_data: Dict[str, Any]) -> HandlerResult:
return HandlerResult(
response=cached_data["response"],
status=cached_data["status"]
)
Weave Logging Integration
StruktX includes comprehensive Weave and OpenTelemetry integration for detailed operation tracking, performance monitoring, and debugging. The system creates a unified trace tree where all operations, including background tasks and parallel execution, are nested under a single root trace for complete visibility.
userID-unitID-threadID-timestampConfiguration
Enable Weave logging in your StruktX configuration:
from strukt import StruktConfig, WeaveConfig, TracingConfig, OpenTelemetryConfig
config = StruktConfig(
# ... other config
weave=WeaveConfig(
enabled=True,
project_name="my-ai-app", # Or use PROJECT_NAME env var
environment="development", # Or use CURRENT_ENV env var
api_key_env="WANDB_API_KEY" # Environment variable for Weave API key
),
tracing=TracingConfig(
component_label="StruktX", # Customize component name (default: "Engine")
collapse_status_ops=True, # Collapse status operations into attributes
enable_middleware_tracing=False # Optional middleware tracing
),
opentelemetry=OpenTelemetryConfig(
enabled=True,
project_id="my-project",
api_key_env="WANDB_API_KEY",
use_openai_instrumentation=True # Auto-instrument OpenAI SDK calls
)
)
Environment Variables
Set these environment variables for Weave integration:
export WANDB_API_KEY="your-weave-api-key"
export PROJECT_NAME="my-project" # Optional, defaults to "struktx"
export CURRENT_ENV="production" # Optional, defaults to "development"
Unified Trace Architecture
The system creates a single root trace session that contains all operations:
StruktX.run(user123) [userID-unitID-threadID-timestamp]
├── StruktX.Engine.classify
├── StruktX.Engine._execute_grouped_handlers
│ ├── StruktX.Handler.handle (parallel)
│ └── BackgroundTask.device_control
│ └── StruktX.Handler.handle (background thread)
├── StruktX.LLMClient.invoke
└── StruktX.Engine.log_post_run_evaluation
Automatic Operation Tracking
When enabled, Weave automatically tracks:
user123-unit456-thread789-1234567890)Engine.run, classify, execute_grouped_handlers, execute_handlers_parallelBackgroundTask.execute nested within the original trace, including resultsCustom Trace Naming
StruktX automatically generates meaningful trace names using context information:
# Context provided in invoke call
response = ai.invoke(
"Turn on the living room lights",
context={
"user_id": "user123",
"unit_id": "apartment456",
"thread_id": "session_789" # Optional, UUID generated if missing
}
)
# Creates trace: user123-apartment456-session_789-1234567890
Component Label Customization
Customize the component label shown in traces:
config = StruktConfig(
tracing=TracingConfig(
component_label="StruktX" # Changes "Engine.run(user123)" to "StruktX.run(user123)"
)
)
Background Task Integration
Background tasks are automatically nested within the original trace:
# Main request creates root trace
response = ai.invoke("Control bedroom AC", context={"user_id": "user123"})
# Background task appears nested in Weave:
# StruktX.run(user123)
# └── BackgroundTask.device_control
# ├── Input: {task_id, query_type, parts, user_id}
# └── Output: {status: "completed", result: {...}}
OpenTelemetry Integration
StruktX exports all traces to Weave via OpenTelemetry Protocol (OTLP):
config = StruktConfig(
opentelemetry=OpenTelemetryConfig(
enabled=True,
project_id="my-project",
api_key_env="WANDB_API_KEY",
export_endpoint="https://trace.wandb.ai/otel/v1/traces", # Optional, auto-detected
use_openai_instrumentation=True # Auto-instrument OpenAI SDK calls
)
)
Advanced Configuration
config = StruktConfig(
tracing=TracingConfig(
component_label="MyApp", # Custom component name
collapse_status_ops=True, # Collapse status events into attributes
enable_middleware_tracing=False # Optional middleware operation tracing
)
)
User Context Tracking
Track operations with user context using the weave_context method. You can provide context in multiple ways:
Explicit values:
# Track all operations within a user context
with ai.weave_context(
user_id="user123",
unit_id="apartment456",
unit_name="Sunset Apartments"
):
# All operations within this context will have user context
response = ai.invoke("What's the weather like today?")
response2 = ai.invoke("Can you help me schedule maintenance?")
From context dictionary:
# Extract user context from a dictionary
user_context = {
"user_id": "user456",
"unit_id": "apartment789",
"unit_name": "Downtown Loft"
}
with ai.weave_context(context=user_context):
response = ai.invoke("Can you help me schedule maintenance?")
From InvocationState (for handlers):
# Automatically extract context from InvocationState
with ai.weave_context_from_state(state):
# All operations will have user context from state.context
response = ai.invoke("What's my current temperature setting?")
Custom Operation Tracking
Decorate functions with Weave tracking:
@ai.create_weave_op(name="process_user_request", call_display_name="Process Request")
def process_user_request(user_input: str, user_context: dict) -> str:
"""This function will be automatically tracked by Weave."""
# Function logic here
return f"Processed: {user_input}"
# Call the decorated function
result = process_user_request("Hello", {"user_id": "user123"})
Weave Dashboard Information
In your Weave dashboard, you'll see comprehensive tracking:
- Engine Operations: Complete request lifecycle from start to completion
- LLM Operations: Input prompts, outputs, timing, and performance metrics
- Handler Operations: Input/output tracking, execution times, success rates
- Memory Operations: Retrieval patterns, injection sources, context usage
- User Context: All operations tagged with user_id, unit_id, unit_name
- Performance Metrics: Execution times, throughput, latency, success/failure rates
- Error Tracking: Error types, messages, context at time of error, stack traces
Advanced Usage
Access Weave functionality directly through the Strukt instance:
# Check if Weave is available
if ai.is_weave_available():
print("Weave logging is enabled")
# Get project information
project_name, environment = ai.get_weave_project_info()
print(f"Project: {project_name}-{environment}")
# Create custom Weave operations
@ai.create_weave_op(name="custom_operation")
def my_custom_function():
pass
# Use context managers for user tracking
with ai.weave_context(user_id="user1", unit_id="unit1"):
# Operations tracked with user context
pass
Installation
Install Weave as an optional dependency:
# Install with Weave support
pip install struktx[weave]
# Or install Weave separately
pip install weave
Best Practices
- Project Naming: Use descriptive project names that reflect your application
- Environment Separation: Use different environments for development, staging, and production
- User Context: Always provide user context for better tracking and debugging
- Custom Operations: Decorate important business logic functions for detailed tracking
- Error Handling: Weave automatically tracks errors, but ensure proper exception handling
- Performance Monitoring: Use the tracked metrics to identify bottlenecks and optimize performance
Memory Extraction Middleware
Automatically extracts durable facts from conversations and stores them in your memory engine (e.g., Upstash Vector). On subsequent requests, MemoryAugmentedLLMClient retrieves relevant items and prepends them to prompts.
context.user_id and optionally context.unit_id.from strukt import create, StruktConfig, MemoryConfig
from strukt import HandlersConfig, LLMClientConfig, ClassifierConfig, MiddlewareConfig
cfg = StruktConfig(
llm=LLMClientConfig("langchain_openai:ChatOpenAI", dict(model="gpt-4o-mini")),
handlers=HandlersConfig(default_route="general"),
memory=MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "app1"},
use_store=True,
augment_llm=True,
),
middleware=[
MiddlewareConfig("strukt.memory.middleware:MemoryExtractionMiddleware", params={"max_items": 5}),
],
)
app = create(cfg)
MemoryConfig.use_store=True is required for MemoryExtractionMiddleware to work.
It currently only reads context of user_id and unit_id, if you wish to change this behavior, you can implement a custom MemoryAugmentedLLMClient and MemoryExtractionMiddleware.
Tips:
- Always scope memories per user/session.
- Keep extracted items concise and factual. Avoid storing ephemeral content.
- Control verbosity with env and middleware params. Logs show when memory is injected.
Extensions
Build reusable packages exposing factories for handlers, middleware, and memory engines.
# your_extension/__init__.py
from .handlers import DeviceHandler
from .middleware import DeviceAuthMiddleware
from .models import DeviceCommand
__all__ = ["DeviceHandler", "DeviceAuthMiddleware", "DeviceCommand"]
Devices Extension
Example of building a devices extension that receives natural language and triggers device actions.
Models
from pydantic import BaseModel, Field
class DeviceCommand(BaseModel):
device_id: str = Field(..., min_length=1)
action: str = Field(..., min_length=1) # e.g., "turn_on", "set_temp"
value: str | int | float | None = None
Handler
from strukt.interfaces import Handler, LLMClient
from strukt.types import InvocationState, HandlerResult
from .models import DeviceCommand
class DeviceHandler(Handler):
def __init__(self, llm: LLMClient):
self.llm = llm
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
# Use structured output to extract a DeviceCommand from text
from pydantic import BaseModel
class Cmd(DeviceCommand):
pass
try:
cmd = self.llm.structured(
prompt=f"Extract a devices command from: {state.text}",
output_model=Cmd,
query_hint="device_command",
augment_source="devices",
context=state.context,
)
# TODO: execute the command using your device client
return HandlerResult(response=f"Executed {cmd.action} on {cmd.device_id}", status="devices")
except Exception:
return HandlerResult(response="Could not understand device command.", status="error")
Middleware (optional)
from strukt.middleware import Middleware
from strukt.types import InvocationState
class DeviceAuthMiddleware(Middleware):
def before_handle(self, state: InvocationState, query_type: str, parts: list[str]):
if query_type == "devices":
if not state.context.get("auth_token"):
# deny by changing parts to an error sentinel or add a flag
parts = ["UNAUTHORIZED"]
return state, parts
Register
from strukt import create, StruktConfig, HandlersConfig, MiddlewareConfig
from your_extension.handlers import DeviceHandler
from your_extension.middleware import DeviceAuthMiddleware
cfg = StruktConfig(
handlers=HandlersConfig(
registry={"devices": DeviceHandler},
default_route="general",
),
middleware=[MiddlewareConfig(DeviceAuthMiddleware)],
)
app = create(cfg)
Best practices:
- Validate structured outputs with Pydantic models that reflect your device API.
- Keep device-side effects idempotent; return clear status strings for telemetry.
- Use
query_hintandaugment_sourceto improve logging and memory quality.
Advanced
Query Hints
Pass query_hint to help memory retrieval and logging. It is provided when calling an LLM automatically or can be modified when invoking any LLM inhereting from the StruktX LLM classes.
resp = app.invoke("recommend lunch", context={"user_id": "u1"})
# or: llm.invoke(prompt, query_hint="recommendation")
Augment Source
Provide augment_source when calling an LLM client to label memory injection source in logs.
# Inside a handler
llm.invoke(prompt, context=state.context, query_hint=state.text, augment_source="recommendations")
Context & Scoping
Use user_id and optionally unit_id in context for scoped memory retrieval. If a KnowledgeStore is enabled, StruktX may use it to further scope memory.
# Handlers receive the full invocation state
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
user_id = state.context.get("user_id")
# enrich prompts or enforce auth
...
Step-by-Step: Build Your First App
This guided tutorial creates two handlers Time Handler and Weather Handler adds a custom Rate Limit Middleware and enables Memory Extraction
Setup
Create a virtual environment and install dependencies.
uv venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
uv pip install struktX
Weather Model
Typed outputs reduce parsing errors and improve reliability when extracting fields from natural language.
from pydantic import BaseModel, Field
class WeatherQuery(BaseModel):
city: str = Field(..., min_length=1)
unit: str | None = Field(default="celsius", description="celsius or fahrenheit")
Time Handler
Handlers encapsulate business logic per query type and receive the full InvocationState
from strukt.interfaces import Handler, LLMClient
from strukt.types import InvocationState, HandlerResult
from datetime import datetime
class TimeHandler(Handler):
def __init__(self, llm: LLMClient):
self.llm = llm
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
now = datetime.utcnow().strftime("%H:%M UTC")
return HandlerResult(response=f"Current time: {now}", status="time")
Weather Handler
Use llm.structured to reliably extract fields into WeatherQuery, then call a weather client.
from strukt.interfaces import Handler, LLMClient
from strukt.types import InvocationState, HandlerResult
from step_01_models import WeatherQuery
def get_weather(city: str, unit: str | None = "celsius") -> str:
unit_symbol = "°C" if (unit or "celsius").lower().startswith("c") else "°F"
return f"22{unit_symbol} and clear in {city}"
class WeatherHandler(Handler):
def __init__(self, llm: LLMClient):
self.llm = llm
def handle(self, state: InvocationState, parts: list[str]) -> HandlerResult:
q = self.llm.structured(
prompt=f"Extract weather query fields from: {state.text}",
output_model=WeatherQuery,
query_hint="weather_query",
augment_source="weather",
context=state.context,
)
return HandlerResult(response=get_weather(q.city, q.unit), status="weather")
Rate Limit Middleware
Centralize rate limiting so handlers remain clean. This example counts calls by user_id and returns an error sentinel when exceeded.
from strukt.middleware import Middleware
from strukt.types import InvocationState
class RateLimitMiddleware(Middleware):
def __init__(self, max_calls: int = 5):
self.max_calls = max_calls
self._counts: dict[str, int] = {}
def before_handle(self, state: InvocationState, query_type: str, parts: list[str]):
user_id = state.context.get("user_id", "anon")
self._counts[user_id] = self._counts.get(user_id, 0) + 1
if self._counts[user_id] > self.max_calls:
return state, ["RATE_LIMITED"]
return state, parts
Enable Memory
Enable MemoryExtractionMiddleware and a vector store to persist durable facts. StruktX will auto-inject them via MemoryAuguatedLLMClient during LLM calls.
Use MemoryConfig.use_store=True and MemoryConfig.augment_llm=True to enable a KnowledgeStore to further scope memory.
MemoryConfig.use_store=True is required for MemoryExtractionMiddleware to work.
from strukt import MemoryConfig, MiddlewareConfig
memory = MemoryConfig(
factory="strukt.memory:UpstashVectorMemoryEngine",
params={"index_url": "...", "index_token": "...", "namespace": "demo"},
use_store=True,
augment_llm=True,
)
memory_mw = MiddlewareConfig("strukt.memory.middleware:MemoryExtractionMiddleware", params={"max_items": 5})
Wire It Up
Register handlers under query types and compose middleware. Set the default route for unclassified requests.
import os
from strukt import create, StruktConfig, HandlersConfig, LLMClientConfig, MiddlewareConfig
from step_02_time_handler import TimeHandler
from step_03_weather_handler import WeatherHandler
from step_04_rate_limit_middleware import RateLimitMiddleware
from step_05_memory import memory, memory_mw
os.environ.setdefault("OPENAI_API_KEY", "...")
cfg = StruktConfig(
llm=LLMClientConfig("langchain_openai:ChatOpenAI", dict(model="gpt-4o-mini")),
handlers=HandlersConfig(
registry={
"time": TimeHandler,
"weather": WeatherHandler,
},
default_route="time",
),
memory=memory,
middleware=[MiddlewareConfig(RateLimitMiddleware, params=dict(max_calls=3)), memory_mw],
)
app = create(cfg)
Run
Invoke with user_id so rate-limiting and memory are scoped per user.
print(app.invoke("what's the time now?", context={"user_id": "u1"}).response)
print(app.invoke("weather in Paris in celsius", context={"user_id": "u1"}).response)
for _ in range(5):
r = app.invoke("time please", context={"user_id": "u1"})
print(r.response)
Why this structure?
Reference (Overview)
strukt.create(config): builds the app with LLM, classifier, handlers, memory, middleware.Strukt.invoke(text, context)/Strukt.ainvoke: run requests.StruktConfig: top-level config dataclass; subconfigs:LLMClientConfig,ClassifierConfig,HandlersConfig,MemoryConfig,MiddlewareConfig.interfaces.LLMClient:invoke,structured.interfaces.Classifier:classify→QueryClassification.interfaces.Handler:handle→HandlerResult.interfaces.MemoryEngine:add,get,get_scoped,remove,cleanup.defaults.MemoryAugmentedLLMClient: auto-injects relevant memory into prompts; supportsaugment_sourceandquery_hint.logging.get_logger,LoggingMiddleware: structured, Rich-powered console logging.langchain_helpers.LangChainLLMClient,adapt_to_llm_client,create_structured_chain.utils.load_factory,utils.coerce_factory: resolve factories from strings/callables/classes/instances.types:InvocationState,QueryClassification,HandlerResult,StruktResponse,StruktQueryEnum.
Best Practices
- Prefer typed handlers with clear responsibilities.
- Keep middleware small and composable.
- Scope memory using user-specific / traceable context.
FAQ
Can I use non-LangChain LLMs? Yes—implement LLMClient or provide an adapter.
How do I add a new query type? Implement a handler and register it in HandlersConfig.registry and include it in the classifier config.
How is memory injected? If MemoryConfig.augment_llm=True, MemoryAugmentedLLMClient retrieves relevant docs and prepends them to prompts.
How do handler intents work? Handlers extract intents and store them in state.context['handler_intents'][handler_name] = action. The background task middleware uses these intents to determine if a task should run in background based on the configured action_based_background rules.
When should I use background tasks? Use background tasks for operations that take time (device control, ticket creation) while keeping quick operations (status checks, queries) synchronous for immediate responses.
How do async optimizations work? StruktX automatically bridges sync and async handlers. Use await app.ainvoke() for maximum performance with up to 15 concurrent handlers. Existing sync handlers work without changes.
What's the difference between sync and async handlers? Sync handlers run in thread pools when called via ainvoke(). Async handlers run natively with true concurrency. Hybrid handlers can implement both for optimal performance.
How are Pydantic responses preserved? When multiple handlers return structured objects, StruktX preserves them as a list. Single responses return the full object. Mixed responses fall back to string concatenation.
How do I monitor performance? Enable EngineOptimizationsConfig and access metrics via app._engine._performance_monitor or the /metrics endpoint in FastAPI applications.
Extras
MCP Server
Claude Desktop
{
"mcpServers": {
"struktx": {
"command": "npx",
"args": ["-y", "mcp-remote", "https://struktx.snowheap.ai/api/mcp"]
}
}
}Cursor
{
"mcpServers": {
"struktx": {
"url": "https://struktx.snowheap.ai/api/mcp"
}
}
}Windsurf
{
"mcpServers": {
"struktx": {
"url": "https://struktx.snowheap.ai/api/mcp"
}
}
}