Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Exporter] Improve exporting of databricks_pipeline resources #4142

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ var meAdminFixture = qa.HTTPFixture{
var emptyPipelines = qa.HTTPFixture{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{},
}

Expand Down Expand Up @@ -2021,7 +2021,7 @@ func TestImportingDLTPipelines(t *testing.T) {
emptyIpAccessLIst,
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down Expand Up @@ -2236,7 +2236,7 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) {
userReadFixture,
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down
51 changes: 33 additions & 18 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,23 +2002,22 @@ var resourcesMap map[string]importable = map[string]importable{
return name + "_" + d.Id()
},
List: func(ic *importContext) error {
w, err := ic.Client.WorkspaceClient()
if err != nil {
return err
}
pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{
MaxResults: 50,
it := ic.workspaceClient.Pipelines.ListPipelines(ic.Context, pipelines.ListPipelinesRequest{
MaxResults: 100,
})
if err != nil {
return err
}
for i, q := range pipelinesList {
i := 0
for it.HasNext(ic.Context) {
q, err := it.Next(ic.Context)
if err != nil {
return err
}
i++
if !ic.MatchesName(q.Name) {
continue
}
var modifiedAt int64
if ic.incremental {
pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
pipeline, err := ic.workspaceClient.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
PipelineId: q.PipelineId,
})
if err != nil {
Expand All @@ -2030,21 +2029,37 @@ var resourcesMap map[string]importable = map[string]importable{
Resource: "databricks_pipeline",
ID: q.PipelineId,
}, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name))
log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList))
if i%100 == 0 {
log.Printf("[INFO] Imported %d DLT Pipelines", i)
}
}
log.Printf("[INFO] Listed %d DLT pipelines", i)
return nil
},
Import: func(ic *importContext, r *resource) error {
var pipeline tfpipelines.Pipeline
s := ic.Resources["databricks_pipeline"].Schema
common.DataToStructPointer(r.Data, s, &pipeline)
if pipeline.Catalog != "" && pipeline.Target != "" {
ic.Emit(&resource{
Resource: "databricks_schema",
ID: pipeline.Catalog + "." + pipeline.Target,
})
if pipeline.Deployment != nil && pipeline.Deployment.Kind == "BUNDLE" {
log.Printf("[INFO] Skipping processing of DLT Pipeline with ID %s (%s) as deployed with DABs",
r.ID, pipeline.Name)
return nil
}
if pipeline.Catalog != "" {
var schemaName string
if pipeline.Target != "" {
schemaName = pipeline.Target
} else if pipeline.Schema != "" {
schemaName = pipeline.Schema
}
if schemaName != "" {
ic.Emit(&resource{
Resource: "databricks_schema",
ID: pipeline.Catalog + "." + pipeline.Target,
})
}
}
if pipeline.Deployment == nil || pipeline.Deployment.Kind == "BUNDLE" {
if pipeline.Deployment == nil || pipeline.Deployment.Kind != "BUNDLE" {
for _, lib := range pipeline.Libraries {
if lib.Notebook != nil {
ic.emitNotebookOrRepo(lib.Notebook.Path)
Expand Down
2 changes: 1 addition & 1 deletion exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func TestIncrementalListDLT(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down
Loading