diff --git a/Dockerfile b/Dockerfile index e3e9be1b..dc34311e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,7 @@ COPY ./kafka-connect-fitbit-source/src/ /code/kafka-connect-fitbit-source/src RUN ./gradlew jar -FROM confluentinc/cp-kafka-connect-base:5.1.0 +FROM confluentinc/cp-kafka-connect-base:5.5.0 MAINTAINER Joris Borgdorff diff --git a/README.md b/README.md index 0bd6bb40..88279ca3 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,12 @@ your Fitbit App client ID and client secret. The following tables shows the poss fitbit.user.repository.urlURL for webservice containing user credentials. Only used if a webservice-based user repository is configured.string""low +fitbit.user.repository.client.idClient ID for connecting to the service repository.string""medium + +fitbit.user.repository.client.secretClient secret for connecting to the service repository.string""medium + +fitbit.user.repository.oauth2.token.urlOAuth 2.0 token url for retrieving client credentials.string""medium + fitbit.intraday.steps.topicTopic for Fitbit intraday stepsstringconnect_fitbit_intraday_stepsnon-empty string without control characterslow fitbit.intraday.heart.rate.topicTopic for Fitbit intraday heart_ratestringconnect_fitbit_intraday_heart_ratenon-empty string without control characterslow @@ -76,8 +82,26 @@ your Fitbit App client ID and client secret. The following tables shows the poss fitbit.activity.log.topicTopic for Fitbit activity log.stringconnect_fitbit_activity_lognon-empty string without control characterslow fitbit.intraday.calories.topicTopic for Fitbit intraday caloriesstringconnect_fitbit_intraday_caloriesnon-empty string without control characterslow + +fitbit.user.firebase.collection.fitbit.nameFirestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.stringfitbitlow + +fitbit.user.firebase.collection.user.nameFirestore Collection for retrieving User details. Only used when a Firebase based user repository is used.stringuserslow +If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties: + +``` +Client ID: fitbit.user.repository.client.id +Client Secret: fitbit.user.repository.client.secret +Scope: SUBJECT.READ +Resources: res_restAuthorizer +Grant types: client_credentials +Access Token validity: 600 +Refresh Token validity: 0 +``` + +Finally set the `fitbit.user.repository.oauth.token.url` to `http://managementportal-app:8080/managementportal/oauth/token`. + Now you can run a full Kafka stack using ```shell diff --git a/build.gradle b/build.gradle index 6dff616c..d58194dc 100644 --- a/build.gradle +++ b/build.gradle @@ -2,16 +2,16 @@ description = 'kafka-connect-rest-source' subprojects { ext { - kafkaVersion = '2.2.0-cp2' - confluentVersion = '5.2.1' - jacksonVersion = '2.9.9' + kafkaVersion = '2.5.0' + confluentVersion = '5.5.0' + jacksonVersion = '2.11.0' } apply plugin: 'java' apply plugin: 'java-library' group = 'org.radarbase' - version = '0.3.0' + version = '0.3.1' sourceCompatibility = 1.8 targetCompatibility = 1.8 @@ -21,12 +21,13 @@ subprojects { maven { url "https://packages.confluent.io/maven/" } maven { url "https://repo.maven.apache.org/maven2" } jcenter() + maven { url "https://dl.bintray.com/radar-cns/org.radarcns" } maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local/' } } } wrapper { - gradleVersion '5.4.1' + gradleVersion '6.4.1' } evaluationDependsOnChildren() diff --git a/docker/kafka-wait b/docker/kafka-wait index a971cd30..98d8b8d1 100755 --- a/docker/kafka-wait +++ b/docker/kafka-wait @@ -18,12 +18,11 @@ max_timeout=32 tries=10 timeout=1 while true; do - ZOOKEEPER_CHECK=$(zookeeper-shell ${CONNECT_ZOOKEEPER_CONNECT} <<< "ls /brokers/ids" | tail -2 | head -1) + ZOOKEEPER_CHECK=$(zookeeper-shell ${CONNECT_ZOOKEEPER_CONNECT} <<< "ls /brokers/ids" | tail -n 1) echo "Zookeeper response: ${ZOOKEEPER_CHECK}" - # ZOOKEEPER_CHECK="${ZOOKEEPER_CHECK##*$'\n'}" ZOOKEEPER_CHECK="$(echo -e "${ZOOKEEPER_CHECK}" | tr -d '[:space:]' | tr -d '[' | tr -d ']')" - IFS=',' read -r -a array <<< ${ZOOKEEPER_CHECK} + IFS=',' read -r -a array <<< "${ZOOKEEPER_CHECK}" LENGTH=${#array[@]} if [ "$LENGTH" -eq "$KAFKA_BROKERS" ]; then echo "Kafka brokers available." diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 5c2d1cf0..62d4c053 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index f4d7b2bf..a4f0001d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index b0d6d0ab..fbd7c515 100755 --- a/gradlew +++ b/gradlew @@ -7,7 +7,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -82,6 +82,7 @@ esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then @@ -125,10 +126,11 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath @@ -154,19 +156,19 @@ if $cygwin ; then else eval `echo args$i`="\"$arg\"" fi - i=$((i+1)) + i=`expr $i + 1` done case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; esac fi @@ -175,14 +177,9 @@ save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } -APP_ARGS=$(save "$@") +APP_ARGS=`save "$@"` # Collect all arguments for the java command, following the shell quoting and substitution rules eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" -# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong -if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then - cd "$(dirname "$0")" -fi - exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index 15e1ee37..a9f778a7 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -5,7 +5,7 @@ @rem you may not use this file except in compliance with the License. @rem You may obtain a copy of the License at @rem -@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem https://www.apache.org/licenses/LICENSE-2.0 @rem @rem Unless required by applicable law or agreed to in writing, software @rem distributed under the License is distributed on an "AS IS" BASIS, @@ -29,6 +29,9 @@ if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" @@ -81,6 +84,7 @@ set CMD_LINE_ARGS=%* set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + @rem Execute Gradle "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% diff --git a/kafka-connect-fitbit-source/build.gradle b/kafka-connect-fitbit-source/build.gradle index b05f1632..1ae0fe16 100644 --- a/kafka-connect-fitbit-source/build.gradle +++ b/kafka-connect-fitbit-source/build.gradle @@ -3,6 +3,7 @@ dependencies { api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.3' + implementation group: 'org.radarcns', name: 'oauth-client-util', version: '0.5.8' implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index 9611cf4a..5e8e0f1b 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -20,6 +20,8 @@ import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; @@ -122,6 +124,19 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig { private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic"; private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories"; + + public static final String FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG = "fitbit.user.repository.client.id"; + private static final String FITBIT_USER_REPOSITORY_CLIENT_ID_DOC = "Client ID for connecting to the service repository."; + private static final String FITBIT_USER_REPOSITORY_CLIENT_ID_DISPLAY = "Client ID for user repository."; + + public static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG = "fitbit.user.repository.client.secret"; + private static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_DOC = "Client secret for connecting to the service repository."; + private static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_DISPLAY = "Client Secret for user repository."; + + public static final String FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG = "fitbit.user.repository.oauth2.token.url"; + private static final String FITBIT_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials."; + private static final String FITBIT_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL."; + public static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG = "fitbit.user.firebase.collection.fitbit.name"; private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC = "Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used."; private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY = "Firebase Fitbit collection name."; @@ -242,6 +257,37 @@ public String toString() { Width.SHORT, FITBIT_USER_REPOSITORY_URL_DISPLAY) + + .define(FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + FITBIT_USER_REPOSITORY_CLIENT_ID_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_USER_REPOSITORY_CLIENT_ID_DISPLAY) + + .define(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG, + Type.PASSWORD, + "", + Importance.MEDIUM, + FITBIT_USER_REPOSITORY_CLIENT_SECRET_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_USER_REPOSITORY_CLIENT_SECRET_DISPLAY) + + .define(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + FITBIT_USER_REPOSITORY_TOKEN_URL_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_USER_REPOSITORY_TOKEN_URL_DISPLAY) + .define(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG, Type.STRING, FITBIT_INTRADAY_STEPS_TOPIC_DEFAULT, @@ -447,4 +493,25 @@ public String getFitbitUserRepositoryFirestoreFitbitCollection() { public String getFitbitUserRepositoryFirestoreUserCollection() { return getString(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG); } + + public String getFitbitUserRepositoryClientId() { + return getString(FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG); + } + + public String getFitbitUserRepositoryClientSecret() { + return getPassword(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value(); + } + + public URL getFitbitUserRepositoryTokenUrl() { + String value = getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG); + if (value == null || value.isEmpty()) { + return null; + } else { + try { + return new URL(getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG)); + } catch (MalformedURLException e) { + throw new ConfigException("Fitbit user repository token URL is invalid."); + } + } + } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java index f0ee03c9..f441301c 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitActivityLogAvroConverter.java @@ -85,7 +85,7 @@ private FitbitActivityLogRecord getRecord(JsonNode s, OffsetDateTime startTime) .setLogType(optString(s, "logType").orElse(null)) .setType(optLong(s, "activityType").orElse(null)) .setSpeed(optDouble(s, "speed").orElse(null)) - .setDistance(optDouble(s, "distance").map(Float::new).orElse(null)) + .setDistance(optDouble(s, "distance").map(Double::floatValue).orElse(null)) .setSteps(optInt(s, "steps").orElse(null)) .setEnergy(optInt(s, "calories").map(e -> e * FOOD_CAL_TO_KJOULE_FACTOR) .orElse(null)) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index 4e636ce5..7913493a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -25,6 +25,7 @@ import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT; import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.TIMESTAMP_OFFSET_KEY; import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.nearFuture; +import static org.radarbase.connect.rest.request.PollingRequestRoute.max; import java.io.IOException; import java.time.Duration; @@ -316,7 +317,7 @@ protected Instant nextPoll(User user) { } else { Instant nextPoll = lastPollPerUser.getOrDefault(user.getId(), MIN_INSTANT) .plus(getPollIntervalPerUser()); - return PollingRequestRoute.max(offset.plus(getLookbackTime()), nextPoll); + return max(offset.plus(getLookbackTime()), nextPoll); } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java index 9a7e066c..63318a44 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectReader; +import io.confluent.common.config.ConfigException; import java.io.IOException; +import java.net.URL; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -34,6 +36,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.NotAuthorizedException; +import okhttp3.Credentials; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; @@ -43,6 +46,8 @@ import okhttp3.ResponseBody; import org.radarbase.connect.rest.RestSourceConnectorConfig; import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; +import org.radarcns.exception.TokenException; +import org.radarcns.oauth.OAuth2Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +67,10 @@ public class ServiceUserRepository implements UserRepository { private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); private HttpUrl baseUrl; - private HashSet containedUsers; + private final HashSet containedUsers; private Set timedCachedUsers = new HashSet<>(); + private OAuth2Client repositoryClient; + private String basicCredentials; public ServiceUserRepository() { this.client = new OkHttpClient(); @@ -82,10 +89,35 @@ public void initialize(RestSourceConnectorConfig config) { FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config; this.baseUrl = fitbitConfig.getFitbitUserRepositoryUrl(); this.containedUsers.addAll(fitbitConfig.getFitbitUsers()); + + URL tokenUrl = fitbitConfig.getFitbitUserRepositoryTokenUrl(); + String clientId = fitbitConfig.getFitbitUserRepositoryClientId(); + String clientSecret = fitbitConfig.getFitbitUserRepositoryClientSecret(); + + if (tokenUrl != null) { + if (clientId.isEmpty()) { + throw new ConfigException("Client ID for user repository is not set."); + } + this.repositoryClient = new OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ") + .httpClient(client) + .build(); + } else if (clientId != null) { + basicCredentials = Credentials.basic(clientId, clientSecret); + } } @Override public Stream stream() { + if (nextFetch.get().equals(MIN_INSTANT)) { + try { + applyPendingUpdates(); + } catch (IOException ex) { + logger.error("Failed to initially get users from repository"); + } + } return this.timedCachedUsers.stream(); } @@ -121,21 +153,39 @@ public void applyPendingUpdates() throws IOException { Request request = requestFor("users?source-type=FitBit").build(); this.timedCachedUsers = this.makeRequest(request, USER_LIST_READER).getUsers().stream() - .filter( - u -> - u.isComplete() - && (containedUsers.isEmpty() - || containedUsers.contains(u.getVersionedId()))) + .filter(u -> u.isComplete() + && (containedUsers.isEmpty() + || containedUsers.contains(u.getVersionedId()))) .collect(Collectors.toSet()); nextFetch.set(Instant.now().plus(FETCH_THRESHOLD)); } - private Request.Builder requestFor(String relativeUrl) { + private Request.Builder requestFor(String relativeUrl) throws IOException { HttpUrl url = baseUrl.resolve(relativeUrl); if (url == null) { throw new IllegalArgumentException("Relative URL is invalid"); } - return new Request.Builder().url(url); + Request.Builder builder = new Request.Builder().url(url); + String authorization = requestAuthorization(); + if (authorization != null) { + builder.addHeader("Authorization", authorization); + } + + return builder; + } + + private String requestAuthorization() throws IOException { + if (repositoryClient != null) { + try { + return "Bearer " + repositoryClient.getValidToken().getAccessToken(); + } catch (TokenException ex) { + throw new IOException(ex); + } + } else if (basicCredentials != null) { + return basicCredentials; + } else { + return null; + } } private T makeRequest(Request request, ObjectReader reader) throws IOException { diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java index 60ca17de..75026d0a 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java @@ -86,7 +86,7 @@ public class YamlUserRepository implements UserRepository { private Set configuredUsers; private Headers headers; - private ConcurrentMap users = new ConcurrentHashMap<>(); + private final ConcurrentMap users = new ConcurrentHashMap<>(); private final AtomicReference nextFetch = new AtomicReference<>(MIN_INSTANT); private Path credentialsDir; diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java index 6fdf54fc..cb8ab697 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java @@ -37,7 +37,7 @@ public class CovidCollabFirebaseUserRepository extends FirebaseUserRepository { private static final Logger logger = LoggerFactory.getLogger(CovidCollabFirebaseUserRepository.class); - private Map cachedUsers = new HashMap<>(); + private final Map cachedUsers = new HashMap<>(); private CollectionReference userCollection; private CollectionReference fitbitCollection; private FitbitTokenService fitbitTokenService; diff --git a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/config/ValidClass.java b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/config/ValidClass.java index daba5ec6..91844795 100644 --- a/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/config/ValidClass.java +++ b/kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/config/ValidClass.java @@ -49,11 +49,9 @@ public void ensureValid(String name, Object obj) { "Class " + obj + " must be subclass of " + superClass.getName()); } try { - cls.newInstance(); - } catch (InstantiationException ex) { + cls.getConstructor().newInstance(); + } catch (ReflectiveOperationException ex) { throw new ConfigException(name, obj, "Class " + obj + " must be instantiable: " + ex); - } catch (IllegalAccessException ex) { - throw new ConfigException(name, obj, "Class " + obj + " must be accessible: " + ex); } }