Pipeline Operations
How to run the arktrace MPOL screening pipeline for each supported region, configure parameters, and interpret outputs.
If you need an operations-level summary of pipeline types (purpose, run timing, expected results), see Pipeline Catalog (Operations View).
Quick start
# Interactive — prompts for region selection, retries failed steps
uv run python scripts/run_pipeline.py
# Non-interactive — Singapore region, fails fast
uv run python scripts/run_pipeline.py --region singapore --non-interactive
Prerequisites: a valid AISSTREAM_API_KEY in .env and Python 3.12+.
Interactive operations shell
Run a menu-driven shell that lets you execute common operations jobs and prints a result summary after each run.
bash scripts/run_operations_shell.sh
Available menu jobs:
- Full Screening
- Review-Feedback Evaluation
- Historical Backtesting + Public Integration Batch
- Demo/Smoke
- SAR Feature Smoke Test (job 11)
- EO Feature Smoke Test (job 12)
- Ingest EO Detections from CSV (job 13) — load a local EO CSV into eo_detections without a GFW API token, then optionally run feature matrix + scoring to verify in the dashboard
- Ingest AIS CSV / NMEA file (job 14) — load a provider CSV or NMEA 0183 file into ais_positions
- Ingest custom feed drop-ins (job 15) — run all CSV files in _inputs/custom_feeds/ through the auto-detector
Delayed-label intelligence loop (backtracking)
Run after a confirmed label is submitted to surface precursor signals and uplift related entities.
# Full pass
uv run python scripts/run_backtracking.py
# Incremental — only labels confirmed since a checkpoint
uv run python scripts/run_backtracking.py --since 2026-04-01T00:00:00Z
See Backtracking Runbook for full options, output format, and demo scenario.
Region presets
| Region | Flag | DuckDB | Gap threshold | Feature window | Weights (A / G / I) |
|---|---|---|---|---|---|
| Singapore / Malacca | singapore |
singapore.duckdb |
6 h | 30 d | 0.40 / 0.40 / 0.20 |
| Japan Sea / DPRK | japan |
japansea.duckdb |
12 h | 60 d | 0.40 / 0.40 / 0.20 |
| Middle East | middleeast |
middleeast.duckdb |
12 h | 60 d | 0.40 / 0.40 / 0.20 |
| Europe / Baltic | europe |
europe.duckdb |
6 h | 45 d | 0.35 / 0.35 / 0.30 |
| US Gulf | gulf |
gulf.duckdb |
6 h | 14 d | 0.50 / 0.30 / 0.20 |
The preset weights are starting points. The pipeline auto-calibrates w_graph on every run via _calibrate_graph_weight(). Calling src.score.composite standalone still requires --w-graph (or the new --auto-calibrate flag). The calibrated value is printed at the end of Step 8 for reference.
Pipeline steps
The pipeline runs 10 steps in sequence. Each step prints a status line; in interactive mode, failed steps prompt retry or skip.
| # | Step | Key modules |
|---|---|---|
| 1 | Schema initialisation | src/ingest/schema.py |
| 2 | Marine Cadastre historical backfill | src/ingest/marine_cadastre.py |
| 3 | Live AIS streaming | src/ingest/ais_stream.py |
| 4 | Sanctions loading | src/ingest/sanctions.py |
| 5 | Custom feed drop-ins | src/ingest/custom_feeds.py |
| 6 | Ownership graph | src/ingest/vessel_registry.py (→ Lance Graph) + src/features/ownership_graph.py |
| 7 | Feature engineering | src/features/ais_behavior.py + identity.py + trade_mismatch.py + build_matrix.py |
| 8 | Scoring | src/score/causal_sanction.py + mpol_baseline.py + anomaly.py + composite.py + watchlist.py |
| 9 | GDELT ingestion | src/ingest/gdelt.py |
| 10 | Dashboard | src/api/main.py (uvicorn) |
Common CLI flags
Live AIS streaming (--stream-duration)
# Collect 300 seconds of live AIS before moving on
uv run python scripts/run_pipeline.py --region singapore \
--non-interactive --stream-duration 300
Without --stream-duration, non-interactive mode skips live streaming entirely. In interactive mode, streaming runs until Ctrl-C.
Historical Marine Cadastre backfill (--marine-cadastre-year)
Only available for US-covered regions (Gulf of Mexico and US coastal waters). Repeat for multiple years.
uv run python scripts/run_pipeline.py --region gulf \
--non-interactive \
--marine-cadastre-year 2022 \
--marine-cadastre-year 2023
Geopolitical rerouting filter (--geopolitical-event-filter)
Down-weights anomaly_score for vessels in declared rerouting corridors to reduce false positives. Supply the path to a JSON event file.
uv run python scripts/run_pipeline.py --region middleeast \
--non-interactive \
--geopolitical-event-filter config/geopolitical_events.json
config/geopolitical_events.json covers:
- Red Sea / Cape of Good Hope rerouting (2023-11-01 → ongoing), down_weight 0.5
- Taiwan Strait GPS spoofing zone (2024-01-01 → ongoing), down_weight 0.7
Add new events to the JSON file without code changes. Format:
{
"events": [
{
"name": "Description of event",
"active_from": "YYYY-MM-DD",
"active_to": "YYYY-MM-DD",
"corridors": [
{"lat_min": -40, "lat_max": -25, "lon_min": 10, "lon_max": 40}
],
"down_weight": 0.5
}
]
}
Dummy vessel seeding (--seed-dummy)
Injects four realistic shadow fleet vessels (PETROVSKY ZVEZDA, SARI NOUR, OCEAN VOYAGER, VERA SUNSET) into the DB after feature engineering so they appear on the dashboard during demos without requiring real AIS data.
uv run python scripts/run_pipeline.py --region singapore \
--non-interactive --seed-dummy
Custom feed drop-ins (_inputs/custom_feeds/)
Drop any CSV file into _inputs/custom_feeds/ and it is auto-detected and ingested on the next pipeline run — no code changes required. Step 5 (step_custom_feeds) runs before the ownership graph build so proprietary position and detection data is included in feature engineering.
Supported feed types
| Feed type | Required columns | Target table |
|---|---|---|
| AIS positions | mmsi (or MMSI), lat (or LAT), lon (or LON), timestamp (or BaseDateTime) |
ais_positions |
| SAR detections | lat, lon, detected_at |
sar_detections |
| Cargo manifest | reporter, partner, hs_code, period |
trade_flow |
| Custom sanctions | name, list_source |
sanctions_entities |
Detection is column-first; if columns are ambiguous, the filename prefix is used (ais_*, sar_*, cargo_*, manifest_*, sanctions_*).
Optional columns
| Feed | Optional columns | Default when absent |
|---|---|---|
| SAR | detection_id, length_m, source_scene, confidence |
UUID / null / filename stem / 1.0 |
| Cargo | trade_value_usd, route_key |
null |
| Sanctions | entity_id, mmsi, imo, flag, type |
UUID / null |
AIS column-map sidecar
For AIS feeds with non-standard column names, create a JSON sidecar alongside the CSV:
_inputs/custom_feeds/spire_feed.csv
_inputs/custom_feeds/spire_feed.columnmap.json
{"mmsi": "vessel_id", "lat": "latitude", "lon": "longitude", "timestamp": "time_utc"}
Run standalone
# Ingest all pending files
uv run python src/ingest/custom_feeds.py
# Dry-run (detect types without inserting)
uv run python src/ingest/custom_feeds.py --dry-run
# Custom directory
uv run python src/ingest/custom_feeds.py --dir /path/to/feeds
Or use the operations shell:
bash scripts/run_operations_shell.sh # → job 15: Ingest custom feed drop-ins
Running steps individually
Each module can be run standalone. Useful for re-running a single step after a failure without re-ingesting all data.
# Schema
uv run python -m src.ingest.schema --db data/processed/singapore.duckdb
# AIS feature engineering (30-day window, 6h gap threshold)
uv run python -m src.features.ais_behavior \
--db data/processed/singapore.duckdb --window 30 --gap-threshold-hours 6
# Build full feature matrix
uv run python -m src.features.build_matrix --db data/processed/singapore.duckdb
# MPOL baseline (service vessel exclusion on by default)
uv run python -m src.score.mpol_baseline --db data/processed/singapore.duckdb
# Anomaly scoring
uv run python -m src.score.anomaly --db data/processed/singapore.duckdb
# C3 causal calibration (produces causal_effects.parquet, prints calibrated w_graph)
uv run python -m src.score.causal_sanction \
--db data/processed/singapore.duckdb \
--output data/processed/singapore_causal_effects.parquet
# Composite scoring with calibrated weight
uv run python -m src.score.composite \
--db data/processed/singapore.duckdb \
--w-graph 0.52 \
--geopolitical-event-filter config/geopolitical_events.json
# Watchlist output
uv run python -m src.score.watchlist --db data/processed/singapore.duckdb
Environment variables
All configurable paths are also settable via environment variables (useful in Docker Compose):
| Variable | Default | Description |
|---|---|---|
DB_PATH |
data/processed/mpol.duckdb |
Active DuckDB path |
AISSTREAM_API_KEY |
— | Required for live AIS ingestion |
LLM_PROVIDER |
llamacpp |
LLM client: llamacpp, openai, anthropic, gemini, ollama |
LLM_BASE_URL |
— | OpenAI-compatible base URL (required for gemini, ollama, custom endpoints) |
LLM_MODEL |
— | Model identifier (e.g., gpt-4o-mini, llama3.2:3b) |
WATCHLIST_OUTPUT_PATH |
data/processed/candidate_watchlist.parquet |
Watchlist parquet output |
COMPOSITE_SCORES_PATH |
data/processed/composite_scores.parquet |
Composite scores output |
CAUSAL_EFFECTS_PATH |
data/processed/causal_effects.parquet |
C3 output |
VALIDATION_METRICS_PATH |
data/processed/validation_metrics.json |
Metrics for dashboard |
Docker Compose
# Full pipeline (non-interactive, Singapore preset)
PIPELINE_REGION=singapore docker compose run --rm pipeline
# With historical backfill
PIPELINE_REGION=gulf docker compose run --rm pipeline \
uv run python scripts/run_pipeline.py \
--region gulf --non-interactive \
--marine-cadastre-year 2023
# Dashboard only (after pipeline has run)
docker compose up dashboard
Cleared vessel feedback
When a Phase B physical inspection returns outcome = cleared, record the MMSI in the cleared_vessels table. On the next scoring cycle, the vessel will be used as a hard negative in HDBSCAN and Isolation Forest training, lowering false positive rates for similar vessels.
# Add a cleared vessel (replace values as appropriate)
duckdb data/processed/singapore.duckdb << 'SQL'
INSERT INTO cleared_vessels (mmsi, cleared_by, investigation_id, notes)
VALUES ('123456789', 'officer_kim', 'INV-2026-042',
'Boarded 2026-04-15; IMO confirmed, cargo documents valid');
SQL
Output files
| File | Description |
|---|---|
data/processed/<region>.duckdb |
All ingested and computed data for the region |
data/processed/<region>_causal_effects.parquet |
C3 causal model per-regime ATT estimates |
data/processed/composite_scores.parquet |
Full scored vessel frame (all vessels) |
data/processed/candidate_watchlist.parquet |
Top candidates, sorted by confidence |
data/processed/validation_metrics.json |
Precision@50, Recall@200, AUROC |
data/processed/review_feedback_evaluation.json |
Tier-aware metrics and threshold recommendations |
data/processed/backtracking_report.json |
Delayed-label intelligence loop report |
data/processed/backtracking_report.md |
Human-readable backtracking summary |
Troubleshooting
Step 3 (AIS stream) — 0 rows inserted
Check AISSTREAM_API_KEY in .env and verify the bounding box covers an area with vessel traffic.
Step 5 (Ownership graph) — vessel_registry fails
Run uv run python src/ingest/vessel_registry.py --db <db_path> to rebuild the Lance Graph datasets manually. Alternatively, pass --skip-graph to build_matrix.py to run without graph features (graph features default to safe values).
Step 7 (Scoring) — composite returns empty DataFrame
vessel_features is empty. Re-run step 6 (feature engineering) and confirm build_matrix.py completed without errors.
Dashboard shows no vessels
Confirm WATCHLIST_OUTPUT_PATH points to the correct parquet file and that it contains rows (polars.read_parquet(path).height > 0).