""" CFTC COT Data Importer Inserts parsed CommodityBlock objects into the SQLite database. All inserts use INSERT OR IGNORE for idempotency — safe to re-run. """ import sqlite3 import tempfile from dataclasses import dataclass from pathlib import Path from typing import Optional import requests from app.db import get_db from app.ingestion.parser import ( CommodityBlock, parse_html_file, parse_zip_file, ) HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) } HISTORICAL_BASE = "https://www.cftc.gov/files/dea/history" WEEKLY_URL = "https://www.cftc.gov/dea/options/deacbtlof.htm" @dataclass class ImportResult: source: str rows_inserted: int = 0 rows_skipped: int = 0 error: Optional[str] = None def _upsert_commodity(conn: sqlite3.Connection, block: CommodityBlock) -> int: """Insert commodity if not exists; return its id.""" conn.execute( """ INSERT OR IGNORE INTO commodities (cftc_code, name, exchange, exchange_abbr, contract_unit) VALUES (?, ?, ?, ?, ?) """, (block.cftc_code, block.name, block.exchange, block.exchange_abbr, block.contract_unit), ) row = conn.execute( "SELECT id FROM commodities WHERE cftc_code = ?", (block.cftc_code,) ).fetchone() return row[0] def _upsert_report(conn: sqlite3.Connection, commodity_id: int, block: CommodityBlock, source_file: str) -> Optional[int]: """ Insert report row. Returns report id, or None if already exists (i.e. this report date was already imported for this commodity). """ cur = conn.execute( """ INSERT OR IGNORE INTO reports (commodity_id, report_date, prev_report_date, source_file) VALUES (?, ?, ?, ?) """, (commodity_id, block.report_date, block.prev_report_date, source_file), ) if cur.rowcount == 0: return None # already existed return cur.lastrowid def import_commodity_block(conn: sqlite3.Connection, block: CommodityBlock, source: str) -> tuple[int, int]: """ Insert one CommodityBlock into the DB. Returns (rows_inserted, rows_skipped). """ commodity_id = _upsert_commodity(conn, block) report_id = _upsert_report(conn, commodity_id, block, source) if report_id is None: return 0, 1 # already imported inserted = 0 for row_type in ('All', 'Old', 'Other'): pos = block.positions.get(row_type) if pos is None: continue chg = block.changes if row_type == 'All' else None pct = block.percentages.get(row_type) trd = block.traders.get(row_type) conn.execute( """ INSERT OR IGNORE INTO positions ( report_id, row_type, open_interest, noncomm_long, noncomm_short, noncomm_spreading, comm_long, comm_short, total_long, total_short, nonrept_long, nonrept_short, chg_open_interest, chg_noncomm_long, chg_noncomm_short, chg_noncomm_spreading, chg_comm_long, chg_comm_short, chg_total_long, chg_total_short, chg_nonrept_long, chg_nonrept_short, pct_open_interest, pct_noncomm_long, pct_noncomm_short, pct_noncomm_spreading, pct_comm_long, pct_comm_short, pct_total_long, pct_total_short, pct_nonrept_long, pct_nonrept_short, traders_total, traders_noncomm_long, traders_noncomm_short, traders_noncomm_spread, traders_comm_long, traders_comm_short, traders_total_long, traders_total_short ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """, ( report_id, row_type, pos.open_interest, pos.noncomm_long, pos.noncomm_short, pos.noncomm_spreading, pos.comm_long, pos.comm_short, pos.total_long, pos.total_short, pos.nonrept_long, pos.nonrept_short, chg.chg_open_interest if chg else None, chg.chg_noncomm_long if chg else None, chg.chg_noncomm_short if chg else None, chg.chg_noncomm_spreading if chg else None, chg.chg_comm_long if chg else None, chg.chg_comm_short if chg else None, chg.chg_total_long if chg else None, chg.chg_total_short if chg else None, chg.chg_nonrept_long if chg else None, chg.chg_nonrept_short if chg else None, pct.pct_open_interest if pct else None, pct.pct_noncomm_long if pct else None, pct.pct_noncomm_short if pct else None, pct.pct_noncomm_spreading if pct else None, pct.pct_comm_long if pct else None, pct.pct_comm_short if pct else None, pct.pct_total_long if pct else None, pct.pct_total_short if pct else None, pct.pct_nonrept_long if pct else None, pct.pct_nonrept_short if pct else None, trd.traders_total if trd else None, trd.traders_noncomm_long if trd else None, trd.traders_noncomm_short if trd else None, trd.traders_noncomm_spread if trd else None, trd.traders_comm_long if trd else None, trd.traders_comm_short if trd else None, trd.traders_total_long if trd else None, trd.traders_total_short if trd else None, ), ) inserted += 1 # Concentration conc = block.concentration.get(row_type) if conc: conn.execute( """ INSERT OR IGNORE INTO concentration ( report_id, row_type, conc_gross_long_4, conc_gross_short_4, conc_gross_long_8, conc_gross_short_8, conc_net_long_4, conc_net_short_4, conc_net_long_8, conc_net_short_8 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( report_id, row_type, conc.conc_gross_long_4, conc.conc_gross_short_4, conc.conc_gross_long_8, conc.conc_gross_short_8, conc.conc_net_long_4, conc.conc_net_short_4, conc.conc_net_long_8, conc.conc_net_short_8, ), ) return inserted, 0 def import_html_file(html_path: str) -> ImportResult: """Import a single weekly HTML file.""" source = Path(html_path).name result = ImportResult(source=source) with get_db() as conn: if _already_imported(conn, source): result.rows_skipped = 1 return result _log_start(conn, source, 'weekly_html') try: with get_db() as conn: for block in parse_html_file(html_path): ins, skp = import_commodity_block(conn, block, source) result.rows_inserted += ins result.rows_skipped += skp conn.commit() except Exception as e: result.error = str(e) with get_db() as conn: _log_done(conn, source, result.rows_inserted, result.rows_skipped, result.error) return result def import_zip_file(zip_path: str, source_label: Optional[str] = None) -> ImportResult: """Import a historical ZIP file.""" source = source_label or Path(zip_path).name result = ImportResult(source=source) try: with get_db() as conn: for block in parse_zip_file(zip_path): ins, skp = import_commodity_block(conn, block, source) result.rows_inserted += ins result.rows_skipped += skp conn.commit() except Exception as e: result.error = str(e) return result def _download_zip(url: str, dest: Path) -> bool: """Download a ZIP file, return True on success.""" try: r = requests.get(url, headers=HEADERS, timeout=120, stream=True) r.raise_for_status() dest.write_bytes(r.content) return True except requests.RequestException: return False def _log_start(conn: sqlite3.Connection, source: str, source_type: str) -> None: conn.execute( """ INSERT OR REPLACE INTO import_log (source, source_type, status, started_at) VALUES (?, ?, 'running', datetime('now')) """, (source, source_type), ) conn.commit() def _log_done(conn: sqlite3.Connection, source: str, inserted: int, skipped: int, error: Optional[str] = None) -> None: status = 'error' if error else 'done' conn.execute( """ UPDATE import_log SET status = ?, rows_inserted = ?, rows_skipped = ?, completed_at = datetime('now'), error_message = ? WHERE source = ? """, (status, inserted, skipped, error, source), ) conn.commit() def _already_imported(conn: sqlite3.Connection, source: str) -> bool: row = conn.execute( "SELECT status FROM import_log WHERE source = ?", (source,) ).fetchone() return row is not None and row[0] == 'done' def run_historical_import(start_year: int = 1995, end_year: int = 2026, verbose: bool = True) -> None: """ Download and import the full historical archive. Uses import_log to skip already-completed sources. """ sources = [] # Combined 1995-2016 archive sources.append(( f"{HISTORICAL_BASE}/deahistfo_1995_2016.zip", "deahistfo_1995_2016.zip", "historical_zip", )) # Per-year archives from 2017 onwards for year in range(max(start_year, 2017), end_year + 1): sources.append(( f"{HISTORICAL_BASE}/deahistfo{year}.zip", f"deahistfo{year}.zip", "annual_zip", )) with get_db() as conn: pass # just to verify DB is accessible for url, label, source_type in sources: with get_db() as conn: if _already_imported(conn, label): if verbose: print(f" [skip] {label} (already imported)") continue if verbose: print(f" [download] {label} ...") with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: tmp_path = Path(tmp.name) if not _download_zip(url, tmp_path): if verbose: print(f" [error] Failed to download {url}") with get_db() as conn: _log_start(conn, label, source_type) _log_done(conn, label, 0, 0, f"Download failed: {url}") tmp_path.unlink(missing_ok=True) continue if verbose: print(f" [import] {label} ...") with get_db() as conn: _log_start(conn, label, source_type) result = import_zip_file(str(tmp_path), source_label=label) tmp_path.unlink(missing_ok=True) with get_db() as conn: _log_done(conn, label, result.rows_inserted, result.rows_skipped, result.error) if verbose: status = "ERROR: " + result.error if result.error else "OK" print(f" [done] {label}: {result.rows_inserted} inserted, " f"{result.rows_skipped} skipped — {status}") def download_and_import() -> ImportResult: """ Download the current weekly report and import it. Used by the cron job. """ import re from datetime import datetime result = ImportResult(source="weekly") try: r = requests.get(WEEKLY_URL, headers=HEADERS, timeout=30) r.raise_for_status() html = r.content.decode('latin-1') except requests.RequestException as e: result.error = str(e) return result # Extract report date for filename m = re.search( r'(January|February|March|April|May|June|July|August|September|' r'October|November|December)\s+(\d{1,2}),\s+(\d{4})', html, ) if not m: result.error = "Could not extract report date" return result month, day, year = m.groups() months = ['January','February','March','April','May','June', 'July','August','September','October','November','December'] from datetime import date as dt report_date = dt(int(year), months.index(month) + 1, int(day)).isoformat() from app.db import DB_PATH data_dir = DB_PATH.parent data_dir.mkdir(exist_ok=True) filename = f"{report_date}_deacbtlof.htm" filepath = data_dir / filename if not filepath.exists(): filepath.write_text(html, encoding='latin-1') result.source = filename with get_db() as conn: if _already_imported(conn, filename): result.rows_skipped = 1 return result with get_db() as conn: _log_start(conn, filename, 'weekly_html') r2 = import_html_file(str(filepath)) result.rows_inserted = r2.rows_inserted result.rows_skipped = r2.rows_skipped result.error = r2.error with get_db() as conn: _log_done(conn, filename, r2.rows_inserted, r2.rows_skipped, r2.error) return result