A treasury and trading ops team uses MuleRun Computer to run an automated morning brief every weekday at 06:00 UTC. The job queries 10 MuleRun market-data API categories, normalizes data into a unified schema, computes composite signal scores, archives all outputs to MuleRun Drive, and publishes a static MuleRun Page dashboard — the page you are reading now.
The entire pipeline — from cron trigger to published page — runs unattended on a persistent MuleRun Computer instance. No manual intervention, no external CI, no cloud-function glue. One VM, one script, one schedule.
Seven stages execute sequentially inside a single Python process on MuleRun Computer.
The pipeline queries all 10 categories from the MuleRun market-data MCP server. Endpoint examples shown below are illustrative.
| # | Category | MCP Tool | Endpoint Example | Usage in Pipeline |
|---|---|---|---|---|
| 1 | Crypto Price | crypto_price | e.g. crypto_price({symbol:"BTC"}) | BTC, ETH spot + 24h change for dashboard cards |
| 2 | Crypto Market | crypto_market | e.g. crypto_market({metric:"total_mcap"}) | Total market cap, dominance, volume for breadth signals |
| 3 | Technical Analysis | technical_analysis | e.g. technical_analysis({symbol:"BTC",indicators:["RSI","MACD"]}) | RSI, MACD, Bollinger Bands for momentum score |
| 4 | Sentiment Index | sentiment_index | e.g. sentiment_index({source:"aggregate"}) | Fear & Greed composite for sentiment Z-score |
| 5 | News Feed | news_feed | e.g. news_feed({category:"crypto",limit:20}) | Top 20 crypto headlines for narrative tagging |
| 6 | Macro Indicators | macro_indicators | e.g. macro_indicators({series:["CPI","NFP","PMI"]}) | CPI, NFP, PMI surprises for macro composite |
| 7 | Rates & Yields | rates_yields | e.g. rates_yields({curves:["US2Y","US10Y","US30Y"]}) | Yield curve shape, real rates for risk-free signal |
| 8 | Global Assets | global_assets | e.g. global_assets({assets:["SPX","DXY","XAU"]}) | S&P 500, DXY, Gold for cross-asset dashboard |
| 9 | Cross Asset | cross_asset | e.g. cross_asset({analysis:"correlation_matrix"}) | BTC vs SPX/Gold/DXY rolling correlation for regime detection |
| 10 | TradFi News | tradfi_news | e.g. tradfi_news({topics:["fed","earnings"],limit:10}) | Fed & earnings headlines for macro narrative overlay |
Each daily run writes six files to an immutable date-partitioned folder. Files are never overwritten; re-runs append a suffix.
# /etc/cron.d/market-signal-ops
0 6 * * 1-5 mulerun /opt/market-signal-ops/run.sh
# run.sh — entry point on MuleRun Computer
#!/usr/bin/env bash
set -euo pipefail
cd /opt/market-signal-ops
source .venv/bin/activate
export RUN_DATE=$(date -u +%Y-%m-%d)
export DRIVE_PATH="/market-signal-ops/${RUN_DATE}"
python run_morning_brief.py \
--date "$RUN_DATE" \
--drive-path "$DRIVE_PATH" \
--publish 2>&1 | tee "logs/${RUN_DATE}.log"
# Upload log to Drive
mulerun drive upload \
"logs/${RUN_DATE}.log" \
"${DRIVE_PATH}/run-log.txt"import asyncio, json, httpx
from datetime import datetime, timezone
API_CATEGORIES = [
"crypto_price", "crypto_market",
"technical_analysis", "sentiment_index",
"news_feed", "macro_indicators",
"rates_yields", "global_assets",
"cross_asset", "tradfi_news",
]
async def fetch_all(session):
"""Parallel fetch with retry."""
tasks = [call_api(session, cat)
for cat in API_CATEGORIES]
return await asyncio.gather(
*tasks, return_exceptions=True
)
async def call_api(session, category,
retries=3, backoff=1.0):
for attempt in range(retries):
try:
r = await session.post(
MULERUN_MCP_ENDPOINT,
json={"tool": category,
"params": PARAMS[category]},
timeout=10,
)
r.raise_for_status()
data = r.json()
if is_stale(data):
raise StaleDataError(category)
return data
except (httpx.HTTPStatusError,
httpx.TimeoutException) as e:
if attempt == retries - 1:
raise
await asyncio.sleep(
backoff * (2 ** attempt))
def compute_signals(raw: dict) -> dict:
"""Composite Z-score across dimensions."""
z = {}
z["momentum"] = zscore(
raw["technical_analysis"]["rsi"], 50, 14)
z["sentiment"] = zscore(
raw["sentiment_index"]["value"], 50, 25)
z["macro"] = zscore(
raw["macro_indicators"]["surprise"],
0, 0.8)
z["vol"] = zscore(
raw["cross_asset"]["realised_vol"],
18, 6)
z["composite"] = sum(z.values()) / len(z)
return zimport subprocess, json, os
def archive_to_drive(data: dict, drive_path: str):
"""Write all output files to MuleRun Drive."""
files = {
"prices.json": data["prices"],
"macro.json": data["macro"],
"signals.json": data["signals"],
"news.json": data["news"],
"dashboard-data.json": data["dashboard"],
}
for name, content in files.items():
local = f"/tmp/{name}"
with open(local, "w") as f:
json.dump(content, f, indent=2)
subprocess.run([
"mulerun", "drive", "upload",
local, f"{drive_path}/{name}"
], check=True)
def publish_page(dashboard_data: dict):
"""Inject data into template and publish."""
with open("template.html") as f:
html = f.read()
html = html.replace(
"/*__DATA__*/",
json.dumps(dashboard_data))
with open("/tmp/index.html", "w") as f:
f.write(html)
subprocess.run([
"mulerun", "page", "deploy",
"--name", "market-signal-ops",
"--file", "/tmp/index.html"
], check=True)Excerpt from signals.json — composite signal decomposition for 2026-04-28.
| Dimension | Raw Value | Mean | Std Dev | Z-Score | Signal |
|---|---|---|---|---|---|
| Momentum (RSI-14) | 62.4 | 50.0 | 14.0 | +0.89 | Green |
| Sentiment | 68 | 50 | 25 | +0.72 | Green |
| Macro Surprise | +0.32 | 0.00 | 0.80 | +0.40 | Green |
| Vol Regime | 24.1% | 18.0% | 6.0% | +1.02 | Amber |
| Yield Curve Δ | −3 bps | 0 bps | 5 bps | −0.60 | Green |
| BTC-SPX Correlation | 0.41 | 0.35 | 0.15 | +0.40 | Green |
| Composite | +0.47 | Green | |||
Exponential backoff: 1 s → 2 s → 4 s. Max 3 attempts per endpoint. On final failure, the category is marked degraded in run-log.json and excluded from the composite score. The dashboard renders a caution badge for any degraded category.
HTTP 429 responses are caught by the retry loop. The script reads the Retry-After header and waits accordingly before the next attempt. If rate limits are consistently hit, the cron schedule can be shifted by 5-minute jitter: */5 offset.
Each API response includes a timestamp field. If the data is older than 4 hours, it is flagged as stale. Stale data is still archived but excluded from signal scoring. The dashboard shows an amber "STALE" badge on the affected card.
def is_stale(data: dict, max_age_hours: int = 4) -> bool:
ts = datetime.fromisoformat(data["timestamp"])
age = datetime.now(timezone.utc) - ts
return age.total_seconds() > max_age_hours * 3600
def classify_response(resp, category):
"""Classify API response for run-log."""
if resp is None:
return {"status": "failed", "category": category}
if isinstance(resp, Exception):
return {"status": "error", "category": category,
"error": str(resp)}
if is_stale(resp):
return {"status": "stale", "category": category}
return {"status": "ok", "category": category}MULERUN_API_KEY env var is set via mulerun computer env set and injected at runtime. Rotation is handled through MuleRun's key management console.