Skip to content

Commit

Permalink
Merge pull request #31 from nanxstats/distributed
Browse files Browse the repository at this point in the history
Place dataset and loss into separate modules
  • Loading branch information
nanxstats authored Dec 27, 2024
2 parents 8553ef1 + d1fd0b2 commit b7cec3b
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 203 deletions.
9 changes: 9 additions & 0 deletions docs/reference/data.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Data

::: tinytopics.data
options:
members:
- NumpyDiskDataset
- IndexTrackingDataset
show_root_heading: true
show_source: false
1 change: 0 additions & 1 deletion docs/reference/fit.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@
options:
members:
- fit_model
- poisson_nmf_loss
show_root_heading: true
show_source: false
8 changes: 8 additions & 0 deletions docs/reference/loss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Losses

::: tinytopics.loss
options:
members:
- poisson_nmf_loss
show_root_heading: true
show_source: false
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ nav:
- API Reference:
- Fit: reference/fit.md
- Models: reference/models.md
- Loss: reference/loss.md
- Data: reference/data.md
- Plot: reference/plot.md
- Colors: reference/colors.md
- Utilities: reference/utils.md
Expand Down
12 changes: 3 additions & 9 deletions src/tinytopics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
"""
Topic modeling via sum-to-one constrained neural Poisson NMF.
Modules:
fit: Model fitting and loss calculation.
models: NeuralPoissonNMF model definition.
plot: Functions for plotting loss curves, document-topic distributions, and top terms.
colors: Color palettes.
utils: Utility functions for data generation, topic alignment, and document sorting.
"""

from .fit import fit_model
from .models import NeuralPoissonNMF
from .fit import fit_model, poisson_nmf_loss
from .loss import poisson_nmf_loss
from .data import NumpyDiskDataset
from .utils import (
set_random_seed,
generate_synthetic_data,
align_topics,
sort_documents,
NumpyDiskDataset,
)
from .colors import pal_tinytopics, scale_color_tinytopics
from .plot import plot_loss, plot_structure, plot_top_terms
79 changes: 79 additions & 0 deletions src/tinytopics/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from collections.abc import Sequence
from pathlib import Path

import torch
import numpy as np
from torch import Tensor
from torch.utils.data import Dataset


class IndexTrackingDataset(Dataset):
"""Dataset wrapper that tracks indices through shuffling"""

def __init__(self, dataset: Dataset | Tensor) -> None:
self.dataset = dataset
self.shape: tuple[int, int] = (
dataset.shape
if hasattr(dataset, "shape")
else (len(dataset), dataset[0].shape[0])
)
self.is_tensor: bool = isinstance(dataset, torch.Tensor)

def __len__(self) -> int:
return len(self.dataset)

def __getitem__(self, idx: int) -> tuple[Tensor, Tensor]:
return self.dataset[idx], torch.tensor(idx)


class NumpyDiskDataset(Dataset):
"""
A PyTorch Dataset class for loading document-term matrices from disk.
The dataset can be initialized with either a path to a `.npy` file or
a NumPy array. When a file path is provided, the data is accessed
lazily using memory mapping, which is useful for handling large datasets
that do not fit entirely in (CPU) memory.
"""

def __init__(
self, data: str | Path | np.ndarray, indices: Sequence[int] | None = None
) -> None:
"""
Args:
data: Either path to `.npy` file (str or Path) or numpy array.
indices: Optional sequence of indices to use as valid indices.
"""
if isinstance(data, (str, Path)):
data_path = Path(data)
if not data_path.exists():
raise FileNotFoundError(f"Data file not found: {data_path}")
# Get shape without loading full array
self.shape: tuple[int, int] = tuple(np.load(data_path, mmap_mode="r").shape)
self.data_path: Path = data_path
self.mmap_data: np.ndarray | None = None
else:
self.shape: tuple[int, int] = data.shape
self.data_path: None = None
self.data: np.ndarray = data

self.indices: Sequence[int] = indices or range(self.shape[0])

def __len__(self) -> int:
return len(self.indices)

def __getitem__(self, idx: int) -> torch.Tensor:
real_idx = self.indices[idx]

