Skip to content

Commit

Permalink
Limit the size of manifests/signatures sync/upload
Browse files Browse the repository at this point in the history
Adds new settings to limit the size of manifests and signatures as a safeguard
to avoid DDoS attack during sync and upload operations.
Modify the blob upload to read the layers in chunks.

closes: pulp#532
  • Loading branch information
git-hyagi committed Oct 4, 2024
1 parent bb7e18c commit a24678e
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGES/532.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added a limit of 4MB to Manifests and Signatures, through the `MANIFEST_PAYLOAD_MAX_SIZE` and
`SIGNATURE_PAYLOAD_MAX_SIZE` settings, to protect against OOM DoS attacks.
16 changes: 16 additions & 0 deletions docs/admin/guides/limit-manifest-and-signature-sizes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Limit the size of Manifests and Signatures

By default, Pulp is configured to block the synchronization and upload of image Manifests and/or
Signatures if they exceed a 4MB size limit. A use case for this feature is to avoid OOM DoS attacks
when synchronizing remote repositories with malicious or compromised container images.
To define a different limit, use the following settings:
```
MANIFEST_PAYLOAD_MAX_SIZE=<bytes>
SIGNATURE_PAYLOAD_MAX_SIZE=<bytes>
```

for example, to modify the limits to 10MB:
```
MANIFEST_PAYLOAD_MAX_SIZE=10_000_000
SIGNATURE_PAYLOAD_MAX_SIZE=10_000_000
```
85 changes: 81 additions & 4 deletions pulp_container/app/downloaders.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,86 @@
import aiohttp
import asyncio
import fnmatch
import json
import re

from aiohttp.client_exceptions import ClientResponseError
from collections import namedtuple
from django.conf import settings
from logging import getLogger
from urllib import parse


from pulpcore.plugin.download import DownloaderFactory, HttpDownloader

from pulp_container.constants import V2_ACCEPT_HEADERS
from pulp_container.constants import (
MANIFEST_MEDIA_TYPES,
MEGABYTE,
V2_ACCEPT_HEADERS,
)
from pulp_container.app.exceptions import PayloadTooLarge

log = getLogger(__name__)

HeadResult = namedtuple(
"HeadResult",
["status_code", "path", "artifact_attributes", "url", "headers"],
)
DownloadResult = namedtuple("DownloadResult", ["url", "artifact_attributes", "path", "headers"])


class ValidateResourceSizeMixin:
async def _handle_response(self, response, content_type=None, max_body_size=None):
"""
Overrides the HttpDownloader method to be able to limit the request body size.
Handle the aiohttp response by writing it to disk and calculating digests
Args:
response (aiohttp.ClientResponse): The response to handle.
content_type (string): Type of the resource (manifest or signature) whose size
will be verified
max_body_size (int): Maximum allowed body size of the resource (manifest or signature).
Returns:
DownloadResult: Contains information about the result. See the DownloadResult docs for
more information.
"""
if self.headers_ready_callback:
await self.headers_ready_callback(response.headers)
total_size = 0
while True:
chunk = await response.content.read(MEGABYTE)
total_size += len(chunk)
if max_body_size and total_size > max_body_size:
await self.finalize()
raise PayloadTooLarge()
if not chunk:
await self.finalize()
break # the download is done
await self.handle_data(chunk)
return DownloadResult(
path=self.path,
artifact_attributes=self.artifact_attributes,
url=self.url,
headers=response.headers,
)

class RegistryAuthHttpDownloader(HttpDownloader):
def get_content_type_and_max_resource_size(self, response):
"""
Returns the content_type (manifest or signature) based on the HTTP request and also the
corresponding resource allowed maximum size.
"""
max_resource_size = None
content_type = response.content_type
is_cosign_tag = fnmatch.fnmatch(response.url.name, "sha256-*.sig")
if isinstance(self, NoAuthSignatureDownloader) or is_cosign_tag:
max_resource_size = settings["SIGNATURE_PAYLOAD_MAX_SIZE"]
content_type = "Signature"
elif content_type in MANIFEST_MEDIA_TYPES.IMAGE + MANIFEST_MEDIA_TYPES.LIST:
max_resource_size = settings["MANIFEST_PAYLOAD_MAX_SIZE"]
content_type = "Manifest"
return content_type, max_resource_size


