Skip to content

Commit

Permalink
chore/improve inserts (teaxyz#7)
Browse files Browse the repository at this point in the history
* preload and cache versions and licenses

* buffered items

* write my own insert

* clean up insert_versions

* updated rest of pg functions

except package url

* params passed to functions

* uncomment fetch

* dupe username warning

* configurable fetch and warn on missing package

* wrong key

* licenses

* licenses, again

* user_versions bugs

* re-enable everything

* None for versions

* packages has to be first

* reuse caches

* monitor query stats

* dependencies cache / params
  • Loading branch information
sanchitram1 authored Oct 11, 2024
1 parent daf182a commit f111f66
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 265 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,23 @@ docker compose build
### start

Requires: build
Env: PKG_MANAGER=${PKG_MANAGER:-crates}
Env: CHAI_DATABASE_URL=${CHAI_DATABASE_URL:-"postgresql://postgres:s3cr3t@localhost:5435/chai"}

```sh
docker compose up -d
```

### test

Env: TEST=true
Env: DEBUG=true

```sh
docker compose up
```

### full-test

Requires: build
Env: PKG_MANAGER=${PKG_MANAGER:-crates}
Env: CHAI_DATABASE_URL=${CHAI_DATABASE_URL:-"postgresql://postgres:s3cr3t@localhost:5435/chai"}
Env: TEST=true
Env: DEBUG=true

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ services:
- PYTHONPATH=/app
- TEST=${TEST:-false}
- DEBUG=${DEBUG:-false}
- FETCH=${FETCH:-true}
volumes:
- .:/app
working_dir: /app
Expand Down
72 changes: 45 additions & 27 deletions monitor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from typing import Tuple, Optional, Dict
import docker
import json
from sqlalchemy import event
from sqlalchemy.orm import Session
from collections import defaultdict

PIPELINE_CONTAINER = "chai-oss-pipeline-1"
DATABASE_CONTAINER = "chai-oss-db-1"
Expand Down Expand Up @@ -57,6 +56,18 @@ def read_logs(logs: str) -> Tuple[int, int, float]:
return select_count, insert_count, total_sql_time


def capture_stats(container, start_time):
stats = get_container_stats(container)
if stats is None:
return None
end_time = time.time()
return {
"duration": end_time - start_time,
"max_cpu_percent": stats["cpu_percent"],
"max_memory_usage": stats["memory_usage"],
}


def monitor_pipeline() -> None:
client = docker.from_env()
pipeline_container = None
Expand All @@ -71,36 +82,43 @@ def monitor_pipeline() -> None:
except docker.errors.NotFound:
time.sleep(1)

# get the pipeline container stats
max_cpu_percent = 0
max_memory_usage = 0

while pipeline_container.status == "running":
stats = get_container_stats(pipeline_container)

if stats is None:
print("pipeline container stats not found...exiting")
# Initialize stats tracking
model_stats = defaultdict(
lambda: {"duration": 0, "max_cpu_percent": 0, "max_memory_usage": 0}
)
current_model = None
model_start_time = None

for line in pipeline_container.logs(stream=True, follow=True):
line = line.decode("utf-8").strip()

if "inserted" in line and "objects into" in line:
model_name = line.split("objects into")[-1].strip()

if current_model:
stats = capture_stats(pipeline_container, model_start_time)
if stats:
model_stats[current_model]["duration"] += stats["duration"]
model_stats[current_model]["max_cpu_percent"] = max(
model_stats[current_model]["max_cpu_percent"],
stats["max_cpu_percent"],
)
model_stats[current_model]["max_memory_usage"] = max(
model_stats[current_model]["max_memory_usage"],
stats["max_memory_usage"],
)

current_model = model_name
model_start_time = time.time()

if "✅ crates" in line:
break

max_cpu_percent = max(max_cpu_percent, stats["cpu_percent"])
max_memory_usage = max(max_memory_usage, stats["memory_usage"])
time.sleep(1)

end_time = time.time()
total_runtime = end_time - start_time

# parse db logs to extract query info
# logs = database_container.logs().decode("utf-8")
# select_count, insert_count, total_sql_time = read_logs(logs)

# prep report!
report = {
"total_runtime": total_runtime,
"max_cpu_percent": max_cpu_percent,
"max_memory_usage": max_memory_usage,
# "select_count": select_count,
# "insert_count": insert_count,
}
# Prepare the report
report = {"total_runtime": total_runtime, "model_stats": dict(model_stats)}

print("report:")
print(json.dumps(report, indent=2))
Expand Down
23 changes: 11 additions & 12 deletions src/pipeline/crates.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
class Config:
file_location: str
test: bool
fetch: bool
package_manager_id: str
url_types: URLTypes
user_types: UserTypes

def __str__(self):
return f"Config(file_location={self.file_location}, test={self.test}, \
package_manager_id={self.package_manager_id}, url_types={self.url_types}, \
user_types={self.user_types})"
fetch={self.fetch}, package_manager_id={self.package_manager_id}, \
url_types={self.url_types}, user_types={self.user_types})"


