Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to hbase bigtable #5010

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ updates.ignore = [
{ groupId = "org.apache.commons", artifactId = "commons-math3" },
{ groupId = "org.apache.flink" },
{ groupId = "org.apache.hadoop" },
{ groupId = "org.apache.hbase" }
{ groupId = "org.apache.httpcomponents" },
{ groupId = "org.apache.spark" },
{ groupId = "org.checkerframework" },
Expand Down
48 changes: 34 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ val autoServiceVersion = "1.0.1"
val autoValueVersion = "1.9"
val avroVersion = "1.8.2"
val bigdataossVersion = "2.2.16"
val bigtableClientVersion = "1.28.0"
val commonsCodecVersion = "1.15"
val commonsCompressVersion = "1.21"
val commonsIoVersion = "2.13.0"
Expand Down Expand Up @@ -77,7 +76,6 @@ val floggerVersion = "0.7.4"
val gaxVersion = "2.32.0"
val googleApiCommonVersion = "2.15.0"
val googleAuthVersion = "1.19.0"
val googleCloudBigTableVersion = "2.26.0"
val googleCloudCoreVersion = "2.22.0"
val googleCloudDatastoreVersion = "0.107.3"
val googleCloudMonitoringVersion = "3.24.0"
Expand All @@ -99,6 +97,7 @@ val algebraVersion = "2.9.0"
val annoy4sVersion = "0.10.0"
val annoyVersion = "0.2.6"
val breezeVersion = "2.1.0"
val bigtableHbaseBeamVersion = "2.11.0"
val caffeineVersion = "2.9.3"
val cassandraDriverVersion = "3.11.4"
val cassandraVersion = "3.11.16"
Expand All @@ -110,6 +109,7 @@ val elasticsearch7Version = "7.17.9"
val elasticsearch8Version = "8.9.1"
val fansiVersion = "0.4.0"
val featranVersion = "0.8.0"
val hbaseVersion = "1.7.2"
val httpAsyncClientVersion = "4.1.5"
val hamcrestVersion = "2.2"
val jakartaJsonVersion = "2.1.2"
Expand Down Expand Up @@ -234,6 +234,12 @@ lazy val java17Settings = sys.props("java.version") match {
case _ => Def.settings()
}

// test libs to exclude
val testLibs: Seq[ExclusionRule] = Seq(
"junit" % "junit",
"org.hamcrest" % "hamcrest-core"
)

val commonSettings = formatSettings ++
mimaSettings ++
java17Settings ++
Expand Down Expand Up @@ -602,7 +608,6 @@ lazy val `scio-test`: Project = project
moduleFilter("junit", "junit")
).reduce(_ | _),
libraryDependencies ++= Seq(
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % googleCloudBigTableVersion,
"com.google.http-client" % "google-http-client" % googleHttpClientsVersion,
"com.lihaoyi" %% "fansi" % fansiVersion,
"com.lihaoyi" %% "pprint" % pprintVersion,
Expand Down Expand Up @@ -721,21 +726,18 @@ lazy val `scio-google-cloud-platform`: Project = project
"com.google.api-client" % "google-api-client" % googleClientsVersion,
"com.google.api.grpc" % "grpc-google-cloud-pubsub-v1" % googleCloudPubSubVersion,
"com.google.api.grpc" % "proto-google-cloud-bigquerystorage-v1beta1" % bigQueryStorageBetaVersion,
"com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % googleCloudBigTableVersion,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % googleCloudBigTableVersion,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % googleCloudDatastoreVersion,
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % googleCloudPubSubVersion,
"com.google.apis" % "google-api-services-bigquery" % googleApiServicesBigQueryVersion,
"com.google.auth" % "google-auth-library-credentials" % googleAuthVersion,
"com.google.auth" % "google-auth-library-oauth2-http" % googleAuthVersion,
"com.google.cloud" % "google-cloud-bigquerystorage" % bigQueryStorageVersion,
"com.google.cloud" % "google-cloud-bigtable" % googleCloudBigTableVersion,
"com.google.cloud" % "google-cloud-core" % googleCloudCoreVersion,
"com.google.cloud" % "google-cloud-spanner" % googleCloudSpannerVersion,
"com.google.cloud.bigdataoss" % "util" % bigdataossVersion,
"com.google.cloud.bigtable" % "bigtable-client-core" % bigtableClientVersion,
"com.google.cloud.bigtable" % "bigtable-client-core-config" % bigtableClientVersion,
"com.google.guava" % "guava" % guavaVersion,
"com.google.cloud.bigtable" % "bigtable-hbase-beam" % bigtableHbaseBeamVersion excludeAll (testLibs: _*),
"com.google.cloud.bigtable" % "bigtable-hbase-1.x-shaded" % bigtableHbaseBeamVersion excludeAll (testLibs: _*),
"org.apache.hbase" % "hbase-shaded-client" % hbaseVersion excludeAll (testLibs: _*),
"com.google.http-client" % "google-http-client" % googleHttpClientsVersion,
"com.google.http-client" % "google-http-client-gson" % googleHttpClientsVersion,
"com.google.protobuf" % "protobuf-java" % protobufVersion,
Expand All @@ -753,6 +755,7 @@ lazy val `scio-google-cloud-platform`: Project = project
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-hbase" % beamVersion,
"org.apache.beam" % "beam-vendor-guava-26_0-jre" % beamVendorVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
// test
Expand Down Expand Up @@ -1151,7 +1154,6 @@ lazy val `scio-examples`: Project = project
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"com.google.api-client" % "google-api-client" % googleClientsVersion,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % googleCloudBigTableVersion,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % googleCloudDatastoreVersion,
"com.google.apis" % "google-api-services-bigquery" % googleApiServicesBigQueryVersion,
"com.google.apis" % "google-api-services-pubsub" % googleApiServicesPubsubVersion,
Expand All @@ -1165,7 +1167,7 @@ lazy val `scio-examples`: Project = project
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"com.softwaremill.magnolia1_2" %% "magnolia" % magnoliaVersion,
"com.spotify" %% "magnolify-avro" % magnolifyVersion,
"com.spotify" %% "magnolify-bigtable" % magnolifyVersion,
// "com.spotify" %% "magnolify-bigtable" % magnolifyVersion,
"com.spotify" %% "magnolify-datastore" % magnolifyVersion,
"com.spotify" %% "magnolify-shared" % magnolifyVersion,
"com.spotify" %% "magnolify-tensorflow" % magnolifyVersion,
Expand Down Expand Up @@ -1276,6 +1278,12 @@ lazy val `scio-repl`: Project = project
case s if s.endsWith(".proto") =>
// arbitrary pick last conflicting proto file
MergeStrategy.last
case PathList("dependencies.properties") =>
// arbitrary pick last dependencies property file
MergeStrategy.last
case PathList("THIRD-PARTY.txt") =>
// drop conflicting THIRD-PARTY.txt
MergeStrategy.discard
case PathList("git.properties") =>
// drop conflicting git properties
MergeStrategy.discard
Expand All @@ -1285,12 +1293,27 @@ lazy val `scio-repl`: Project = project
case PathList("META-INF", "gradle", "incremental.annotation.processors") =>
// drop conflicting kotlin compiler info
MergeStrategy.discard
case PathList("META-INF", "native-image", "incremental.annotation.processors") =>
// drop conflicting kotlin compiler info
MergeStrategy.discard
case PathList("META-INF", "io.netty.versions.properties") =>
// merge conflicting netty property files
MergeStrategy.filterDistinctLines
case PathList("META-INF", "native-image", "native-image.properties") =>
// merge conflicting native-image property files
MergeStrategy.filterDistinctLines
case PathList(
"META-INF",
"native-image",
"com.google.api",
"gax-grpc",
"native-image.properties"
) =>
// merge conflicting native-image property files
MergeStrategy.filterDistinctLines
case PathList("mozilla", "public-suffix-list.txt") =>
// merge conflicting public-suffix-list files
MergeStrategy.filterDistinctLines
case s => old(s)
}
}
Expand Down Expand Up @@ -1481,7 +1504,6 @@ lazy val site: Project = project
ScalaUnidoc / unidoc / unidocAllClasspaths := (ScalaUnidoc / unidoc / unidocAllClasspaths).value
.map { cp =>
cp.filterNot(_.data.getCanonicalPath.matches(""".*guava-11\..*"""))
.filterNot(_.data.getCanonicalPath.matches(""".*bigtable-client-core-0\..*"""))
},
// mdoc
// pre-compile md using mdoc
Expand Down Expand Up @@ -1554,8 +1576,6 @@ ThisBuild / dependencyOverrides ++= Seq(
"com.google.api" % "gax-httpjson" % gaxVersion,
"com.google.api-client" % "google-api-client" % googleClientsVersion,
"com.google.api.grpc" % "grpc-google-common-protos" % googleCommonsProtoVersion,
"com.google.api.grpc" % "proto-google-cloud-bigtable-admin-v2" % googleCloudBigTableVersion,
"com.google.api.grpc" % "proto-google-cloud-bigtable-v2" % googleCloudBigTableVersion,
"com.google.api.grpc" % "proto-google-cloud-datastore-v1" % googleCloudDatastoreVersion,
"com.google.api.grpc" % "proto-google-common-protos" % googleCommonsProtoVersion,
"com.google.api.grpc" % "proto-google-iam-v1" % googleIAMVersion,
Expand Down
4 changes: 2 additions & 2 deletions scio-core/src/main/scala/com/spotify/scio/io/ScioIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ object ScioIO {

/** Base trait for [[ScioIO]] without business logic, for stubbing mock data with `JobTest`. */
trait TestIO[T] extends ScioIO[T] {
override type ReadP = Nothing
override type WriteP = Nothing
override type ReadP = Unit
override type WriteP = Unit

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
throw new UnsupportedOperationException(s"$this is for testing purpose only")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,31 @@
// Example: Bigtable Input and Output
package com.spotify.scio.examples.extra

import com.google.bigtable.v2.{Mutation, Row}
import com.google.protobuf.ByteString
import com.spotify.scio._
import com.spotify.scio.bigtable._
import com.spotify.scio.examples.common.ExampleData
import org.apache.hadoop.hbase.client.{Mutation, Put, Result}
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.joda.time.Duration

import java.nio.charset.StandardCharsets

// This depends on APIs from `scio-bigtable` and imports from `com.spotify.scio.bigtable._`.
object BigtableExample {
val FAMILY_NAME: String = "count"
val COLUMN_QUALIFIER: ByteString = ByteString.copyFromUtf8("long")
val FAMILY_NAME: Array[Byte] = "count".getBytes(StandardCharsets.UTF_8)
val COLUMN_QUALIFIER: Array[Byte] = "long".getBytes(StandardCharsets.UTF_8)

// Convert a key-value pair to a Bigtable `Mutation` for writing
def toMutation(key: String, value: Long): (ByteString, Iterable[Mutation]) = {
val m = Mutations.newSetCell(
FAMILY_NAME,
COLUMN_QUALIFIER,
ByteString.copyFromUtf8(value.toString),
0L
)
(ByteString.copyFromUtf8(key), Iterable(m))
def toPutMutation(key: String, value: Long): Mutation =
new Put(key.getBytes(StandardCharsets.UTF_8))
.addColumn(FAMILY_NAME, COLUMN_QUALIFIER, 0L, BigInt(value).toByteArray)

// Convert a Bigtable `Result` from reading to a formatted key-value string
def fromResult(r: Result): String = {
val key = new String(r.getRow, StandardCharsets.UTF_8)
val value = BigInt(r.getValue(FAMILY_NAME, COLUMN_QUALIFIER)).toLong
s"$key:$value"
}

// Convert a Bigtable `Row` from reading to a formatted key-value string
def fromRow(r: Row): String =
r.getKey.toStringUtf8 + ": " + r
.getValue(FAMILY_NAME, COLUMN_QUALIFIER)
.get
.toStringUtf8
}

// ## Bigtable Write example
Expand All @@ -70,28 +66,28 @@ object BigtableWriteExample {
// Bump up the number of bigtable nodes before writing so that the extra traffic does not
// affect production service. A sleep period is inserted to ensure all new nodes are online
// before the ingestion starts.
sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 15)
sc.resizeClusters(btProjectId, btInstanceId, 15)

// Ensure that destination tables and column families exist
val table = new HTableDescriptor(TableName.valueOf(btTableId))
.addFamily(new HColumnDescriptor(BigtableExample.FAMILY_NAME))
sc.ensureTables(
btProjectId,
btInstanceId,
Map(
btTableId -> List(BigtableExample.FAMILY_NAME)
)
List(table)
)

sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR))
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map(kv => BigtableExample.toMutation(kv._1, kv._2))
.map { case (word, count) => BigtableExample.toPutMutation(word, count) }
.saveAsBigtable(btProjectId, btInstanceId, btTableId)

sc.run().waitUntilDone()

// Bring down the number of nodes after the job ends to save cost. There is no need to wait
// after bumping the nodes down.
sc.updateNumberOfBigtableNodes(btProjectId, btInstanceId, 3, Duration.ZERO)
sc.resizeClusters(btProjectId, btInstanceId, 3, Duration.ZERO)
}
}

Expand All @@ -114,7 +110,7 @@ object BigtableReadExample {
val btTableId = args("bigtableTableId")

sc.bigtable(btProjectId, btInstanceId, btTableId)
.map(BigtableExample.fromRow)
.map(BigtableExample.fromResult)
.saveAsTextFile(args("output"))

sc.run()
Expand Down
Loading