Greg 2c28ac3b0a Add Disaggregated COT data support (2019–2026)
Integrates the CFTC Disaggregated Commitments of Traders reports
(com_disagg_txt_YYYY.zip) which break positions down by Producer/Merchant,
Swap Dealers, Managed Money, and Other Reportables — a different report
type from the existing legacy COT data.

- schema.sql: add disagg_reports, disagg_positions, disagg_concentration tables
- parser.py: add DisaggPositionRow/DisaggBlock dataclasses and
  parse_disagg_csv_text()/parse_disagg_zip_file() for c_year.txt format
- importer.py: add import_disagg_block(), import_disagg_zip_file(),
  run_disagg_historical_import() for 2019–2026 yearly ZIPs
- cli.py: add import-disagg-history subcommand
- docker-compose.yaml: run import-disagg-history on startup (idempotent
  via import_log, so re-deploys skip already-imported years)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 18:11:59 +01:00

631 lines
22 KiB
Python

"""
CFTC COT Data Importer
Inserts parsed CommodityBlock objects into the SQLite database.
All inserts use INSERT OR IGNORE for idempotency — safe to re-run.
"""
import sqlite3
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import requests
from app.db import get_db
from app.ingestion.parser import (
CommodityBlock,
parse_html_file,
parse_zip_file,
parse_disagg_zip_file,
)
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
HISTORICAL_BASE = "https://www.cftc.gov/files/dea/history"
WEEKLY_URL = "https://www.cftc.gov/dea/options/deacbtlof.htm"
@dataclass
class ImportResult:
source: str
rows_inserted: int = 0
rows_skipped: int = 0
error: Optional[str] = None
def _upsert_commodity(conn: sqlite3.Connection, block: CommodityBlock) -> int:
"""Insert commodity if not exists; return its id."""
conn.execute(
"""
INSERT OR IGNORE INTO commodities
(cftc_code, name, exchange, exchange_abbr, contract_unit)
VALUES (?, ?, ?, ?, ?)
""",
(block.cftc_code, block.name, block.exchange,
block.exchange_abbr, block.contract_unit),
)
row = conn.execute(
"SELECT id FROM commodities WHERE cftc_code = ?", (block.cftc_code,)
).fetchone()
return row[0]
def _upsert_report(conn: sqlite3.Connection, commodity_id: int,
block: CommodityBlock, source_file: str) -> Optional[int]:
"""
Insert report row. Returns report id, or None if already exists
(i.e. this report date was already imported for this commodity).
"""
cur = conn.execute(
"""
INSERT OR IGNORE INTO reports
(commodity_id, report_date, prev_report_date, source_file)
VALUES (?, ?, ?, ?)
""",
(commodity_id, block.report_date, block.prev_report_date, source_file),
)
if cur.rowcount == 0:
return None # already existed
return cur.lastrowid
def import_commodity_block(conn: sqlite3.Connection, block: CommodityBlock,
source: str) -> tuple[int, int]:
"""
Insert one CommodityBlock into the DB.
Returns (rows_inserted, rows_skipped).
"""
commodity_id = _upsert_commodity(conn, block)
report_id = _upsert_report(conn, commodity_id, block, source)
if report_id is None:
return 0, 1 # already imported
inserted = 0
for row_type in ('All', 'Old', 'Other'):
pos = block.positions.get(row_type)
if pos is None:
continue
chg = block.changes if row_type == 'All' else None
pct = block.percentages.get(row_type)
trd = block.traders.get(row_type)
conn.execute(
"""
INSERT OR IGNORE INTO positions (
report_id, row_type,
open_interest,
noncomm_long, noncomm_short, noncomm_spreading,
comm_long, comm_short,
total_long, total_short,
nonrept_long, nonrept_short,
chg_open_interest,
chg_noncomm_long, chg_noncomm_short, chg_noncomm_spreading,
chg_comm_long, chg_comm_short,
chg_total_long, chg_total_short,
chg_nonrept_long, chg_nonrept_short,
pct_open_interest,
pct_noncomm_long, pct_noncomm_short, pct_noncomm_spreading,
pct_comm_long, pct_comm_short,
pct_total_long, pct_total_short,
pct_nonrept_long, pct_nonrept_short,
traders_total,
traders_noncomm_long, traders_noncomm_short, traders_noncomm_spread,
traders_comm_long, traders_comm_short,
traders_total_long, traders_total_short
) VALUES (
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?,
?, ?,
?,
?, ?, ?,
?, ?,
?, ?
)
""",
(
report_id, row_type,
pos.open_interest,
pos.noncomm_long, pos.noncomm_short, pos.noncomm_spreading,
pos.comm_long, pos.comm_short,
pos.total_long, pos.total_short,
pos.nonrept_long, pos.nonrept_short,
chg.chg_open_interest if chg else None,
chg.chg_noncomm_long if chg else None,
chg.chg_noncomm_short if chg else None,
chg.chg_noncomm_spreading if chg else None,
chg.chg_comm_long if chg else None,
chg.chg_comm_short if chg else None,
chg.chg_total_long if chg else None,
chg.chg_total_short if chg else None,
chg.chg_nonrept_long if chg else None,
chg.chg_nonrept_short if chg else None,
pct.pct_open_interest if pct else None,
pct.pct_noncomm_long if pct else None,
pct.pct_noncomm_short if pct else None,
pct.pct_noncomm_spreading if pct else None,
pct.pct_comm_long if pct else None,
pct.pct_comm_short if pct else None,
pct.pct_total_long if pct else None,
pct.pct_total_short if pct else None,
pct.pct_nonrept_long if pct else None,
pct.pct_nonrept_short if pct else None,
trd.traders_total if trd else None,
trd.traders_noncomm_long if trd else None,
trd.traders_noncomm_short if trd else None,
trd.traders_noncomm_spread if trd else None,
trd.traders_comm_long if trd else None,
trd.traders_comm_short if trd else None,
trd.traders_total_long if trd else None,
trd.traders_total_short if trd else None,
),
)
inserted += 1
# Concentration
conc = block.concentration.get(row_type)
if conc:
conn.execute(
"""
INSERT OR IGNORE INTO concentration (
report_id, row_type,
conc_gross_long_4, conc_gross_short_4,
conc_gross_long_8, conc_gross_short_8,
conc_net_long_4, conc_net_short_4,
conc_net_long_8, conc_net_short_8
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
report_id, row_type,
conc.conc_gross_long_4, conc.conc_gross_short_4,
conc.conc_gross_long_8, conc.conc_gross_short_8,
conc.conc_net_long_4, conc.conc_net_short_4,
conc.conc_net_long_8, conc.conc_net_short_8,
),
)
return inserted, 0
def import_html_file(html_path: str) -> ImportResult:
"""Import a single weekly HTML file."""
source = Path(html_path).name
result = ImportResult(source=source)
with get_db() as conn:
if _already_imported(conn, source):
result.rows_skipped = 1
return result
_log_start(conn, source, 'weekly_html')
try:
with get_db() as conn:
for block in parse_html_file(html_path):
ins, skp = import_commodity_block(conn, block, source)
result.rows_inserted += ins
result.rows_skipped += skp
conn.commit()
except Exception as e:
result.error = str(e)
with get_db() as conn:
_log_done(conn, source, result.rows_inserted, result.rows_skipped, result.error)
return result
def import_zip_file(zip_path: str, source_label: Optional[str] = None) -> ImportResult:
"""Import a historical ZIP file."""
source = source_label or Path(zip_path).name
result = ImportResult(source=source)
try:
with get_db() as conn:
for block in parse_zip_file(zip_path):
ins, skp = import_commodity_block(conn, block, source)
result.rows_inserted += ins
result.rows_skipped += skp
conn.commit()
except Exception as e:
result.error = str(e)
return result
def _download_zip(url: str, dest: Path) -> bool:
"""Download a ZIP file, return True on success."""
try:
r = requests.get(url, headers=HEADERS, timeout=120, stream=True)
r.raise_for_status()
dest.write_bytes(r.content)
return True
except requests.RequestException:
return False
def _log_start(conn: sqlite3.Connection, source: str, source_type: str) -> None:
conn.execute(
"""
INSERT OR REPLACE INTO import_log (source, source_type, status, started_at)
VALUES (?, ?, 'running', datetime('now'))
""",
(source, source_type),
)
conn.commit()
def _log_done(conn: sqlite3.Connection, source: str,
inserted: int, skipped: int, error: Optional[str] = None) -> None:
status = 'error' if error else 'done'
conn.execute(
"""
UPDATE import_log
SET status = ?, rows_inserted = ?, rows_skipped = ?,
completed_at = datetime('now'), error_message = ?
WHERE source = ?
""",
(status, inserted, skipped, error, source),
)
conn.commit()
def _already_imported(conn: sqlite3.Connection, source: str) -> bool:
row = conn.execute(
"SELECT status FROM import_log WHERE source = ?", (source,)
).fetchone()
return row is not None and row[0] == 'done'
def run_historical_import(start_year: int = 1995, end_year: int = 2026,
verbose: bool = True) -> None:
"""
Download and import the full historical archive.
Uses import_log to skip already-completed sources.
"""
sources = []
# Combined 1995-2016 archive
sources.append((
f"{HISTORICAL_BASE}/deahistfo_1995_2016.zip",
"deahistfo_1995_2016.zip",
"historical_zip",
))
# Per-year archives from 2017 onwards
for year in range(max(start_year, 2017), end_year + 1):
sources.append((
f"{HISTORICAL_BASE}/deahistfo{year}.zip",
f"deahistfo{year}.zip",
"annual_zip",
))
with get_db() as conn:
pass # just to verify DB is accessible
for url, label, source_type in sources:
with get_db() as conn:
if _already_imported(conn, label):
if verbose:
print(f" [skip] {label} (already imported)")
continue
if verbose:
print(f" [download] {label} ...")
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = Path(tmp.name)
if not _download_zip(url, tmp_path):
if verbose:
print(f" [error] Failed to download {url}")
with get_db() as conn:
_log_start(conn, label, source_type)
_log_done(conn, label, 0, 0, f"Download failed: {url}")
tmp_path.unlink(missing_ok=True)
continue
if verbose:
print(f" [import] {label} ...")
with get_db() as conn:
_log_start(conn, label, source_type)
result = import_zip_file(str(tmp_path), source_label=label)
tmp_path.unlink(missing_ok=True)
with get_db() as conn:
_log_done(conn, label, result.rows_inserted, result.rows_skipped, result.error)
if verbose:
status = "ERROR: " + result.error if result.error else "OK"
print(f" [done] {label}: {result.rows_inserted} inserted, "
f"{result.rows_skipped} skipped — {status}")
def import_disagg_block(conn: sqlite3.Connection, block, source: str) -> tuple[int, int]:
"""
Insert one DisaggBlock into the disaggregated tables.
Returns (rows_inserted, rows_skipped).
"""
commodity_id = _upsert_commodity(conn, block)
cur = conn.execute(
"""
INSERT OR IGNORE INTO disagg_reports
(commodity_id, report_date, source_file)
VALUES (?, ?, ?)
""",
(commodity_id, block.report_date, source),
)
if cur.rowcount == 0:
return 0, 1
report_id = cur.lastrowid
inserted = 0
for pos in block.rows:
sfx = pos.row_type
chg = (
pos.chg_open_interest,
pos.chg_prod_merc_long, pos.chg_prod_merc_short,
pos.chg_swap_long, pos.chg_swap_short, pos.chg_swap_spread,
pos.chg_m_money_long, pos.chg_m_money_short, pos.chg_m_money_spread,
pos.chg_other_rept_long, pos.chg_other_rept_short,pos.chg_other_rept_spread,
pos.chg_tot_rept_long, pos.chg_tot_rept_short,
pos.chg_nonrept_long, pos.chg_nonrept_short,
) if sfx == 'All' else (None,) * 16
conn.execute(
"""
INSERT OR IGNORE INTO disagg_positions (
report_id, row_type,
open_interest,
prod_merc_long, prod_merc_short,
swap_long, swap_short, swap_spread,
m_money_long, m_money_short, m_money_spread,
other_rept_long, other_rept_short, other_rept_spread,
tot_rept_long, tot_rept_short,
nonrept_long, nonrept_short,
chg_open_interest,
chg_prod_merc_long, chg_prod_merc_short,
chg_swap_long, chg_swap_short, chg_swap_spread,
chg_m_money_long, chg_m_money_short, chg_m_money_spread,
chg_other_rept_long, chg_other_rept_short, chg_other_rept_spread,
chg_tot_rept_long, chg_tot_rept_short,
chg_nonrept_long, chg_nonrept_short,
pct_open_interest,
pct_prod_merc_long, pct_prod_merc_short,
pct_swap_long, pct_swap_short, pct_swap_spread,
pct_m_money_long, pct_m_money_short, pct_m_money_spread,
pct_other_rept_long, pct_other_rept_short, pct_other_rept_spread,
pct_tot_rept_long, pct_tot_rept_short,
pct_nonrept_long, pct_nonrept_short,
traders_total,
traders_prod_merc_long, traders_prod_merc_short,
traders_swap_long, traders_swap_short, traders_swap_spread,
traders_m_money_long, traders_m_money_short, traders_m_money_spread,
traders_other_rept_long, traders_other_rept_short, traders_other_rept_spread,
traders_tot_rept_long, traders_tot_rept_short
) VALUES (
?, ?,
?,
?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?,
?, ?,
?,
?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?,
?, ?,
?,
?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?,
?, ?,
?,
?, ?,
?, ?, ?,
?, ?, ?,
?, ?, ?,
?, ?
)
""",
(
report_id, sfx,
pos.open_interest,
pos.prod_merc_long, pos.prod_merc_short,
pos.swap_long, pos.swap_short, pos.swap_spread,
pos.m_money_long, pos.m_money_short, pos.m_money_spread,
pos.other_rept_long, pos.other_rept_short,pos.other_rept_spread,
pos.tot_rept_long, pos.tot_rept_short,
pos.nonrept_long, pos.nonrept_short,
*chg,
pos.pct_open_interest,
pos.pct_prod_merc_long, pos.pct_prod_merc_short,
pos.pct_swap_long, pos.pct_swap_short, pos.pct_swap_spread,
pos.pct_m_money_long, pos.pct_m_money_short, pos.pct_m_money_spread,
pos.pct_other_rept_long, pos.pct_other_rept_short,pos.pct_other_rept_spread,
pos.pct_tot_rept_long, pos.pct_tot_rept_short,
pos.pct_nonrept_long, pos.pct_nonrept_short,
pos.traders_total,
pos.traders_prod_merc_long, pos.traders_prod_merc_short,
pos.traders_swap_long, pos.traders_swap_short, pos.traders_swap_spread,
pos.traders_m_money_long, pos.traders_m_money_short, pos.traders_m_money_spread,
pos.traders_other_rept_long, pos.traders_other_rept_short, pos.traders_other_rept_spread,
pos.traders_tot_rept_long, pos.traders_tot_rept_short,
),
)
inserted += 1
conc = block.concentration.get(sfx)
if conc:
conn.execute(
"""
INSERT OR IGNORE INTO disagg_concentration (
report_id, row_type,
conc_gross_long_4, conc_gross_short_4,
conc_gross_long_8, conc_gross_short_8,
conc_net_long_4, conc_net_short_4,
conc_net_long_8, conc_net_short_8
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
report_id, sfx,
conc.conc_gross_long_4, conc.conc_gross_short_4,
conc.conc_gross_long_8, conc.conc_gross_short_8,
conc.conc_net_long_4, conc.conc_net_short_4,
conc.conc_net_long_8, conc.conc_net_short_8,
),
)
return inserted, 0
def import_disagg_zip_file(zip_path: str, source_label: Optional[str] = None) -> ImportResult:
"""Import a Disaggregated COT ZIP file."""
source = source_label or Path(zip_path).name
result = ImportResult(source=source)
try:
with get_db() as conn:
for block in parse_disagg_zip_file(zip_path):
ins, skp = import_disagg_block(conn, block, source)
result.rows_inserted += ins
result.rows_skipped += skp
conn.commit()
except Exception as e:
result.error = str(e)
return result
def run_disagg_historical_import(start_year: int = 2019, end_year: int = 2026,
verbose: bool = True) -> None:
"""
Download and import the Disaggregated COT yearly ZIP files.
Uses import_log to skip already-completed sources.
"""
for year in range(start_year, end_year + 1):
label = f"com_disagg_txt_{year}.zip"
url = f"{HISTORICAL_BASE}/com_disagg_txt_{year}.zip"
with get_db() as conn:
if _already_imported(conn, label):
if verbose:
print(f" [skip] {label} (already imported)")
continue
if verbose:
print(f" [download] {label} ...")
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = Path(tmp.name)
if not _download_zip(url, tmp_path):
if verbose:
print(f" [error] Failed to download {url}")
with get_db() as conn:
_log_start(conn, label, 'disagg_annual_zip')
_log_done(conn, label, 0, 0, f"Download failed: {url}")
tmp_path.unlink(missing_ok=True)
continue
if verbose:
print(f" [import] {label} ...")
with get_db() as conn:
_log_start(conn, label, 'disagg_annual_zip')
result = import_disagg_zip_file(str(tmp_path), source_label=label)
tmp_path.unlink(missing_ok=True)
with get_db() as conn:
_log_done(conn, label, result.rows_inserted, result.rows_skipped, result.error)
if verbose:
status = "ERROR: " + result.error if result.error else "OK"
print(f" [done] {label}: {result.rows_inserted} inserted, "
f"{result.rows_skipped} skipped — {status}")
def download_and_import() -> ImportResult:
"""
Download the current weekly report and import it.
Used by the cron job.
"""
import re
from datetime import datetime
result = ImportResult(source="weekly")
try:
r = requests.get(WEEKLY_URL, headers=HEADERS, timeout=30)
r.raise_for_status()
html = r.content.decode('latin-1')
except requests.RequestException as e:
result.error = str(e)
return result
# Extract report date for filename
m = re.search(
r'(January|February|March|April|May|June|July|August|September|'
r'October|November|December)\s+(\d{1,2}),\s+(\d{4})',
html,
)
if not m:
result.error = "Could not extract report date"
return result
month, day, year = m.groups()
months = ['January','February','March','April','May','June',
'July','August','September','October','November','December']
from datetime import date as dt
report_date = dt(int(year), months.index(month) + 1, int(day)).isoformat()
from app.db import DB_PATH
data_dir = DB_PATH.parent
data_dir.mkdir(exist_ok=True)
filename = f"{report_date}_deacbtlof.htm"
filepath = data_dir / filename
if not filepath.exists():
filepath.write_text(html, encoding='latin-1')
result.source = filename
with get_db() as conn:
if _already_imported(conn, filename):
result.rows_skipped = 1
return result
with get_db() as conn:
_log_start(conn, filename, 'weekly_html')
r2 = import_html_file(str(filepath))
result.rows_inserted = r2.rows_inserted
result.rows_skipped = r2.rows_skipped
result.error = r2.error
with get_db() as conn:
_log_done(conn, filename, r2.rows_inserted, r2.rows_skipped, r2.error)
return result