if self.data_path is not None:
# Load mmap data lazily
if self.mmap_data is None:
self.mmap_data = np.load(self.data_path, mmap_mode="r")
return torch.tensor(self.mmap_data[real_idx], dtype=torch.float32)
else:
return torch.tensor(self.data[real_idx], dtype=torch.float32)

@property
def num_terms(self) -> int:
"""Return vocabulary size (number of columns)."""
return self.shape[1]
38 changes: 2 additions & 36 deletions src/tinytopics/fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,9 @@
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

from .data import IndexTrackingDataset
from .models import NeuralPoissonNMF


def poisson_nmf_loss(X: Tensor, X_reconstructed: Tensor) -> Tensor:
"""
Compute the Poisson NMF loss function (negative log-likelihood).
Args:
X: Original document-term matrix.
X_reconstructed: Reconstructed matrix from the model.
Returns:
The computed Poisson NMF loss.
"""
epsilon: float = 1e-10
return (
X_reconstructed - X * torch.log(torch.clamp(X_reconstructed, min=epsilon))
).sum()


class IndexTrackingDataset(Dataset):
"""Dataset wrapper that tracks indices through shuffling"""

def __init__(self, dataset: Dataset | Tensor) -> None:
self.dataset = dataset
self.shape: tuple[int, int] = (
dataset.shape
if hasattr(dataset, "shape")
else (len(dataset), dataset[0].shape[0])
)
self.is_tensor: bool = isinstance(dataset, torch.Tensor)

def __len__(self) -> int:
return len(self.dataset)

def __getitem__(self, idx: int) -> tuple[Tensor, Tensor]:
return self.dataset[idx], torch.tensor(idx)
from .loss import poisson_nmf_loss