class RegistryAuthHttpDownloader(ValidateResourceSizeMixin, HttpDownloader):
"""
Custom Downloader that automatically handles Token Based and Basic Authentication.
Expand Down Expand Up @@ -104,7 +164,10 @@ async def _run(self, handle_401=True, extra_data=None):
if http_method == "head":
to_return = await self._handle_head_response(response)
else:
to_return = await self._handle_response(response)
content_type, max_resource_size = self.get_content_type_and_max_resource_size(
response
)
to_return = await self._handle_response(response, content_type, max_resource_size)

await response.release()
self.response_headers = response.headers
Expand Down Expand Up @@ -193,7 +256,7 @@ async def _handle_head_response(self, response):
)


class NoAuthSignatureDownloader(HttpDownloader):
class NoAuthSignatureDownloader(ValidateResourceSizeMixin, HttpDownloader):
"""A downloader class suited for signature downloads."""

def raise_for_status(self, response):
Expand All @@ -208,6 +271,20 @@ def raise_for_status(self, response):
else:
response.raise_for_status()

async def _run(self, extra_data=None):
if self.download_throttler:
await self.download_throttler.acquire()
async with self.session.get(
self.url, proxy=self.proxy, proxy_auth=self.proxy_auth, auth=self.auth
) as response:
self.raise_for_status(response)
content_type, max_resource_size = self.get_content_type_and_max_resource_size(response)
to_return = await self._handle_response(response, content_type, max_resource_size)
await response.release()
if self._close_session_on_finalize:
await self.session.close()
return to_return


class NoAuthDownloaderFactory(DownloaderFactory):
"""
Expand Down
23 changes: 23 additions & 0 deletions pulp_container/app/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from rest_framework import status
from rest_framework.exceptions import APIException, NotFound, ParseError


Expand Down Expand Up @@ -151,3 +152,25 @@ def __init__(self, message):
]
}
)


class PayloadTooLarge(APIException):
"""An exception to render an HTTP 413 response."""

status_code = status.HTTP_413_REQUEST_ENTITY_TOO_LARGE
default_code = "payload_too_large"

def __init__(self, message=None):
"""Initialize the exception with the message for invalid manifest."""
message = message or "manifest invalid"
super().__init__(
detail={
"errors": [
{
"code": "MANIFEST_INVALID",
"message": message,
"detail": "http: request body too large",
}
]
}
)
43 changes: 28 additions & 15 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import re

from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError
from io import BytesIO
from itertools import chain
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
from tempfile import NamedTemporaryFile
Expand Down Expand Up @@ -69,6 +70,7 @@
ManifestNotFound,
ManifestInvalid,
ManifestSignatureInvalid,
PayloadTooLarge,
)
from pulp_container.app.redirects import (
FileStorageRedirects,
Expand All @@ -92,13 +94,14 @@
EMPTY_BLOB,
SIGNATURE_API_EXTENSION_VERSION,
SIGNATURE_HEADER,
SIGNATURE_PAYLOAD_MAX_SIZE,
SIGNATURE_TYPE,
V2_ACCEPT_HEADERS,
MEGABYTE,
)

log = logging.getLogger(__name__)


