COTexplorer/app/ingestion/importer.py
Greg 37f8eac932 Initial commit: CFTC COT Explorer
FastAPI application that ingests CFTC Commitments of Traders data into SQLite
and exposes it via a REST API with analytics endpoints (screener, percentile rank,
concentration). Includes CLI for historical and weekly data ingestion, Docker setup,
and a frontend.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 11:23:00 +01:00

420 lines
14 KiB
Python

"""
CFTC COT Data Importer
Inserts parsed CommodityBlock objects into the SQLite database.
All inserts use INSERT OR IGNORE for idempotency — safe to re-run.
"""
import sqlite3
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import requests
from app.db import get_db
from app.ingestion.parser import (
CommodityBlock,
parse_html_file,
parse_zip_file,
)
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
HISTORICAL_BASE = "https://www.cftc.gov/files/dea/history"
WEEKLY_URL = "https://www.cftc.gov/dea/options/deacbtlof.htm"
@dataclass
class ImportResult:
source: str
rows_inserted: int = 0
rows_skipped: int = 0
error: Optional[str] = None
def _upsert_commodity(conn: sqlite3.Connection, block: CommodityBlock) -> int:
"""Insert commodity if not exists; return its id."""
conn.execute(
"""
INSERT OR IGNORE INTO commodities
(cftc_code, name, exchange, exchange_abbr, contract_unit)
VALUES (?, ?, ?, ?, ?)
""",
(block.cftc_code, block.name, block.exchange,
block.exchange_abbr, block.contract_unit),
)
row = conn.execute(
"SELECT id FROM commodities WHERE cftc_code = ?", (block.cftc_code,)
).fetchone()
return row[0]
def _upsert_report(conn: sqlite3.Connection, commodity_id: int,
block: CommodityBlock, source_file: str) -> Optional[int]:
"""
Insert report row. Returns report id, or None if already exists
(i.e. this report date was already imported for this commodity).
"""
cur = conn.execute(
"""
INSERT OR IGNORE INTO reports
(commodity_id, report_date, prev_report_date, source_file)
VALUES (?, ?, ?, ?)
""",
(commodity_id, block.report_date, block.prev_report_date, source_file),
)
if cur.rowcount == 0:
return None # already existed
return cur.lastrowid
def import_commodity_block(conn: sqlite3.Connection, block: CommodityBlock,
source: str) -> tuple[int, int]:
"""
Insert one CommodityBlock into the DB.
Returns (rows_inserted, rows_skipped).
"""
commodity_id = _upsert_commodity(conn, block)
report_id = _upsert_report(conn, commodity_id, block, source)
if report_id is None:
return 0, 1 # already imported
inserted = 0
for row_type in ('All', 'Old', 'Other'):
pos = block.positions.get(row_type)
if pos is None:
continue
chg = block.changes if row_type == 'All' else None
pct = block.percentages.get(row_type)
trd = block.traders.get(row_type)
conn.execute(
"""
INSERT OR IGNORE INTO positions (
report_id, row_type,
open_interest,
noncomm_long, noncomm_short, noncomm_spreading,
comm_long, comm_short,
total_long, total_short,
nonrept_long, nonrept_short,
chg_open_interest,
chg_noncomm_long, chg_noncomm_short, chg_noncomm_spreading,
chg_comm_long, chg_comm_short,
chg_total_long, chg_total_short,
chg_nonrept_long, chg_nonrept_short,
pct_open_interest,
pct_noncomm_long, pct_noncomm_short, pct_noncomm_spreading,
pct_comm_long, pct_comm_short,
pct_total_long, pct_total_short,
pct_nonrept_long, pct_nonrept_short,
traders_total,
traders_noncomm_long, traders_noncomm_short, traders_noncomm_spread,
traders_comm_long, traders_comm_short,
traders_total_long, traders_total_short
) VALUES (
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?
)
""",
(
report_id, row_type,
pos.open_interest,
pos.noncomm_long, pos.noncomm_short, pos.noncomm_spreading,
pos.comm_long, pos.comm_short,
pos.total_long, pos.total_short,
pos.nonrept_long, pos.nonrept_short,
chg.chg_open_interest if chg else None,
chg.chg_noncomm_long if chg else None,
chg.chg_noncomm_short if chg else None,
chg.chg_noncomm_spreading if chg else None,
chg.chg_comm_long if chg else None,
chg.chg_comm_short if chg else None,
chg.chg_total_long if chg else None,
chg.chg_total_short if chg else None,
chg.chg_nonrept_long if chg else None,
chg.chg_nonrept_short if chg else None,
pct.pct_open_interest if pct else None,
pct.pct_noncomm_long if pct else None,
pct.pct_noncomm_short if pct else None,
pct.pct_noncomm_spreading if pct else None,
pct.pct_comm_long if pct else None,
pct.pct_comm_short if pct else None,
pct.pct_total_long if pct else None,
pct.pct_total_short if pct else None,
pct.pct_nonrept_long if pct else None,
pct.pct_nonrept_short if pct else None,
trd.traders_total if trd else None,
trd.traders_noncomm_long if trd else None,
trd.traders_noncomm_short if trd else None,
trd.traders_noncomm_spread if trd else None,
trd.traders_comm_long if trd else None,
trd.traders_comm_short if trd else None,
trd.traders_total_long if trd else None,
trd.traders_total_short if trd else None,
),
)
inserted += 1
# Concentration
conc = block.concentration.get(row_type)
if conc:
conn.execute(
"""
INSERT OR IGNORE INTO concentration (
report_id, row_type,
conc_gross_long_4, conc_gross_short_4,
conc_gross_long_8, conc_gross_short_8,
conc_net_long_4, conc_net_short_4,
conc_net_long_8, conc_net_short_8
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
report_id, row_type,
conc.conc_gross_long_4, conc.conc_gross_short_4,
conc.conc_gross_long_8, conc.conc_gross_short_8,
conc.conc_net_long_4, conc.conc_net_short_4,
conc.conc_net_long_8, conc.conc_net_short_8,
),
)
return inserted, 0
def import_html_file(html_path: str) -> ImportResult:
"""Import a single weekly HTML file."""
source = Path(html_path).name
result = ImportResult(source=source)
with get_db() as conn:
if _already_imported(conn, source):
result.rows_skipped = 1
return result
_log_start(conn, source, 'weekly_html')
try:
with get_db() as conn:
for block in parse_html_file(html_path):
ins, skp = import_commodity_block(conn, block, source)
result.rows_inserted += ins
result.rows_skipped += skp
conn.commit()
except Exception as e:
result.error = str(e)
with get_db() as conn:
_log_done(conn, source, result.rows_inserted, result.rows_skipped, result.error)
return result
def import_zip_file(zip_path: str, source_label: Optional[str] = None) -> ImportResult:
"""Import a historical ZIP file."""
source = source_label or Path(zip_path).name
result = ImportResult(source=source)
try:
with get_db() as conn:
for block in parse_zip_file(zip_path):
ins, skp = import_commodity_block(conn, block, source)
result.rows_inserted += ins
result.rows_skipped += skp
conn.commit()
except Exception as e:
result.error = str(e)
return result
def _download_zip(url: str, dest: Path) -> bool:
"""Download a ZIP file, return True on success."""
try:
r = requests.get(url, headers=HEADERS, timeout=120, stream=True)
r.raise_for_status()
dest.write_bytes(r.content)
return True
except requests.RequestException:
return False
def _log_start(conn: sqlite3.Connection, source: str, source_type: str) -> None:
conn.execute(
"""
INSERT OR REPLACE INTO import_log (source, source_type, status, started_at)
VALUES (?, ?, 'running', datetime('now'))
""",
(source, source_type),
)
conn.commit()
def _log_done(conn: sqlite3.Connection, source: str,
inserted: int, skipped: int, error: Optional[str] = None) -> None:
status = 'error' if error else 'done'
conn.execute(
"""
UPDATE import_log
SET status = ?, rows_inserted = ?, rows_skipped = ?,
completed_at = datetime('now'), error_message = ?
WHERE source = ?
""",
(status, inserted, skipped, error, source),
)
conn.commit()
def _already_imported(conn: sqlite3.Connection, source: str) -> bool:
row = conn.execute(
"SELECT status FROM import_log WHERE source = ?", (source,)
).fetchone()
return row is not None and row[0] == 'done'
def run_historical_import(start_year: int = 1995, end_year: int = 2026,
verbose: bool = True) -> None:
"""
Download and import the full historical archive.
Uses import_log to skip already-completed sources.
"""
sources = []
# Combined 1995-2016 archive
sources.append((
f"{HISTORICAL_BASE}/deahistfo_1995_2016.zip",
"deahistfo_1995_2016.zip",
"historical_zip",
))
# Per-year archives from 2017 onwards
for year in range(max(start_year, 2017), end_year + 1):
sources.append((
f"{HISTORICAL_BASE}/deahistfo{year}.zip",
f"deahistfo{year}.zip",
"annual_zip",
))
with get_db() as conn:
pass # just to verify DB is accessible
for url, label, source_type in sources:
with get_db() as conn:
if _already_imported(conn, label):
if verbose:
print(f" [skip] {label} (already imported)")
continue
if verbose:
print(f" [download] {label} ...")
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = Path(tmp.name)
if not _download_zip(url, tmp_path):
if verbose:
print(f" [error] Failed to download {url}")
with get_db() as conn:
_log_start(conn, label, source_type)
_log_done(conn, label, 0, 0, f"Download failed: {url}")
tmp_path.unlink(missing_ok=True)
continue
if verbose:
print(f" [import] {label} ...")
with get_db() as conn:
_log_start(conn, label, source_type)
result = import_zip_file(str(tmp_path), source_label=label)
tmp_path.unlink(missing_ok=True)
with get_db() as conn:
_log_done(conn, label, result.rows_inserted, result.rows_skipped, result.error)
if verbose:
status = "ERROR: " + result.error if result.error else "OK"
print(f" [done] {label}: {result.rows_inserted} inserted, "
f"{result.rows_skipped} skipped — {status}")
def download_and_import() -> ImportResult:
"""
Download the current weekly report and import it.
Used by the cron job.
"""
import re
from datetime import datetime
result = ImportResult(source="weekly")
try:
r = requests.get(WEEKLY_URL, headers=HEADERS, timeout=30)
r.raise_for_status()
html = r.content.decode('latin-1')
except requests.RequestException as e:
result.error = str(e)
return result
# Extract report date for filename
m = re.search(
r'(January|February|March|April|May|June|July|August|September|'
r'October|November|December)\s+(\d{1,2}),\s+(\d{4})',
html,
)
if not m:
result.error = "Could not extract report date"
return result
month, day, year = m.groups()
months = ['January','February','March','April','May','June',
'July','August','September','October','November','December']
from datetime import date as dt
report_date = dt(int(year), months.index(month) + 1, int(day)).isoformat()
from app.db import DB_PATH
data_dir = DB_PATH.parent
data_dir.mkdir(exist_ok=True)
filename = f"{report_date}_deacbtlof.htm"
filepath = data_dir / filename
if not filepath.exists():
filepath.write_text(html, encoding='latin-1')
result.source = filename
with get_db() as conn:
if _already_imported(conn, filename):
result.rows_skipped = 1
return result
with get_db() as conn:
_log_start(conn, filename, 'weekly_html')
r2 = import_html_file(str(filepath))
result.rows_inserted = r2.rows_inserted
result.rows_skipped = r2.rows_skipped
result.error = r2.error
with get_db() as conn:
_log_done(conn, filename, r2.rows_inserted, r2.rows_skipped, r2.error)
return result