Integrates the CFTC Disaggregated Commitments of Traders reports (com_disagg_txt_YYYY.zip) which break positions down by Producer/Merchant, Swap Dealers, Managed Money, and Other Reportables — a different report type from the existing legacy COT data. - schema.sql: add disagg_reports, disagg_positions, disagg_concentration tables - parser.py: add DisaggPositionRow/DisaggBlock dataclasses and parse_disagg_csv_text()/parse_disagg_zip_file() for c_year.txt format - importer.py: add import_disagg_block(), import_disagg_zip_file(), run_disagg_historical_import() for 2019–2026 yearly ZIPs - cli.py: add import-disagg-history subcommand - docker-compose.yaml: run import-disagg-history on startup (idempotent via import_log, so re-deploys skip already-imported years) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
157 lines
5.6 KiB
Python
157 lines
5.6 KiB
Python
"""
|
||
CFTC COT Ingestion CLI
|
||
|
||
Usage:
|
||
python -m app.ingestion.cli init-db
|
||
python -m app.ingestion.cli import-local-html [--data-dir ./data]
|
||
python -m app.ingestion.cli import-history [--start-year 1995] [--end-year 2026]
|
||
python -m app.ingestion.cli import-html <file.htm>
|
||
python -m app.ingestion.cli import-zip <file.zip>
|
||
python -m app.ingestion.cli download-and-import
|
||
python -m app.ingestion.cli status
|
||
"""
|
||
|
||
import argparse
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
|
||
def cmd_init_db(args):
|
||
from app.db import init_db
|
||
init_db()
|
||
|
||
|
||
def cmd_import_local_html(args):
|
||
from app.ingestion.importer import import_html_file
|
||
data_dir = Path(args.data_dir)
|
||
files = sorted(data_dir.glob("*_deacbtlof.htm"))
|
||
if not files:
|
||
print(f"No HTML files found in {data_dir}")
|
||
return
|
||
total_inserted = total_skipped = 0
|
||
for f in files:
|
||
result = import_html_file(str(f))
|
||
status = f"ERROR: {result.error}" if result.error else "OK"
|
||
print(f" {f.name}: {result.rows_inserted} inserted, {result.rows_skipped} skipped — {status}")
|
||
total_inserted += result.rows_inserted
|
||
total_skipped += result.rows_skipped
|
||
print(f"\nTotal: {total_inserted} inserted, {total_skipped} skipped")
|
||
|
||
|
||
def cmd_import_history(args):
|
||
from app.ingestion.importer import run_historical_import
|
||
print(f"Importing historical data {args.start_year}–{args.end_year}...")
|
||
run_historical_import(start_year=args.start_year, end_year=args.end_year, verbose=True)
|
||
print("Done.")
|
||
|
||
|
||
def cmd_import_html(args):
|
||
from app.ingestion.importer import import_html_file
|
||
result = import_html_file(args.file)
|
||
if result.error:
|
||
print(f"Error: {result.error}", file=sys.stderr)
|
||
sys.exit(1)
|
||
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
|
||
|
||
|
||
def cmd_import_zip(args):
|
||
from app.ingestion.importer import import_zip_file
|
||
result = import_zip_file(args.file)
|
||
if result.error:
|
||
print(f"Error: {result.error}", file=sys.stderr)
|
||
sys.exit(1)
|
||
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
|
||
|
||
|
||
def cmd_import_disagg_history(args):
|
||
from app.ingestion.importer import run_disagg_historical_import
|
||
print(f"Importing Disaggregated COT data {args.start_year}–{args.end_year}...")
|
||
run_disagg_historical_import(start_year=args.start_year, end_year=args.end_year, verbose=True)
|
||
print("Done.")
|
||
|
||
|
||
def cmd_download_and_import(args):
|
||
from app.ingestion.importer import download_and_import
|
||
result = download_and_import()
|
||
if result.error:
|
||
print(f"Error: {result.error}", file=sys.stderr)
|
||
sys.exit(1)
|
||
print(f"Source: {result.source}")
|
||
print(f"{result.rows_inserted} inserted, {result.rows_skipped} skipped")
|
||
|
||
|
||
def cmd_status(args):
|
||
from app.db import get_db
|
||
with get_db() as conn:
|
||
# Summary counts
|
||
row = conn.execute("SELECT COUNT(*) FROM commodities").fetchone()
|
||
print(f"Commodities: {row[0]}")
|
||
row = conn.execute("SELECT COUNT(DISTINCT report_date) FROM reports").fetchone()
|
||
print(f"Report dates: {row[0]}")
|
||
row = conn.execute("SELECT COUNT(*) FROM positions").fetchone()
|
||
print(f"Position rows: {row[0]}")
|
||
row = conn.execute("SELECT MIN(report_date), MAX(report_date) FROM reports").fetchone()
|
||
print(f"Date range: {row[0]} to {row[1]}")
|
||
|
||
# Exchanges
|
||
print("\nBy exchange:")
|
||
for r in conn.execute(
|
||
"SELECT exchange_abbr, COUNT(*) FROM commodities GROUP BY exchange_abbr ORDER BY COUNT(*) DESC"
|
||
):
|
||
print(f" {r[0]}: {r[1]} markets")
|
||
|
||
# Import log
|
||
print("\nImport log (last 10):")
|
||
for r in conn.execute(
|
||
"SELECT source, status, rows_inserted, rows_skipped, completed_at "
|
||
"FROM import_log ORDER BY id DESC LIMIT 10"
|
||
):
|
||
print(f" {r['source']}: {r['status']} — "
|
||
f"{r['rows_inserted']} inserted, {r['rows_skipped']} skipped "
|
||
f"({r['completed_at']})")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="CFTC COT data ingestion CLI")
|
||
sub = parser.add_subparsers(dest="command", required=True)
|
||
|
||
sub.add_parser("init-db", help="Initialize the database schema")
|
||
|
||
p = sub.add_parser("import-local-html", help="Import all local HTML files")
|
||
p.add_argument("--data-dir", default="data", help="Directory with HTML files")
|
||
|
||
p = sub.add_parser("import-history", help="Download and import full historical archive")
|
||
p.add_argument("--start-year", type=int, default=1995)
|
||
p.add_argument("--end-year", type=int, default=2026)
|
||
|
||
p = sub.add_parser("import-html", help="Import a single HTML file")
|
||
p.add_argument("file")
|
||
|
||
p = sub.add_parser("import-zip", help="Import a single ZIP file")
|
||
p.add_argument("file")
|
||
|
||
p = sub.add_parser("import-disagg-history",
|
||
help="Download and import Disaggregated COT yearly ZIPs (2019–2026)")
|
||
p.add_argument("--start-year", type=int, default=2019)
|
||
p.add_argument("--end-year", type=int, default=2026)
|
||
|
||
sub.add_parser("download-and-import", help="Download latest weekly report and import it")
|
||
sub.add_parser("status", help="Show database statistics")
|
||
|
||
args = parser.parse_args()
|
||
commands = {
|
||
"init-db": cmd_init_db,
|
||
"import-local-html": cmd_import_local_html,
|
||
"import-history": cmd_import_history,
|
||
"import-html": cmd_import_html,
|
||
"import-zip": cmd_import_zip,
|
||
"import-disagg-history": cmd_import_disagg_history,
|
||
"download-and-import": cmd_download_and_import,
|
||
"status": cmd_status,
|
||
}
|
||
commands[args.command](args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|