Skip to content

Commit

Permalink
NEOS-1695: support creating schemas if they dont exist not just table (
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Jan 8, 2025
1 parent 79b11fd commit 6bd2bb8
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 20 deletions.
15 changes: 15 additions & 0 deletions backend/pkg/sqlmanager/mysql/mysql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
const (
columnDefaultDefault = "Default"
columnDefaultString = "String"

SchemasLabel = "schemas"
)

type MysqlManager struct {
Expand Down Expand Up @@ -629,6 +631,18 @@ func (m *MysqlManager) GetSchemaInitStatements(
errgrp, errctx := errgroup.WithContext(ctx)
errgrp.SetLimit(5)

schemaStmts := []string{}
errgrp.Go(func() error {
uniqueSchemas := map[string]struct{}{}
for _, table := range tables {
uniqueSchemas[table.Schema] = struct{}{}
}
for schema := range uniqueSchemas {
schemaStmts = append(schemaStmts, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`;", schema))
}
return nil
})

tableTriggerStmts := []string{}
errgrp.Go(func() error {
tableTriggers, err := m.GetSchemaTableTriggers(errctx, tables)
Expand Down Expand Up @@ -669,6 +683,7 @@ func (m *MysqlManager) GetSchemaInitStatements(
}

return []*sqlmanager_shared.InitSchemaStatements{
{Label: SchemasLabel, Statements: schemaStmts},
{Label: "data types"},
{Label: "create table", Statements: createTables},
{Label: "non-fk alter table", Statements: nonFkAlterStmts},
Expand Down
18 changes: 18 additions & 0 deletions backend/pkg/sqlmanager/postgres/postgres-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"golang.org/x/sync/errgroup"
)

const (
SchemasLabel = "schemas"
)

type PostgresManager struct {
querier pg_queries.Querier
db pg_queries.DBTX
Expand Down Expand Up @@ -512,6 +516,19 @@ func (p *PostgresManager) GetSchemaInitStatements(
tables []*sqlmanager_shared.SchemaTable,
) ([]*sqlmanager_shared.InitSchemaStatements, error) {
errgrp, errctx := errgroup.WithContext(ctx)

schemaStmts := []string{}
errgrp.Go(func() error {
uniqueSchemas := map[string]struct{}{}
for _, table := range tables {
uniqueSchemas[table.Schema] = struct{}{}
}
for schema := range uniqueSchemas {
schemaStmts = append(schemaStmts, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %q;", schema))
}
return nil
})

dataTypeStmts := []string{}
errgrp.Go(func() error {
datatypeCfg, err := p.GetSchemaTableDataTypes(errctx, tables)
Expand Down Expand Up @@ -562,6 +579,7 @@ func (p *PostgresManager) GetSchemaInitStatements(
}

return []*sqlmanager_shared.InitSchemaStatements{
{Label: SchemasLabel, Statements: schemaStmts},
{Label: "data types", Statements: dataTypeStmts},
{Label: "create table", Statements: createTables},
{Label: "non-fk alter table", Statements: nonFkAlterStmts},
Expand Down
2 changes: 1 addition & 1 deletion backend/services/mgmt/v1alpha1/job-service/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (s *Service) SetRunContexts(
id := req.GetId()
user, err := s.userdataclient.GetUser(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to get user: %w", err)
}
if err := user.EnforceJob(ctx, userdata.NewWildcardDomainEntity(id.GetAccountId()), rbac.JobAction_Edit); err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions docs/docs/connections/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description: Postgres is one of the most commonly used databases in the world an
id: postgres
hide_title: false
slug: /connections/postgres
# cSpell:words myrole
# cSpell:words myrole,mydatabase
---

## Introduction
Expand Down Expand Up @@ -63,7 +63,7 @@ The following TLS/SSL modes are available for Postgres via the `sslMode` query p

> **NB:** if using the `URL` configuration, you will need to specify this directly in the query parameters. If using the host configuration, be sure to select the correct option in the dropdown that you intend to use.
```
```console
disable - No SSL at all, plain TCP
allow - First try non-SSL, if that fails, try SSL
prefer - First try SSL, if that fails, try non-SSL (default)
Expand Down Expand Up @@ -133,6 +133,12 @@ If you are planning to allow Neosync to truncate data prior to a job run, then t
GRANT TRUNCATE on public.users TO myrole;
```

Neosync will attempt to create schemas that do not exist. If you plan on allowing Neosync to do this, you will need to grant the `CREATE` permission on the schema.

```sql
GRANT CREATE ON DATABASE mydatabase TO myrole;
```

If you are planning to allow Neosync to initialize tables within a schema, you will need to grant more permissions in order to do so.

```sql
Expand Down
16 changes: 8 additions & 8 deletions docs/docs/guides/incremental-additions.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ Each destination can have different destination options. For example, one destin

Here is a matrix showing support for each destination option under the **Option** column and which destinations support it.

| Option | Description | Postgres | MySQL | SQL Server | DynamoDB | MongoDB | S3 |
| --------------------------- | ------------------------------------------------------------------------------- | -------- | ----- | ---------- | -------- | ------- | --- |
| Truncate Before Insert | Truncates table before inserting data |||||||
| Init Table Schema | Creates table(s) and their constraints. The database schema must already exist. ||| ||||
| On Conflict Do Nothing | If there is a conflict when inserting data do not insert |||||||
| Skip Foreign Key Violations | Insert all valid records, bypassing any that violate foreign key constraints. |||||||
| Truncate CASCADE | Truncate cascade all tables |||||||
| Option | Description | Postgres | MySQL | SQL Server | DynamoDB | MongoDB | S3 |
| --------------------------- | ----------------------------------------------------------------------------- | -------- | ----- | ---------- | -------- | ------- | --- |
| Truncate Before Insert | Truncates table before inserting data |||||||
| Init Table Schema | Creates schemas, tables, and their constraints. ||| ||||
| On Conflict Do Nothing | If there is a conflict when inserting data do not insert |||||||
| Skip Foreign Key Violations | Insert all valid records, bypassing any that violate foreign key constraints. |||||||
| Truncate CASCADE | Truncate cascade all tables |||||||

## Full Refresh

Expand Down Expand Up @@ -70,6 +70,6 @@ The other way to do incremental data additions is to use Subsetting. Using subse

This is generally more efficient and more flexible way to do incremental data refreshes. It's also, generally, a more powerful way to subset your data.

# Conclusion
## Conclusion

Full refreshes and incremental data syncs are two ways to sync data using Neosync. There are plenty of valid use-cases for both and depending on your requirements, one may make more sense than the other. If you have other requirements or questions, please don't hesitate to submit a feature request or talk to us in Discord.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export default function MssqlDBDestinationOptionsForm(
});
}}
title="Init Table Schema"
description="Creates table(s) and their constraints. The database schema must already exist. "
description="Creates schemas, tables, and their constraints."
/>
<FormErrorMessage message={errors?.initTableSchema?.message} />
</FormItemContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export default function MysqlDBDestinationOptionsForm(
});
}}
title="Init Table Schema"
description="Creates table(s) and their constraints. The database schema must already exist. "
description="Creates schemas, tables, and their constraints."
/>
<FormErrorMessage message={errors?.initTableSchema?.message} />
</FormItemContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export default function PostgresDBDestinationOptionsForm(
});
}}
title="Init Table Schema"
description="Creates table(s) and their constraints. The database schema must already exist. "
description="Creates schemas, tables, and their constraints."
/>
<FormErrorMessage message={errors?.initTableSchema?.message} />
</FormItemContainer>
Expand Down
11 changes: 11 additions & 0 deletions internal/ee/mssql-manager/ee-mssql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const (
SchemasLabel = "schemas"
ViewsFunctionsLabel = "view and functions"
TableIndexLabel = "table index"
)
Expand Down Expand Up @@ -149,6 +150,15 @@ func (m *Manager) GetSchemaInitStatements(ctx context.Context, tables []*sqlmana
}

errgrp, errctx := errgroup.WithContext(ctx)

schemaStmts := []string{}
errgrp.Go(func() error {
for schema := range schemasMap {
schemaStmts = append(schemaStmts, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %q;", schema))
}
return nil
})

dataTypeStmts := []string{}
errgrp.Go(func() error {
datatypeCfg, err := m.GetSchemaTableDataTypes(errctx, tables)
Expand Down Expand Up @@ -212,6 +222,7 @@ func (m *Manager) GetSchemaInitStatements(ctx context.Context, tables []*sqlmana
}

return []*sqlmanager_shared.InitSchemaStatements{
{Label: SchemasLabel, Statements: schemaStmts},
{Label: "data types", Statements: dataTypeStmts},
{Label: "create table", Statements: createTables},
{Label: ViewsFunctionsLabel, Statements: viewAndFuncStmts},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,28 @@ func (b *initStatementBuilder) RunSqlInitTableStatements(
return nil, err
}

initErrors := []*InitSchemaError{}
for _, block := range initblocks {
slogger.Info(fmt.Sprintf("[%s] found %d statements to execute during schema initialization", block.Label, len(block.Statements)))
if len(block.Statements) == 0 {
continue
}
err = destdb.Db().BatchExec(ctx, batchSizeConst, block.Statements, &sqlmanager_shared.BatchExecOpts{})
if err != nil {
return nil, fmt.Errorf("unable to exec pg %s statements: %w", block.Label, err)
slogger.Error(fmt.Sprintf("unable to exec pg %s statements: %s", block.Label, err.Error()))
if block.Label != sqlmanager_postgres.SchemasLabel {
return nil, fmt.Errorf("unable to exec pg %s statements: %w", block.Label, err)
}
initErrors = append(initErrors, &InitSchemaError{
Statement: strings.Join(block.Statements, "\n"),
Error: err.Error(),
})
}
}
initSchemaRunContext = append(initSchemaRunContext, &InitSchemaRunContext{
ConnectionId: destination.ConnectionId,
Errors: initErrors,
})
}
// truncate statements
if sqlopts.TruncateCascade {
Expand Down Expand Up @@ -257,16 +269,28 @@ func (b *initStatementBuilder) RunSqlInitTableStatements(
return nil, err
}

initErrors := []*InitSchemaError{}
for _, block := range initblocks {
slogger.Info(fmt.Sprintf("[%s] found %d statements to execute during schema initialization", block.Label, len(block.Statements)))
if len(block.Statements) == 0 {
continue
}
err = destdb.Db().BatchExec(ctx, batchSizeConst, block.Statements, &sqlmanager_shared.BatchExecOpts{})
if err != nil {
return nil, fmt.Errorf("unable to exec mysql %s statements: %w", block.Label, err)
slogger.Error(fmt.Sprintf("unable to exec mysql %s statements: %s", block.Label, err.Error()))
if block.Label != sqlmanager_mysql.SchemasLabel {
return nil, fmt.Errorf("unable to exec mysql %s statements: %w", block.Label, err)
}
initErrors = append(initErrors, &InitSchemaError{
Statement: strings.Join(block.Statements, "\n"),
Error: err.Error(),
})
}
}
initSchemaRunContext = append(initSchemaRunContext, &InitSchemaRunContext{
ConnectionId: destination.ConnectionId,
Errors: initErrors,
})
}
}
// truncate statements
Expand Down Expand Up @@ -316,14 +340,14 @@ func (b *initStatementBuilder) RunSqlInitTableStatements(
for _, stmt := range block.Statements {
err = destdb.Db().Exec(ctx, stmt)
if err != nil {
slogger.Error("unable to exec mssql %s statements: %w", block.Label, err)
slogger.Error(fmt.Sprintf("unable to exec mssql %s statements: %s", block.Label, err.Error()))
if block.Label != ee_sqlmanager_mssql.SchemasLabel && block.Label != ee_sqlmanager_mssql.ViewsFunctionsLabel && block.Label != ee_sqlmanager_mssql.TableIndexLabel {
return nil, fmt.Errorf("unable to exec mssql %s statements: %w", block.Label, err)
}
initErrors = append(initErrors, &InitSchemaError{
Statement: stmt,
Error: err.Error(),
})
if block.Label != ee_sqlmanager_mssql.ViewsFunctionsLabel && block.Label != ee_sqlmanager_mssql.TableIndexLabel {
return nil, fmt.Errorf("unable to exec mssql %s statements: %w", block.Label, err)
}
}
}
}
Expand Down

0 comments on commit 6bd2bb8

Please sign in to comment.