Skip to content

Commit

Permalink
NIFI-13652 Added Range Header Handling for Content Download (#9172)
Browse files Browse the repository at this point in the history
This closes #9172
  • Loading branch information
exceptionfactory authored Aug 14, 2024
1 parent 60e9918 commit 89c7579
Show file tree
Hide file tree
Showing 15 changed files with 905 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper;
import org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper;
import org.apache.nifi.web.api.config.NotFoundExceptionMapper;
import org.apache.nifi.web.api.config.RangeNotSatisfiableExceptionMapper;
import org.apache.nifi.web.api.config.ResourceNotFoundExceptionMapper;
import org.apache.nifi.web.api.config.ThrowableMapper;
import org.apache.nifi.web.api.config.UnknownNodeExceptionMapper;
Expand Down Expand Up @@ -131,6 +132,7 @@ public NiFiWebApiResourceConfig(@Context ServletContext servletContext) {
register(NoResponseFromNodesExceptionMapper.class);
register(NodeDisconnectionExceptionMapper.class);
register(NodeReconnectionExceptionMapper.class);
register(RangeNotSatisfiableExceptionMapper.class);
register(ResourceNotFoundExceptionMapper.class);
register(NotFoundExceptionMapper.class);
register(UnknownNodeExceptionMapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.nifi.web.api;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import io.swagger.v3.oas.annotations.Operation;
Expand All @@ -33,13 +30,13 @@
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
Expand All @@ -52,7 +49,6 @@
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.DropRequestDTO;
Expand All @@ -65,6 +61,7 @@
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder;
import org.apache.nifi.web.util.ResponseBuilderUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
Expand Down Expand Up @@ -194,7 +191,6 @@ public Response getFlowFile(
* @param flowFileUuid The flowfile uuid
* @param clusterNodeId The cluster node id
* @return The content stream
* @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
Expand All @@ -209,14 +205,20 @@ public Response getFlowFile(
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested")
}
)
public Response downloadFlowFileContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter(
description = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response."
)
Expand Down Expand Up @@ -257,30 +259,14 @@ public Response downloadFlowFileContent(
// get the uri of the request
final String uri = generateResourceUri("flowfile-queues", connectionId, "flowfiles", flowFileUuid, "content");

// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(connectionId, flowFileUuid, uri);
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());

// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);

// flush the response
output.flush();
}
}
};

// use the appropriate content type
String contentType = content.getType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}

final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
responseBuilder.type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
Expand All @@ -48,7 +48,6 @@
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
Expand All @@ -59,15 +58,13 @@
import org.apache.nifi.web.api.entity.ReplayLastEventSnapshotDTO;
import org.apache.nifi.web.api.entity.SubmitReplayRequestEntity;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.nifi.web.api.streaming.StreamingOutputResponseBuilder;
import org.apache.nifi.web.util.ResponseBuilderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;

Expand Down Expand Up @@ -104,14 +101,20 @@ public class ProvenanceEventResource extends ApplicationResource {
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested")
}
)
public Response getInputContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter(
description = "The id of the node where the content exists if clustered."
)
Expand Down Expand Up @@ -140,30 +143,13 @@ public Response getInputContent(
// get the uri of the request
final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "input");

// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.INPUT);

// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);

// flush the response
output.flush();
}
}
};

// use the appropriate content type
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());
String contentType = content.getType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}

final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
responseBuilder.type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
}

Expand All @@ -188,14 +174,20 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "206", description = "Partial Content with range of bytes requested"),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it."),
@ApiResponse(responseCode = "416", description = "Requested Range Not Satisfiable based on bytes requested"),
}
)
public Response getOutputContent(
@Parameter(
description = "Range of bytes requested"
)
@HeaderParam("Range") final String rangeHeader,
@Parameter(
description = "The id of the node where the content exists if clustered."
)
Expand Down Expand Up @@ -224,30 +216,13 @@ public Response getOutputContent(
// get the uri of the request
final String uri = generateResourceUri("provenance", "events", String.valueOf(id.getLong()), "content", "output");

// get an input stream to the content
final DownloadableContent content = serviceFacade.getContent(id.getLong(), uri, ContentDirection.OUTPUT);

// generate a streaming response
final StreamingOutput response = new StreamingOutput() {
@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
try (InputStream is = content.getContent()) {
// stream the content to the response
StreamUtils.copy(is, output);

// flush the response
output.flush();
}
}
};

// use the appropriate content type
final Response.ResponseBuilder responseBuilder = noCache(new StreamingOutputResponseBuilder(content.getContent()).range(rangeHeader).build());
String contentType = content.getType();
if (contentType == null) {
contentType = MediaType.APPLICATION_OCTET_STREAM;
}

final Response.ResponseBuilder responseBuilder = generateOkResponse(response).type(contentType);
responseBuilder.type(contentType);
return ResponseBuilderUtils.setContentDisposition(responseBuilder, content.getFilename()).build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.config;

import jakarta.ws.rs.core.MediaType;
import org.apache.nifi.web.api.streaming.RangeNotSatisfiableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.ExceptionMapper;
import jakarta.ws.rs.ext.Provider;

/**
* Map Range Not Satisfiable Exception to HTTP 416 Responses
*/
@Provider
public class RangeNotSatisfiableExceptionMapper implements ExceptionMapper<RangeNotSatisfiableException> {

private static final Logger logger = LoggerFactory.getLogger(RangeNotSatisfiableExceptionMapper.class);

@Override
public Response toResponse(final RangeNotSatisfiableException exception) {
logger.info("HTTP 416 Range Not Satisfiable: {}", exception.getMessage());

return Response.status(Response.Status.REQUESTED_RANGE_NOT_SATISFIABLE)
.entity(exception.getMessage())
.type(MediaType.TEXT_PLAIN_TYPE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;

import java.util.Objects;
import java.util.OptionalLong;

/**
* Range of bytes requested as described in RFC 9110 Section 14.1.2 with optional first and last positions
*/
public class ByteRange {
private final Long firstPosition;

private final Long lastPosition;

public ByteRange(final Long firstPosition, final Long lastPosition) {
if (firstPosition == null) {
Objects.requireNonNull(lastPosition, "Last Position required");
}
this.firstPosition = firstPosition;
this.lastPosition = lastPosition;
}

/**
* Get first position in byte range which can be empty indicating the last position must be specified
*
* @return First position starting with 0 or empty
*/
public OptionalLong getFirstPosition() {
return firstPosition == null ? OptionalLong.empty() : OptionalLong.of(firstPosition);
}

/**
* Get last position in byte range which can empty indicating the first position must be specified
*
* @return Last position starting with 0 or empty
*/
public OptionalLong getLastPosition() {
return lastPosition == null ? OptionalLong.empty() : OptionalLong.of(lastPosition);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.streaming;

/**
* Byte Range Format Exception indicating invalid units specified in Range Header
*/
public class ByteRangeFormatException extends IllegalArgumentException {

public ByteRangeFormatException(final String message) {
super(message);
}
}
Loading

0 comments on commit 89c7579

Please sign in to comment.