def initialize(db: DB) -> Config:
file_location = "https://static.crates.io/db-dump.tar.gz"
test = getenv("TEST", "false").lower() == "true"
fetch = getenv("FETCH", "true").lower() == "true"
package_manager = db.select_package_manager_by_name("crates", create=True)
homepage_url = db.select_url_types_homepage(create=True)
repository_url = db.select_url_types_repository(create=True)
Expand All @@ -47,6 +49,7 @@ def initialize(db: DB) -> Config:
return Config(
file_location=file_location,
test=test,
fetch=fetch,
package_manager_id=package_manager.id,
url_types=url_types,
user_types=user_types,
Expand All @@ -61,20 +64,15 @@ def fetch(config: Config) -> None:

def load(db: DB, transformer: CratesTransformer, config: Config) -> None:
db.insert_packages(transformer.packages(), config.package_manager_id, "crates")

# crates provides a gh_login for every single crate publisher
# so, we use the GitHub source as `source_id`
db.insert_users(transformer.users())
db.insert_user_packages(transformer.user_packages(), config.user_types.github)

# crates provides a homepage, repository, and documentation url for every crate
db.insert_versions(transformer.versions())
db.insert_users(transformer.users(), config.user_types.crates)
db.insert_user_packages(transformer.user_packages())
db.insert_urls(transformer.urls())

if not config.test:
# these are bigger files, so we skip them in tests
db.insert_versions(transformer.versions())
db.insert_user_versions(transformer.user_versions(), config.user_types.github)
db.insert_package_urls(transformer.package_urls())
# db.insert_package_urls(transformer.package_urls()) FIXME
db.insert_dependencies(transformer.dependencies())

db.insert_load_history(config.package_manager_id)
Expand All @@ -84,7 +82,8 @@ def load(db: DB, transformer: CratesTransformer, config: Config) -> None:
def main(db: DB) -> None:
config = initialize(db)
logger.debug(config)
fetch(config)
if config.fetch:
fetch(config)

transformer = CratesTransformer(config.url_types, config.user_types)
load(db, transformer, config)
Expand Down
18 changes: 2 additions & 16 deletions src/pipeline/utils/crates/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def dependencies(self) -> Generator[Dict[str, str], None, None]:
dependency_type = DependencyType(kind)

yield {
"start_id": start_id,
"end_id": end_id,
"version_id": start_id,
"crate_id": end_id,
"semver_range": req,
"dependency_type": dependency_type,
}
Expand Down Expand Up @@ -199,17 +199,3 @@ def package_urls(self) -> Generator[Dict[str, str], None, None]:
"url": documentation,
"url_type_id": self.url_types.documentation,
}


if __name__ == "__main__":
from src.pipeline.crates import initialize
from src.pipeline.utils.pg import DB

db = DB()
config = initialize(db)
x = CratesTransformer(config.url_types, config.user_types)

db.insert_urls(x.urls())
print("***** done urls *****")
db.insert_package_urls(x.package_urls())
print("***** done package urls *****")
Loading

0 comments on commit f111f66

Please sign in to comment.