IGNORED_PULL_THROUGH_REMOTE_ATTRIBUTES = [
"remote_ptr",
"pulp_type",
Expand Down Expand Up @@ -790,7 +793,8 @@ def create_single_chunk_artifact(self, chunk):
with transaction.atomic():
# 1 chunk, create artifact right away
with NamedTemporaryFile("ab") as temp_file:
temp_file.write(chunk.read())
while subchunk := chunk.read(MEGABYTE):
temp_file.write(subchunk)
temp_file.flush()

uploaded_file = PulpTemporaryUploadedFile.from_file(
Expand Down Expand Up @@ -827,8 +831,7 @@ def create_blob(self, artifact, digest):

def single_request_upload(self, request, path, repository, digest):
"""Monolithic upload."""
chunk = request.META["wsgi.input"]
artifact = self.create_single_chunk_artifact(chunk)
artifact = self.create_single_chunk_artifact(request)
blob = self.create_blob(artifact, digest)
repository.pending_blobs.add(blob)
return BlobResponse(blob, path, 201, request)
Expand Down Expand Up @@ -867,7 +870,7 @@ def partial_update(self, request, path, pk=None):
"""
_, repository = self.get_dr_push(request, path)
upload = get_object_or_404(models.Upload, repository=repository, pk=pk)
chunk = request.META["wsgi.input"]
chunk = request.stream or BytesIO()
if range_header := request.headers.get("Content-Range"):
found = self.content_range_pattern.match(range_header)
if not found:
Expand All @@ -891,7 +894,7 @@ def partial_update(self, request, path, pk=None):
chunk = ContentFile(chunk.read())
upload.append(chunk, upload.size)
else:
artifact = self.create_single_chunk_artifact(chunk)
artifact = self.create_single_chunk_artifact(request)
upload.artifact = artifact
if not length:
length = artifact.size
Expand All @@ -912,10 +915,9 @@ def put(self, request, path, pk=None):
_, repository = self.get_dr_push(request, path)

digest = request.query_params["digest"]
chunk = request.META["wsgi.input"]
# last chunk (and the only one) from monolitic upload
# or last chunk from chunked upload
last_chunk = ContentFile(chunk.read())
# if request body is 0 no instance of stream will be created
chunk = request.stream or BytesIO()
last_chunk = ContentFile(request.read())
upload = get_object_or_404(models.Upload, pk=pk, repository=repository)

if artifact := upload.artifact:
Expand Down Expand Up @@ -1171,8 +1173,7 @@ def put(self, request, path, pk=None):
Responds with the actual manifest
"""
# iterate over all the layers and create
chunk = request.META["wsgi.input"]
artifact = self.receive_artifact(chunk)
artifact = self.receive_artifact(request)
manifest_digest = "sha256:{id}".format(id=artifact.sha256)

with storage.open(artifact.file.name, mode="rb") as artifact_file:
Expand Down Expand Up @@ -1379,8 +1380,11 @@ def receive_artifact(self, chunk):
subchunk = chunk.read(2000000)
if not subchunk:
break
temp_file.write(subchunk)
size += len(subchunk)
if size > settings["MANIFEST_PAYLOAD_MAX_SIZE"]:
temp_file.flush()
raise PayloadTooLarge()
temp_file.write(subchunk)
for algorithm in Artifact.DIGEST_FIELDS:
hashers[algorithm].update(subchunk)
temp_file.flush()
Expand Down Expand Up @@ -1451,9 +1455,18 @@ def put(self, request, path, pk):
except models.Manifest.DoesNotExist:
raise ManifestNotFound(reference=pk)

signature_payload = request.META["wsgi.input"].read(SIGNATURE_PAYLOAD_MAX_SIZE)
signature_max_size = settings["SIGNATURE_PAYLOAD_MAX_SIZE"]
if signature_max_size:
meta = request.META
try:
content_length = int(meta.get("CONTENT_LENGTH", meta.get("HTTP_CONTENT_LENGTH", 0)))
except (ValueError, TypeError):
content_length = 0
if content_length > signature_max_size:
raise PayloadTooLarge()

try:
signature_dict = json.loads(signature_payload)
signature_dict = json.loads(request.read(signature_max_size))
except json.decoder.JSONDecodeError:
raise ManifestSignatureInvalid(digest=pk)

Expand Down
3 changes: 3 additions & 0 deletions pulp_container/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@

# The number of allowed threads to sign manifests in parallel
MAX_PARALLEL_SIGNING_TASKS = 10

MANIFEST_PAYLOAD_MAX_SIZE = 4_000_000
SIGNATURE_PAYLOAD_MAX_SIZE = 4_000_000
21 changes: 20 additions & 1 deletion pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ManifestSignature,
Tag,
)
from pulp_container.app.exceptions import PayloadTooLarge
from pulp_container.app.utils import (
extract_data_from_signature,
urlpath_sanitize,
Expand Down Expand Up @@ -62,7 +63,14 @@ def __init__(self, remote, signed_only):

async def _download_manifest_data(self, manifest_url):
downloader = self.remote.get_downloader(url=manifest_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
try:
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
except PayloadTooLarge as e:
# if it failed to download the manifest, log the error and
# there is nothing to return
log.warning(e.args[0])
return None, None, None

with open(response.path, "rb") as content_file:
raw_bytes_data = content_file.read()
response.artifact_attributes["file"] = response.path
Expand Down Expand Up @@ -146,6 +154,11 @@ async def run(self):
for artifact in asyncio.as_completed(to_download_artifact):
content_data, raw_text_data, response = await artifact

# skip the current object if it failed to be downloaded
if not content_data:
await pb_parsed_tags.aincrement()
continue

digest = calculate_digest(raw_text_data)
tag_name = response.url.split("/")[-1]

Expand Down Expand Up @@ -542,6 +555,12 @@ async def create_signatures(self, man_dc, signature_source):
"{} is not accessible, can't sync an image signature. "
"Error: {} {}".format(signature_url, exc.status, exc.message)
)
except PayloadTooLarge as e:
log.warning(
"Failed to sync signature {}. Error: {}".format(signature_url, e.args[0])
)
signature_counter += 1
continue

with open(signature_download_result.path, "rb") as f:
signature_raw = f.read()
Expand Down
1 change: 0 additions & 1 deletion pulp_container/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,5 @@
SIGNATURE_HEADER = "X-Registry-Supports-Signatures"

MEGABYTE = 1_000_000
SIGNATURE_PAYLOAD_MAX_SIZE = 4 * MEGABYTE

SIGNATURE_API_EXTENSION_VERSION = 2

0 comments on commit a24678e

Please sign in to comment.