def fit_model(
Expand Down
19 changes: 19 additions & 0 deletions src/tinytopics/loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import torch
from torch import Tensor


def poisson_nmf_loss(X: Tensor, X_reconstructed: Tensor) -> Tensor:
"""
Compute the Poisson NMF loss function (negative log-likelihood).
Args:
X: Original document-term matrix.
X_reconstructed: Reconstructed matrix from the model.
Returns:
The computed Poisson NMF loss.
"""
epsilon: float = 1e-10
return (
X_reconstructed - X * torch.log(torch.clamp(X_reconstructed, min=epsilon))
).sum()
55 changes: 0 additions & 55 deletions src/tinytopics/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Tuple
from collections.abc import Sequence, MutableMapping
from collections import defaultdict
from pathlib import Path

import torch
import numpy as np
from torch.utils.data import Dataset
from scipy.optimize import linear_sum_assignment
from tqdm.auto import tqdm

Expand Down Expand Up @@ -131,56 +129,3 @@ def sort_topic_groups(grouped_docs: MutableMapping[int, list]) -> Sequence[int]:
doc_info = get_document_info()
grouped_docs = group_by_topic(doc_info)
return sort_topic_groups(grouped_docs)


class NumpyDiskDataset(Dataset):
"""
A PyTorch Dataset class for loading document-term matrices from disk.
The dataset can be initialized with either a path to a `.npy` file or
a NumPy array. When a file path is provided, the data is accessed
lazily using memory mapping, which is useful for handling large datasets
that do not fit entirely in (CPU) memory.
"""

def __init__(
self, data: str | Path | np.ndarray, indices: Sequence[int] | None = None
) -> None:
"""
Args:
data: Either path to `.npy` file (str or Path) or numpy array.
indices: Optional sequence of indices to use as valid indices.
"""
if isinstance(data, (str, Path)):
data_path = Path(data)
if not data_path.exists():
raise FileNotFoundError(f"Data file not found: {data_path}")
# Get shape without loading full array
self.shape: tuple[int, int] = tuple(np.load(data_path, mmap_mode="r").shape)
self.data_path: Path = data_path
self.mmap_data: np.ndarray | None = None
else:
self.shape: tuple[int, int] = data.shape
self.data_path: None = None
self.data: np.ndarray = data

self.indices: Sequence[int] = indices or range(self.shape[0])

def __len__(self) -> int:
return len(self.indices)

def __getitem__(self, idx: int) -> torch.Tensor:
real_idx = self.indices[idx]

if self.data_path is not None:
# Load mmap data lazily
if self.mmap_data is None:
self.mmap_data = np.load(self.data_path, mmap_mode="r")
return torch.tensor(self.mmap_data[real_idx], dtype=torch.float32)
else:
return torch.tensor(self.data[real_idx], dtype=torch.float32)

@property
def num_terms(self) -> int:
"""Return vocabulary size (number of columns)."""
return self.shape[1]
90 changes: 90 additions & 0 deletions tests/test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pytest
import torch
import numpy as np

from tinytopics.data import NumpyDiskDataset


def test_numpy_disk_dataset_from_array():
"""Test NumpyDiskDataset with direct numpy array input."""
data = np.random.rand(10, 5).astype(np.float32)

dataset = NumpyDiskDataset(data)

# Test basic properties
assert len(dataset) == 10
assert dataset.num_terms == 5
assert dataset.shape == (10, 5)

# Test data access
for i in range(len(dataset)):
item = dataset[i]
assert isinstance(item, torch.Tensor)
assert item.shape == (5,)
assert torch.allclose(item, torch.tensor(data[i], dtype=torch.float32))


def test_numpy_disk_dataset_from_file(tmp_path):
"""Test NumpyDiskDataset with .npy file input."""
data = np.random.rand(10, 5).astype(np.float32)
file_path = tmp_path / "test_data.npy"
np.save(file_path, data)

dataset = NumpyDiskDataset(file_path)

# Test basic properties
assert len(dataset) == 10
assert dataset.num_terms == 5
assert dataset.shape == (10, 5)

# Test data access
for i in range(len(dataset)):
item = dataset[i]
assert isinstance(item, torch.Tensor)
assert item.shape == (5,)
assert torch.allclose(item, torch.tensor(data[i], dtype=torch.float32))


def test_numpy_disk_dataset_with_indices():
"""Test NumpyDiskDataset with custom indices."""
data = np.random.rand(10, 5).astype(np.float32)
indices = [3, 1, 4]

dataset = NumpyDiskDataset(data, indices=indices)

# Test basic properties
assert len(dataset) == len(indices)
assert dataset.num_terms == 5
assert dataset.shape == (10, 5)

# Test data access
for i, orig_idx in enumerate(indices):
item = dataset[i]
assert isinstance(item, torch.Tensor)
assert item.shape == (5,)
assert torch.allclose(item, torch.tensor(data[orig_idx], dtype=torch.float32))


def test_numpy_disk_dataset_file_not_found():
"""Test NumpyDiskDataset with non-existent file."""
with pytest.raises(FileNotFoundError):
NumpyDiskDataset("non_existent_file.npy")


def test_numpy_disk_dataset_memory_efficiency(tmp_path):
"""Test that NumpyDiskDataset uses memory mapping efficiently."""
shape = (1000, 500) # 500K elements
data = np.random.rand(*shape).astype(np.float32)
file_path = tmp_path / "large_data.npy"
np.save(file_path, data)

dataset = NumpyDiskDataset(file_path)

# Access data in random order
indices = np.random.permutation(shape[0])[:100] # Sample 100 random rows
for idx in indices:
item = dataset[idx]
assert torch.allclose(item, torch.tensor(data[idx], dtype=torch.float32))

# Memory mapping should be initialized only after first access
assert dataset.mmap_data is not None
16 changes: 1 addition & 15 deletions tests/test_fit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
import torch

from tinytopics.fit import poisson_nmf_loss, fit_model
from tinytopics.fit import fit_model
from tinytopics.utils import set_random_seed, generate_synthetic_data

# Test data dimensions
Expand All @@ -17,20 +17,6 @@ def sample_data():
return generate_synthetic_data(n=N_DOCS, m=N_TERMS, k=N_TOPICS)


def test_poisson_nmf_loss():
"""Test the Poisson NMF loss function."""
X = torch.tensor([[1.0, 2.0], [3.0, 4.0]])
X_reconstructed = torch.tensor([[1.1, 1.9], [2.9, 4.1]])

loss = poisson_nmf_loss(X, X_reconstructed)

# Test with perfect reconstruction
perfect_loss = poisson_nmf_loss(X, X)

# Perfect reconstruction should have lower loss
assert perfect_loss < loss


def test_fit_model_basic(sample_data):
"""Test basic model fitting functionality."""
X, _, _ = sample_data
Expand Down
Loading

0 comments on commit b7cec3b

Please sign in to comment.