Greg 2c28ac3b0a Add Disaggregated COT data support (2019–2026)
Integrates the CFTC Disaggregated Commitments of Traders reports
(com_disagg_txt_YYYY.zip) which break positions down by Producer/Merchant,
Swap Dealers, Managed Money, and Other Reportables — a different report
type from the existing legacy COT data.

- schema.sql: add disagg_reports, disagg_positions, disagg_concentration tables
- parser.py: add DisaggPositionRow/DisaggBlock dataclasses and
  parse_disagg_csv_text()/parse_disagg_zip_file() for c_year.txt format
- importer.py: add import_disagg_block(), import_disagg_zip_file(),
  run_disagg_historical_import() for 2019–2026 yearly ZIPs
- cli.py: add import-disagg-history subcommand
- docker-compose.yaml: run import-disagg-history on startup (idempotent
  via import_log, so re-deploys skip already-imported years)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 18:11:59 +01:00

157 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CFTC COT Ingestion CLI
Usage:
python -m app.ingestion.cli init-db
python -m app.ingestion.cli import-local-html [--data-dir ./data]
python -m app.ingestion.cli import-history [--start-year 1995] [--end-year 2026]
python -m app.ingestion.cli import-html <file.htm>
python -m app.ingestion.cli import-zip <file.zip>
python -m app.ingestion.cli download-and-import
python -m app.ingestion.cli status
"""
import argparse
import sys
from pathlib import Path
def cmd_init_db(args):
from app.db import init_db
init_db()
def cmd_import_local_html(args):
from app.ingestion.importer import import_html_file
data_dir = Path(args.data_dir)
files = sorted(data_dir.glob("*_deacbtlof.htm"))
if not files:
print(f"No HTML files found in {data_dir}")
return
total_inserted = total_skipped = 0
for f in files:
result = import_html_file(str(f))
status = f"ERROR: {result.error}" if result.error else "OK"
print(f" {f.name}: {result.rows_inserted} inserted, {result.rows_skipped} skipped — {status}")
total_inserted += result.rows_inserted
total_skipped += result.rows_skipped
print(f"\nTotal: {total_inserted} inserted, {total_skipped} skipped")
def cmd_import_history(args):
from app.ingestion.importer import run_historical_import
print(f"Importing historical data {args.start_year}{args.end_year}...")
run_historical_import(start_year=args.start_year, end_year=args.end_year, verbose=True)
print("Done.")
def cmd_import_html(args):
from app.ingestion.importer import import_html_file
result = import_html_file(args.file)
if result.error:
print(f"Error: {result.error}", file=sys.stderr)
sys.exit(1)
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
def cmd_import_zip(args):
from app.ingestion.importer import import_zip_file
result = import_zip_file(args.file)
if result.error:
print(f"Error: {result.error}", file=sys.stderr)
sys.exit(1)
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
def cmd_import_disagg_history(args):
from app.ingestion.importer import run_disagg_historical_import
print(f"Importing Disaggregated COT data {args.start_year}{args.end_year}...")
run_disagg_historical_import(start_year=args.start_year, end_year=args.end_year, verbose=True)
print("Done.")
def cmd_download_and_import(args):
from app.ingestion.importer import download_and_import
result = download_and_import()
if result.error:
print(f"Error: {result.error}", file=sys.stderr)
sys.exit(1)
print(f"Source: {result.source}")
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
def cmd_status(args):
from app.db import get_db
with get_db() as conn:
# Summary counts
row = conn.execute("SELECT COUNT(*) FROM commodities").fetchone()
print(f"Commodities: {row[0]}")
row = conn.execute("SELECT COUNT(DISTINCT report_date) FROM reports").fetchone()
print(f"Report dates: {row[0]}")
row = conn.execute("SELECT COUNT(*) FROM positions").fetchone()
print(f"Position rows: {row[0]}")
row = conn.execute("SELECT MIN(report_date), MAX(report_date) FROM reports").fetchone()
print(f"Date range: {row[0]} to {row[1]}")
# Exchanges
print("\nBy exchange:")
for r in conn.execute(
"SELECT exchange_abbr, COUNT(*) FROM commodities GROUP BY exchange_abbr ORDER BY COUNT(*) DESC"
):
print(f" {r[0]}: {r[1]} markets")
# Import log
print("\nImport log (last 10):")
for r in conn.execute(
"SELECT source, status, rows_inserted, rows_skipped, completed_at "
"FROM import_log ORDER BY id DESC LIMIT 10"
):
print(f" {r['source']}: {r['status']}"
f"{r['rows_inserted']} inserted, {r['rows_skipped']} skipped "
f"({r['completed_at']})")
def main():
parser = argparse.ArgumentParser(description="CFTC COT data ingestion CLI")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("init-db", help="Initialize the database schema")
p = sub.add_parser("import-local-html", help="Import all local HTML files")
p.add_argument("--data-dir", default="data", help="Directory with HTML files")
p = sub.add_parser("import-history", help="Download and import full historical archive")
p.add_argument("--start-year", type=int, default=1995)
p.add_argument("--end-year", type=int, default=2026)
p = sub.add_parser("import-html", help="Import a single HTML file")
p.add_argument("file")
p = sub.add_parser("import-zip", help="Import a single ZIP file")
p.add_argument("file")
p = sub.add_parser("import-disagg-history",
help="Download and import Disaggregated COT yearly ZIPs (20192026)")
p.add_argument("--start-year", type=int, default=2019)
p.add_argument("--end-year", type=int, default=2026)
sub.add_parser("download-and-import", help="Download latest weekly report and import it")
sub.add_parser("status", help="Show database statistics")
args = parser.parse_args()
commands = {
"init-db": cmd_init_db,
"import-local-html": cmd_import_local_html,
"import-history": cmd_import_history,
"import-html": cmd_import_html,
"import-zip": cmd_import_zip,
"import-disagg-history": cmd_import_disagg_history,
"download-and-import": cmd_download_and_import,
"status": cmd_status,
}
commands[args.command](args)
if __name__ == "__main__":
main()