Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Snowflake (#5500) #5502

Merged
merged 9 commits into from
Jan 13, 2025
Merged

Support Snowflake (#5500) #5502

merged 9 commits into from
Jan 13, 2025

Conversation

turb
Copy link
Contributor

@turb turb commented Sep 25, 2024

Fix #5500

First draft of SnowflakeIO for scio, using the one from Beam.

It has:

  • SnowflakeSelect (read only), allowing to read from a select
  • SnowflakeTable (rw), allowing to read or write from/to a table
  • ScioContext and SCollection implicits

Notes:

@turb turb mentioned this pull request Sep 25, 2024
@turb turb force-pushed the snowflake branch 3 times, most recently from b53021e to 1cf2986 Compare September 25, 2024 15:48
Copy link

codecov bot commented Sep 25, 2024

Codecov Report

Attention: Patch coverage is 35.36585% with 53 lines in your changes missing coverage. Please review.

Project coverage is 61.25%. Comparing base (c45685a) to head (2a2ac46).
Report is 15 commits behind head on main.

Files with missing lines Patch % Lines
...scala/com/spotify/scio/snowflake/SnowflakeIO.scala 27.39% 53 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5502      +/-   ##
==========================================
- Coverage   61.43%   61.25%   -0.18%     
==========================================
  Files         312      315       +3     
  Lines       11103    11187      +84     
  Branches      762      765       +3     
==========================================
+ Hits         6821     6853      +32     
- Misses       4282     4334      +52     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@turb turb force-pushed the snowflake branch 2 times, most recently from d179ff2 to 9f6010d Compare September 25, 2024 16:37
@turb
Copy link
Contributor Author

turb commented Sep 25, 2024

The build wants me to have "com.nrinaudo" %% "kantan.csv" % kantanCsvVersion as a runtime dep, but then it cannot compile. I thought that a compile dep was also supposed to be in runtime (at least in the Maven model).

(edit: that has been fixed)

@turb turb force-pushed the snowflake branch 4 times, most recently from 74d15ae to 901625d Compare October 16, 2024 12:18
@turb
Copy link
Contributor Author

turb commented Oct 16, 2024

@RustedBones I finally had the time to fix this build :)

@RustedBones
Copy link
Contributor

Sorry for the delay. Will look at it this week !

build.sbt Outdated Show resolved Hide resolved

package com.spotify.scio.snowflake

trait SnowflakeAuthenticationOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait SnowflakeAuthenticationOptions
sealed trait SnowflakeAuthenticationOptions

To avoid the match may not be exhaustive warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixup 9ede3f3

Comment on lines 50 to 54
.withWarehouse(connectionOptions.warehouse)

connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: to avoid using intermediate variable, you can use syntactic sugar from scala.util.chaining._.
(can be done above too)

Suggested change
.withWarehouse(connectionOptions.warehouse)
connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
.withWarehouse(connectionOptions.warehouse)
.pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL
fixup 25c5a76

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With fold: a9b56c4

new CsvMapper[T] {
override def mapRow(parts: Array[String]): T = {
val unsnowedParts = parts.map {
case "\\N" => "" // needs to be mapped to an Option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never used Snowflake. Can you give some context for this case ?

Copy link
Contributor Author

@turb turb Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnowflakeIO from Apache Beam uses COPY from Snowflake, that simply exports to CSV, that is then read from storage. And this COPY uses \N to represent null.

Thing is, kantan maps empty string with None, so we provide it.

@turb turb requested a review from RustedBones November 12, 2024 16:39
@RustedBones
Copy link
Contributor

Hey @turb, can you test the latest change on a real Snowflake DB to make sure everything still works ?

@turb
Copy link
Contributor Author

turb commented Nov 19, 2024

Hey @turb, can you test the latest change on a real Snowflake DB to make sure everything still works ?

Still works! (on read)

private[snowflake] def snowflakeIoId(opts: SnowflakeConnectionOptions, target: String): String = {
// source params
val params = Option(opts.database).map(db => s"db=$db") ++
Option(opts.warehouse).map(db => s"warehouse=$db")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing this to other complex testId implementations in our Scio IOs (example 1, 2), I think a format like this would fit better:

SnowflakeIO(url, target, warehouse?, db?)

wdyt @RustedBones

Copy link
Contributor

@RustedBones RustedBones Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as JdbcIO where we 'normalize' the connection url, hiding credentials. See https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-parameters

@RustedBones RustedBones merged commit 8bfb590 into spotify:main Jan 13, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Snowflake Support
3 participants