from fastapi import APIRouter, HTTPException, Query from typing import Optional from app.db import get_db from app.api.models import ( CommodityMeta, DisaggPositionPoint, DisaggHistoryResponse, DisaggScreenerRow, CompareResponse, ComparePoint, ) router = APIRouter(prefix="/api/disagg", tags=["disaggregated"]) def _disagg_commodity_meta(conn, cftc_code: str) -> dict: row = conn.execute( """ SELECT c.cftc_code, c.name, c.exchange, c.exchange_abbr, c.contract_unit, MIN(dr.report_date) AS first_date, MAX(dr.report_date) AS last_date, COUNT(DISTINCT dr.report_date) AS week_count, 1 AS has_disagg FROM commodities c JOIN disagg_reports dr ON dr.commodity_id = c.id WHERE c.cftc_code = ? GROUP BY c.id """, (cftc_code,), ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"No disaggregated data for {cftc_code}") return dict(row) def _row_to_disagg_point(row) -> DisaggPositionPoint: d = dict(row) d['prod_merc_net'] = ( (d['prod_merc_long'] or 0) - (d['prod_merc_short'] or 0) if d.get('prod_merc_long') is not None and d.get('prod_merc_short') is not None else None ) d['swap_net'] = ( (d['swap_long'] or 0) - (d['swap_short'] or 0) if d.get('swap_long') is not None and d.get('swap_short') is not None else None ) d['m_money_net'] = ( (d['m_money_long'] or 0) - (d['m_money_short'] or 0) if d.get('m_money_long') is not None and d.get('m_money_short') is not None else None ) d['other_rept_net'] = ( (d['other_rept_long'] or 0) - (d['other_rept_short'] or 0) if d.get('other_rept_long') is not None and d.get('other_rept_short') is not None else None ) d['nonrept_net'] = ( (d['nonrept_long'] or 0) - (d['nonrept_short'] or 0) if d.get('nonrept_long') is not None and d.get('nonrept_short') is not None else None ) return DisaggPositionPoint(**{k: d.get(k) for k in DisaggPositionPoint.model_fields}) @router.get("/{cftc_code}/history", response_model=DisaggHistoryResponse) def get_disagg_history( cftc_code: str, from_date: Optional[str] = Query(None), to_date: Optional[str] = Query(None), row_type: str = Query("All", pattern="^(All|Old|Other)$"), ): with get_db() as conn: meta = _disagg_commodity_meta(conn, cftc_code) sql = """ SELECT dr.report_date, dp.open_interest, dp.prod_merc_long, dp.prod_merc_short, dp.swap_long, dp.swap_short, dp.swap_spread, dp.m_money_long, dp.m_money_short, dp.m_money_spread, dp.other_rept_long, dp.other_rept_short, dp.nonrept_long, dp.nonrept_short, dp.chg_open_interest, dp.chg_m_money_long, dp.chg_m_money_short, dp.chg_prod_merc_long, dp.chg_prod_merc_short, dp.chg_swap_long, dp.chg_swap_short, dp.pct_open_interest, dp.pct_m_money_long, dp.pct_m_money_short, dp.pct_prod_merc_long, dp.pct_prod_merc_short, dp.pct_swap_long, dp.pct_swap_short, dp.traders_total, dp.traders_m_money_long, dp.traders_m_money_short, dp.traders_prod_merc_long, dp.traders_prod_merc_short FROM disagg_positions dp JOIN disagg_reports dr ON dr.id = dp.report_id JOIN commodities c ON c.id = dr.commodity_id WHERE c.cftc_code = ? AND dp.row_type = ? """ params: list = [cftc_code, row_type] if from_date: sql += " AND dr.report_date >= ?" params.append(from_date) if to_date: sql += " AND dr.report_date <= ?" params.append(to_date) sql += " ORDER BY dr.report_date ASC" rows = conn.execute(sql, params).fetchall() data = [_row_to_disagg_point(r) for r in rows] return DisaggHistoryResponse( commodity=CommodityMeta(**meta), row_type=row_type, data=data, ) @router.get("/screener", response_model=list[DisaggScreenerRow]) def disagg_screener( exchange: Optional[str] = Query(None), lookback_weeks: int = Query(156, ge=4, le=1560), top_n: int = Query(500, ge=1, le=1000), direction: Optional[str] = Query(None, pattern="^(long|short)$"), ): """ Return markets ranked by current Managed Money net position relative to the historical distribution (percentile rank). """ exchange_filter = "AND c.exchange_abbr = ?" if exchange else "" exchange_params = [exchange] if exchange else [] with get_db() as conn: rows = conn.execute( f""" WITH latest AS ( SELECT c.cftc_code, c.name AS commodity, c.exchange_abbr AS exchange, MAX(dr.report_date) AS latest_date FROM commodities c JOIN disagg_reports dr ON dr.commodity_id = c.id {exchange_filter} GROUP BY c.cftc_code ), latest_pos AS ( SELECT l.cftc_code, l.commodity, l.exchange, l.latest_date, dp.open_interest, (dp.m_money_long - dp.m_money_short) AS m_money_net, dp.chg_m_money_long, dp.chg_m_money_short FROM latest l JOIN commodities c ON c.cftc_code = l.cftc_code JOIN disagg_reports dr ON dr.commodity_id = c.id AND dr.report_date = l.latest_date JOIN disagg_positions dp ON dp.report_id = dr.id AND dp.row_type = 'All' ), lookback AS ( SELECT c.cftc_code, (dp.m_money_long - dp.m_money_short) AS net, ROW_NUMBER() OVER (PARTITION BY c.cftc_code ORDER BY dr.report_date DESC) AS rn FROM commodities c JOIN disagg_reports dr ON dr.commodity_id = c.id JOIN disagg_positions dp ON dp.report_id = dr.id AND dp.row_type = 'All' ), pct AS ( SELECT lp.cftc_code, lp.commodity, lp.exchange, lp.latest_date, lp.open_interest, lp.m_money_net, lp.chg_m_money_long, lp.chg_m_money_short, CAST( (SELECT COUNT(*) FROM lookback lb2 WHERE lb2.cftc_code = lp.cftc_code AND lb2.rn <= ? AND lb2.net < lp.m_money_net) AS REAL ) / NULLIF( (SELECT COUNT(*) FROM lookback lb3 WHERE lb3.cftc_code = lp.cftc_code AND lb3.rn <= ?), 0 ) * 100.0 AS pct_rank FROM latest_pos lp ) SELECT cftc_code, commodity, exchange, latest_date, m_money_net, open_interest, pct_rank, chg_m_money_long, chg_m_money_short FROM pct ORDER BY pct_rank DESC LIMIT ? """, exchange_params + [lookback_weeks, lookback_weeks, top_n], ).fetchall() result = [DisaggScreenerRow(**dict(r)) for r in rows] if direction == 'long': result = [r for r in result if r.pct_rank is not None and r.pct_rank >= 50] elif direction == 'short': result = [r for r in result if r.pct_rank is not None and r.pct_rank < 50] return result @router.get("/{cftc_code}/net-position-percentile") def disagg_net_position_percentile( cftc_code: str, lookback_weeks: int = Query(156, ge=4, le=1560), ): """ Where does the current Managed Money net position sit in the historical distribution over the last N weeks? """ with get_db() as conn: row = conn.execute( "SELECT id, name FROM commodities WHERE cftc_code = ?", (cftc_code,) ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Commodity {cftc_code} not found") commodity_name = row['name'] history = conn.execute( """ SELECT (dp.m_money_long - dp.m_money_short) AS net FROM disagg_positions dp JOIN disagg_reports dr ON dr.id = dp.report_id JOIN commodities c ON c.id = dr.commodity_id WHERE c.cftc_code = ? AND dp.row_type = 'All' ORDER BY dr.report_date DESC LIMIT ? """, (cftc_code, lookback_weeks), ).fetchall() if not history: raise HTTPException(status_code=404, detail="No disaggregated data found") nets = [r[0] for r in history if r[0] is not None] if not nets: return { "cftc_code": cftc_code, "commodity": commodity_name, "current_net": None, "percentile": None, "z_score": None, "lookback_weeks": lookback_weeks, "period_min": None, "period_max": None, } current = nets[0] n = len(nets) below = sum(1 for v in nets[1:] if v < current) percentile = round(below / max(n - 1, 1) * 100, 1) mean = sum(nets) / n variance = sum((v - mean) ** 2 for v in nets) / n std = variance ** 0.5 z_score = round((current - mean) / std, 2) if std > 0 else 0.0 return { "cftc_code": cftc_code, "commodity": commodity_name, "current_net": current, "percentile": percentile, "z_score": z_score, "lookback_weeks": n, "period_min": min(nets), "period_max": max(nets), } @router.get("/compare", response_model=CompareResponse) def disagg_compare( codes: str = Query(..., description="Comma-separated CFTC codes, max 8"), metric: str = Query("m_money_net"), from_date: Optional[str] = Query(None), to_date: Optional[str] = Query(None), row_type: str = Query("All", pattern="^(All|Old|Other)$"), ): code_list = [c.strip() for c in codes.split(",")][:8] COMPUTED = {"m_money_net", "prod_merc_net", "swap_net", "other_rept_net", "nonrept_net"} DB_FIELDS = { "open_interest", "m_money_long", "m_money_short", "m_money_spread", "prod_merc_long", "prod_merc_short", "swap_long", "swap_short", "swap_spread", "pct_m_money_long", "pct_m_money_short", "traders_total", } if metric not in COMPUTED and metric not in DB_FIELDS: raise HTTPException(status_code=400, detail=f"Unknown metric: {metric}") commodities = [] series: dict[str, list[ComparePoint]] = {} _EXPR = { "m_money_net": "(dp.m_money_long - dp.m_money_short)", "prod_merc_net": "(dp.prod_merc_long - dp.prod_merc_short)", "swap_net": "(dp.swap_long - dp.swap_short)", "other_rept_net": "(dp.other_rept_long - dp.other_rept_short)", "nonrept_net": "(dp.nonrept_long - dp.nonrept_short)", } with get_db() as conn: for code in code_list: try: meta = _disagg_commodity_meta(conn, code) commodities.append(CommodityMeta(**meta)) except HTTPException: continue select_expr = _EXPR.get(metric, f"dp.{metric}") sql = f""" SELECT dr.report_date, {select_expr} AS value FROM disagg_positions dp JOIN disagg_reports dr ON dr.id = dp.report_id JOIN commodities c ON c.id = dr.commodity_id WHERE c.cftc_code = ? AND dp.row_type = ? """ params: list = [code, row_type] if from_date: sql += " AND dr.report_date >= ?" params.append(from_date) if to_date: sql += " AND dr.report_date <= ?" params.append(to_date) sql += " ORDER BY dr.report_date ASC" rows = conn.execute(sql, params).fetchall() series[code] = [ComparePoint(report_date=r[0], value=r[1]) for r in rows] return CompareResponse(metric=metric, commodities=commodities, series=series)