FastAPI application that ingests CFTC Commitments of Traders data into SQLite and exposes it via a REST API with analytics endpoints (screener, percentile rank, concentration). Includes CLI for historical and weekly data ingestion, Docker setup, and a frontend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
420 lines
14 KiB
Python
420 lines
14 KiB
Python
"""
|
|
CFTC COT Data Importer
|
|
|
|
Inserts parsed CommodityBlock objects into the SQLite database.
|
|
All inserts use INSERT OR IGNORE for idempotency — safe to re-run.
|
|
"""
|
|
|
|
import sqlite3
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
from app.db import get_db
|
|
from app.ingestion.parser import (
|
|
CommodityBlock,
|
|
parse_html_file,
|
|
parse_zip_file,
|
|
)
|
|
|
|
HEADERS = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
}
|
|
|
|
HISTORICAL_BASE = "https://www.cftc.gov/files/dea/history"
|
|
WEEKLY_URL = "https://www.cftc.gov/dea/options/deacbtlof.htm"
|
|
|
|
|
|
@dataclass
|
|
class ImportResult:
|
|
source: str
|
|
rows_inserted: int = 0
|
|
rows_skipped: int = 0
|
|
error: Optional[str] = None
|
|
|
|
|
|
def _upsert_commodity(conn: sqlite3.Connection, block: CommodityBlock) -> int:
|
|
"""Insert commodity if not exists; return its id."""
|
|
conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO commodities
|
|
(cftc_code, name, exchange, exchange_abbr, contract_unit)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(block.cftc_code, block.name, block.exchange,
|
|
block.exchange_abbr, block.contract_unit),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT id FROM commodities WHERE cftc_code = ?", (block.cftc_code,)
|
|
).fetchone()
|
|
return row[0]
|
|
|
|
|
|
def _upsert_report(conn: sqlite3.Connection, commodity_id: int,
|
|
block: CommodityBlock, source_file: str) -> Optional[int]:
|
|
"""
|
|
Insert report row. Returns report id, or None if already exists
|
|
(i.e. this report date was already imported for this commodity).
|
|
"""
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO reports
|
|
(commodity_id, report_date, prev_report_date, source_file)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(commodity_id, block.report_date, block.prev_report_date, source_file),
|
|
)
|
|
if cur.rowcount == 0:
|
|
return None # already existed
|
|
return cur.lastrowid
|
|
|
|
|
|
def import_commodity_block(conn: sqlite3.Connection, block: CommodityBlock,
|
|
source: str) -> tuple[int, int]:
|
|
"""
|
|
Insert one CommodityBlock into the DB.
|
|
Returns (rows_inserted, rows_skipped).
|
|
"""
|
|
commodity_id = _upsert_commodity(conn, block)
|
|
report_id = _upsert_report(conn, commodity_id, block, source)
|
|
|
|
if report_id is None:
|
|
return 0, 1 # already imported
|
|
|
|
inserted = 0
|
|
for row_type in ('All', 'Old', 'Other'):
|
|
pos = block.positions.get(row_type)
|
|
if pos is None:
|
|
continue
|
|
|
|
chg = block.changes if row_type == 'All' else None
|
|
pct = block.percentages.get(row_type)
|
|
trd = block.traders.get(row_type)
|
|
|
|
conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO positions (
|
|
report_id, row_type,
|
|
open_interest,
|
|
noncomm_long, noncomm_short, noncomm_spreading,
|
|
comm_long, comm_short,
|
|
total_long, total_short,
|
|
nonrept_long, nonrept_short,
|
|
chg_open_interest,
|
|
chg_noncomm_long, chg_noncomm_short, chg_noncomm_spreading,
|
|
chg_comm_long, chg_comm_short,
|
|
chg_total_long, chg_total_short,
|
|
chg_nonrept_long, chg_nonrept_short,
|
|
pct_open_interest,
|
|
pct_noncomm_long, pct_noncomm_short, pct_noncomm_spreading,
|
|
pct_comm_long, pct_comm_short,
|
|
pct_total_long, pct_total_short,
|
|
pct_nonrept_long, pct_nonrept_short,
|
|
traders_total,
|
|
traders_noncomm_long, traders_noncomm_short, traders_noncomm_spread,
|
|
traders_comm_long, traders_comm_short,
|
|
traders_total_long, traders_total_short
|
|
) VALUES (
|
|
?, ?,
|
|
?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?, ?,
|
|
?,
|
|
?, ?, ?,
|
|
?, ?,
|
|
?, ?
|
|
)
|
|
""",
|
|
(
|
|
report_id, row_type,
|
|
pos.open_interest,
|
|
pos.noncomm_long, pos.noncomm_short, pos.noncomm_spreading,
|
|
pos.comm_long, pos.comm_short,
|
|
pos.total_long, pos.total_short,
|
|
pos.nonrept_long, pos.nonrept_short,
|
|
chg.chg_open_interest if chg else None,
|
|
chg.chg_noncomm_long if chg else None,
|
|
chg.chg_noncomm_short if chg else None,
|
|
chg.chg_noncomm_spreading if chg else None,
|
|
chg.chg_comm_long if chg else None,
|
|
chg.chg_comm_short if chg else None,
|
|
chg.chg_total_long if chg else None,
|
|
chg.chg_total_short if chg else None,
|
|
chg.chg_nonrept_long if chg else None,
|
|
chg.chg_nonrept_short if chg else None,
|
|
pct.pct_open_interest if pct else None,
|
|
pct.pct_noncomm_long if pct else None,
|
|
pct.pct_noncomm_short if pct else None,
|
|
pct.pct_noncomm_spreading if pct else None,
|
|
pct.pct_comm_long if pct else None,
|
|
pct.pct_comm_short if pct else None,
|
|
pct.pct_total_long if pct else None,
|
|
pct.pct_total_short if pct else None,
|
|
pct.pct_nonrept_long if pct else None,
|
|
pct.pct_nonrept_short if pct else None,
|
|
trd.traders_total if trd else None,
|
|
trd.traders_noncomm_long if trd else None,
|
|
trd.traders_noncomm_short if trd else None,
|
|
trd.traders_noncomm_spread if trd else None,
|
|
trd.traders_comm_long if trd else None,
|
|
trd.traders_comm_short if trd else None,
|
|
trd.traders_total_long if trd else None,
|
|
trd.traders_total_short if trd else None,
|
|
),
|
|
)
|
|
inserted += 1
|
|
|
|
# Concentration
|
|
conc = block.concentration.get(row_type)
|
|
if conc:
|
|
conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO concentration (
|
|
report_id, row_type,
|
|
conc_gross_long_4, conc_gross_short_4,
|
|
conc_gross_long_8, conc_gross_short_8,
|
|
conc_net_long_4, conc_net_short_4,
|
|
conc_net_long_8, conc_net_short_8
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
report_id, row_type,
|
|
conc.conc_gross_long_4, conc.conc_gross_short_4,
|
|
conc.conc_gross_long_8, conc.conc_gross_short_8,
|
|
conc.conc_net_long_4, conc.conc_net_short_4,
|
|
conc.conc_net_long_8, conc.conc_net_short_8,
|
|
),
|
|
)
|
|
|
|
return inserted, 0
|
|
|
|
|
|
def import_html_file(html_path: str) -> ImportResult:
|
|
"""Import a single weekly HTML file."""
|
|
source = Path(html_path).name
|
|
result = ImportResult(source=source)
|
|
with get_db() as conn:
|
|
if _already_imported(conn, source):
|
|
result.rows_skipped = 1
|
|
return result
|
|
_log_start(conn, source, 'weekly_html')
|
|
try:
|
|
with get_db() as conn:
|
|
for block in parse_html_file(html_path):
|
|
ins, skp = import_commodity_block(conn, block, source)
|
|
result.rows_inserted += ins
|
|
result.rows_skipped += skp
|
|
conn.commit()
|
|
except Exception as e:
|
|
result.error = str(e)
|
|
with get_db() as conn:
|
|
_log_done(conn, source, result.rows_inserted, result.rows_skipped, result.error)
|
|
return result
|
|
|
|
|
|
def import_zip_file(zip_path: str, source_label: Optional[str] = None) -> ImportResult:
|
|
"""Import a historical ZIP file."""
|
|
source = source_label or Path(zip_path).name
|
|
result = ImportResult(source=source)
|
|
try:
|
|
with get_db() as conn:
|
|
for block in parse_zip_file(zip_path):
|
|
ins, skp = import_commodity_block(conn, block, source)
|
|
result.rows_inserted += ins
|
|
result.rows_skipped += skp
|
|
conn.commit()
|
|
except Exception as e:
|
|
result.error = str(e)
|
|
return result
|
|
|
|
|
|
def _download_zip(url: str, dest: Path) -> bool:
|
|
"""Download a ZIP file, return True on success."""
|
|
try:
|
|
r = requests.get(url, headers=HEADERS, timeout=120, stream=True)
|
|
r.raise_for_status()
|
|
dest.write_bytes(r.content)
|
|
return True
|
|
except requests.RequestException:
|
|
return False
|
|
|
|
|
|
def _log_start(conn: sqlite3.Connection, source: str, source_type: str) -> None:
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO import_log (source, source_type, status, started_at)
|
|
VALUES (?, ?, 'running', datetime('now'))
|
|
""",
|
|
(source, source_type),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _log_done(conn: sqlite3.Connection, source: str,
|
|
inserted: int, skipped: int, error: Optional[str] = None) -> None:
|
|
status = 'error' if error else 'done'
|
|
conn.execute(
|
|
"""
|
|
UPDATE import_log
|
|
SET status = ?, rows_inserted = ?, rows_skipped = ?,
|
|
completed_at = datetime('now'), error_message = ?
|
|
WHERE source = ?
|
|
""",
|
|
(status, inserted, skipped, error, source),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _already_imported(conn: sqlite3.Connection, source: str) -> bool:
|
|
row = conn.execute(
|
|
"SELECT status FROM import_log WHERE source = ?", (source,)
|
|
).fetchone()
|
|
return row is not None and row[0] == 'done'
|
|
|
|
|
|
def run_historical_import(start_year: int = 1995, end_year: int = 2026,
|
|
verbose: bool = True) -> None:
|
|
"""
|
|
Download and import the full historical archive.
|
|
Uses import_log to skip already-completed sources.
|
|
"""
|
|
sources = []
|
|
|
|
# Combined 1995-2016 archive
|
|
sources.append((
|
|
f"{HISTORICAL_BASE}/deahistfo_1995_2016.zip",
|
|
"deahistfo_1995_2016.zip",
|
|
"historical_zip",
|
|
))
|
|
|
|
# Per-year archives from 2017 onwards
|
|
for year in range(max(start_year, 2017), end_year + 1):
|
|
sources.append((
|
|
f"{HISTORICAL_BASE}/deahistfo{year}.zip",
|
|
f"deahistfo{year}.zip",
|
|
"annual_zip",
|
|
))
|
|
|
|
with get_db() as conn:
|
|
pass # just to verify DB is accessible
|
|
|
|
for url, label, source_type in sources:
|
|
with get_db() as conn:
|
|
if _already_imported(conn, label):
|
|
if verbose:
|
|
print(f" [skip] {label} (already imported)")
|
|
continue
|
|
|
|
if verbose:
|
|
print(f" [download] {label} ...")
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
|
|
tmp_path = Path(tmp.name)
|
|
|
|
if not _download_zip(url, tmp_path):
|
|
if verbose:
|
|
print(f" [error] Failed to download {url}")
|
|
with get_db() as conn:
|
|
_log_start(conn, label, source_type)
|
|
_log_done(conn, label, 0, 0, f"Download failed: {url}")
|
|
tmp_path.unlink(missing_ok=True)
|
|
continue
|
|
|
|
if verbose:
|
|
print(f" [import] {label} ...")
|
|
|
|
with get_db() as conn:
|
|
_log_start(conn, label, source_type)
|
|
|
|
result = import_zip_file(str(tmp_path), source_label=label)
|
|
tmp_path.unlink(missing_ok=True)
|
|
|
|
with get_db() as conn:
|
|
_log_done(conn, label, result.rows_inserted, result.rows_skipped, result.error)
|
|
|
|
if verbose:
|
|
status = "ERROR: " + result.error if result.error else "OK"
|
|
print(f" [done] {label}: {result.rows_inserted} inserted, "
|
|
f"{result.rows_skipped} skipped — {status}")
|
|
|
|
|
|
def download_and_import() -> ImportResult:
|
|
"""
|
|
Download the current weekly report and import it.
|
|
Used by the cron job.
|
|
"""
|
|
import re
|
|
from datetime import datetime
|
|
|
|
result = ImportResult(source="weekly")
|
|
try:
|
|
r = requests.get(WEEKLY_URL, headers=HEADERS, timeout=30)
|
|
r.raise_for_status()
|
|
html = r.content.decode('latin-1')
|
|
except requests.RequestException as e:
|
|
result.error = str(e)
|
|
return result
|
|
|
|
# Extract report date for filename
|
|
m = re.search(
|
|
r'(January|February|March|April|May|June|July|August|September|'
|
|
r'October|November|December)\s+(\d{1,2}),\s+(\d{4})',
|
|
html,
|
|
)
|
|
if not m:
|
|
result.error = "Could not extract report date"
|
|
return result
|
|
|
|
month, day, year = m.groups()
|
|
months = ['January','February','March','April','May','June',
|
|
'July','August','September','October','November','December']
|
|
from datetime import date as dt
|
|
report_date = dt(int(year), months.index(month) + 1, int(day)).isoformat()
|
|
|
|
from app.db import DB_PATH
|
|
data_dir = DB_PATH.parent
|
|
data_dir.mkdir(exist_ok=True)
|
|
filename = f"{report_date}_deacbtlof.htm"
|
|
filepath = data_dir / filename
|
|
|
|
if not filepath.exists():
|
|
filepath.write_text(html, encoding='latin-1')
|
|
|
|
result.source = filename
|
|
with get_db() as conn:
|
|
if _already_imported(conn, filename):
|
|
result.rows_skipped = 1
|
|
return result
|
|
|
|
with get_db() as conn:
|
|
_log_start(conn, filename, 'weekly_html')
|
|
|
|
r2 = import_html_file(str(filepath))
|
|
result.rows_inserted = r2.rows_inserted
|
|
result.rows_skipped = r2.rows_skipped
|
|
result.error = r2.error
|
|
|
|
with get_db() as conn:
|
|
_log_done(conn, filename, r2.rows_inserted, r2.rows_skipped, r2.error)
|
|
|
|
return result
|