From cd37507523315e307f45fb712ab7253161af37f3 Mon Sep 17 00:00:00 2001 From: Ina Panova Date: Tue, 2 May 2023 12:59:56 +0200 Subject: [PATCH] Added suport for monolithic upload. closes #1219 --- CHANGES/1219.feature | 1 + pulp_container/app/registry_api.py | 102 +++++++++++++++++++---------- 2 files changed, 68 insertions(+), 35 deletions(-) create mode 100644 CHANGES/1219.feature diff --git a/CHANGES/1219.feature b/CHANGES/1219.feature new file mode 100644 index 000000000..dda24782b --- /dev/null +++ b/CHANGES/1219.feature @@ -0,0 +1 @@ +Added support for monolithic upload. diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 489bb3f44..9e5d68c05 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -560,19 +560,69 @@ def create(self, request, path): """ Create a new upload. - Note: We do not support monolithic upload. """ _, repository = self.get_dr_push(request, path, create=True) if self.tries_to_mount_blob(request): response = self.mount_blob(request, path, repository) + elif digest := request.query_params.get("digest"): + # if the digest parameter is present, the request body will be + # used to complete the upload in a single request. + # this is monolithic upload + response = self.single_request_upload(request, path, repository, digest) else: upload = models.Upload(repository=repository, size=0) upload.save() response = UploadResponse(upload=upload, path=path, request=request) - return response + def create_single_chunk_artifact(self, chunk): + with transaction.atomic(): + # 1 chunk, create artifact right away + with NamedTemporaryFile("ab") as temp_file: + temp_file.write(chunk.read()) + temp_file.flush() + + uploaded_file = PulpTemporaryUploadedFile.from_file( + File(open(temp_file.name, "rb")) + ) + try: + artifact = Artifact.init_and_validate(uploaded_file) + artifact.save() + except IntegrityError: + artifact = Artifact.objects.get(sha256=artifact.sha256) + artifact.touch() + return artifact + + def create_blob(self, artifact, digest): + with transaction.atomic(): + try: + blob = models.Blob(digest=digest) + blob.save() + except IntegrityError: + blob = models.Blob.objects.get(digest=digest) + blob.touch() + try: + blob_artifact = ContentArtifact( + artifact=artifact, content=blob, relative_path=digest + ) + blob_artifact.save() + except IntegrityError: + # re-upload artifact in case it was previously removed. + ca = ContentArtifact.objects.get(content=blob, relative_path=digest) + if not ca.artifact: + ca.artifact = artifact + ca.save(update_fields=["artifact"]) + return blob + + def single_request_upload(self, request, path, repository, digest): + """Monolithic upload.""" + chunk = request.META["wsgi.input"] + artifact = self.create_single_chunk_artifact(chunk) + blob = self.create_blob(artifact, digest) + repository.pending_blobs.add(blob) + return BlobResponse(blob, path, 201, request) + @staticmethod def tries_to_mount_blob(request): """Check if a client is trying to perform cross repository blob mounting.""" @@ -617,6 +667,8 @@ def partial_update(self, request, path, pk=None): length = end - start + 1 else: + # podman client claims to do chunked upload, but does not send the range header + # it sends just one chunk in PATCH and nothing in PUT length = int(request.headers.get("Content-Length", 0)) start = 0 @@ -629,21 +681,7 @@ def partial_update(self, request, path, pk=None): chunk = ContentFile(chunk.read()) upload.append(chunk, upload.size) else: - # 1 chunk - # do not add to the upload, create artifact right away - with NamedTemporaryFile("ab") as temp_file: - temp_file.write(chunk.read()) - temp_file.flush() - - uploaded_file = PulpTemporaryUploadedFile.from_file( - File(open(temp_file.name, "rb")) - ) - try: - artifact = Artifact.init_and_validate(uploaded_file) - artifact.save() - except IntegrityError: - artifact = Artifact.objects.get(sha256=artifact.sha256) - artifact.touch() + artifact = self.create_single_chunk_artifact(chunk) upload.artifact = artifact if not length: length = artifact.size @@ -657,11 +695,17 @@ def put(self, request, path, pk=None): """ Create a blob from uploaded chunks. - Note: We do not support monolithic upload. + This request makes the upload complete. It can whether carry a zero-length + body or last chunk can be uploaded. + """ _, repository = self.get_dr_push(request, path) digest = request.query_params["digest"] + chunk = request.META["wsgi.input"] + # last chunk (and the only one) from monolitic upload + # or last chunk from chunked upload + last_chunk = ContentFile(chunk.read()) upload = get_object_or_404(models.Upload, pk=pk, repository=repository) if artifact := upload.artifact: @@ -674,6 +718,9 @@ def put(self, request, path, pk=None): for chunk in chunks: temp_file.write(chunk.file.read()) chunk.file.close() + if last_chunk.size: + temp_file.write(last_chunk.read()) + last_chunk.file.close() temp_file.flush() uploaded_file = PulpTemporaryUploadedFile.from_file( @@ -689,23 +736,7 @@ def put(self, request, path, pk=None): artifact = Artifact.objects.get(sha256=artifact.sha256) artifact.touch() - with transaction.atomic(): - try: - blob = models.Blob(digest=digest) - blob.save() - except IntegrityError: - blob = models.Blob.objects.get(digest=digest) - blob.touch() - try: - blob_artifact = ContentArtifact( - artifact=artifact, content=blob, relative_path=digest - ) - blob_artifact.save() - except IntegrityError: - ca = ContentArtifact.objects.get(content=blob, relative_path=digest) - if not ca.artifact: - ca.artifact = artifact - ca.save(update_fields=["artifact"]) + blob = self.create_blob(artifact, digest) upload.delete() repository.pending_blobs.add(blob) @@ -1022,6 +1053,7 @@ def _save_manifest(self, artifact, manifest_digest, content_type, config_blob=No try: ca.save() except IntegrityError: + # re-upload artifact in case it was previously removed. ca = ContentArtifact.objects.get(content=manifest, relative_path=manifest.digest) if not ca.artifact: ca.artifact = artifact