Greg 2c28ac3b0a Add Disaggregated COT data support (2019–2026)
Integrates the CFTC Disaggregated Commitments of Traders reports
(com_disagg_txt_YYYY.zip) which break positions down by Producer/Merchant,
Swap Dealers, Managed Money, and Other Reportables — a different report
type from the existing legacy COT data.

- schema.sql: add disagg_reports, disagg_positions, disagg_concentration tables
- parser.py: add DisaggPositionRow/DisaggBlock dataclasses and
  parse_disagg_csv_text()/parse_disagg_zip_file() for c_year.txt format
- importer.py: add import_disagg_block(), import_disagg_zip_file(),
  run_disagg_historical_import() for 2019–2026 yearly ZIPs
- cli.py: add import-disagg-history subcommand
- docker-compose.yaml: run import-disagg-history on startup (idempotent
  via import_log, so re-deploys skip already-imported years)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 18:11:59 +01:00

859 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CFTC COT Report Parser
Parses the fixed-width text format used by CFTC for Commitments of Traders
Long Reports. The format uses ':' as group separators within each data line.
Handles both:
- Weekly HTML files (text wrapped in <pre> tag)
- Historical ZIP files (.txt files, same format without HTML wrapper)
"""
import re
import zipfile
from dataclasses import dataclass, field
from datetime import date
from pathlib import Path
from typing import Iterator, Optional
# Regex patterns
COMMODITY_HEADER_RE = re.compile(r'^(\S.+?)\s{2,}Code-(\d+)\s*$')
DATE_RE = re.compile(
r'(January|February|March|April|May|June|July|August|September|October|November|December)'
r'\s+(\d{1,2}),\s+(\d{4})'
)
EXCHANGE_ABBR = {
'CHICAGO BOARD OF TRADE': 'CBT',
'CHICAGO MERCANTILE EXCHANGE': 'CME',
'NEW YORK MERCANTILE EXCHANGE': 'NYMEX',
'COMMODITY EXCHANGE INC': 'COMEX',
'COMMODITY EXCHANGE INC.': 'COMEX',
'ICE FUTURES U.S.': 'ICE',
'ICE FUTURES U.S': 'ICE',
'ICE FUTURES EUROPE': 'ICE-EU',
'KANSAS CITY BOARD OF TRADE': 'KCBT',
'MINNEAPOLIS GRAIN EXCHANGE': 'MGE',
}
def _parse_date(text: str) -> Optional[str]:
"""Extract ISO date string from text like 'February 17, 2026'."""
m = DATE_RE.search(text)
if not m:
return None
month, day, year = m.groups()
try:
d = date(int(year), list(['January', 'February', 'March', 'April', 'May', 'June',
'July', 'August', 'September', 'October', 'November',
'December']).index(month) + 1, int(day))
return d.isoformat()
except (ValueError, IndexError):
return None
def _nums(s: str, as_float: bool = False) -> list:
"""Parse whitespace-separated numbers from a string, stripping commas."""
clean = s.replace(',', '').strip()
if not clean:
return []
result = []
for tok in clean.split():
try:
result.append(float(tok) if as_float else int(float(tok)))
except ValueError:
pass
return result
def _parse_position_line(line: str, as_float: bool = False) -> tuple[str, list]:
"""
Parse a data line like:
'All : 544,127: 117,677 175,249 205,702 184,989 124,796 508,367 505,746: 35,760 38,381'
Returns (row_label, flat_list_of_values) where values are:
[open_interest, noncomm_long, noncomm_short, spreading,
comm_long, comm_short, total_long, total_short,
nonrept_long, nonrept_short] -- 10 values total
"""
parts = line.split(':')
label = parts[0].strip()
values = []
for part in parts[1:]:
values.extend(_nums(part, as_float=as_float))
return label, values
def _parse_trader_line(line: str) -> tuple[str, list]:
"""
Parse a traders line like:
'All : 375: 122 119 146 105 104 309 296:'
Returns (label, [total, noncomm_long, noncomm_short, spread, comm_long, comm_short, total_long, total_short])
"""
parts = line.split(':')
label = parts[0].strip()
values = []
for part in parts[1:]:
values.extend(_nums(part))
return label, values
def _parse_concentration_line(line: str) -> tuple[str, list]:
"""
Parse a concentration line like:
'All : 12.5 11.6 21.6 20.1 9.2 7.2 15.3 12.5'
Returns (label, [gross_long_4, gross_short_4, gross_long_8, gross_short_8,
net_long_4, net_short_4, net_long_8, net_short_8])
"""
# Only one colon (after label) -- but some lines may have more from header artefacts
idx = line.index(':')
label = line[:idx].strip()
values = _nums(line[idx + 1:], as_float=True)
return label, values
@dataclass
class PositionRow:
open_interest: Optional[int] = None
noncomm_long: Optional[int] = None
noncomm_short: Optional[int] = None
noncomm_spreading: Optional[int] = None
comm_long: Optional[int] = None
comm_short: Optional[int] = None
total_long: Optional[int] = None
total_short: Optional[int] = None
nonrept_long: Optional[int] = None
nonrept_short: Optional[int] = None
@dataclass
class ChangesRow:
chg_open_interest: Optional[int] = None
chg_noncomm_long: Optional[int] = None
chg_noncomm_short: Optional[int] = None
chg_noncomm_spreading: Optional[int] = None
chg_comm_long: Optional[int] = None
chg_comm_short: Optional[int] = None
chg_total_long: Optional[int] = None
chg_total_short: Optional[int] = None
chg_nonrept_long: Optional[int] = None
chg_nonrept_short: Optional[int] = None
@dataclass
class PctRow:
pct_open_interest: Optional[float] = None
pct_noncomm_long: Optional[float] = None
pct_noncomm_short: Optional[float] = None
pct_noncomm_spreading: Optional[float] = None
pct_comm_long: Optional[float] = None
pct_comm_short: Optional[float] = None
pct_total_long: Optional[float] = None
pct_total_short: Optional[float] = None
pct_nonrept_long: Optional[float] = None
pct_nonrept_short: Optional[float] = None
@dataclass
class TraderRow:
traders_total: Optional[int] = None
traders_noncomm_long: Optional[int] = None
traders_noncomm_short: Optional[int] = None
traders_noncomm_spread: Optional[int] = None
traders_comm_long: Optional[int] = None
traders_comm_short: Optional[int] = None
traders_total_long: Optional[int] = None
traders_total_short: Optional[int] = None
@dataclass
class ConcentrationRow:
conc_gross_long_4: Optional[float] = None
conc_gross_short_4: Optional[float] = None
conc_gross_long_8: Optional[float] = None
conc_gross_short_8: Optional[float] = None
conc_net_long_4: Optional[float] = None
conc_net_short_4: Optional[float] = None
conc_net_long_8: Optional[float] = None
conc_net_short_8: Optional[float] = None
@dataclass
class CommodityBlock:
cftc_code: str
name: str
exchange: str
exchange_abbr: str
contract_unit: str
report_date: str
prev_report_date: Optional[str]
positions: dict = field(default_factory=dict) # row_type -> PositionRow
changes: Optional[ChangesRow] = None
percentages: dict = field(default_factory=dict) # row_type -> PctRow
traders: dict = field(default_factory=dict) # row_type -> TraderRow
concentration: dict = field(default_factory=dict) # row_type -> ConcentrationRow
def _assign_position_values(values: list, as_float: bool = False) -> dict:
"""Map a 10-value list to position field names."""
keys = ['open_interest', 'noncomm_long', 'noncomm_short', 'noncomm_spreading',
'comm_long', 'comm_short', 'total_long', 'total_short',
'nonrept_long', 'nonrept_short']
return {k: values[i] if i < len(values) else None for i, k in enumerate(keys)}
def _parse_block(lines: list[str]) -> Optional[CommodityBlock]:
"""Parse a single commodity block into a CommodityBlock."""
if not lines:
return None
# --- Header line (line 0): NAME - EXCHANGE ... Code-XXXXXX ---
header = lines[0].strip()
m = COMMODITY_HEADER_RE.match(lines[0].rstrip())
if not m:
return None
full_name = m.group(1).strip()
cftc_code = m.group(2)
# Split "NAME - EXCHANGE" on first " - "
if ' - ' in full_name:
name, exchange = full_name.split(' - ', 1)
else:
name, exchange = full_name, ''
name = name.strip()
exchange = exchange.strip()
exchange_abbr = EXCHANGE_ABBR.get(exchange.upper(), exchange[:6].upper().replace(' ', ''))
# --- Report date line (line 1) ---
report_date = None
if len(lines) > 1:
report_date = _parse_date(lines[1])
if not report_date:
return None
contract_unit = ''
prev_report_date = None
positions: dict = {}
changes: Optional[ChangesRow] = None
percentages: dict = {}
traders: dict = {}
concentration: dict = {}
# State machine
section = 'POSITIONS'
expect_changes = False
for line in lines[2:]:
stripped = line.strip()
# Skip pure separator / empty lines
if not stripped or stripped.startswith('---') or stripped == ':':
continue
# Contract unit
if '(CONTRACTS OF' in line:
m2 = re.search(r'\(CONTRACTS OF[^)]+\)', line)
if m2:
contract_unit = m2.group(0)
continue
# Section triggers
if 'Changes in Commitments from' in line:
prev_report_date = _parse_date(line)
expect_changes = True
section = 'CHANGES'
continue
if 'Percent of Open Interest Represented' in line:
section = 'PERCENT'
expect_changes = False
continue
if '# Traders' in line or 'Number of Traders in Each Category' in line:
section = 'TRADERS'
expect_changes = False
continue
if 'Percent of Open Interest Held by' in line:
section = 'CONCENTRATION'
expect_changes = False
continue
# Skip other header/label-only lines
if ':' not in line:
continue
label_part = line.split(':')[0].strip()
if section == 'POSITIONS' or section == 'PERCENT':
if label_part not in ('All', 'Old', 'Other'):
continue
if section == 'POSITIONS':
_, vals = _parse_position_line(line, as_float=False)
if len(vals) >= 1:
d = _assign_position_values(vals)
positions[label_part] = PositionRow(**d)
else:
_, vals = _parse_position_line(line, as_float=True)
if len(vals) >= 1:
keys = ['pct_open_interest', 'pct_noncomm_long', 'pct_noncomm_short',
'pct_noncomm_spreading', 'pct_comm_long', 'pct_comm_short',
'pct_total_long', 'pct_total_short', 'pct_nonrept_long', 'pct_nonrept_short']
d = {k: vals[i] if i < len(vals) else None for i, k in enumerate(keys)}
percentages[label_part] = PctRow(**d)
elif section == 'CHANGES':
# Changes row has blank label
if label_part == '' or label_part == ':':
_, vals = _parse_position_line(line, as_float=False)
if len(vals) >= 1:
keys = ['chg_open_interest', 'chg_noncomm_long', 'chg_noncomm_short',
'chg_noncomm_spreading', 'chg_comm_long', 'chg_comm_short',
'chg_total_long', 'chg_total_short', 'chg_nonrept_long', 'chg_nonrept_short']
d = {k: vals[i] if i < len(vals) else None for i, k in enumerate(keys)}
changes = ChangesRow(**d)
section = 'CHANGES_DONE'
elif section == 'TRADERS':
if label_part not in ('All', 'Old', 'Other'):
continue
_, vals = _parse_trader_line(line)
if len(vals) >= 1:
keys = ['traders_total', 'traders_noncomm_long', 'traders_noncomm_short',
'traders_noncomm_spread', 'traders_comm_long', 'traders_comm_short',
'traders_total_long', 'traders_total_short']
d = {k: vals[i] if i < len(vals) else None for i, k in enumerate(keys)}
traders[label_part] = TraderRow(**d)
elif section == 'CONCENTRATION':
if label_part not in ('All', 'Old', 'Other'):
continue
_, vals = _parse_concentration_line(line)
if len(vals) >= 8:
concentration[label_part] = ConcentrationRow(
conc_gross_long_4=vals[0],
conc_gross_short_4=vals[1],
conc_gross_long_8=vals[2],
conc_gross_short_8=vals[3],
conc_net_long_4=vals[4],
conc_net_short_4=vals[5],
conc_net_long_8=vals[6],
conc_net_short_8=vals[7],
)
if not positions:
return None
return CommodityBlock(
cftc_code=cftc_code,
name=name,
exchange=exchange,
exchange_abbr=exchange_abbr,
contract_unit=contract_unit,
report_date=report_date,
prev_report_date=prev_report_date,
positions=positions,
changes=changes,
percentages=percentages,
traders=traders,
concentration=concentration,
)
def parse_text_blocks(text: str) -> Iterator[CommodityBlock]:
"""
Split raw fixed-width text into commodity blocks and parse each one.
Each block starts with a line matching the commodity header pattern.
"""
lines = text.splitlines()
block_lines: list[str] = []
for line in lines:
if COMMODITY_HEADER_RE.match(line.rstrip()):
if block_lines:
block = _parse_block(block_lines)
if block:
yield block
block_lines = [line]
else:
block_lines.append(line)
if block_lines:
block = _parse_block(block_lines)
if block:
yield block
def extract_text_from_html(html: str) -> str:
"""Extract raw text content from the <pre> block in a CFTC HTML file."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
pre = soup.find('pre')
if pre:
return pre.get_text()
# Fallback: strip HTML tags
return re.sub(r'<[^>]+>', '', html)
def parse_html_file(path: str) -> Iterator[CommodityBlock]:
"""Parse a weekly HTML file downloaded from CFTC."""
content = Path(path).read_text(encoding='latin-1')
text = extract_text_from_html(content)
yield from parse_text_blocks(text)
def parse_zip_file(zip_path: str) -> Iterator[CommodityBlock]:
"""
Parse a historical CFTC ZIP archive.
Annual/historical ZIPs contain a CSV file ('annualof.txt' or similar).
Falls back to fixed-width text parsing if no CSV header detected.
"""
with zipfile.ZipFile(zip_path) as zf:
txt_files = [n for n in zf.namelist() if n.lower().endswith('.txt')]
for fname in txt_files:
with zf.open(fname) as f:
text = f.read().decode('latin-1')
# Detect CSV by checking for quoted header on first line
if text.lstrip().startswith('"Market'):
yield from parse_csv_text(text)
else:
yield from parse_text_blocks(text)
# ── CSV format (historical annual ZIPs) ────────────────────────────────────
# Map CSV column name suffixes to our field names
_POS_FIELDS = {
'Open Interest': 'open_interest',
'Noncommercial Positions-Long': 'noncomm_long',
'Noncommercial Positions-Short': 'noncomm_short',
'Noncommercial Positions-Spreading': 'noncomm_spreading',
'Commercial Positions-Long': 'comm_long',
'Commercial Positions-Short': 'comm_short',
'Total Reportable Positions-Long': 'total_long',
'Total Reportable Positions-Short': 'total_short',
'Nonreportable Positions-Long': 'nonrept_long',
'Nonreportable Positions-Short': 'nonrept_short',
}
_CHG_FIELDS = {
'Change in Open Interest': 'chg_open_interest',
'Change in Noncommercial-Long': 'chg_noncomm_long',
'Change in Noncommercial-Short': 'chg_noncomm_short',
'Change in Noncommercial-Spreading': 'chg_noncomm_spreading',
'Change in Commercial-Long': 'chg_comm_long',
'Change in Commercial-Short': 'chg_comm_short',
'Change in Total Reportable-Long': 'chg_total_long',
'Change in Total Reportable-Short': 'chg_total_short',
'Change in Nonreportable-Long': 'chg_nonrept_long',
'Change in Nonreportable-Short': 'chg_nonrept_short',
}
_PCT_FIELDS = {
'% of Open Interest (OI)': 'pct_open_interest',
'% of OI-Noncommercial-Long': 'pct_noncomm_long',
'% of OI-Noncommercial-Short': 'pct_noncomm_short',
'% of OI-Noncommercial-Spreading': 'pct_noncomm_spreading',
'% of OI-Commercial-Long': 'pct_comm_long',
'% of OI-Commercial-Short': 'pct_comm_short',
'% of OI-Total Reportable-Long': 'pct_total_long',
'% of OI-Total Reportable-Short': 'pct_total_short',
'% of OI-Nonreportable-Long': 'pct_nonrept_long',
'% of OI-Nonreportable-Short': 'pct_nonrept_short',
}
_TRD_FIELDS = {
'Traders-Total': 'traders_total',
'Traders-Noncommercial-Long': 'traders_noncomm_long',
'Traders-Noncommercial-Short':'traders_noncomm_short',
'Traders-Noncommercial-Spreading': 'traders_noncomm_spread',
'Traders-Commercial-Long': 'traders_comm_long',
'Traders-Commercial-Short': 'traders_comm_short',
'Traders-Total Reportable-Long': 'traders_total_long',
'Traders-Total Reportable-Short': 'traders_total_short',
}
_CONC_FIELDS = {} # populated dynamically — column names are inconsistent
def _csv_val(row: dict, key: str, as_float: bool = False):
"""Get a value from a CSV row by key prefix match, stripping whitespace."""
# Try exact key first, then strip leading/trailing spaces from all keys
for k, v in row.items():
if k.strip() == key.strip():
v = v.strip()
if not v:
return None
try:
return float(v) if as_float else int(float(v))
except ValueError:
return None
return None
def _build_position_row_from_csv(row: dict, suffix: str) -> PositionRow:
kwargs = {}
for prefix, field in _POS_FIELDS.items():
col = f'{prefix} ({suffix})'
# open_interest has slightly different format for Old/Other
kwargs[field] = _csv_val(row, col)
return PositionRow(**kwargs)
def _build_changes_from_csv(row: dict) -> ChangesRow:
kwargs = {}
for prefix, field in _CHG_FIELDS.items():
kwargs[field] = _csv_val(row, f'{prefix} (All)')
return ChangesRow(**kwargs)
def _build_pct_row_from_csv(row: dict, suffix: str) -> PctRow:
kwargs = {}
for prefix, field in _PCT_FIELDS.items():
# Percent columns have slightly inconsistent naming between All and Old/Other
col_all = f'{prefix} (OI) ({suffix})' if '% of Open Interest' in prefix else f'{prefix} ({suffix})'
val = _csv_val(row, f'{prefix} ({suffix})', as_float=True)
if val is None:
# Try alternate form
val = _csv_val(row, f'{prefix}(OI) ({suffix})', as_float=True)
kwargs[field] = val
return PctRow(**kwargs)
def _build_trader_row_from_csv(row: dict, suffix: str) -> TraderRow:
kwargs = {}
for prefix, field in _TRD_FIELDS.items():
kwargs[field] = _csv_val(row, f'{prefix} ({suffix})')
return TraderRow(**kwargs)
def _build_concentration_from_csv(row: dict, suffix: str) -> ConcentrationRow:
"""
Concentration columns have inconsistent spacing in CFTC CSVs, e.g.:
'Concentration-Gross LT = 4 TDR-Long (All)'
'Concentration-Gross LT =4 TDR-Short (All)'
Match by normalizing whitespace.
"""
import re as _re
def _norm(s: str) -> str:
return _re.sub(r'\s+', '', s).lower()
# Build a normalized lookup for this row
norm_row = {_norm(k): v for k, v in row.items()}
suf = suffix.lower()
def _get(pattern: str):
key = _norm(pattern + f'({suffix})')
v = norm_row.get(key, '').strip()
if not v:
return None
try:
return float(v)
except ValueError:
return None
return ConcentrationRow(
conc_gross_long_4=_get('Concentration-Gross LT =4 TDR-Long '),
conc_gross_short_4=_get('Concentration-Gross LT =4 TDR-Short '),
conc_gross_long_8=_get('Concentration-Gross LT =8 TDR-Long '),
conc_gross_short_8=_get('Concentration-Gross LT =8 TDR-Short '),
conc_net_long_4=_get('Concentration-Net LT =4 TDR-Long '),
conc_net_short_4=_get('Concentration-Net LT =4 TDR-Short '),
conc_net_long_8=_get('Concentration-Net LT =8 TDR-Long '),
conc_net_short_8=_get('Concentration-Net LT =8 TDR-Short '),
)
def _csv_row_to_block(row: dict) -> Optional[CommodityBlock]:
"""Convert one CSV row (= one commodity × one date) to a CommodityBlock."""
import csv as _csv
full_name = row.get('Market and Exchange Names', '').strip()
report_date = row.get('As of Date in Form YYYY-MM-DD', '').strip()
cftc_code = row.get('CFTC Contract Market Code', '').strip()
if not full_name or not report_date or not cftc_code:
return None
if ' - ' in full_name:
name, exchange = full_name.split(' - ', 1)
else:
name, exchange = full_name, ''
name = name.strip()
exchange = exchange.strip()
exchange_abbr = EXCHANGE_ABBR.get(exchange.upper(),
exchange[:6].upper().replace(' ', ''))
positions = {}
percentages = {}
traders = {}
concentration = {}
for suffix, label in [('All', 'All'), ('Old', 'Old'), ('Other', 'Other')]:
positions[label] = _build_position_row_from_csv(row, suffix)
percentages[label] = _build_pct_row_from_csv(row, suffix)
traders[label] = _build_trader_row_from_csv(row, suffix)
concentration[label] = _build_concentration_from_csv(row, suffix)
changes = _build_changes_from_csv(row)
return CommodityBlock(
cftc_code=cftc_code,
name=name,
exchange=exchange,
exchange_abbr=exchange_abbr,
contract_unit='',
report_date=report_date,
prev_report_date=None,
positions=positions,
changes=changes,
percentages=percentages,
traders=traders,
concentration=concentration,
)
def parse_csv_text(text: str) -> Iterator[CommodityBlock]:
"""Parse a CFTC historical CSV file (annualof.txt format)."""
import csv as _csv
reader = _csv.DictReader(text.splitlines())
for row in reader:
block = _csv_row_to_block(row)
if block:
yield block
# ── Disaggregated COT format (com_disagg_txt_YYYY.zip / c_year.txt) ─────────
@dataclass
class DisaggPositionRow:
row_type: str
open_interest: Optional[int] = None
prod_merc_long: Optional[int] = None
prod_merc_short: Optional[int] = None
swap_long: Optional[int] = None
swap_short: Optional[int] = None
swap_spread: Optional[int] = None
m_money_long: Optional[int] = None
m_money_short: Optional[int] = None
m_money_spread: Optional[int] = None
other_rept_long: Optional[int] = None
other_rept_short: Optional[int] = None
other_rept_spread: Optional[int] = None
tot_rept_long: Optional[int] = None
tot_rept_short: Optional[int] = None
nonrept_long: Optional[int] = None
nonrept_short: Optional[int] = None
chg_open_interest: Optional[int] = None
chg_prod_merc_long: Optional[int] = None
chg_prod_merc_short: Optional[int] = None
chg_swap_long: Optional[int] = None
chg_swap_short: Optional[int] = None
chg_swap_spread: Optional[int] = None
chg_m_money_long: Optional[int] = None
chg_m_money_short: Optional[int] = None
chg_m_money_spread: Optional[int] = None
chg_other_rept_long: Optional[int] = None
chg_other_rept_short: Optional[int] = None
chg_other_rept_spread: Optional[int] = None
chg_tot_rept_long: Optional[int] = None
chg_tot_rept_short: Optional[int] = None
chg_nonrept_long: Optional[int] = None
chg_nonrept_short: Optional[int] = None
pct_open_interest: Optional[float] = None
pct_prod_merc_long: Optional[float] = None
pct_prod_merc_short: Optional[float] = None
pct_swap_long: Optional[float] = None
pct_swap_short: Optional[float] = None
pct_swap_spread: Optional[float] = None
pct_m_money_long: Optional[float] = None
pct_m_money_short: Optional[float] = None
pct_m_money_spread: Optional[float] = None
pct_other_rept_long: Optional[float] = None
pct_other_rept_short: Optional[float] = None
pct_other_rept_spread: Optional[float] = None
pct_tot_rept_long: Optional[float] = None
pct_tot_rept_short: Optional[float] = None
pct_nonrept_long: Optional[float] = None
pct_nonrept_short: Optional[float] = None
traders_total: Optional[int] = None
traders_prod_merc_long: Optional[int] = None
traders_prod_merc_short: Optional[int] = None
traders_swap_long: Optional[int] = None
traders_swap_short: Optional[int] = None
traders_swap_spread: Optional[int] = None
traders_m_money_long: Optional[int] = None
traders_m_money_short: Optional[int] = None
traders_m_money_spread: Optional[int] = None
traders_other_rept_long: Optional[int] = None
traders_other_rept_short: Optional[int] = None
traders_other_rept_spread: Optional[int] = None
traders_tot_rept_long: Optional[int] = None
traders_tot_rept_short: Optional[int] = None
@dataclass
class DisaggBlock:
cftc_code: str
name: str
exchange: str
exchange_abbr: str
contract_unit: str
report_date: str
rows: list # list of DisaggPositionRow (one per row_type)
concentration: dict # row_type -> ConcentrationRow
def _di(row: dict, col: str) -> Optional[int]:
v = row.get(col, '').strip()
if not v:
return None
try:
return int(float(v))
except ValueError:
return None
def _df(row: dict, col: str) -> Optional[float]:
v = row.get(col, '').strip()
if not v:
return None
try:
return float(v)
except ValueError:
return None
def _parse_disagg_position_row(csv_row: dict, sfx: str) -> DisaggPositionRow:
pos = DisaggPositionRow(row_type=sfx)
pos.open_interest = _di(csv_row, f'Open_Interest_{sfx}')
pos.prod_merc_long = _di(csv_row, f'Prod_Merc_Positions_Long_{sfx}')
pos.prod_merc_short = _di(csv_row, f'Prod_Merc_Positions_Short_{sfx}')
# NB: CFTC uses double underscore for Swap Short/Spread columns
pos.swap_long = _di(csv_row, f'Swap_Positions_Long_{sfx}')
pos.swap_short = _di(csv_row, f'Swap__Positions_Short_{sfx}')
pos.swap_spread = _di(csv_row, f'Swap__Positions_Spread_{sfx}')
pos.m_money_long = _di(csv_row, f'M_Money_Positions_Long_{sfx}')
pos.m_money_short = _di(csv_row, f'M_Money_Positions_Short_{sfx}')
pos.m_money_spread = _di(csv_row, f'M_Money_Positions_Spread_{sfx}')
pos.other_rept_long = _di(csv_row, f'Other_Rept_Positions_Long_{sfx}')
pos.other_rept_short = _di(csv_row, f'Other_Rept_Positions_Short_{sfx}')
pos.other_rept_spread= _di(csv_row, f'Other_Rept_Positions_Spread_{sfx}')
pos.tot_rept_long = _di(csv_row, f'Tot_Rept_Positions_Long_{sfx}')
pos.tot_rept_short = _di(csv_row, f'Tot_Rept_Positions_Short_{sfx}')
pos.nonrept_long = _di(csv_row, f'NonRept_Positions_Long_{sfx}')
pos.nonrept_short = _di(csv_row, f'NonRept_Positions_Short_{sfx}')
if sfx == 'All':
pos.chg_open_interest = _di(csv_row, 'Change_in_Open_Interest_All')
pos.chg_prod_merc_long = _di(csv_row, 'Change_in_Prod_Merc_Long_All')
pos.chg_prod_merc_short = _di(csv_row, 'Change_in_Prod_Merc_Short_All')
pos.chg_swap_long = _di(csv_row, 'Change_in_Swap_Long_All')
pos.chg_swap_short = _di(csv_row, 'Change_in_Swap_Short_All')
pos.chg_swap_spread = _di(csv_row, 'Change_in_Swap_Spread_All')
pos.chg_m_money_long = _di(csv_row, 'Change_in_M_Money_Long_All')
pos.chg_m_money_short = _di(csv_row, 'Change_in_M_Money_Short_All')
pos.chg_m_money_spread = _di(csv_row, 'Change_in_M_Money_Spread_All')
pos.chg_other_rept_long = _di(csv_row, 'Change_in_Other_Rept_Long_All')
pos.chg_other_rept_short = _di(csv_row, 'Change_in_Other_Rept_Short_All')
pos.chg_other_rept_spread= _di(csv_row, 'Change_in_Other_Rept_Spread_All')
pos.chg_tot_rept_long = _di(csv_row, 'Change_in_Tot_Rept_Long_All')
pos.chg_tot_rept_short = _di(csv_row, 'Change_in_Tot_Rept_Short_All')
pos.chg_nonrept_long = _di(csv_row, 'Change_in_NonRept_Long_All')
pos.chg_nonrept_short = _di(csv_row, 'Change_in_NonRept_Short_All')
pos.pct_open_interest = _df(csv_row, f'Pct_of_Open_Interest_{sfx}')
pos.pct_prod_merc_long = _df(csv_row, f'Pct_of_OI_Prod_Merc_Long_{sfx}')
pos.pct_prod_merc_short = _df(csv_row, f'Pct_of_OI_Prod_Merc_Short_{sfx}')
pos.pct_swap_long = _df(csv_row, f'Pct_of_OI_Swap_Long_{sfx}')
pos.pct_swap_short = _df(csv_row, f'Pct_of_OI_Swap_Short_{sfx}')
pos.pct_swap_spread = _df(csv_row, f'Pct_of_OI_Swap_Spread_{sfx}')
pos.pct_m_money_long = _df(csv_row, f'Pct_of_OI_M_Money_Long_{sfx}')
pos.pct_m_money_short = _df(csv_row, f'Pct_of_OI_M_Money_Short_{sfx}')
pos.pct_m_money_spread = _df(csv_row, f'Pct_of_OI_M_Money_Spread_{sfx}')
pos.pct_other_rept_long = _df(csv_row, f'Pct_of_OI_Other_Rept_Long_{sfx}')
pos.pct_other_rept_short = _df(csv_row, f'Pct_of_OI_Other_Rept_Short_{sfx}')
pos.pct_other_rept_spread= _df(csv_row, f'Pct_of_OI_Other_Rept_Spread_{sfx}')
pos.pct_tot_rept_long = _df(csv_row, f'Pct_of_OI_Tot_Rept_Long_{sfx}')
pos.pct_tot_rept_short = _df(csv_row, f'Pct_of_OI_Tot_Rept_Short_{sfx}')
pos.pct_nonrept_long = _df(csv_row, f'Pct_of_OI_NonRept_Long_{sfx}')
pos.pct_nonrept_short = _df(csv_row, f'Pct_of_OI_NonRept_Short_{sfx}')
pos.traders_total = _di(csv_row, f'Traders_Tot_{sfx}')
pos.traders_prod_merc_long = _di(csv_row, f'Traders_Prod_Merc_Long_{sfx}')
pos.traders_prod_merc_short = _di(csv_row, f'Traders_Prod_Merc_Short_{sfx}')
pos.traders_swap_long = _di(csv_row, f'Traders_Swap_Long_{sfx}')
pos.traders_swap_short = _di(csv_row, f'Traders_Swap_Short_{sfx}')
pos.traders_swap_spread = _di(csv_row, f'Traders_Swap_Spread_{sfx}')
pos.traders_m_money_long = _di(csv_row, f'Traders_M_Money_Long_{sfx}')
pos.traders_m_money_short = _di(csv_row, f'Traders_M_Money_Short_{sfx}')
pos.traders_m_money_spread = _di(csv_row, f'Traders_M_Money_Spread_{sfx}')
pos.traders_other_rept_long = _di(csv_row, f'Traders_Other_Rept_Long_{sfx}')
pos.traders_other_rept_short = _di(csv_row, f'Traders_Other_Rept_Short_{sfx}')
pos.traders_other_rept_spread= _di(csv_row, f'Traders_Other_Rept_Spread_{sfx}')
pos.traders_tot_rept_long = _di(csv_row, f'Traders_Tot_Rept_Long_{sfx}')
pos.traders_tot_rept_short = _di(csv_row, f'Traders_Tot_Rept_Short_{sfx}')
return pos
def _parse_disagg_concentration(csv_row: dict, sfx: str) -> ConcentrationRow:
return ConcentrationRow(
conc_gross_long_4 =_df(csv_row, f'Conc_Gross_LE_4_TDR_Long_{sfx}'),
conc_gross_short_4=_df(csv_row, f'Conc_Gross_LE_4_TDR_Short_{sfx}'),
conc_gross_long_8 =_df(csv_row, f'Conc_Gross_LE_8_TDR_Long_{sfx}'),
conc_gross_short_8=_df(csv_row, f'Conc_Gross_LE_8_TDR_Short_{sfx}'),
conc_net_long_4 =_df(csv_row, f'Conc_Net_LE_4_TDR_Long_{sfx}'),
conc_net_short_4 =_df(csv_row, f'Conc_Net_LE_4_TDR_Short_{sfx}'),
conc_net_long_8 =_df(csv_row, f'Conc_Net_LE_8_TDR_Long_{sfx}'),
conc_net_short_8 =_df(csv_row, f'Conc_Net_LE_8_TDR_Short_{sfx}'),
)
def _disagg_csv_row_to_block(csv_row: dict) -> Optional[DisaggBlock]:
full_name = csv_row.get('Market_and_Exchange_Names', '').strip()
report_date = csv_row.get('Report_Date_as_YYYY-MM-DD', '').strip()
cftc_code = csv_row.get('CFTC_Contract_Market_Code', '').strip()
contract_unit= csv_row.get('Contract_Units', '').strip()
if not full_name or not report_date or not cftc_code:
return None
if ' - ' in full_name:
name, exchange = full_name.split(' - ', 1)
else:
name, exchange = full_name, ''
name = name.strip()
exchange = exchange.strip()
exchange_abbr = EXCHANGE_ABBR.get(exchange.upper(),
exchange[:6].upper().replace(' ', ''))
rows = [_parse_disagg_position_row(csv_row, sfx) for sfx in ('All', 'Old', 'Other')]
concentration = {sfx: _parse_disagg_concentration(csv_row, sfx)
for sfx in ('All', 'Old', 'Other')}
return DisaggBlock(
cftc_code=cftc_code,
name=name,
exchange=exchange,
exchange_abbr=exchange_abbr,
contract_unit=contract_unit,
report_date=report_date,
rows=rows,
concentration=concentration,
)
def parse_disagg_csv_text(text: str) -> Iterator[DisaggBlock]:
"""Parse a CFTC Disaggregated COT CSV file (c_year.txt format)."""
import csv as _csv
reader = _csv.DictReader(text.splitlines())
for row in reader:
block = _disagg_csv_row_to_block(row)
if block:
yield block
def parse_disagg_zip_file(zip_path: str) -> Iterator[DisaggBlock]:
"""Parse a CFTC Disaggregated COT ZIP archive (com_disagg_txt_YYYY.zip)."""
with zipfile.ZipFile(zip_path) as zf:
txt_files = [n for n in zf.namelist() if n.lower().endswith('.txt')]
for fname in txt_files:
with zf.open(fname) as f:
text = f.read().decode('latin-1')
yield from parse_disagg_csv_text(text)