Skip to content

Commit

Permalink
add initial 2024 Insights Data processing
Browse files Browse the repository at this point in the history
  • Loading branch information
cak committed Dec 28, 2024
1 parent d40504d commit 3112e62
Show file tree
Hide file tree
Showing 4 changed files with 941 additions and 0 deletions.
96 changes: 96 additions & 0 deletions markdown/2024_insights/01_data_collection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
---
jupyter:
jupytext:
text_representation:
extension: .md
format_name: markdown
format_version: '1.3'
jupytext_version: 1.16.6
kernelspec:
display_name: Python 3
language: python
name: python3
---

# 2024 Insights - Data Collection

```python
import os
import requests
import zipfile
from pathlib import Path
```

## Project Setup

Before proceeding with data collection, we need to ensure that the necessary directories for storing raw and processed data are in place. This step is crucial to maintain an organized structure for our project, especially when working with multiple datasets over time.

The following Python code will check if the required directories exist (`raw` and `processed` under `2024_insights`), and if not, it will create them. This approach ensures that the environment is always correctly set up before any data processing begins, even if you're running this notebook on a new machine or a fresh clone of the repository.


```python
# Directories to create
dirs = [
"../../data/2024_insights/raw/",
"../../data/2024_insights/processed/",
]

# Create 2024 Insights data directories if they don't exist
for d in dirs:
os.makedirs(d, exist_ok=True)
```

# Data Collection

To automate the downloading, unzipping, and saving of required datasets, execute the Python code in the **next cell**.

This script will:
- Download the NIST NVD (2023 and 2024) and CISA KEV datasets.
- Extract JSON files from ZIP archives.
- Save all files to the directory: `/data/2024_insights/raw/`.

Once the script has run successfully, proceed to the data preprocessing steps in the next notebook.

```python
# Target directory for raw data
DATA_DIR = Path("../../data/2024_insights/raw")
DATA_DIR.mkdir(parents=True, exist_ok=True)

# URLs for datasets
DATASETS = {
"nvdcve-1.1-2024.json.zip": "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2024.json.zip",
"nvdcve-1.1-2023.json.zip": "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2023.json.zip",
"known_exploited_vulnerabilities.csv": "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv",
}

def download_file(url, dest):
"""Download a file from a URL to a destination."""
print(f"Downloading: {url}")
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(dest, "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
file.write(chunk)
print(f"Saved to: {dest}")
else:
print(f"Failed to download {url} - Status code: {response.status_code}")

def unzip_file(zip_path, dest_dir):
"""Unzip a file to a destination directory."""
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(dest_dir)
print(f"Unzipped {zip_path} to {dest_dir}")

# Main execution
for filename, url in DATASETS.items():
dest_path = DATA_DIR / filename

# Download the file
download_file(url, dest_path)

# If it's a ZIP file, extract its contents
if filename.endswith(".zip"):
unzip_file(dest_path, DATA_DIR)
dest_path.unlink() # Remove the ZIP file after extraction

```
206 changes: 206 additions & 0 deletions markdown/2024_insights/02_data_cleaning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
---
jupyter:
jupytext:
text_representation:
extension: .md
format_name: markdown
format_version: '1.3'
jupytext_version: 1.16.6
kernelspec:
display_name: Python 3
language: python
name: python3
---

# Weekly CVE - Data Cleaning

```python
import json

import pandas as pd
```

## Script to Process NVD Data and Integrate CISA KEV Data

This script processes NVD JSON data for a specified year and enriches it with CISA Known Exploited Vulnerabilities (KEV) data.

### Features:
1. **Processes NVD Data**:
- Extracts key fields such as CVE ID, description, CWE, CVSS scores, vendor, product, and publication dates.
- Adds `CVE_Assigner` from the `CVE_data_meta.ASSIGNER` field.

2. **Enriches with CISA KEV Data**:
- Merges with CISA KEV dataset to include information on exploited vulnerabilities.
- Identifies if a CVE is part of CISA KEV.

3. **Generates Additional Fields**:
- Adds derived fields such as `Published_Year`, `Published_Month`, and `Published_YearMonth` for trend analysis.

4. **Final Output**:
- Saves the processed data to a CSV file, ready for analysis and visualization.

### Final Columns:
- **CVE_ID**: Unique identifier for the vulnerability.
- **Description**: English description of the CVE.
- **CWE**: Common Weakness Enumeration for the vulnerability.
- **CVSS_Base_Score**: CVSS v3 base score.
- **CVSS_Severity**: Severity rating (Critical, High, Medium, Low).
- **CVSS_Vector**: Attack vector for the CVE.
- **Exploitability_Score**: CVSS exploitability score.
- **Impact_Score**: CVSS impact score.
- **Vendor**: Vendor of the vulnerable product.
- **Product**: Vulnerable product name.
- **CVE_Assigner**: Organization or individual who assigned the CVE.
- **Published_Date**: Date the CVE was published.
- **Last_Modified_Date**: Date the CVE was last modified.
- **Published_Year**, **Published_Month**, **Published_YearMonth**: Derived fields for time-based analysis.
- **CISA_KEV**: Boolean indicating if the CVE is in the CISA KEV catalog.
- **KEV_DateAdded**: Date the CVE was added to CISA KEV.
- **KEV_Vendor**: Vendor information from CISA KEV.
- **KEV_Product**: Product information from CISA KEV.
- **KEV_ShortDescription**: Short description from CISA KEV.
- **KEV_KnownRansomware**: Ransomware association, if any.


