Skip to content

Commit

Permalink
Added suport for monolithic upload.
Browse files Browse the repository at this point in the history
closes #1219
  • Loading branch information
ipanova committed May 26, 2023
1 parent 63e33c2 commit 4d5b05d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGES/1219.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for monolithic upload.
98 changes: 63 additions & 35 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,19 +560,69 @@ def create(self, request, path):
"""
Create a new upload.
Note: We do not support monolithic upload.
"""
_, repository = self.get_dr_push(request, path, create=True)

if self.tries_to_mount_blob(request):
response = self.mount_blob(request, path, repository)
elif digest := request.query_params.get("digest"):
# if the digest parameter is present, the request body will be
# used to complete the upload in a single request.
# this is monolithic upload
response = self.single_request_upload(request, path, repository, digest)
else:
upload = models.Upload(repository=repository, size=0)
upload.save()
response = UploadResponse(upload=upload, path=path, request=request)

return response

def create_single_chunk_artifact(self, chunk):
with transaction.atomic():
# 1 chunk, create artifact right away
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
return artifact

def create_blob(self, artifact, digest):
with transaction.atomic():
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
# re-upload artifact in case it was previously removed.
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
return blob

def single_request_upload(self, request, path, repository, digest):
"""Monolithic upload."""
chunk = request.META["wsgi.input"]
artifact = self.create_single_chunk_artifact(chunk)
blob = self.create_blob(artifact, digest)
repository.pending_blobs.add(blob)
return BlobResponse(blob, path, 201, request)

@staticmethod
def tries_to_mount_blob(request):
"""Check if a client is trying to perform cross repository blob mounting."""
Expand Down Expand Up @@ -629,21 +679,7 @@ def partial_update(self, request, path, pk=None):
chunk = ContentFile(chunk.read())
upload.append(chunk, upload.size)
else:
# 1 chunk
# do not add to the upload, create artifact right away
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
File(open(temp_file.name, "rb"))
)
try:
artifact = Artifact.init_and_validate(uploaded_file)
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()
artifact = self.create_single_chunk_artifact(chunk)
upload.artifact = artifact
if not length:
length = artifact.size
Expand All @@ -657,11 +693,15 @@ def put(self, request, path, pk=None):
"""
Create a blob from uploaded chunks.
Note: We do not support monolithic upload.
This request makes the upload complete. It can whether carry a zero-length
body or last chunk can be uploaded.
"""
_, repository = self.get_dr_push(request, path)

digest = request.query_params["digest"]
chunk = request.META["wsgi.input"]
last_chunk = ContentFile(chunk.read())
upload = get_object_or_404(models.Upload, pk=pk, repository=repository)

if artifact := upload.artifact:
Expand All @@ -674,6 +714,9 @@ def put(self, request, path, pk=None):
for chunk in chunks:
temp_file.write(chunk.file.read())
chunk.file.close()
if last_chunk.size:
temp_file.write(last_chunk.read())
last_chunk.file.close()
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
Expand All @@ -689,23 +732,7 @@ def put(self, request, path, pk=None):
artifact = Artifact.objects.get(sha256=artifact.sha256)
artifact.touch()

with transaction.atomic():
try:
blob = models.Blob(digest=digest)
blob.save()
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])
blob = self.create_blob(artifact, digest)
upload.delete()

repository.pending_blobs.add(blob)
Expand Down Expand Up @@ -1022,6 +1049,7 @@ def _save_manifest(self, artifact, manifest_digest, content_type, config_blob=No
try:
ca.save()
except IntegrityError:
# re-upload artifact in case it was previously removed.
ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest)
if not ca.artifact:
ca.artifact = artifact
Expand Down

0 comments on commit 4d5b05d

Please sign in to comment.