Skip to content

Commit

Permalink
Added multiple task assignment implementations for source connectors.
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Nov 14, 2024
1 parent 5d3450b commit 4ae9732
Show file tree
Hide file tree
Showing 7 changed files with 553 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* FilenamePartitionTaskAssignmentStrategy will determine which files to process for each task, based on the partition
* number defined in the filename e.g. the default object names created by the sink connector ex.
* topicname-{{partition}}-{{startoffset}} the partition can be extracted to have one task running per partition.
*
*/
public final class FilenamePartitionTaskAssignmentStrategy implements TaskAssignmentStrategy {
private final static Logger LOG = LoggerFactory.getLogger(FilenamePartitionTaskAssignmentStrategy.class);
private final static String NUMBER_REGEX_PATTERN = "(\\d)+";
// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
private final static String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<partition>\\d)+";
private final static String PARTITION_PATTERN = "\\{\\{partition}}";
private final static String START_OFFSET_PATTERN = "\\{\\{start_offset}}";
private final static String TIMESTAMP_PATTERN = "\\{\\{timestamp}}";
public static final String PARTITION = "partition";
private Pattern partitionPattern;

private int maxTasks;

FilenamePartitionTaskAssignmentStrategy(final int maxTasks, final String expectedSourceNameFormat) {
configureTaskAssignment(maxTasks, expectedSourceNameFormat);
}

/**
*
* @param sourceNameToBeEvaluated
* is the filename/table name of the source for the connector.
* @return Predicate to confirm if the given source name matches
*/
@Override
public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated) {
final Matcher match = partitionPattern.matcher(sourceNameToBeEvaluated);
if (match.find()) {
return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(match.group(PARTITION)));
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;

}

/**
* When a connector reconfiguration event is received this method should be called to ensure the correct Startegy is
* being implemented by the connector.
*
* @param maxTasks
* maximum number of configured tasks for this connector
* @param expectedSourceNameFormat
* what the format of the source should appear like so to configure the task distribution.
*/
@Override
public void reconfigureTaskAssignmentStrategy(final int maxTasks, final String expectedSourceNameFormat) {
configureTaskAssignment(maxTasks, expectedSourceNameFormat);
}

private void configureTaskAssignment(final int maxTasks, final String expectedSourceNameFormat) {
if (!expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(
"Partition pattern {{partition}} not found, please configure the expected source to include the partition pattern.");
}
setMaxTasks(maxTasks);
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
partitionPattern = Pattern.compile(regexString);
}

private void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

/**
* HashTaskAssignmentStrategy determines which files should be executed by a task by the filename and path supplied by
* the iterator. The RandomTaskAssignment is perfect in use cases where ordering of events is not a requirement and when
* adding files to kafka where the files were not previously created by a supported S3 Sink or where manually created or
* created by another process.
*/
public final class HashTaskAssignmentStrategy implements TaskAssignmentStrategy {
private int maxTasks;
HashTaskAssignmentStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) {
final int taskAssignment = Math.floorMod(filenameToBeEvaluated.hashCode(), maxTasks);
// floor mod returns the remainder of a division so will start at 0 and move up
// tasks start at 1 and so we simply add one to the task assignment to get the correct thread to
// process the task.
return taskAssignment == taskId;
}

@Override
public void reconfigureTaskAssignmentStrategy(final int maxTasks, final String expectedFormat) {
setMaxTasks(maxTasks);
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* PartitionPathTaskAssignmentStrategy allows source connectors to distribute tasks based off the folder structure that
* partition number that is defined in that structure. for example /PREFIX/partition={{partition}}/YYYY/MM/DD/mm this
* will split all tasks by the number of unique partitions defined in the storage path. e.g. Task distribution in
* Connect with 10 Partitions and 3 tasks |Task | Partition| |0|0| |1|1| |2|2| |0|3| |1|4| |2|5| |0|6| |1|7| |2|8| |0|9|
*/
public final class PartitionPathTaskAssignmentStrategy implements TaskAssignmentStrategy {
public static final String PARTITION_NUMBER_PATTERN = "\\{\\{partition}}";
private final static Logger LOG = LoggerFactory.getLogger(PartitionPathTaskAssignmentStrategy.class);

private String prefix;
private int maxTasks;

PartitionPathTaskAssignmentStrategy(final int maxTasks, final String expectedPathFormat) {
configureTaskAssignment(maxTasks, expectedPathFormat);
}

@Override
public boolean isPartOfTask(final int taskId, final String valueToBeEvaluated) {
if (!valueToBeEvaluated.startsWith(prefix)) {
LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup",
valueToBeEvaluated, prefix);
return false;
}
String value = StringUtils.substringAfter(valueToBeEvaluated, prefix);
if (!value.contains("/")) {
LOG.warn("Ignoring path {}, does not contain any sub folders after partitionId prefix {}",
valueToBeEvaluated, prefix);
return false;
}
value = StringUtils.substringBefore(value, "/");

try {
return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(value));
} catch (NumberFormatException ex) {
throw new ConnectException(String
.format("Unexpected non integer value found parsing path for partitionId: %s", valueToBeEvaluated));
}
}

/**
*
* @param maxTasks
* The maximum number of configured tasks for this
* @param expectedPathFormat
* The format of the path and where to identify
*/

@Override
public void reconfigureTaskAssignmentStrategy(final int maxTasks, final String expectedPathFormat) {
configureTaskAssignment(maxTasks, expectedPathFormat);

}

private void configureTaskAssignment(final int maxTasks, final String expectedPathFormat) {
setMaxTasks(maxTasks);

if (!expectedPathFormat.contains(PARTITION_NUMBER_PATTERN)) {
throw new ConfigException(String.format(
"Expected path format is missing the identifier '%s' to correctly select the partition",
PARTITION_NUMBER_PATTERN));
}
prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_NUMBER_PATTERN);
}

private void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

/**
* TaskAssignment is a common interface which allows source connectors to determine what method of distributing tasks
* amongst source connectors in distributed mode.
*/
public interface TaskAssignmentStrategy {

boolean isPartOfTask(int taskId, String valueToBeEvaluated);

/**
* When a connector receives a reconfigure event this method should be called to ensure that the task Assignment
* strategy is updated correctly.
*
* @param maxTasks
* The maximum number of tasks created for the Connector
* @param expectedFormat
* The expected format, of files, path, table names or other ways to partition the tasks.
*/
void reconfigureTaskAssignmentStrategy(int maxTasks, String expectedFormat);

/**
* Check if the task is responsible for this set of files by checking if the given task matches the partition id.
*
* @param taskId
* the current running task
* @param partitionId
* The partitionId recovered from the file path.
* @return true if this task is responsible for this partition. false if it is not responsible for this task.
*/
default boolean taskMatchesPartition(final int taskId, final int partitionId) {
// The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
// will break.
return taskId == partitionId;
}

/**
* In the event of more partitions existing then tasks configured, the task will be required to take up additional
* tasks that match.
*
* @param taskId
* the current running task.
* @param maxTasks
* The maximum number of configured tasks allowed to run for this connector.
* @param partitionId
* The partitionId recovered from the file path.
* @return true if the task supplied should handle the supplied partition
*/
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {

return taskMatchesPartition(taskId, partitionId % maxTasks);

}

default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionNumber) {
return partitionNumber < maxTasks
? taskMatchesPartition(taskId, partitionNumber)
: taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionNumber);

}
}
Loading

0 comments on commit 4ae9732

Please sign in to comment.