```python
import json
import pandas as pd
from datetime import datetime

def process_nvd_data(year):
# Define input & output paths
file_path = f'../../data/2024_insights/raw/nvdcve-1.1-{year}.json'
cisa_kev_path = '../../data/2024_insights/raw/known_exploited_vulnerabilities.csv'
output_path_template = '../../data/2024_insights/processed/nvd_data_{}.csv'

# Load the NVD JSON data for the given year
with open(file_path, 'r') as file:
nvd_data = json.load(file)

records = []
for item in nvd_data['CVE_Items']:
cve = item['cve']
cve_id = cve['CVE_data_meta']['ID']
cve_assigner = cve.get('CVE_data_meta', {}).get('ASSIGNER', None)

# Extract English description (if any)
description = next(
(desc['value'] for desc in cve['description']['description_data'] if desc['lang'] == 'en'),
'No description available'
)

# Extract CWE
cwe = next((
desc['value']
for problem in cve['problemtype']['problemtype_data']
for desc in problem['description']
if desc['lang'] == 'en'
), None)

# Extract CVSS v3 details
impact_data = item.get('impact', {})
base_metric_v3 = impact_data.get('baseMetricV3', {})
cvss_data = base_metric_v3.get('cvssV3', {})

cvss_base_score = cvss_data.get('baseScore')
cvss_severity = cvss_data.get('baseSeverity', 'UNKNOWN').upper()
cvss_vector = cvss_data.get('vectorString')
exploitability_score = base_metric_v3.get('exploitabilityScore')
impact_score = base_metric_v3.get('impactScore')

# Parse published & modified dates
published_date_str = item.get('publishedDate')
last_modified_date_str = item.get('lastModifiedDate')
published_date = pd.to_datetime(published_date_str, errors='coerce') or pd.NaT
last_modified_date = pd.to_datetime(last_modified_date_str, errors='coerce') or pd.NaT

# Create derived date fields
published_year = published_date.year if pd.notnull(published_date) else None
published_month = published_date.month if pd.notnull(published_date) else None
published_ym = published_date.tz_localize(None).to_period('M') if pd.notnull(published_date) else None

# Build records for each vendor-product pair
cpe_found = False
for node in item.get('configurations', {}).get('nodes', []):
for cpe_match in node.get('cpe_match', []):
if cpe_match.get('vulnerable'):
cpe_parts = cpe_match['cpe23Uri'].split(':')
vendor = cpe_parts[3].title() if len(cpe_parts) > 3 else "Unknown Vendor"
product = cpe_parts[4].title() if len(cpe_parts) > 4 else "Unknown Product"

records.append([
cve_id, description, cwe, cvss_base_score, cvss_severity,
cvss_vector, exploitability_score, impact_score, vendor,
product, cve_assigner, published_date_str, last_modified_date_str,
published_date, last_modified_date, published_year,
published_month, published_ym
])
cpe_found = True

if not cpe_found:
records.append([
cve_id, description, cwe, cvss_base_score, cvss_severity,
cvss_vector, exploitability_score, impact_score, None,
None, cve_assigner, published_date_str, last_modified_date_str,
published_date, last_modified_date, published_year,
published_month, published_ym
])

# Convert to DataFrame
columns = [
'CVE_ID', 'Description', 'CWE', 'CVSS_Base_Score', 'CVSS_Severity',
'CVSS_Vector', 'Exploitability_Score', 'Impact_Score', 'Vendor',
'Product', 'CVE_Assigner', 'Published_Date_Str', 'Last_Modified_Date_Str',
'Published_Date', 'Last_Modified_Date', 'Published_Year',
'Published_Month', 'Published_YearMonth'
]
df = pd.DataFrame(records, columns=columns)

# Merge with the CISA KEV data
kev_df = pd.read_csv(cisa_kev_path, parse_dates=['dateAdded'])
kev_df.rename(
columns={
'cveID': 'CVE_ID',
'dateAdded': 'KEV_DateAdded',
'vendorProject': 'KEV_Vendor',
'product': 'KEV_Product',
'shortDescription': 'KEV_ShortDescription',
'knownRansomwareCampaignUse': 'KEV_KnownRansomware'
},
inplace=True
)
kev_df['KEV_KnownRansomware'] = kev_df['KEV_KnownRansomware'].fillna("Unknown").str.capitalize()
kev_df['KEV_Notes'] = kev_df['notes'].str.split(';').str[0].str.strip()

df = df.merge(
kev_df[['CVE_ID', 'KEV_DateAdded', 'KEV_Vendor', 'KEV_Product', 'KEV_ShortDescription', 'KEV_KnownRansomware']],
on='CVE_ID',
how='left'
)
df['CISA_KEV'] = df['KEV_DateAdded'].notna()

# Define final column order
final_columns = [
'CVE_ID', 'Description', 'CWE', 'CVSS_Base_Score', 'CVSS_Severity',
'CVSS_Vector', 'Exploitability_Score', 'Impact_Score', 'Vendor',
'Product', 'CVE_Assigner', 'Published_Date', 'Last_Modified_Date',
'Published_Year', 'Published_Month', 'Published_YearMonth',
'CISA_KEV', 'KEV_DateAdded', 'KEV_Vendor', 'KEV_Product',
'KEV_ShortDescription', 'KEV_KnownRansomware'
]
df = df[final_columns]

# Save processed data to CSV
output_path = output_path_template.format(year)
df.to_csv(output_path, index=False, encoding='utf-8')

return df

# Process NVD data for 2023
df_2023 = process_nvd_data(2023)

# Process NVD data for 2024
df_2024 = process_nvd_data(2024)
df_2024.head()

```
Loading

0 comments on commit 3112e62

Please sign in to comment.