Learning ObjectivesBy the end of this chapter, you will be able to:
- Understand different data sources for NFL and college football
- Set up data storage and management systems
- Work with APIs and web scraping techniques
- Implement data versioning and backup strategies
- Design efficient data pipelines for football analytics
Introduction
Modern football analytics depends on having reliable access to high-quality data. Whether you're analyzing play-by-play data, player tracking information, or scouting reports, the foundation of any analytics project is a well-designed data infrastructure.
This chapter explores the complete data ecosystem for football analytics—from where data comes from to how it should be stored, managed, and accessed. We'll cover both free and commercial data sources, discuss best practices for data management, and build practical data pipelines that you can use in your own projects.
What is Data Infrastructure?
Data infrastructure encompasses all the systems, processes, and tools used to acquire, store, manage, and deliver data to end users. For football analytics, this includes data sources (APIs, web scraping), storage systems (files, databases), pipelines (automated data collection), and quality control mechanisms.The Football Data Landscape
Types of Football Data
Football data comes in many forms, each serving different analytical purposes:
Play-by-Play Data: The foundation of most football analytics
- Every play from every game
- Down, distance, field position
- Play outcomes, yards gained
- Pre-snap formations and personnel
Player Tracking Data: Detailed movement information
- X, Y coordinates of all 22 players
- Speed, acceleration, direction
- Available through NFL Next Gen Stats
- Limited historical availability
Scouting Data: Qualitative evaluations
- Player grades and rankings
- Formation identification
- Route concepts
- Available from services like Pro Football Focus
Contextual Data: Supporting information
- Weather conditions
- Injury reports
- Betting lines and spreads
- Referee assignments
Data Quality Considerations
Not all football data is created equal. When evaluating data sources, consider:
- Accuracy: How reliable is the data?
- Completeness: Are there missing values or games?
- Timeliness: How quickly is data updated?
- Granularity: What level of detail is provided?
- Cost: Free vs. commercial sources
- Documentation: Is the data well-documented?
The nflverse Ecosystem
The nflverse is a collection of R and Python packages that provide free, well-maintained access to NFL data. It has become the gold standard for open-source NFL analytics.
Core nflverse Packages
nflfastR (R) / nfl_data_py (Python)
- Play-by-play data from 1999-present
- Advanced metrics pre-calculated (EPA, WP, etc.)
- Weekly roster data
- Team schedules and results
nflreadr (R)
- Efficient data loading with caching
- Access to supplementary datasets
- Player statistics and IDs
- Draft picks and combine data
nflplotR (R) / nfl_data_py (Python)
- NFL team logos and colors
- Player headshots
- Visualization helpers
Installing nflverse Packages
#| eval: false
#| echo: true
# Install nflverse packages
install.packages("nflreadr")
install.packages("nflfastR")
install.packages("nflplotR")
# Data manipulation and visualization
install.packages("tidyverse")
install.packages("arrow") # For Parquet files
install.packages("DBI") # For databases
install.packages("RSQLite") # SQLite database
#| eval: false
#| echo: true
# Install core packages
pip install nfl-data-py pandas numpy
# Storage and database packages
pip install pyarrow fastparquet sqlalchemy
# Additional utilities
pip install requests beautifulsoup4 tqdm
Loading Play-by-Play Data
Let's load NFL play-by-play data using nflverse:
#| label: load-pbp-r
#| message: false
#| warning: false
#| cache: true
library(nflreadr)
library(tidyverse)
# Load single season
pbp_2023 <- load_pbp(2023)
# Load multiple seasons
pbp_multi <- load_pbp(2021:2023)
# Display basic info
cat("2023 season: ", nrow(pbp_2023), "plays,", ncol(pbp_2023), "columns\n")
cat("Multi-season:", nrow(pbp_multi), "plays\n")
# Sample data
pbp_2023 %>%
select(game_id, week, posteam, defteam, desc, epa, wpa) %>%
slice(1:5) %>%
print()
#| label: load-pbp-py
#| message: false
#| warning: false
#| cache: true
import nfl_data_py as nfl
import pandas as pd
# Load single season
pbp_2023 = nfl.import_pbp_data([2023])
# Load multiple seasons
pbp_multi = nfl.import_pbp_data([2021, 2022, 2023])
# Display basic info
print(f"2023 season: {len(pbp_2023):,} plays, {len(pbp_2023.columns)} columns")
print(f"Multi-season: {len(pbp_multi):,} plays\n")
# Sample data
print(pbp_2023[['game_id', 'week', 'posteam', 'defteam',
'desc', 'epa', 'wpa']].head())
Caching for Efficiency
Both nflfastR and nfl_data_py cache data locally after the first download. This means subsequent loads are nearly instantaneous. The cache is automatically updated when new data becomes available.Player and Roster Data
#| label: roster-data-r
#| message: false
#| warning: false
#| cache: true
# Load roster data
rosters <- load_rosters(2023)
# Load player statistics
player_stats <- load_player_stats(2023)
# Load draft picks
draft_picks <- load_draft_picks(2023)
# Example: Find all quarterbacks
qbs <- rosters %>%
filter(position == "QB") %>%
select(full_name, team, position, depth_chart_position, years_exp)
cat("Found", nrow(qbs), "quarterbacks in 2023\n")
head(qbs, 5)
#| label: roster-data-py
#| message: false
#| warning: false
#| cache: true
# Load roster data
rosters = nfl.import_rosters([2023])
# Load seasonal statistics
player_stats = nfl.import_seasonal_data([2023])
# Load draft picks
draft_picks = nfl.import_draft_picks([2023])
# Example: Find all quarterbacks
qbs = rosters[rosters['position'] == 'QB'][
['full_name', 'team', 'position', 'depth_chart_position', 'years_exp']
]
print(f"Found {len(qbs)} quarterbacks in 2023\n")
print(qbs.head())
NFL Official Data Sources
Next Gen Stats (NGS)
Next Gen Stats provides player tracking data captured at 10 times per second. While the complete dataset is not publicly available, summary statistics are accessible through the NFL website and API.
Available NGS Metrics:
- Average separation for receivers
- Time to throw for quarterbacks
- Completion probability
- Average cushion for defensive backs
- Rushing yards over expected
NFL API
The NFL maintains various APIs for different data types. While not officially documented, these endpoints are accessible:
#| eval: false
#| echo: true
library(httr)
library(jsonlite)
# Example: Fetch current week scores
url <- "https://site.api.espn.com/apis/site/v2/sports/football/nfl/scoreboard"
response <- GET(url)
data <- content(response, as = "text") %>%
fromJSON()
# Extract game information
games <- data$events
#| eval: false
#| echo: true
import requests
import json
# Example: Fetch current week scores
url = "https://site.api.espn.com/apis/site/v2/sports/football/nfl/scoreboard"
response = requests.get(url)
data = response.json()
# Extract game information
games = data['events']
API Rate Limits
When using unofficial APIs, be respectful of rate limits. Implement delays between requests and cache data locally to avoid overwhelming servers. Excessive requests may result in your IP being blocked.College Football Data
cfbfastR
The cfbfastR package provides play-by-play data for college football, similar to nflfastR for the NFL:
#| eval: false
#| echo: true
# Install cfbfastR
install.packages("cfbfastR")
library(cfbfastR)
# Load college play-by-play data
cfb_pbp <- load_cfb_pbp(2023)
# Load team information
teams <- cfbd_team_info(year = 2023)
# Load recruiting rankings
recruiting <- cfbd_recruiting_player(2023)
#| eval: false
#| echo: true
# Install collegefootballdata
pip install cfbd
import cfbd
from cfbd.rest import ApiException
# Configure API
configuration = cfbd.Configuration()
configuration.api_key['Authorization'] = 'YOUR_API_KEY'
api_instance = cfbd.GamesApi(cfbd.ApiClient(configuration))
# Get games
games = api_instance.get_games(year=2023)
ESPN College Football API
ESPN provides extensive college football data through their APIs:
#| eval: false
#| echo: true
library(httr)
library(jsonlite)
# Fetch college football scoreboard
cfb_url <- "https://site.api.espn.com/apis/site/v2/sports/football/college-football/scoreboard"
response <- GET(cfb_url)
cfb_data <- content(response, as = "text") %>%
fromJSON()
# Get team rankings
rankings_url <- "https://site.api.espn.com/apis/site/v2/sports/football/college-football/rankings"
rankings <- GET(rankings_url) %>%
content(as = "text") %>%
fromJSON()
#| eval: false
#| echo: true
import requests
# Fetch college football scoreboard
cfb_url = "https://site.api.espn.com/apis/site/v2/sports/football/college-football/scoreboard"
response = requests.get(cfb_url)
cfb_data = response.json()
# Get team rankings
rankings_url = "https://site.api.espn.com/apis/site/v2/sports/football/college-football/rankings"
rankings = requests.get(rankings_url).json()
Data Storage Formats
Choosing the right storage format is crucial for performance, scalability, and collaboration. Let's compare common formats:
Format Comparison
| Format | Size | Speed | Portability | Use Case |
|---|---|---|---|---|
| CSV | Large | Slow | Excellent | Sharing, simple data |
| Parquet | Small | Fast | Good | Large datasets, analytics |
| RDS/Pickle | Medium | Fast | Language-specific | R/Python objects |
| SQLite | Medium | Medium | Excellent | Relational queries |
| PostgreSQL | Varies | Fast | Server-based | Multi-user, large scale |
CSV Files
CSV (Comma-Separated Values) is the most portable format but least efficient:
#| eval: false
#| echo: true
# Save to CSV
write_csv(pbp_2023, "data/pbp_2023.csv")
# Load from CSV
pbp_loaded <- read_csv("data/pbp_2023.csv")
# Compressed CSV
write_csv(pbp_2023, "data/pbp_2023.csv.gz")
#| eval: false
#| echo: true
# Save to CSV
pbp_2023.to_csv("data/pbp_2023.csv", index=False)
# Load from CSV
pbp_loaded = pd.read_csv("data/pbp_2023.csv")
# Compressed CSV
pbp_2023.to_csv("data/pbp_2023.csv.gz",
index=False, compression='gzip')
Parquet Files
Parquet is a columnar storage format that offers excellent compression and fast read times:
#| eval: false
#| echo: true
library(arrow)
# Save to Parquet
write_parquet(pbp_2023, "data/pbp_2023.parquet")
# Load from Parquet
pbp_loaded <- read_parquet("data/pbp_2023.parquet")
# Partitioned Parquet (by season)
pbp_multi %>%
group_by(season) %>%
write_dataset("data/pbp_partitioned", format = "parquet")
# Read partitioned data
pbp_all <- open_dataset("data/pbp_partitioned") %>%
collect()
#| eval: false
#| echo: true
import pyarrow.parquet as pq
import pyarrow as pa
# Save to Parquet
pbp_2023.to_parquet("data/pbp_2023.parquet")
# Load from Parquet
pbp_loaded = pd.read_parquet("data/pbp_2023.parquet")
# Partitioned Parquet (by season)
pbp_multi.to_parquet("data/pbp_partitioned",
partition_cols=['season'],
engine='pyarrow')
# Read partitioned data
pbp_all = pd.read_parquet("data/pbp_partitioned")
Storage Benchmark
Let's compare storage formats using actual NFL data:
#| label: storage-benchmark-r
#| message: false
#| warning: false
#| cache: true
library(nflreadr)
library(arrow)
library(tictoc)
# Load sample data
pbp <- load_pbp(2023)
# Create temporary directory
temp_dir <- tempdir()
# Benchmark CSV
tic("CSV Write")
write_csv(pbp, file.path(temp_dir, "test.csv"))
csv_time_write <- toc(quiet = TRUE)
tic("CSV Read")
csv_data <- read_csv(file.path(temp_dir, "test.csv"), show_col_types = FALSE)
csv_time_read <- toc(quiet = TRUE)
# Benchmark Parquet
tic("Parquet Write")
write_parquet(pbp, file.path(temp_dir, "test.parquet"))
parquet_time_write <- toc(quiet = TRUE)
tic("Parquet Read")
parquet_data <- read_parquet(file.path(temp_dir, "test.parquet"))
parquet_time_read <- toc(quiet = TRUE)
# Get file sizes
csv_size <- file.size(file.path(temp_dir, "test.csv")) / 1024^2
parquet_size <- file.size(file.path(temp_dir, "test.parquet")) / 1024^2
# Create comparison table
storage_comparison <- tibble(
Format = c("CSV", "Parquet"),
`Size (MB)` = c(csv_size, parquet_size),
`Write Time (s)` = c(
csv_time_write$toc - csv_time_write$tic,
parquet_time_write$toc - parquet_time_write$tic
),
`Read Time (s)` = c(
csv_time_read$toc - csv_time_read$tic,
parquet_time_read$toc - parquet_time_read$tic
)
)
library(gt)
storage_comparison %>%
gt() %>%
fmt_number(columns = c(`Size (MB)`, `Write Time (s)`, `Read Time (s)`),
decimals = 2) %>%
tab_header(
title = "Storage Format Comparison",
subtitle = "2023 NFL Play-by-Play Data"
)
#| label: storage-benchmark-py
#| message: false
#| warning: false
#| cache: true
import nfl_data_py as nfl
import pandas as pd
import time
import os
# Load sample data
pbp = nfl.import_pbp_data([2023])
# Benchmark CSV
start = time.time()
pbp.to_csv("temp_test.csv", index=False)
csv_write_time = time.time() - start
start = time.time()
csv_data = pd.read_csv("temp_test.csv")
csv_read_time = time.time() - start
csv_size = os.path.getsize("temp_test.csv") / 1024**2
# Benchmark Parquet
start = time.time()
pbp.to_parquet("temp_test.parquet")
parquet_write_time = time.time() - start
start = time.time()
parquet_data = pd.read_parquet("temp_test.parquet")
parquet_read_time = time.time() - start
parquet_size = os.path.getsize("temp_test.parquet") / 1024**2
# Create comparison
comparison = pd.DataFrame({
'Format': ['CSV', 'Parquet'],
'Size (MB)': [csv_size, parquet_size],
'Write Time (s)': [csv_write_time, parquet_write_time],
'Read Time (s)': [csv_read_time, parquet_read_time]
})
print("\nStorage Format Comparison - 2023 NFL Play-by-Play Data")
print(comparison.to_string(index=False))
# Clean up
os.remove("temp_test.csv")
os.remove("temp_test.parquet")
When to Use Each Format
- **CSV**: When sharing data with non-technical users or using tools that don't support Parquet - **Parquet**: For large datasets, analytical workloads, and long-term storage - **RDS/Pickle**: For saving R/Python objects with complex structures - **Databases**: When you need ACID properties, concurrent access, or complex queriesDatabase Storage
For larger projects or multi-user environments, databases provide better performance and concurrency control.
SQLite Database
SQLite is a serverless database perfect for single-user analytics:
#| eval: false
#| echo: true
library(DBI)
library(RSQLite)
# Create/connect to database
con <- dbConnect(SQLite(), "data/football.db")
# Write data to database
dbWriteTable(con, "pbp_2023", pbp_2023, overwrite = TRUE)
dbWriteTable(con, "rosters", rosters, overwrite = TRUE)
# Query data
result <- dbGetQuery(con, "
SELECT posteam,
COUNT(*) as plays,
AVG(epa) as avg_epa
FROM pbp_2023
WHERE play_type IN ('pass', 'run')
GROUP BY posteam
ORDER BY avg_epa DESC
")
# Create indexes for performance
dbExecute(con, "CREATE INDEX idx_posteam ON pbp_2023(posteam)")
dbExecute(con, "CREATE INDEX idx_play_type ON pbp_2023(play_type)")
# Close connection
dbDisconnect(con)
#| eval: false
#| echo: true
from sqlalchemy import create_engine
import sqlite3
# Create database engine
engine = create_engine('sqlite:///data/football.db')
# Write data to database
pbp_2023.to_sql('pbp_2023', engine, if_exists='replace', index=False)
rosters.to_sql('rosters', engine, if_exists='replace', index=False)
# Query data using pandas
query = """
SELECT posteam,
COUNT(*) as plays,
AVG(epa) as avg_epa
FROM pbp_2023
WHERE play_type IN ('pass', 'run')
GROUP BY posteam
ORDER BY avg_epa DESC
"""
result = pd.read_sql(query, engine)
# Create indexes for performance
with engine.connect() as conn:
conn.execute("CREATE INDEX IF NOT EXISTS idx_posteam ON pbp_2023(posteam)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_play_type ON pbp_2023(play_type)")
PostgreSQL Database
For production environments or team settings, PostgreSQL offers robust features:
#| eval: false
#| echo: true
library(RPostgreSQL)
# Connect to PostgreSQL
con <- dbConnect(
PostgreSQL(),
dbname = "football",
host = "localhost",
port = 5432,
user = "postgres",
password = "your_password"
)
# Write data
dbWriteTable(con, "pbp_2023", pbp_2023, overwrite = TRUE)
# Create materialized view for common queries
dbExecute(con, "
CREATE MATERIALIZED VIEW team_epa AS
SELECT
posteam,
season,
COUNT(*) as plays,
AVG(epa) as avg_epa,
STDDEV(epa) as sd_epa
FROM pbp_2023
WHERE play_type IN ('pass', 'run')
GROUP BY posteam, season
")
dbDisconnect(con)
#| eval: false
#| echo: true
from sqlalchemy import create_engine
# Connect to PostgreSQL
engine = create_engine(
'postgresql://postgres:your_password@localhost:5432/football'
)
# Write data
pbp_2023.to_sql('pbp_2023', engine, if_exists='replace', index=False)
# Create materialized view
with engine.connect() as conn:
conn.execute("""
CREATE MATERIALIZED VIEW team_epa AS
SELECT
posteam,
season,
COUNT(*) as plays,
AVG(epa) as avg_epa,
STDDEV(epa) as sd_epa
FROM pbp_2023
WHERE play_type IN ('pass', 'run')
GROUP BY posteam, season
""")
Building Data Pipelines
A data pipeline automates the process of acquiring, transforming, and storing data. Let's build a simple pipeline that updates NFL data weekly.
Basic Pipeline Architecture
#| label: fig-pipeline-architecture
#| fig-cap: "Data pipeline architecture for football analytics"
#| echo: false
library(ggplot2)
library(ggraph)
library(tidygraph)
# Create pipeline graph
pipeline_data <- data.frame(
from = c("API", "API", "Transform", "Transform", "Storage", "Storage"),
to = c("Transform", "Transform", "Storage", "Storage", "Analytics", "Reports"),
label = c("Play-by-Play", "Roster", "Clean", "Enrich", "Query", "Export")
)
# Create graph
g <- as_tbl_graph(pipeline_data)
# Plot
ggraph(g, layout = 'sugiyama') +
geom_edge_link(aes(label = label),
angle_calc = 'along',
label_dodge = unit(2.5, 'mm'),
arrow = arrow(length = unit(3, 'mm')),
end_cap = circle(3, 'mm')) +
geom_node_point(size = 15, color = "#4A90E2") +
geom_node_text(aes(label = name), color = "white", fontface = "bold") +
theme_void() +
labs(title = "Football Analytics Data Pipeline",
subtitle = "From raw data to insights")
📊 Visualization Output
The code above generates a visualization. To see the output, run this code in your R or Python environment. The resulting plot will help illustrate the concepts discussed in this section.
Implementing a Data Pipeline
#| eval: false
#| echo: true
library(nflreadr)
library(arrow)
library(lubridate)
library(logger)
# Set up logging
log_info("Starting NFL data pipeline...")
# Define data directory
data_dir <- "data/nfl"
if (!dir.exists(data_dir)) dir.create(data_dir, recursive = TRUE)
# Function to update play-by-play data
update_pbp_data <- function(seasons = 2023, force = FALSE) {
log_info("Updating play-by-play data for seasons: {paste(seasons, collapse=', ')}")
# Check if update is needed
parquet_file <- file.path(data_dir, "pbp_data.parquet")
if (!force && file.exists(parquet_file)) {
file_time <- file.mtime(parquet_file)
if (difftime(Sys.time(), file_time, units = "hours") < 24) {
log_info("Data is recent (< 24 hours old). Skipping update.")
return(read_parquet(parquet_file))
}
}
# Download data
tryCatch({
pbp <- load_pbp(seasons)
# Data validation
log_info("Validating data...")
# Check for required columns
required_cols <- c("game_id", "play_id", "posteam", "epa", "wpa")
missing_cols <- setdiff(required_cols, names(pbp))
if (length(missing_cols) > 0) {
log_error("Missing required columns: {paste(missing_cols, collapse=', ')}")
stop("Data validation failed")
}
# Check for reasonable data ranges
if (any(pbp$epa < -20 | pbp$epa > 20, na.rm = TRUE)) {
log_warn("Found extreme EPA values outside expected range")
}
# Save data
log_info("Saving data to {parquet_file}")
write_parquet(pbp, parquet_file)
log_info("Pipeline completed successfully. Processed {nrow(pbp)} plays.")
return(pbp)
}, error = function(e) {
log_error("Pipeline failed: {e$message}")
stop(e)
})
}
# Function to update roster data
update_roster_data <- function(seasons = 2023) {
log_info("Updating roster data for seasons: {paste(seasons, collapse=', ')}")
rosters <- load_rosters(seasons)
roster_file <- file.path(data_dir, "roster_data.parquet")
write_parquet(rosters, roster_file)
log_info("Roster data saved: {nrow(rosters)} players")
return(rosters)
}
# Run pipeline
pbp <- update_pbp_data(2021:2023)
rosters <- update_roster_data(2021:2023)
log_info("All pipeline tasks completed")
#| eval: false
#| echo: true
import nfl_data_py as nfl
import pandas as pd
import logging
from pathlib import Path
from datetime import datetime, timedelta
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info("Starting NFL data pipeline...")
# Define data directory
data_dir = Path("data/nfl")
data_dir.mkdir(parents=True, exist_ok=True)
def update_pbp_data(seasons=[2023], force=False):
"""Update play-by-play data"""
logger.info(f"Updating play-by-play data for seasons: {seasons}")
parquet_file = data_dir / "pbp_data.parquet"
# Check if update is needed
if not force and parquet_file.exists():
file_time = datetime.fromtimestamp(parquet_file.stat().st_mtime)
if datetime.now() - file_time < timedelta(hours=24):
logger.info("Data is recent (< 24 hours old). Skipping update.")
return pd.read_parquet(parquet_file)
try:
# Download data
pbp = nfl.import_pbp_data(seasons)
# Data validation
logger.info("Validating data...")
# Check for required columns
required_cols = ['game_id', 'play_id', 'posteam', 'epa', 'wpa']
missing_cols = set(required_cols) - set(pbp.columns)
if missing_cols:
logger.error(f"Missing required columns: {missing_cols}")
raise ValueError("Data validation failed")
# Check for reasonable data ranges
extreme_epa = pbp[(pbp['epa'] < -20) | (pbp['epa'] > 20)]['epa'].notna()
if extreme_epa.any():
logger.warning("Found extreme EPA values outside expected range")
# Save data
logger.info(f"Saving data to {parquet_file}")
pbp.to_parquet(parquet_file)
logger.info(f"Pipeline completed successfully. Processed {len(pbp):,} plays.")
return pbp
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
raise
def update_roster_data(seasons=[2023]):
"""Update roster data"""
logger.info(f"Updating roster data for seasons: {seasons}")
rosters = nfl.import_rosters(seasons)
roster_file = data_dir / "roster_data.parquet"
rosters.to_parquet(roster_file)
logger.info(f"Roster data saved: {len(rosters):,} players")
return rosters
# Run pipeline
if __name__ == "__main__":
pbp = update_pbp_data([2021, 2022, 2023])
rosters = update_roster_data([2021, 2022, 2023])
logger.info("All pipeline tasks completed")
Scheduling Automated Updates
For production pipelines, automate updates using cron (Linux/Mac) or Task Scheduler (Windows):
Linux/Mac cron example:
# Run every Tuesday at 3 AM (after Monday Night Football)
0 3 * * 2 /usr/bin/Rscript /path/to/pipeline.R >> /var/log/nfl-pipeline.log 2>&1
Python with schedule library:
import schedule
import time
def job():
update_pbp_data([2023])
update_roster_data([2023])
# Schedule weekly updates
schedule.every().tuesday.at("03:00").do(job)
while True:
schedule.run_pending()
time.sleep(3600) # Check every hour
Data Versioning and Reproducibility
Data versioning ensures that analyses can be reproduced even as underlying data changes.
Version Control Strategies
1. Timestamped Files
data/
pbp_2023_20240101.parquet
pbp_2023_20240108.parquet
pbp_2023_20240115.parquet
2. Git-based Versioning (for small data)
Use Git LFS (Large File Storage) for tracking data files:
# Install Git LFS
git lfs install
# Track parquet files
git lfs track "*.parquet"
# Commit and push
git add .gitattributes
git commit -m "Add data versioning"
3. Data Version Control (DVC)
DVC is designed specifically for versioning large datasets:
# Install DVC
pip install dvc
# Initialize DVC
dvc init
# Track data file
dvc add data/pbp_2023.parquet
# Commit DVC file
git add data/pbp_2023.parquet.dvc .gitignore
git commit -m "Track play-by-play data"
# Configure remote storage (S3, GCS, etc.)
dvc remote add -d myremote s3://mybucket/dvc-storage
dvc push
Reproducibility Checklist
Ensuring Reproducible Analyses
1. **Document data sources**: Record where data came from and when it was acquired 2. **Version your data**: Use timestamps, hashes, or version control 3. **Pin package versions**: Use `renv` (R) or `requirements.txt` (Python) 4. **Set random seeds**: For any stochastic processes 5. **Document transformations**: Keep clear records of data cleaning steps 6. **Use relative paths**: Make code portable across systems 7. **Containerize**: Use Docker for complete environment reproducibilityExample: Reproducible Project Structure
nfl-analytics-project/
├── data/
│ ├── raw/ # Original, immutable data
│ │ └── pbp_2023.parquet
│ ├── processed/ # Cleaned, transformed data
│ │ └── team_stats.parquet
│ └── external/ # Third-party data
├── src/
│ ├── data_pipeline.R # Data acquisition
│ ├── clean_data.R # Data cleaning
│ └── analysis.R # Analysis code
├── outputs/
│ ├── figures/
│ └── tables/
├── renv/ # R environment (renv)
├── renv.lock # Package versions
├── README.md
└── Makefile # Reproducible workflow
Data Quality and Validation
Ensuring data quality is critical for reliable analytics.
Common Data Quality Issues
- Missing Values: Incomplete play records
- Duplicates: Same play recorded multiple times
- Inconsistencies: Conflicting information
- Outliers: Unrealistic values
- Schema Changes: Column names or types change
Data Validation Pipeline
#| eval: false
#| echo: true
library(tidyverse)
library(assertr)
library(logger)
validate_pbp_data <- function(pbp) {
log_info("Running data quality checks...")
# Check 1: Required columns exist
required_cols <- c("game_id", "play_id", "posteam", "defteam",
"down", "ydstogo", "epa", "wpa")
pbp %>%
verify(all(required_cols %in% names(.))) %>%
# Check 2: No duplicate plays
assert(is_uniq, game_id, play_id) %>%
# Check 3: Valid down values
assert(in_set(1, 2, 3, 4, NA), down) %>%
# Check 4: EPA in reasonable range
assert(within_bounds(-15, 15), epa) %>%
# Check 5: Win probability between 0 and 1
assert(within_bounds(0, 1), wp) %>%
# Check 6: Yards to go is positive
assert(within_bounds(0, 100), ydstogo, allow.na = TRUE) %>%
# Check 7: Season is valid
assert(within_bounds(1999, year(Sys.Date())), season) -> validated_pbp
log_info("All data quality checks passed")
return(validated_pbp)
}
# Run validation
pbp_validated <- validate_pbp_data(pbp_2023)
#| eval: false
#| echo: true
import pandas as pd
import numpy as np
import logging
logger = logging.getLogger(__name__)
def validate_pbp_data(pbp):
"""Validate play-by-play data quality"""
logger.info("Running data quality checks...")
errors = []
warnings = []
# Check 1: Required columns exist
required_cols = ['game_id', 'play_id', 'posteam', 'defteam',
'down', 'ydstogo', 'epa', 'wpa']
missing_cols = set(required_cols) - set(pbp.columns)
if missing_cols:
errors.append(f"Missing required columns: {missing_cols}")
# Check 2: No duplicate plays
duplicates = pbp.duplicated(subset=['game_id', 'play_id'])
if duplicates.any():
errors.append(f"Found {duplicates.sum()} duplicate plays")
# Check 3: Valid down values
invalid_downs = pbp['down'].notna() & ~pbp['down'].isin([1, 2, 3, 4])
if invalid_downs.any():
warnings.append(f"Found {invalid_downs.sum()} invalid down values")
# Check 4: EPA in reasonable range
extreme_epa = pbp['epa'].notna() & ((pbp['epa'] < -15) | (pbp['epa'] > 15))
if extreme_epa.any():
warnings.append(f"Found {extreme_epa.sum()} extreme EPA values")
# Check 5: Win probability between 0 and 1
invalid_wp = pbp['wp'].notna() & ((pbp['wp'] < 0) | (pbp['wp'] > 1))
if invalid_wp.any():
errors.append(f"Found {invalid_wp.sum()} invalid WP values")
# Check 6: Yards to go is positive
invalid_ydstogo = pbp['ydstogo'].notna() & ((pbp['ydstogo'] < 0) | (pbp['ydstogo'] > 100))
if invalid_ydstogo.any():
warnings.append(f"Found {invalid_ydstogo.sum()} invalid ydstogo values")
# Report results
if errors:
for error in errors:
logger.error(error)
raise ValueError("Data validation failed")
if warnings:
for warning in warnings:
logger.warning(warning)
logger.info("All data quality checks passed")
return pbp
# Run validation
pbp_validated = validate_pbp_data(pbp_2023)
Data Profiling
Understanding your data's characteristics helps identify quality issues:
#| label: data-profiling-r
#| message: false
#| warning: false
#| cache: true
library(nflreadr)
library(tidyverse)
library(gt)
pbp <- load_pbp(2023)
# Create data profile
data_profile <- tibble(
Column = names(pbp),
Type = sapply(pbp, class),
Missing = sapply(pbp, function(x) sum(is.na(x))),
`Missing %` = sapply(pbp, function(x) mean(is.na(x)) * 100),
Unique = sapply(pbp, function(x) length(unique(x)))
) %>%
arrange(desc(`Missing %`))
# Display top columns with missing data
data_profile %>%
filter(`Missing %` > 50) %>%
head(10) %>%
gt() %>%
tab_header(
title = "Data Quality Profile",
subtitle = "Columns with >50% missing values"
) %>%
fmt_number(columns = `Missing %`, decimals = 1) %>%
fmt_number(columns = c(Missing, Unique), decimals = 0, use_seps = TRUE)
#| label: data-profiling-py
#| message: false
#| warning: false
#| cache: true
import nfl_data_py as nfl
import pandas as pd
pbp = nfl.import_pbp_data([2023])
# Create data profile
profile = pd.DataFrame({
'Column': pbp.columns,
'Type': pbp.dtypes.values,
'Missing': pbp.isna().sum().values,
'Missing %': (pbp.isna().sum() / len(pbp) * 100).values,
'Unique': pbp.nunique().values
})
profile = profile.sort_values('Missing %', ascending=False)
# Display top columns with missing data
print("\nData Quality Profile - Columns with >50% missing values:")
print(profile[profile['Missing %'] > 50].head(10).to_string(index=False))
Cloud Storage and Scalability
As data grows, cloud storage becomes essential for collaboration and scalability.
Cloud Storage Options
| Provider | Service | Best For | Pricing |
|---|---|---|---|
| AWS | S3 | General purpose | $0.023/GB/month |
| Google Cloud | GCS | BigQuery integration | $0.020/GB/month |
| Azure | Blob Storage | Microsoft ecosystem | $0.018/GB/month |
| Dropbox | Business | Easy sharing | $12.50/user/month |
AWS S3 Integration
#| eval: false
#| echo: true
library(aws.s3)
library(arrow)
# Configure AWS credentials (set environment variables)
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION
# Upload to S3
s3write_using(
pbp_2023,
FUN = write_parquet,
bucket = "my-football-data",
object = "pbp/season=2023/data.parquet"
)
# Read from S3
pbp_from_s3 <- s3read_using(
FUN = read_parquet,
bucket = "my-football-data",
object = "pbp/season=2023/data.parquet"
)
# List files in bucket
files <- get_bucket("my-football-data", prefix = "pbp/")
#| eval: false
#| echo: true
import boto3
import pandas as pd
from io import BytesIO
# Initialize S3 client
s3 = boto3.client('s3',
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY')
# Upload to S3
buffer = BytesIO()
pbp_2023.to_parquet(buffer)
buffer.seek(0)
s3.put_object(
Bucket='my-football-data',
Key='pbp/season=2023/data.parquet',
Body=buffer
)
# Read from S3
obj = s3.get_object(Bucket='my-football-data',
Key='pbp/season=2023/data.parquet')
pbp_from_s3 = pd.read_parquet(BytesIO(obj['Body'].read()))
# List files in bucket
response = s3.list_objects_v2(Bucket='my-football-data',
Prefix='pbp/')
files = [obj['Key'] for obj in response.get('Contents', [])]
Data Lake Architecture
For large-scale analytics, implement a data lake:
#| label: fig-data-lake
#| fig-cap: "Data lake architecture for football analytics"
#| echo: false
library(ggplot2)
library(ggraph)
library(tidygraph)
# Create data lake layers
lake_data <- data.frame(
from = c(rep("Sources", 3), rep("Bronze", 3), rep("Silver", 2)),
to = c(rep("Bronze", 3), rep("Silver", 3), rep("Gold", 2)),
layer = c("Raw", "Raw", "Raw", "Cleaned", "Cleaned", "Cleaned",
"Analytics", "Reports")
)
lake_data$label <- c("NFL API", "ESPN", "NGS",
"Standardize", "Validate", "Enrich",
"Aggregate", "Export")
# Create graph
g <- as_tbl_graph(lake_data)
# Add layer info to nodes
V(g)$layer <- c("Source", "Bronze", "Silver", "Gold")
# Plot
ggraph(g, layout = 'sugiyama') +
geom_edge_link(aes(label = label),
angle_calc = 'along',
label_dodge = unit(2.5, 'mm'),
arrow = arrow(length = unit(3, 'mm')),
end_cap = circle(5, 'mm'),
alpha = 0.6) +
geom_node_point(aes(color = layer), size = 18) +
geom_node_text(aes(label = name), color = "white", fontface = "bold") +
scale_color_manual(values = c("Source" = "#E74C3C", "Bronze" = "#CD7F32",
"Silver" = "#C0C0C0", "Gold" = "#FFD700")) +
theme_void() +
theme(legend.position = "bottom") +
labs(title = "Data Lake Architecture",
subtitle = "Bronze → Silver → Gold progression",
color = "Layer")
📊 Visualization Output
The code above generates a visualization. To see the output, run this code in your R or Python environment. The resulting plot will help illustrate the concepts discussed in this section.
Bronze Layer: Raw, immutable data as ingested
Silver Layer: Cleaned, validated, standardized data
Gold Layer: Aggregated, business-ready datasets
Summary
Building robust data infrastructure is foundational to successful football analytics. In this chapter, we covered:
- Data Sources: nflverse ecosystem, NFL official APIs, college football data
- Storage Formats: CSV, Parquet, databases, and their trade-offs
- Data Pipelines: Automated data acquisition and updating
- Data Quality: Validation, profiling, and quality control
- Versioning: Reproducibility through data version control
- Cloud Storage: Scalable solutions for large datasets
Key takeaways:
- Use Parquet for large datasets; it's 5-10x smaller and faster than CSV
- Automate data updates with pipelines to ensure fresh data
- Implement data validation to catch quality issues early
- Version your data for reproducibility
- Leverage cloud storage for collaboration and scalability
Exercises
Conceptual Questions
-
Data Source Selection: You're building an analytics platform that needs play-by-play data, player tracking data, and injury reports. What data sources would you use for each, and why?
-
Storage Format Decision: You have 10 years of play-by-play data (500GB uncompressed). Compare the trade-offs between storing this in:
- A single Parquet file
- Partitioned Parquet files (by season)
- A PostgreSQL database
- Multiple CSV files
What would you recommend and why?
-
Data Pipeline Frequency: How often should an NFL analytics pipeline update data during the season vs. the offseason? What factors should influence this decision?
-
Data Quality: List five specific data quality checks you would implement for NFL play-by-play data. For each, explain why it's important and what issues it would catch.
Coding Exercises
Exercise 1: Multi-Season Data Pipeline
Build a data pipeline that: a) Downloads play-by-play data for 2020-2023 b) Saves each season as a separate Parquet file c) Creates a combined dataset with all seasons d) Implements at least 3 data quality checks e) Logs all operations **Bonus**: Add error handling to retry failed downloads.Solution
#| eval: false
library(nflreadr)
library(arrow)
library(logger)
library(purrr)
# Set up logging
log_threshold(INFO)
# Data directory
data_dir <- "data/multi_season"
dir.create(data_dir, showWarnings = FALSE, recursive = TRUE)
# Pipeline function
download_season <- function(season, max_retries = 3) {
file_path <- file.path(data_dir, paste0("pbp_", season, ".parquet"))
for (attempt in 1:max_retries) {
tryCatch({
log_info("Downloading season {season} (attempt {attempt})")
# Download data
pbp <- load_pbp(season)
# Data quality checks
log_info("Running quality checks for {season}")
# Check 1: Minimum number of plays
min_plays <- 30000 # Expect ~45k plays per season
if (nrow(pbp) < min_plays) {
stop(paste0("Too few plays: ", nrow(pbp)))
}
# Check 2: All weeks present (1-18 for regular season)
expected_weeks <- 1:18
actual_weeks <- unique(pbp$week[pbp$season_type == "REG"])
if (length(setdiff(expected_weeks, actual_weeks)) > 2) {
log_warn("Missing weeks in season {season}")
}
# Check 3: No all-NA columns
na_cols <- sapply(pbp, function(x) all(is.na(x)))
if (any(na_cols)) {
log_warn("Found all-NA columns: {paste(names(pbp)[na_cols], collapse=', ')}")
}
# Save data
write_parquet(pbp, file_path)
log_info("Successfully saved {season} data: {nrow(pbp)} plays")
return(TRUE)
}, error = function(e) {
log_error("Attempt {attempt} failed for {season}: {e$message}")
if (attempt == max_retries) {
log_error("Max retries reached for {season}")
return(FALSE)
}
Sys.sleep(5 * attempt) # Exponential backoff
})
}
}
# Download all seasons
seasons <- 2020:2023
results <- map_lgl(seasons, download_season)
# Combine datasets
if (all(results)) {
log_info("Combining all seasons...")
pbp_combined <- open_dataset(data_dir) %>%
collect()
combined_path <- file.path(data_dir, "pbp_all_seasons.parquet")
write_parquet(pbp_combined, combined_path)
log_info("Pipeline complete! Total plays: {nrow(pbp_combined)}")
} else {
log_error("Pipeline failed for some seasons")
}
#| eval: false
import nfl_data_py as nfl
import pandas as pd
import logging
from pathlib import Path
import time
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Data directory
data_dir = Path("data/multi_season")
data_dir.mkdir(parents=True, exist_ok=True)
def download_season(season, max_retries=3):
"""Download and validate season data"""
file_path = data_dir / f"pbp_{season}.parquet"
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Downloading season {season} (attempt {attempt})")
# Download data
pbp = nfl.import_pbp_data([season])
# Data quality checks
logger.info(f"Running quality checks for {season}")
# Check 1: Minimum number of plays
min_plays = 30000
if len(pbp) < min_plays:
raise ValueError(f"Too few plays: {len(pbp)}")
# Check 2: All weeks present
expected_weeks = set(range(1, 19))
actual_weeks = set(pbp[pbp['season_type'] == 'REG']['week'].unique())
missing_weeks = expected_weeks - actual_weeks
if len(missing_weeks) > 2:
logger.warning(f"Missing weeks in {season}: {missing_weeks}")
# Check 3: No all-NA columns
na_cols = pbp.columns[pbp.isna().all()].tolist()
if na_cols:
logger.warning(f"Found all-NA columns: {na_cols}")
# Save data
pbp.to_parquet(file_path)
logger.info(f"Successfully saved {season} data: {len(pbp):,} plays")
return True
except Exception as e:
logger.error(f"Attempt {attempt} failed for {season}: {str(e)}")
if attempt == max_retries:
logger.error(f"Max retries reached for {season}")
return False
time.sleep(5 * attempt) # Exponential backoff
return False
# Download all seasons
seasons = range(2020, 2024)
results = [download_season(season) for season in seasons]
# Combine datasets
if all(results):
logger.info("Combining all seasons...")
pbp_combined = pd.concat([
pd.read_parquet(data_dir / f"pbp_{season}.parquet")
for season in seasons
], ignore_index=True)
combined_path = data_dir / "pbp_all_seasons.parquet"
pbp_combined.to_parquet(combined_path)
logger.info(f"Pipeline complete! Total plays: {len(pbp_combined):,}")
else:
logger.error("Pipeline failed for some seasons")
Exercise 2: Data Quality Report
Create a comprehensive data quality report for a season of NFL data that includes: a) Missing value analysis by column b) Distribution of key metrics (EPA, WP) c) Duplicate detection d) Temporal coverage (games per week) e) Team coverage (plays per team) Generate both summary statistics and visualizations.Solution
#| eval: false
library(nflreadr)
library(tidyverse)
library(gt)
library(patchwork)
pbp <- load_pbp(2023)
# 1. Missing Value Analysis
missing_analysis <- tibble(
column = names(pbp),
missing_count = sapply(pbp, function(x) sum(is.na(x))),
missing_pct = sapply(pbp, function(x) mean(is.na(x)) * 100)
) %>%
filter(missing_pct > 0) %>%
arrange(desc(missing_pct))
# 2. Key Metrics Distribution
p1 <- ggplot(pbp %>% filter(!is.na(epa), abs(epa) < 10),
aes(x = epa)) +
geom_histogram(bins = 50, fill = "#4A90E2", alpha = 0.7) +
labs(title = "EPA Distribution", x = "EPA", y = "Count") +
theme_minimal()
p2 <- ggplot(pbp %>% filter(!is.na(wp)),
aes(x = wp)) +
geom_histogram(bins = 50, fill = "#E74C3C", alpha = 0.7) +
labs(title = "Win Probability Distribution", x = "WP", y = "Count") +
theme_minimal()
# 3. Duplicate Detection
duplicates <- pbp %>%
group_by(game_id, play_id) %>%
filter(n() > 1) %>%
nrow()
# 4. Temporal Coverage
temporal_coverage <- pbp %>%
filter(season_type == "REG") %>%
group_by(week) %>%
summarise(games = n_distinct(game_id),
plays = n())
p3 <- ggplot(temporal_coverage, aes(x = week, y = plays)) +
geom_col(fill = "#27AE60") +
labs(title = "Plays per Week", x = "Week", y = "Total Plays") +
theme_minimal()
# 5. Team Coverage
team_coverage <- pbp %>%
filter(!is.na(posteam)) %>%
group_by(posteam) %>%
summarise(plays = n()) %>%
arrange(desc(plays))
p4 <- ggplot(team_coverage, aes(x = reorder(posteam, plays), y = plays)) +
geom_col(fill = "#9B59B6") +
coord_flip() +
labs(title = "Plays per Team", x = "Team", y = "Total Plays") +
theme_minimal()
# Create report
cat("===== DATA QUALITY REPORT =====\n\n")
cat("Dataset: 2023 NFL Play-by-Play\n")
cat("Total Plays:", nrow(pbp), "\n")
cat("Total Columns:", ncol(pbp), "\n")
cat("Duplicates Found:", duplicates, "\n\n")
cat("Top 10 Columns with Missing Values:\n")
print(missing_analysis %>% head(10))
cat("\n\nMetric Summaries:\n")
cat("EPA - Mean:", mean(pbp$epa, na.rm = TRUE), "SD:", sd(pbp$epa, na.rm = TRUE), "\n")
cat("WP - Mean:", mean(pbp$wp, na.rm = TRUE), "SD:", sd(pbp$wp, na.rm = TRUE), "\n")
# Combined plot
(p1 + p2) / (p3 + p4) +
plot_annotation(title = "Data Quality Visualizations",
theme = theme(plot.title = element_text(size = 16, face = "bold")))
#| eval: false
import nfl_data_py as nfl
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
pbp = nfl.import_pbp_data([2023])
# 1. Missing Value Analysis
missing_analysis = pd.DataFrame({
'column': pbp.columns,
'missing_count': pbp.isna().sum(),
'missing_pct': (pbp.isna().sum() / len(pbp) * 100)
})
missing_analysis = missing_analysis[missing_analysis['missing_pct'] > 0]
missing_analysis = missing_analysis.sort_values('missing_pct', ascending=False)
# 2. Key Metrics Distribution
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# EPA Distribution
epa_data = pbp['epa'].dropna()
epa_data = epa_data[abs(epa_data) < 10]
axes[0, 0].hist(epa_data, bins=50, color='#4A90E2', alpha=0.7)
axes[0, 0].set_title('EPA Distribution')
axes[0, 0].set_xlabel('EPA')
axes[0, 0].set_ylabel('Count')
# WP Distribution
wp_data = pbp['wp'].dropna()
axes[0, 1].hist(wp_data, bins=50, color='#E74C3C', alpha=0.7)
axes[0, 1].set_title('Win Probability Distribution')
axes[0, 1].set_xlabel('WP')
axes[0, 1].set_ylabel('Count')
# 3. Duplicate Detection
duplicates = pbp.duplicated(subset=['game_id', 'play_id']).sum()
# 4. Temporal Coverage
temporal_coverage = (pbp[pbp['season_type'] == 'REG']
.groupby('week')
.agg(games=('game_id', 'nunique'),
plays=('play_id', 'count'))
.reset_index())
axes[1, 0].bar(temporal_coverage['week'], temporal_coverage['plays'],
color='#27AE60')
axes[1, 0].set_title('Plays per Week')
axes[1, 0].set_xlabel('Week')
axes[1, 0].set_ylabel('Total Plays')
# 5. Team Coverage
team_coverage = (pbp[pbp['posteam'].notna()]
.groupby('posteam')
.size()
.sort_values(ascending=True)
.reset_index(name='plays'))
axes[1, 1].barh(range(len(team_coverage)), team_coverage['plays'],
color='#9B59B6')
axes[1, 1].set_yticks(range(len(team_coverage)))
axes[1, 1].set_yticklabels(team_coverage['posteam'])
axes[1, 1].set_title('Plays per Team')
axes[1, 1].set_xlabel('Total Plays')
plt.tight_layout()
plt.savefig('data_quality_report.png', dpi=300, bbox_inches='tight')
plt.show()
# Print report
print("===== DATA QUALITY REPORT =====\n")
print(f"Dataset: 2023 NFL Play-by-Play")
print(f"Total Plays: {len(pbp):,}")
print(f"Total Columns: {len(pbp.columns)}")
print(f"Duplicates Found: {duplicates}\n")
print("Top 10 Columns with Missing Values:")
print(missing_analysis.head(10).to_string(index=False))
print("\n\nMetric Summaries:")
print(f"EPA - Mean: {pbp['epa'].mean():.4f}, SD: {pbp['epa'].std():.4f}")
print(f"WP - Mean: {pbp['wp'].mean():.4f}, SD: {pbp['wp'].std():.4f}")
Challenge Problem
Exercise 3: Cloud-Based Data Lake
Design and implement a mini data lake for football analytics: **Requirements:** 1. **Bronze Layer**: Store raw play-by-play data partitioned by season 2. **Silver Layer**: Clean data with validated schemas 3. **Gold Layer**: Create aggregated team statistics **Implementation:** a) Set up local directory structure mimicking S3 b) Create ETL pipeline that processes Bronze → Silver → Gold c) Implement incremental updates (only process new data) d) Add data lineage tracking (record transformations) e) Create a simple query interface **Bonus**: Deploy to actual S3 or Google Cloud Storage **Evaluation Criteria:** - Correctness of transformations - Code organization and documentation - Error handling - Performance optimization - ReproducibilityPartial Solution Framework
#| eval: false
library(tidyverse)
library(arrow)
library(logger)
# Data lake structure
lake_root <- "data_lake"
bronze_path <- file.path(lake_root, "bronze/pbp")
silver_path <- file.path(lake_root, "silver/pbp")
gold_path <- file.path(lake_root, "gold/team_stats")
# Create directories
walk(c(bronze_path, silver_path, gold_path),
~dir.create(.x, recursive = TRUE, showWarnings = FALSE))
# Bronze Layer: Raw data ingestion
ingest_to_bronze <- function(seasons) {
for (season in seasons) {
log_info("Ingesting season {season} to bronze layer")
pbp <- load_pbp(season)
# Save partitioned by season
pbp %>%
write_dataset(
path = bronze_path,
format = "parquet",
partitioning = "season"
)
}
}
# Silver Layer: Cleaned and validated
process_to_silver <- function() {
log_info("Processing bronze to silver layer")
# Read from bronze
bronze_data <- open_dataset(bronze_path)
# Clean and validate
silver_data <- bronze_data %>%
filter(!is.na(posteam), !is.na(epa)) %>%
mutate(
# Standardize team abbreviations
posteam = toupper(posteam),
defteam = toupper(defteam),
# Add derived fields
is_success = epa > 0,
big_play = yards_gained >= 20
) %>%
collect()
# Write to silver
silver_data %>%
write_dataset(
path = silver_path,
format = "parquet",
partitioning = c("season", "week")
)
log_info("Silver layer updated")
}
# Gold Layer: Aggregated analytics
create_gold_layer <- function() {
log_info("Creating gold layer aggregations")
# Read from silver
silver_data <- open_dataset(silver_path)
# Create team statistics
team_stats <- silver_data %>%
filter(play_type %in% c("pass", "run")) %>%
group_by(season, posteam) %>%
summarise(
plays = n(),
avg_epa = mean(epa, na.rm = TRUE),
success_rate = mean(is_success, na.rm = TRUE),
explosive_rate = mean(big_play, na.rm = TRUE),
.groups = "drop"
) %>%
collect()
# Save to gold
team_stats %>%
write_dataset(
path = gold_path,
format = "parquet",
partitioning = "season"
)
log_info("Gold layer updated")
}
# Run pipeline
ingest_to_bronze(2021:2023)
process_to_silver()
create_gold_layer()
#| eval: false
import nfl_data_py as nfl
import pandas as pd
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
# Data lake structure
lake_root = Path("data_lake")
bronze_path = lake_root / "bronze/pbp"
silver_path = lake_root / "silver/pbp"
gold_path = lake_root / "gold/team_stats"
# Create directories
for path in [bronze_path, silver_path, gold_path]:
path.mkdir(parents=True, exist_ok=True)
def ingest_to_bronze(seasons):
"""Ingest raw data to bronze layer"""
for season in seasons:
logger.info(f"Ingesting season {season} to bronze layer")
pbp = nfl.import_pbp_data([season])
# Save partitioned by season
season_path = bronze_path / f"season={season}"
season_path.mkdir(exist_ok=True)
pbp.to_parquet(season_path / "data.parquet")
def process_to_silver():
"""Process bronze to silver layer"""
logger.info("Processing bronze to silver layer")
# Read all bronze data
bronze_dataset = ds.dataset(bronze_path, format="parquet")
bronze_data = bronze_dataset.to_table().to_pandas()
# Clean and validate
silver_data = bronze_data.copy()
silver_data = silver_data[silver_data['posteam'].notna() &
silver_data['epa'].notna()]
# Standardize
silver_data['posteam'] = silver_data['posteam'].str.upper()
silver_data['defteam'] = silver_data['defteam'].str.upper()
# Add derived fields
silver_data['is_success'] = silver_data['epa'] > 0
silver_data['big_play'] = silver_data['yards_gained'] >= 20
# Save partitioned
for (season, week), group in silver_data.groupby(['season', 'week']):
partition_path = silver_path / f"season={season}/week={week}"
partition_path.mkdir(parents=True, exist_ok=True)
group.to_parquet(partition_path / "data.parquet")
logger.info("Silver layer updated")
def create_gold_layer():
"""Create gold layer aggregations"""
logger.info("Creating gold layer aggregations")
# Read silver data
silver_dataset = ds.dataset(silver_path, format="parquet")
silver_data = silver_dataset.to_table().to_pandas()
# Aggregate team statistics
team_stats = (silver_data[silver_data['play_type'].isin(['pass', 'run'])]
.groupby(['season', 'posteam'])
.agg(
plays=('play_id', 'count'),
avg_epa=('epa', 'mean'),
success_rate=('is_success', 'mean'),
explosive_rate=('big_play', 'mean')
)
.reset_index())
# Save partitioned
for season, group in team_stats.groupby('season'):
season_path = gold_path / f"season={season}"
season_path.mkdir(exist_ok=True)
group.to_parquet(season_path / "data.parquet")
logger.info("Gold layer updated")
# Run pipeline
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
ingest_to_bronze([2021, 2022, 2023])
process_to_silver()
create_gold_layer()
Further Reading
Books and Papers
- Wickham, H. & Grolemund, G. (2017). R for Data Science. O'Reilly Media.
- McKinney, W. (2022). Python for Data Analysis, 3rd Edition. O'Reilly Media.
- Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly Media.
Online Resources
- nflverse Documentation: https://nflverse.nflverse.com/
- nfl_data_py Documentation: https://github.com/nfl-data-py/nfl_data_py
- Apache Parquet: https://parquet.apache.org/
- Data Version Control (DVC): https://dvc.org/
Technical Articles
- Baldwin, B. (2020). "nflfastR: Efficient Data Acquisition for NFL Analytics"
- Yurko, R. et al. (2020). "Going Deep: Models for Continuous-Time Within-Play Valuation of Game Outcomes in American Football"
References
:::