Skip to content

Commit

Permalink
Update Accumulo to 3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Feb 15, 2024
1 parent 7cea733 commit 46598b1
Show file tree
Hide file tree
Showing 22 changed files with 493 additions and 401 deletions.
21 changes: 11 additions & 10 deletions plugin/trino-accumulo-iterators/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<project.build.targetJdk>8</project.build.targetJdk>
<project.build.targetJdk>11</project.build.targetJdk>
</properties>

<dependencies>
Expand All @@ -24,22 +24,23 @@
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>${dep.accumulo-hadoop.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${dep.accumulo.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
82 changes: 53 additions & 29 deletions plugin/trino-accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,9 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.curator.version>2.13.0</dep.curator.version>
<dep.curator.version>5.6.0</dep.curator.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3-1</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -74,12 +64,6 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>${dep.accumulo-hadoop.version}</version>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-accumulo-iterators</artifactId>
Expand All @@ -96,6 +80,11 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
Expand Down Expand Up @@ -131,6 +120,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
Expand All @@ -144,24 +137,20 @@
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-api</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-svnexe</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -231,6 +220,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-api</artifactId>
Expand Down Expand Up @@ -291,6 +286,35 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${dep.accumulo.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>aopalliance-repackaged</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,28 @@

/**
* Trino metadata provider for Accumulo.
* Responsible for creating/dropping/listing tables, schemas, columns, all sorts of goodness. Heavily leverages {@link AccumuloClient}.
* Responsible for creating/dropping/listing tables, schemas, columns, all sorts of goodness. Heavily leverages {@link AccumuloMetadataManager}.
*/
public class AccumuloMetadata
implements ConnectorMetadata
{
private static final JsonCodec<ConnectorViewDefinition> VIEW_CODEC =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorViewDefinition.class);

private final AccumuloClient client;
private final AccumuloMetadataManager metadataManager;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

@Inject
public AccumuloMetadata(AccumuloClient client)
public AccumuloMetadata(AccumuloMetadataManager metadataManager)
{
this.client = requireNonNull(client, "client is null");
this.metadataManager = requireNonNull(metadataManager, "metadataManager is null");
}

@Override
public void createSchema(ConnectorSession session, String schemaName, Map<String, Object> properties, TrinoPrincipal owner)
{
checkArgument(properties.isEmpty(), "Can't have properties for schema creation");
client.createSchema(schemaName);
metadataManager.createSchema(schemaName);
}

@Override
Expand All @@ -94,7 +94,7 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc
if (cascade) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping schemas with CASCADE option");
}
client.dropSchema(schemaName);
metadataManager.dropSchema(schemaName);
}

@Override
Expand All @@ -110,7 +110,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
checkNoRollback();

SchemaTableName tableName = tableMetadata.getTable();
AccumuloTable table = client.createTable(tableMetadata);
AccumuloTable table = metadataManager.createTable(tableMetadata);

AccumuloTableHandle handle = new AccumuloTableHandle(
tableName.getSchemaName(),
Expand All @@ -134,7 +134,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess

private void rollbackCreateTable(AccumuloTable table)
{
client.dropTable(table);
metadataManager.dropTable(table);
}

@Override
Expand All @@ -143,53 +143,53 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
if (tableMetadata.getComment().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
}
client.createTable(tableMetadata);
metadataManager.createTable(tableMetadata);
}

@Override
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table != null) {
client.dropTable(table);
metadataManager.dropTable(table);
}
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle,
SchemaTableName newTableName)
{
if (client.getTable(newTableName) != null) {
if (metadataManager.getTable(newTableName) != null) {
throw new TrinoException(ACCUMULO_TABLE_EXISTS, "Table " + newTableName + " already exists");
}

AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
client.renameTable(handle.toSchemaTableName(), newTableName);
metadataManager.renameTable(handle.toSchemaTableName(), newTableName);
}

@Override
public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, boolean replace)
{
String viewData = VIEW_CODEC.toJson(definition);
if (replace) {
client.createOrReplaceView(viewName, viewData);
metadataManager.createOrReplaceView(viewName, viewData);
}
else {
client.createView(viewName, viewData);
metadataManager.createView(viewName, viewData);
}
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
client.dropView(viewName);
metadataManager.dropView(viewName);
}

@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return Optional.ofNullable(client.getView(viewName))
return Optional.ofNullable(metadataManager.getView(viewName))
.map(view -> VIEW_CODEC.fromJson(view.getData()));
}

Expand All @@ -209,13 +209,13 @@ private List<SchemaTableName> listViews(Optional<String> filterSchema)
{
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
if (filterSchema.isPresent()) {
for (String view : client.getViewNames(filterSchema.get())) {
for (String view : metadataManager.getViewNames(filterSchema.get())) {
builder.add(new SchemaTableName(filterSchema.get(), view));
}
}
else {
for (String schemaName : client.getSchemaNames()) {
for (String view : client.getViewNames(schemaName)) {
for (String schemaName : metadataManager.getSchemaNames()) {
for (String view : metadataManager.getViewNames(schemaName)) {
builder.add(new SchemaTableName(schemaName, view));
}
}
Expand Down Expand Up @@ -264,7 +264,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable

// Need to validate that SchemaTableName is a table
if (!this.listViews(session, Optional.of(tableName.getSchemaName())).contains(tableName)) {
AccumuloTable table = client.getTable(tableName);
AccumuloTable table = metadataManager.getTable(tableName);
if (table == null) {
return null;
}
Expand Down Expand Up @@ -298,7 +298,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;

AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table == null) {
throw new TableNotFoundException(handle.toSchemaTableName());
}
Expand All @@ -321,29 +321,29 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
{
AccumuloTableHandle handle = (AccumuloTableHandle) tableHandle;
AccumuloColumnHandle columnHandle = (AccumuloColumnHandle) source;
AccumuloTable table = client.getTable(handle.toSchemaTableName());
AccumuloTable table = metadataManager.getTable(handle.toSchemaTableName());
if (table == null) {
throw new TableNotFoundException(new SchemaTableName(handle.getSchema(), handle.getTable()));
}

client.renameColumn(table, columnHandle.getName(), target);
metadataManager.renameColumn(table, columnHandle.getName(), target);
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
return ImmutableList.copyOf(client.getSchemaNames());
return ImmutableList.copyOf(metadataManager.getSchemaNames());
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> filterSchema)
{
Set<String> schemaNames = filterSchema.<Set<String>>map(ImmutableSet::of)
.orElseGet(client::getSchemaNames);
.orElseGet(metadataManager::getSchemaNames);

ImmutableSet.Builder<SchemaTableName> builder = ImmutableSet.builder();
for (String schemaName : schemaNames) {
for (String tableName : client.getTableNames(schemaName)) {
for (String tableName : metadataManager.getTableNames(schemaName)) {
builder.add(new SchemaTableName(schemaName, tableName));
}
}
Expand Down Expand Up @@ -415,13 +415,13 @@ public void rollback()

private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
{
if (!client.getSchemaNames().contains(tableName.getSchemaName())) {
if (!metadataManager.getSchemaNames().contains(tableName.getSchemaName())) {
return null;
}

// Need to validate that SchemaTableName is a table
if (!this.listViews(Optional.ofNullable(tableName.getSchemaName())).contains(tableName)) {
AccumuloTable table = client.getTable(tableName);
AccumuloTable table = metadataManager.getTable(tableName);
if (table == null) {
return null;
}
Expand Down
Loading

0 comments on commit 46598b1

Please sign in to comment.