From ce7df98d6ce640a052821d338c3de1e8207f10f6 Mon Sep 17 00:00:00 2001 From: Pauline Date: Tue, 5 Mar 2024 14:10:57 +0000 Subject: [PATCH 1/3] Add Fitbit spo2 route and converter --- .../FitbitRestSourceConnectorConfig.java | 20 +++++ .../FitbitIntradaySpo2AvroConverter.java | 79 +++++++++++++++++++ .../fitbit/route/FitbitIntradaySpo2Route.java | 63 +++++++++++++++ 3 files changed, 162 insertions(+) create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradaySpo2AvroConverter.java create mode 100644 kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index ceafad5a..5d3f1f9b 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -106,6 +106,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig { private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY = "Intraday heart rate variability topic"; private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT = "connect_fitbit_intraday_heart_rate_variability"; + private static final String FITBIT_INTRADAY_SPO2_TOPIC_CONFIG = "fitbit.intraday.spo2.topic"; + private static final String FITBIT_INTRADAY_SPO2_TOPIC_DOC = "Topic for Fitbit intraday intraday_spo2"; + private static final String FITBIT_INTRADAY_SPO2_TOPIC_DISPLAY = "Intraday spo2 topic"; + private static final String FITBIT_INTRADAY_SPO2_TOPIC_DEFAULT = "connect_fitbit_intraday_spo2"; + private static final String FITBIT_BREATHING_RATE_TOPIC_CONFIG = "fitbit.breathing.rate.topic"; private static final String FITBIT_BREATHING_RATE_TOPIC_DOC = "Topic for Fitbit breathing rate"; private static final String FITBIT_BREATHING_RATE_TOPIC_DISPLAY = "Breathing rate topic"; @@ -343,6 +348,17 @@ public String toString() { Width.SHORT, FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY) + .define(FITBIT_INTRADAY_SPO2_TOPIC_CONFIG, + Type.STRING, + FITBIT_INTRADAY_SPO2_TOPIC_DEFAULT, + nonControlChar, + Importance.LOW, + FITBIT_INTRADAY_SPO2_TOPIC_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_INTRADAY_SPO2_TOPIC_DISPLAY) + .define(FITBIT_BREATHING_RATE_TOPIC_CONFIG, Type.STRING, FITBIT_BREATHING_RATE_TOPIC_DEFAULT, @@ -511,6 +527,10 @@ public String getFitbitIntradayHeartRateVariabilityTopic() { return getString(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG); } + public String getFitbitIntradaySpo2Topic() { + return getString(FITBIT_INTRADAY_SPO2_TOPIC_CONFIG); + } + public String getFitbitBreathingRateTopic() { return getString(FITBIT_BREATHING_RATE_TOPIC_CONFIG); } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradaySpo2AvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradaySpo2AvroConverter.java new file mode 100644 index 00000000..2f49aaca --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradaySpo2AvroConverter.java @@ -0,0 +1,79 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.connect.rest.fitbit.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.connect.avro.AvroData; +import org.radarbase.connect.rest.RestSourceConnectorConfig; +import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig; +import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; +import org.radarcns.connector.fitbit.FitbitIntradaySpo2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.util.stream.Stream; + +import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull; + +public class FitbitIntradaySpo2AvroConverter extends FitbitAvroConverter { + private static final Logger logger = LoggerFactory.getLogger(FitbitIntradaySpo2AvroConverter.class); + private String spo2Topic; + + public FitbitIntradaySpo2AvroConverter(AvroData avroData) { + super(avroData); + } + + @Override + public void initialize(RestSourceConnectorConfig config) { + spo2Topic = ((FitbitRestSourceConnectorConfig) config).getFitbitIntradaySpo2Topic(); + logger.info("Using intraday spo2 topic {}", spo2Topic); + } + + @Override + protected Stream processRecords(FitbitRestRequest request, JsonNode root, double timeReceived) { + JsonNode spo2 = root; + if (spo2 == null || !spo2.isArray()) { + logger.warn("No Spo2 is provided for {}: {}", request, root); + return Stream.empty(); + } + ZonedDateTime startDate = request.getDateRange().end(); + + return iterableToStream(spo2) + .filter(m -> m != null && m.isObject()) + .map(m -> m.get("minutes")) + .filter(minutes -> minutes != null && minutes.isArray()) + .flatMap(FitbitAvroConverter::iterableToStream) + .map(tryOrNull(minuteData -> parseSpo2(minuteData, startDate, timeReceived), + (a, ex) -> logger.warn("Failed to convert spo2 from request {}, {}", request, a, ex))); + } + + private TopicData parseSpo2(JsonNode minuteData, ZonedDateTime startDate, double timeReceived) { + Instant time = startDate.with(LocalDateTime.parse(minuteData.get("minute").asText())).toInstant(); + Float value = (float) minuteData.get("value").asDouble(); + if (value == null) { + return null; + } + FitbitIntradaySpo2 fitbitSpo2 = new FitbitIntradaySpo2(time.toEpochMilli() / 1000d, + timeReceived, + (float) value); + return new TopicData(time, spo2Topic, fitbitSpo2); + } +} diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java new file mode 100644 index 00000000..f7d3a3e2 --- /dev/null +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.connect.rest.fitbit.route; + +import io.confluent.connect.avro.AvroData; +import org.radarbase.connect.rest.fitbit.converter.FitbitIntradaySpo2AvroConverter; +import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator; +import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; +import org.radarbase.connect.rest.fitbit.user.User; +import org.radarbase.connect.rest.fitbit.user.UserRepository; + +import java.util.stream.Stream; + +import static java.time.temporal.ChronoUnit.SECONDS; +import java.time.Duration; + + +public class FitbitIntradaySpo2Route extends FitbitPollingRoute { + private final FitbitIntradaySpo2AvroConverter converter; + protected static final Duration REQUEST_INTERVAL = Duration.ofDays(30); + + public FitbitIntradaySpo2Route(FitbitRequestGenerator generator, + UserRepository userRepository, AvroData avroData) { + super(generator, userRepository, "intraday_spo2"); + this.converter = new FitbitIntradaySpo2AvroConverter(avroData); + } + + @Override + protected String getUrlFormat(String baseUrl) { + return baseUrl + "/1/user/%s/spo2/date/%s/%s/all.json"; + } + + protected Stream createRequests(User user) { + return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + .map(dateRange -> newRequest(user, dateRange, + user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); + } + + @Override + Duration getDateRangeInterval() { + return REQUEST_INTERVAL; + } + + @Override + public FitbitIntradaySpo2AvroConverter converter() { + return converter; + } +} \ No newline at end of file From 6d0fdb43e97ce2b29b01ab907d9cd8da3fb3473b Mon Sep 17 00:00:00 2001 From: Pauline Date: Tue, 5 Mar 2024 15:20:21 +0000 Subject: [PATCH 2/3] Update Spo2 route --- .../connect/rest/fitbit/route/FitbitIntradaySpo2Route.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java index f7d3a3e2..d5844c17 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java @@ -32,7 +32,6 @@ public class FitbitIntradaySpo2Route extends FitbitPollingRoute { private final FitbitIntradaySpo2AvroConverter converter; - protected static final Duration REQUEST_INTERVAL = Duration.ofDays(30); public FitbitIntradaySpo2Route(FitbitRequestGenerator generator, UserRepository userRepository, AvroData avroData) { @@ -51,9 +50,10 @@ protected Stream createRequests(User user) { user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); } + /** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/intraday/get-spo2-intraday-by-interval/ */ @Override Duration getDateRangeInterval() { - return REQUEST_INTERVAL; + return THIRTY_DAYS; } @Override From 835101d580b5ee090dda4eeed871ec4a1851d2bf Mon Sep 17 00:00:00 2001 From: Pauline Date: Thu, 7 Mar 2024 12:50:20 +0000 Subject: [PATCH 3/3] Update creating of requests in spo2 route --- .../connect/rest/fitbit/route/FitbitIntradaySpo2Route.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java index d5844c17..75b2c171 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradaySpo2Route.java @@ -47,7 +47,7 @@ protected String getUrlFormat(String baseUrl) { protected Stream createRequests(User user) { return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) .map(dateRange -> newRequest(user, dateRange, - user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()))); + user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), DATE_FORMAT.format(dateRange.end()))); } /** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/intraday/get-spo2-intraday-by-interval/ */