FastAPI application that ingests CFTC Commitments of Traders data into SQLite and exposes it via a REST API with analytics endpoints (screener, percentile rank, concentration). Includes CLI for historical and weekly data ingestion, Docker setup, and a frontend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
300 lines
11 KiB
Python
300 lines
11 KiB
Python
from fastapi import APIRouter, HTTPException, Query
|
|
from typing import Optional
|
|
|
|
from app.db import get_db
|
|
from app.api.models import (
|
|
CommodityMeta, HistoryResponse, LatestResponse,
|
|
LatestRowData, PositionPoint, ExtremesResponse, CompareResponse, ComparePoint,
|
|
)
|
|
|
|
router = APIRouter(prefix="/api/positions", tags=["positions"])
|
|
|
|
|
|
def _commodity_meta(conn, cftc_code: str) -> dict:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT c.cftc_code, c.name, c.exchange, c.exchange_abbr, c.contract_unit,
|
|
MIN(r.report_date) AS first_date,
|
|
MAX(r.report_date) AS last_date,
|
|
COUNT(DISTINCT r.report_date) AS week_count
|
|
FROM commodities c
|
|
LEFT JOIN reports r ON r.commodity_id = c.id
|
|
WHERE c.cftc_code = ?
|
|
GROUP BY c.id
|
|
""",
|
|
(cftc_code,),
|
|
).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail=f"Commodity {cftc_code} not found")
|
|
return dict(row)
|
|
|
|
|
|
def _row_to_point(row) -> PositionPoint:
|
|
d = dict(row)
|
|
d['noncomm_net'] = (
|
|
(d['noncomm_long'] or 0) - (d['noncomm_short'] or 0)
|
|
if d.get('noncomm_long') is not None and d.get('noncomm_short') is not None
|
|
else None
|
|
)
|
|
d['comm_net'] = (
|
|
(d['comm_long'] or 0) - (d['comm_short'] or 0)
|
|
if d.get('comm_long') is not None and d.get('comm_short') is not None
|
|
else None
|
|
)
|
|
d['nonrept_net'] = (
|
|
(d.get('nonrept_long') or 0) - (d.get('nonrept_short') or 0)
|
|
if d.get('nonrept_long') is not None and d.get('nonrept_short') is not None
|
|
else None
|
|
)
|
|
return PositionPoint(**{k: d.get(k) for k in PositionPoint.model_fields})
|
|
|
|
|
|
@router.get("/{cftc_code}/history", response_model=HistoryResponse)
|
|
def get_history(
|
|
cftc_code: str,
|
|
from_date: Optional[str] = Query(None),
|
|
to_date: Optional[str] = Query(None),
|
|
row_type: str = Query("All", pattern="^(All|Old|Other)$"),
|
|
):
|
|
with get_db() as conn:
|
|
meta = _commodity_meta(conn, cftc_code)
|
|
|
|
sql = """
|
|
SELECT r.report_date,
|
|
p.open_interest,
|
|
p.noncomm_long, p.noncomm_short, p.noncomm_spreading,
|
|
p.comm_long, p.comm_short,
|
|
p.nonrept_long, p.nonrept_short,
|
|
p.chg_open_interest, p.chg_noncomm_long, p.chg_noncomm_short,
|
|
p.chg_comm_long, p.chg_comm_short,
|
|
p.pct_noncomm_long, p.pct_noncomm_short,
|
|
p.pct_comm_long, p.pct_comm_short,
|
|
p.traders_total, p.traders_noncomm_long, p.traders_noncomm_short,
|
|
p.traders_comm_long, p.traders_comm_short
|
|
FROM positions p
|
|
JOIN reports r ON r.id = p.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND p.row_type = ?
|
|
"""
|
|
params: list = [cftc_code, row_type]
|
|
if from_date:
|
|
sql += " AND r.report_date >= ?"
|
|
params.append(from_date)
|
|
if to_date:
|
|
sql += " AND r.report_date <= ?"
|
|
params.append(to_date)
|
|
sql += " ORDER BY r.report_date ASC"
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
|
|
data = [_row_to_point(r) for r in rows]
|
|
return HistoryResponse(
|
|
commodity=CommodityMeta(**meta),
|
|
row_type=row_type,
|
|
data=data,
|
|
)
|
|
|
|
|
|
@router.get("/compare", response_model=CompareResponse)
|
|
def compare(
|
|
codes: str = Query(..., description="Comma-separated CFTC codes, max 8"),
|
|
metric: str = Query("noncomm_net"),
|
|
from_date: Optional[str] = Query(None),
|
|
to_date: Optional[str] = Query(None),
|
|
row_type: str = Query("All", pattern="^(All|Old|Other)$"),
|
|
):
|
|
code_list = [c.strip() for c in codes.split(",")][:8]
|
|
|
|
COMPUTED = {"noncomm_net", "comm_net", "nonrept_net"}
|
|
DB_FIELDS = {
|
|
"open_interest", "noncomm_long", "noncomm_short", "noncomm_spreading",
|
|
"comm_long", "comm_short", "nonrept_long", "nonrept_short",
|
|
"pct_noncomm_long", "pct_noncomm_short", "pct_comm_long", "pct_comm_short",
|
|
"traders_total",
|
|
}
|
|
|
|
if metric not in COMPUTED and metric not in DB_FIELDS:
|
|
raise HTTPException(status_code=400, detail=f"Unknown metric: {metric}")
|
|
|
|
commodities = []
|
|
series: dict[str, list[ComparePoint]] = {}
|
|
|
|
with get_db() as conn:
|
|
for code in code_list:
|
|
try:
|
|
meta = _commodity_meta(conn, code)
|
|
commodities.append(CommodityMeta(**meta))
|
|
except HTTPException:
|
|
continue
|
|
|
|
if metric == "noncomm_net":
|
|
select_expr = "(p.noncomm_long - p.noncomm_short)"
|
|
elif metric == "comm_net":
|
|
select_expr = "(p.comm_long - p.comm_short)"
|
|
elif metric == "nonrept_net":
|
|
select_expr = "(p.nonrept_long - p.nonrept_short)"
|
|
else:
|
|
select_expr = f"p.{metric}"
|
|
|
|
sql = f"""
|
|
SELECT r.report_date, {select_expr} AS value
|
|
FROM positions p
|
|
JOIN reports r ON r.id = p.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND p.row_type = ?
|
|
"""
|
|
params: list = [code, row_type]
|
|
if from_date:
|
|
sql += " AND r.report_date >= ?"
|
|
params.append(from_date)
|
|
if to_date:
|
|
sql += " AND r.report_date <= ?"
|
|
params.append(to_date)
|
|
sql += " ORDER BY r.report_date ASC"
|
|
|
|
rows = conn.execute(sql, params).fetchall()
|
|
series[code] = [ComparePoint(report_date=r[0], value=r[1]) for r in rows]
|
|
|
|
return CompareResponse(metric=metric, commodities=commodities, series=series)
|
|
|
|
|
|
@router.get("/{cftc_code}/latest", response_model=LatestResponse)
|
|
def get_latest(cftc_code: str):
|
|
with get_db() as conn:
|
|
meta = _commodity_meta(conn, cftc_code)
|
|
|
|
latest_date = conn.execute(
|
|
"""
|
|
SELECT MAX(r.report_date)
|
|
FROM reports r
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ?
|
|
""",
|
|
(cftc_code,),
|
|
).fetchone()[0]
|
|
|
|
if not latest_date:
|
|
raise HTTPException(status_code=404, detail="No data for this commodity")
|
|
|
|
pos_rows = conn.execute(
|
|
"""
|
|
SELECT r.report_date, p.row_type,
|
|
p.open_interest, p.noncomm_long, p.noncomm_short, p.noncomm_spreading,
|
|
p.comm_long, p.comm_short, p.nonrept_long, p.nonrept_short,
|
|
p.chg_open_interest, p.chg_noncomm_long, p.chg_noncomm_short,
|
|
p.chg_comm_long, p.chg_comm_short,
|
|
p.pct_noncomm_long, p.pct_noncomm_short,
|
|
p.pct_comm_long, p.pct_comm_short,
|
|
p.traders_total, p.traders_noncomm_long, p.traders_noncomm_short,
|
|
p.traders_comm_long, p.traders_comm_short
|
|
FROM positions p
|
|
JOIN reports r ON r.id = p.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND r.report_date = ?
|
|
ORDER BY p.row_type
|
|
""",
|
|
(cftc_code, latest_date),
|
|
).fetchall()
|
|
|
|
conc_rows = conn.execute(
|
|
"""
|
|
SELECT cn.row_type,
|
|
cn.conc_gross_long_4, cn.conc_gross_short_4,
|
|
cn.conc_gross_long_8, cn.conc_gross_short_8,
|
|
cn.conc_net_long_4, cn.conc_net_short_4,
|
|
cn.conc_net_long_8, cn.conc_net_short_8
|
|
FROM concentration cn
|
|
JOIN reports r ON r.id = cn.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND r.report_date = ?
|
|
""",
|
|
(cftc_code, latest_date),
|
|
).fetchall()
|
|
|
|
conc_by_type = {r['row_type']: dict(r) for r in conc_rows}
|
|
|
|
result_rows = []
|
|
for row in pos_rows:
|
|
rt = row['row_type']
|
|
point = _row_to_point(row)
|
|
result_rows.append(LatestRowData(
|
|
row_type=rt,
|
|
positions=point,
|
|
concentration=conc_by_type.get(rt),
|
|
))
|
|
|
|
return LatestResponse(
|
|
commodity=CommodityMeta(**meta),
|
|
report_date=latest_date,
|
|
rows=result_rows,
|
|
)
|
|
|
|
|
|
@router.get("/{cftc_code}/extremes", response_model=ExtremesResponse)
|
|
def get_extremes(cftc_code: str):
|
|
with get_db() as conn:
|
|
meta = _commodity_meta(conn, cftc_code)
|
|
|
|
def minmax(col: str):
|
|
# col is a bare column name on the positions table
|
|
r = conn.execute(
|
|
f"""
|
|
SELECT
|
|
MAX(p.{col}) AS max_val,
|
|
MIN(p.{col}) AS min_val,
|
|
(SELECT r2.report_date FROM positions p2
|
|
JOIN reports r2 ON r2.id = p2.report_id
|
|
JOIN commodities c2 ON c2.id = r2.commodity_id
|
|
WHERE c2.cftc_code = ? AND p2.row_type = 'All'
|
|
ORDER BY p2.{col} DESC LIMIT 1) AS max_date,
|
|
(SELECT r2.report_date FROM positions p2
|
|
JOIN reports r2 ON r2.id = p2.report_id
|
|
JOIN commodities c2 ON c2.id = r2.commodity_id
|
|
WHERE c2.cftc_code = ? AND p2.row_type = 'All'
|
|
ORDER BY p2.{col} ASC LIMIT 1) AS min_date
|
|
FROM positions p
|
|
JOIN reports r ON r.id = p.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND p.row_type = 'All'
|
|
""",
|
|
(cftc_code, cftc_code, cftc_code),
|
|
).fetchone()
|
|
return {
|
|
"max": {"value": r[0], "date": r[2]},
|
|
"min": {"value": r[1], "date": r[3]},
|
|
}
|
|
|
|
oi = minmax("open_interest")
|
|
|
|
# For net positions, pull the full series and compute in Python
|
|
def net_minmax(long_col: str, short_col: str):
|
|
rows2 = conn.execute(
|
|
f"""
|
|
SELECT r.report_date, p.{long_col}, p.{short_col}
|
|
FROM positions p
|
|
JOIN reports r ON r.id = p.report_id
|
|
JOIN commodities c ON c.id = r.commodity_id
|
|
WHERE c.cftc_code = ? AND p.row_type = 'All'
|
|
AND p.{long_col} IS NOT NULL AND p.{short_col} IS NOT NULL
|
|
ORDER BY (p.{long_col} - p.{short_col}) DESC
|
|
""",
|
|
(cftc_code,),
|
|
).fetchall()
|
|
if not rows2:
|
|
return {"max": {"value": None, "date": None}, "min": {"value": None, "date": None}}
|
|
return {
|
|
"max": {"value": rows2[0][1] - rows2[0][2], "date": rows2[0][0]},
|
|
"min": {"value": rows2[-1][1] - rows2[-1][2], "date": rows2[-1][0]},
|
|
}
|
|
|
|
nc_net = net_minmax("noncomm_long", "noncomm_short")
|
|
cm_net = net_minmax("comm_long", "comm_short")
|
|
|
|
return ExtremesResponse(
|
|
cftc_code=cftc_code,
|
|
commodity=meta['name'],
|
|
noncomm_net=nc_net,
|
|
open_interest=oi,
|
|
comm_net=cm_net,
|
|
)
|