From d0b29ada3b33de3bb764f640b34705a0b006aced Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Wed, 23 Oct 2024 12:44:44 +0200 Subject: [PATCH] [Exporter] Improve exporting of `databricks_pipeline` resources Changes include: - Use `List` + iterator instead of waiting for full list - improves performance in big workspaces with a lot of DLT pipelines - Better handling of pipelines deployed via DABs - fix error that lead to emitting of notebooks even for DLT pipelines deployed with DABs. - Emit `databricks_schema` for pipelines with direct publishing mode enabled. --- exporter/exporter_test.go | 8 +++--- exporter/importables.go | 51 +++++++++++++++++++++++------------- exporter/importables_test.go | 2 +- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index 3f605930c..2b528f56b 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -252,7 +252,7 @@ var meAdminFixture = qa.HTTPFixture{ var emptyPipelines = qa.HTTPFixture{ Method: "GET", ReuseRequest: true, - Resource: "/api/2.0/pipelines?max_results=50", + Resource: "/api/2.0/pipelines?max_results=100", Response: pipelines.ListPipelinesResponse{}, } @@ -2021,7 +2021,7 @@ func TestImportingDLTPipelines(t *testing.T) { emptyIpAccessLIst, { Method: "GET", - Resource: "/api/2.0/pipelines?max_results=50", + Resource: "/api/2.0/pipelines?max_results=100", Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { @@ -2236,7 +2236,7 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) { userReadFixture, { Method: "GET", - Resource: "/api/2.0/pipelines?max_results=50", + Resource: "/api/2.0/pipelines?max_results=100", Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { @@ -2601,7 +2601,7 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) { }, { Method: "GET", - Resource: "/api/2.0/pipelines?max_results=50", + Resource: "/api/2.0/pipelines?max_results=100", Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ { diff --git a/exporter/importables.go b/exporter/importables.go index 5426845f3..54f0fb6da 100644 --- a/exporter/importables.go +++ b/exporter/importables.go @@ -2002,23 +2002,22 @@ var resourcesMap map[string]importable = map[string]importable{ return name + "_" + d.Id() }, List: func(ic *importContext) error { - w, err := ic.Client.WorkspaceClient() - if err != nil { - return err - } - pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{ - MaxResults: 50, + it := ic.workspaceClient.Pipelines.ListPipelines(ic.Context, pipelines.ListPipelinesRequest{ + MaxResults: 100, }) - if err != nil { - return err - } - for i, q := range pipelinesList { + i := 0 + for it.HasNext(ic.Context) { + q, err := it.Next(ic.Context) + if err != nil { + return err + } + i++ if !ic.MatchesName(q.Name) { continue } var modifiedAt int64 if ic.incremental { - pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{ + pipeline, err := ic.workspaceClient.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{ PipelineId: q.PipelineId, }) if err != nil { @@ -2030,21 +2029,37 @@ var resourcesMap map[string]importable = map[string]importable{ Resource: "databricks_pipeline", ID: q.PipelineId, }, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name)) - log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList)) + if i%100 == 0 { + log.Printf("[INFO] Imported %d DLT Pipelines", i) + } } + log.Printf("[INFO] Listed %d DLT pipelines", i) return nil }, Import: func(ic *importContext, r *resource) error { var pipeline tfpipelines.Pipeline s := ic.Resources["databricks_pipeline"].Schema common.DataToStructPointer(r.Data, s, &pipeline) - if pipeline.Catalog != "" && pipeline.Target != "" { - ic.Emit(&resource{ - Resource: "databricks_schema", - ID: pipeline.Catalog + "." + pipeline.Target, - }) + if pipeline.Deployment != nil && pipeline.Deployment.Kind == "BUNDLE" { + log.Printf("[INFO] Skipping processing of DLT Pipeline with ID %s (%s) as deployed with DABs", + r.ID, pipeline.Name) + return nil + } + if pipeline.Catalog != "" { + var schemaName string + if pipeline.Target != "" { + schemaName = pipeline.Target + } else if pipeline.Schema != "" { + schemaName = pipeline.Schema + } + if schemaName != "" { + ic.Emit(&resource{ + Resource: "databricks_schema", + ID: pipeline.Catalog + "." + pipeline.Target, + }) + } } - if pipeline.Deployment == nil || pipeline.Deployment.Kind == "BUNDLE" { + if pipeline.Deployment == nil || pipeline.Deployment.Kind != "BUNDLE" { for _, lib := range pipeline.Libraries { if lib.Notebook != nil { ic.emitNotebookOrRepo(lib.Notebook.Path) diff --git a/exporter/importables_test.go b/exporter/importables_test.go index 8e82f7c4f..645fba93e 100644 --- a/exporter/importables_test.go +++ b/exporter/importables_test.go @@ -1369,7 +1369,7 @@ func TestIncrementalListDLT(t *testing.T) { qa.HTTPFixturesApply(t, []qa.HTTPFixture{ { Method: "GET", - Resource: "/api/2.0/pipelines?max_results=50", + Resource: "/api/2.0/pipelines?max_results=100", Response: pipelines.ListPipelinesResponse{ Statuses: []pipelines.PipelineStateInfo{ {