diff --git a/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java b/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
index 98cc6deb3b18d4d3f6fcb14c3b4babe28f90257f..dae2f8069ae2363fffe92baf827a4c9a10eb9b0b 100755
--- a/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
+++ b/hetu-carbondata/src/main/java/io/hetu/core/plugin/carbondata/CarbondataConnectorFactory.java
@@ -23,6 +23,10 @@ import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
import io.airlift.units.DataSize;
import io.hetu.core.plugin.carbondata.impl.CarbondataTableConfig;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.plugin.base.jmx.MBeanServerModule;
import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
import io.prestosql.plugin.hive.HiveAnalyzeProperties;
@@ -55,10 +59,6 @@ import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.type.TypeManager;
import org.apache.carbondata.hive.CarbonHiveSerDe;
diff --git a/hetu-hazelcast/pom.xml b/hetu-hazelcast/pom.xml
index ee93729f7799d5b203ac3ddaa13c62165e77c84b..ce85ab9fe0b045277f2eb5391e62b5ac0b67cc47 100644
--- a/hetu-hazelcast/pom.xml
+++ b/hetu-hazelcast/pom.xml
@@ -15,7 +15,6 @@
${project.parent.basedir}
4.6.3
3.21.0
- 5.1
1.9.4
5.3.27
1.1.1
diff --git a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
index 844b1a80273ef8ec4528e65cc304da986df4ab83..b55b76031979900cde7a76a00108a26fc0d29310 100644
--- a/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
+++ b/hetu-iceberg/src/main/java/io/hetu/core/plugin/iceberg/IcebergConnector.java
@@ -16,6 +16,7 @@ package io.hetu.core.plugin.iceberg;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.bootstrap.LifeCycleManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
import io.prestosql.plugin.base.session.SessionPropertiesProvider;
import io.prestosql.plugin.hive.HiveTransactionHandle;
import io.prestosql.spi.connector.Connector;
@@ -29,7 +30,6 @@ import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.TableProcedureMetadata;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.security.ConnectorIdentity;
import io.prestosql.spi.session.PropertyMetadata;
diff --git a/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java b/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
index 6d143578543272a101a68619adc971f5b7c9c5e7..fc3fd2d4d73ca079a5e0aaa174198e913d32a0cb 100644
--- a/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
+++ b/hetu-mpp/src/main/java/io/hetu/core/plugin/mpp/MppConnectorFactory.java
@@ -21,6 +21,10 @@ import io.airlift.bootstrap.Bootstrap;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.event.client.EventModule;
import io.airlift.json.JsonModule;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.plugin.base.jmx.MBeanServerModule;
import io.prestosql.plugin.hive.ConnectorObjectNameGeneratorModule;
import io.prestosql.plugin.hive.HiveAnalyzeProperties;
@@ -53,10 +57,6 @@ import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorPageSourceProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import io.prestosql.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.prestosql.spi.heuristicindex.IndexClient;
import io.prestosql.spi.procedure.Procedure;
import io.prestosql.spi.type.TypeManager;
diff --git a/pom.xml b/pom.xml
index 84168c5d552f2290cf957beebb736bb6554eb270..0a4b073efd91c6e3e868a38c3af94b8ddf74aeb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,8 @@
1.66
0.7.0
5.0.3
+ 2.4.1
+ 5.1
io.hetu.core
@@ -176,6 +161,11 @@
+
+ org.assertj
+ assertj-core
+
+
io.airlift
log-manager
@@ -194,31 +184,6 @@
-
- com.101tec
- zkclient
- 0.10
- runtime
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- zookeeper
- org.apache.zookeeper
-
-
- netty
- io.netty
-
-
-
-
org.testng
diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
similarity index 39%
rename from presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
rename to presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
index 7807c7ff740750313e9a7c65121e6e62fabd5510..98554486c661fb0b274a1b3a53f2111e65528e29 100644
--- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaAdminFactory.java
@@ -11,41 +11,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.prestosql.plugin.kafka;
-import io.prestosql.plugin.kafka.util.EmbeddedKafka;
-import io.prestosql.tests.AbstractTestIntegrationSmokeTest;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.Test;
+import io.prestosql.spi.HostAddress;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+
+import javax.inject.Inject;
-import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
-import static io.airlift.tpch.TpchTable.ORDERS;
-import static io.prestosql.plugin.kafka.KafkaQueryRunner.createKafkaQueryRunner;
-import static io.prestosql.plugin.kafka.util.EmbeddedKafka.createEmbeddedKafka;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-@Test
-public class TestKafkaIntegrationSmokeTest
- extends AbstractTestIntegrationSmokeTest
+public class KafkaAdminFactory
{
- private final EmbeddedKafka embeddedKafka;
+ private final Set nodes;
- public TestKafkaIntegrationSmokeTest()
- throws Exception
+ @Inject
+ public KafkaAdminFactory(KafkaConfig kafkaConfig)
{
- this(createEmbeddedKafka());
+ requireNonNull(kafkaConfig, "kafkaConfig is null");
+ nodes = kafkaConfig.getNodes();
}
- public TestKafkaIntegrationSmokeTest(EmbeddedKafka embeddedKafka)
+ public AdminClient create()
{
- super(() -> createKafkaQueryRunner(embeddedKafka, ORDERS));
- this.embeddedKafka = embeddedKafka;
+ return KafkaAdminClient.create(configure());
}
- @AfterClass(alwaysRun = true)
- public void destroy()
- throws IOException
+ public Properties configure()
{
- embeddedKafka.close();
+ Properties properties = new Properties();
+ properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
+ .map(HostAddress::toString)
+ .collect(joining(",")));
+ return properties;
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
index 64a53c8bff143ac6a62510374063dab45e6f8173..66846d65250ccdea12a5eb42874e5ff6828ffa23 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaColumnHandle.java
@@ -16,6 +16,7 @@ package io.prestosql.plugin.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.plugin.kafka.encoder.EncoderColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.Type;
@@ -24,14 +25,9 @@ import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
-/**
- * Kafka specific connector column handle.
- */
public final class KafkaColumnHandle
- implements DecoderColumnHandle, Comparable
+ implements EncoderColumnHandle, DecoderColumnHandle
{
- private final int ordinalPosition;
-
/**
* Column Name
*/
@@ -43,24 +39,24 @@ public final class KafkaColumnHandle
private final Type type;
/**
- * Mapping hint for the decoder. Can be null.
+ * Mapping hint for the codec. Can be null.
*/
private final String mapping;
/**
- * Data format to use (selects the decoder). Can be null.
+ * Data format to use (selects the codec). Can be null.
*/
private final String dataFormat;
/**
- * Additional format hint for the selected decoder. Selects a decoder subtype (e.g. which timestamp decoder).
+ * Additional format hint for the selected codec. Selects a codec subtype (e.g. which timestamp codec).
*/
private final String formatHint;
/**
- * True if the key decoder should be used, false if the message decoder should be used.
+ * True if the key codec should be used, false if the message codec should be used.
*/
- private final boolean keyDecoder;
+ private final boolean keyCodec;
/**
* True if the column should be hidden.
@@ -74,33 +70,25 @@ public final class KafkaColumnHandle
@JsonCreator
public KafkaColumnHandle(
- @JsonProperty("ordinalPosition") int ordinalPosition,
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("mapping") String mapping,
@JsonProperty("dataFormat") String dataFormat,
@JsonProperty("formatHint") String formatHint,
- @JsonProperty("keyDecoder") boolean keyDecoder,
+ @JsonProperty("keyCodec") boolean keyCodec,
@JsonProperty("hidden") boolean hidden,
@JsonProperty("internal") boolean internal)
{
- this.ordinalPosition = ordinalPosition;
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.mapping = mapping;
this.dataFormat = dataFormat;
this.formatHint = formatHint;
- this.keyDecoder = keyDecoder;
+ this.keyCodec = keyCodec;
this.hidden = hidden;
this.internal = internal;
}
- @JsonProperty
- public int getOrdinalPosition()
- {
- return ordinalPosition;
- }
-
@Override
@JsonProperty
public String getName()
@@ -137,9 +125,9 @@ public final class KafkaColumnHandle
}
@JsonProperty
- public boolean isKeyDecoder()
+ public boolean isKeyCodec()
{
- return keyDecoder;
+ return keyCodec;
}
@JsonProperty
@@ -163,7 +151,7 @@ public final class KafkaColumnHandle
@Override
public int hashCode()
{
- return Objects.hash(ordinalPosition, name, type, mapping, dataFormat, formatHint, keyDecoder, hidden, internal);
+ return Objects.hash(name, type, mapping, dataFormat, formatHint, keyCodec, hidden, internal);
}
@Override
@@ -177,34 +165,26 @@ public final class KafkaColumnHandle
}
KafkaColumnHandle other = (KafkaColumnHandle) obj;
- return Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
- Objects.equals(this.name, other.name) &&
+ return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.mapping, other.mapping) &&
Objects.equals(this.dataFormat, other.dataFormat) &&
Objects.equals(this.formatHint, other.formatHint) &&
- Objects.equals(this.keyDecoder, other.keyDecoder) &&
+ Objects.equals(this.keyCodec, other.keyCodec) &&
Objects.equals(this.hidden, other.hidden) &&
Objects.equals(this.internal, other.internal);
}
- @Override
- public int compareTo(KafkaColumnHandle otherHandle)
- {
- return Integer.compare(this.getOrdinalPosition(), otherHandle.getOrdinalPosition());
- }
-
@Override
public String toString()
{
return toStringHelper(this)
- .add("ordinalPosition", ordinalPosition)
.add("name", name)
.add("type", type)
.add("mapping", mapping)
.add("dataFormat", dataFormat)
.add("formatHint", formatHint)
- .add("keyDecoder", keyDecoder)
+ .add("keyCodec", keyCodec)
.add("hidden", hidden)
.add("internal", internal)
.toString();
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..f4c18d58f136a83b7ca3143cd4888d3a8658738b
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConfig.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.kafka;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.DefunctConfig;
+import io.airlift.units.DataSize;
+import io.airlift.units.DataSize.Unit;
+import io.prestosql.spi.HostAddress;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import java.io.File;
+import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+@DefunctConfig("kafka.connect-timeout")
+public class KafkaConfig
+{
+ private static final int KAFKA_DEFAULT_PORT = 9092;
+
+ private Set nodes = ImmutableSet.of();
+ private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);
+ private String defaultSchema = "default";
+ private Set tableNames = ImmutableSet.of();
+ private File tableDescriptionDir = new File("etc/kafka/");
+ private boolean hideInternalColumns = true;
+ private int messagesPerSplit = 100_000;
+ private boolean timestampUpperBoundPushDownEnabled;
+
+ @Size(min = 1)
+ public Set getNodes()
+ {
+ return nodes;
+ }
+
+ @Config("kafka.nodes")
+ @ConfigDescription("Seed nodes for Kafka cluster. At least one must exist")
+ public KafkaConfig setNodes(String nodes)
+ {
+ this.nodes = (nodes == null) ? null : parseNodes(nodes);
+ return this;
+ }
+
+ public DataSize getKafkaBufferSize()
+ {
+ return kafkaBufferSize;
+ }
+
+ @Config("kafka.buffer-size")
+ @ConfigDescription("Kafka message consumer buffer size")
+ public KafkaConfig setKafkaBufferSize(String kafkaBufferSize)
+ {
+ this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
+ return this;
+ }
+
+ @NotNull
+ public String getDefaultSchema()
+ {
+ return defaultSchema;
+ }
+
+ @Config("kafka.default-schema")
+ @ConfigDescription("Schema name to use in the connector")
+ public KafkaConfig setDefaultSchema(String defaultSchema)
+ {
+ this.defaultSchema = defaultSchema;
+ return this;
+ }
+
+ @NotNull
+ public Set getTableNames()
+ {
+ return tableNames;
+ }
+
+ @Config("kafka.table-names")
+ @ConfigDescription("Set of tables known to this connector")
+ public KafkaConfig setTableNames(String tableNames)
+ {
+ this.tableNames = ImmutableSet.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(tableNames));
+ return this;
+ }
+
+ public boolean isHideInternalColumns()
+ {
+ return hideInternalColumns;
+ }
+
+ @Config("kafka.hide-internal-columns")
+ @ConfigDescription("Whether internal columns are shown in table metadata or not. Default is no")
+ public KafkaConfig setHideInternalColumns(boolean hideInternalColumns)
+ {
+ this.hideInternalColumns = hideInternalColumns;
+ return this;
+ }
+
+ @NotNull
+ public File getTableDescriptionDir()
+ {
+ return tableDescriptionDir;
+ }
+
+ @Config("kafka.table-description-dir")
+ @ConfigDescription("Folder holding JSON description files for Kafka topics")
+ public KafkaConfig setTableDescriptionDir(File tableDescriptionDir)
+ {
+ this.tableDescriptionDir = tableDescriptionDir;
+ return this;
+ }
+
+ private static ImmutableSet parseNodes(String nodes)
+ {
+ Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
+ return StreamSupport.stream(splitter.split(nodes).spliterator(), false)
+ .map(KafkaConfig::toHostAddress)
+ .collect(toImmutableSet());
+ }
+
+ private static HostAddress toHostAddress(String value)
+ {
+ return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
+ }
+
+ @Min(1)
+ public int getMessagesPerSplit()
+ {
+ return messagesPerSplit;
+ }
+
+ @Config("kafka.messages-per-split")
+ @ConfigDescription("Count of Kafka messages to be processed by single Presto Kafka connector split")
+ public KafkaConfig setMessagesPerSplit(int messagesPerSplit)
+ {
+ this.messagesPerSplit = messagesPerSplit;
+ return this;
+ }
+
+ public boolean isTimestampUpperBoundPushDownEnabled()
+ {
+ return timestampUpperBoundPushDownEnabled;
+ }
+
+ @Config("kafka.timestamp-upper-bound-force-push-down-enabled")
+ @ConfigDescription("timestamp upper bound force pushing down enabled")
+ public KafkaConfig setTimestampUpperBoundPushDownEnabled(boolean timestampUpperBoundPushDownEnabled)
+ {
+ this.timestampUpperBoundPushDownEnabled = timestampUpperBoundPushDownEnabled;
+ return this;
+ }
+}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
index 610cdf0f40fad63e5369b3cdd81d05ea424c78fd..6241416f6e50f14324b45ba5c3dfa0795eceace1 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnector.java
@@ -14,44 +14,48 @@
package io.prestosql.plugin.kafka;
import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.log.Logger;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorRecordSetProvider;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.session.PropertyMetadata;
import io.prestosql.spi.transaction.IsolationLevel;
import javax.inject.Inject;
+import java.util.List;
+
import static io.prestosql.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.prestosql.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
-/**
- * Kafka specific implementation of the Presto Connector SPI. This is a read only connector.
- */
public class KafkaConnector
implements Connector
{
- private static final Logger log = Logger.get(KafkaConnector.class);
-
private final LifeCycleManager lifeCycleManager;
- private final KafkaMetadata metadata;
- private final KafkaSplitManager splitManager;
- private final KafkaRecordSetProvider recordSetProvider;
+ private final ConnectorMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ConnectorPageSinkProvider pageSinkProvider;
+ private final KafkaSessionProperties sessionProperties;
@Inject
public KafkaConnector(
LifeCycleManager lifeCycleManager,
- KafkaMetadata metadata,
- KafkaSplitManager splitManager,
- KafkaRecordSetProvider recordSetProvider)
+ ConnectorMetadata metadata,
+ ConnectorSplitManager splitManager,
+ ConnectorRecordSetProvider recordSetProvider,
+ ConnectorPageSinkProvider pageSinkProvider,
+ KafkaSessionProperties sessionProperties)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
+ this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
}
@Override
@@ -79,14 +83,21 @@ public class KafkaConnector
return recordSetProvider;
}
+ @Override
+ public ConnectorPageSinkProvider getPageSinkProvider()
+ {
+ return pageSinkProvider;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties.getSessionProperties();
+ }
+
@Override
public final void shutdown()
{
- try {
- lifeCycleManager.stop();
- }
- catch (Exception e) {
- log.error(e, "Error shutting down connector");
- }
+ lifeCycleManager.stop();
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
index 2668963add837524a139350e32cf613e988dc7ea..82072a7f65150e1ea6e162914a2c618855f4eb0b 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorFactory.java
@@ -14,8 +14,7 @@
package io.prestosql.plugin.kafka;
import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
+import com.google.inject.Module;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.prestosql.spi.NodeManager;
@@ -23,27 +22,20 @@ import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.connector.ConnectorHandleResolver;
-import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.TypeManager;
import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
-import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
-/**
- * Creates Kafka Connectors based off catalogName and specific configuration.
- */
public class KafkaConnectorFactory
implements ConnectorFactory
{
- private final Optional>> tableDescriptionSupplier;
+ private final Module extension;
- KafkaConnectorFactory(Optional>> tableDescriptionSupplier)
+ KafkaConnectorFactory(Module extension)
{
- this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
+ this.extension = requireNonNull(extension, "extension is null");
}
@Override
@@ -64,32 +56,22 @@ public class KafkaConnectorFactory
requireNonNull(catalogName, "catalogName is null");
requireNonNull(config, "config is null");
- try {
- Bootstrap app = new Bootstrap(
- new JsonModule(),
- new KafkaConnectorModule(),
- binder -> {
- binder.bind(TypeManager.class).toInstance(context.getTypeManager());
- binder.bind(NodeManager.class).toInstance(context.getNodeManager());
-
- if (tableDescriptionSupplier.isPresent()) {
- binder.bind(new TypeLiteral>>() {}).toInstance(tableDescriptionSupplier.get());
- }
- else {
- binder.bind(new TypeLiteral>>() {}).to(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
- }
- });
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new KafkaConnectorModule(),
+ extension,
+ binder -> {
+ binder.bind(ClassLoader.class).toInstance(KafkaConnectorFactory.class.getClassLoader());
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ });
- Injector injector = app.strictConfig()
- .doNotInitializeLogging()
- .setRequiredConfigurationProperties(config)
- .initialize();
+ Injector injector = app
+ .strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
- return injector.getInstance(KafkaConnector.class);
- }
- catch (Exception e) {
- throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
+ return injector.getInstance(KafkaConnector.class);
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
index 20e0b30128d8a251185d4d2e027d0f04a5a150e9..806401279db50493e8f1b0fe38587fcf6efd56f2 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConnectorModule.java
@@ -19,40 +19,57 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.prestosql.decoder.DecoderModule;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorRecordSetProvider;
+import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.prestosql.plugin.base.classloader.ForClassLoaderSafe;
+import io.prestosql.plugin.kafka.encoder.EncoderModule;
+import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorPageSinkProvider;
+import io.prestosql.spi.connector.ConnectorRecordSetProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
import javax.inject.Inject;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.prestosql.spi.type.TypeSignature.parseTypeSignature;
import static java.util.Objects.requireNonNull;
-/**
- * Guice module for the Apache Kafka connector.
- */
public class KafkaConnectorModule
implements Module
{
@Override
public void configure(Binder binder)
{
+ binder.bind(ConnectorMetadata.class).to(KafkaMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(KafkaPageSinkProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaInternalFieldManager.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaSessionProperties.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaAdminFactory.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaFilterManager.class).in(Scopes.SINGLETON);
- binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
- binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
- binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
-
- binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
-
- configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+ //binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
+ //configBinder(binder).bindConfig(KafkaConnectorConfig.class);
+ configBinder(binder).bindConfig(KafkaConfig.class);
+ newSetBinder(binder, TableDescriptionSupplier.class).addBinding().toProvider(KafkaTableDescriptionSupplier.class).in(Scopes.SINGLETON);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
binder.install(new DecoderModule());
+ binder.install(new EncoderModule());
+ binder.install(new KafkaProducerModule());
}
public static final class TypeDeserializer
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..9872e7809459b0293f35188bd11f20f4d9344fc3
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.prestosql.plugin.kafka;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Properties;
+
+public interface KafkaConsumerFactory
+{
+ default KafkaConsumer create()
+ {
+ return new KafkaConsumer<>(configure());
+ }
+
+ Properties configure();
+}
diff --git a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
similarity index 52%
rename from presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java
rename to presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
index c7086a26434de719fbf5138deccebd417c262499..0759a716e065c9d0d197679e87d992f1b621d4b0 100644
--- a/presto-kafka/src/test/java/io/prestosql/plugin/kafka/util/NumberEncoder.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaConsumerModule.java
@@ -11,27 +11,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.plugin.kafka.util;
+package io.prestosql.plugin.kafka;
-import kafka.serializer.Encoder;
-import kafka.utils.VerifiableProperties;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
-import java.nio.ByteBuffer;
-
-public class NumberEncoder
- implements Encoder
+public class KafkaConsumerModule
+ implements Module
{
- @SuppressWarnings("UnusedParameters")
- public NumberEncoder(VerifiableProperties properties)
- {
- // constructor required by Kafka
- }
-
@Override
- public byte[] toBytes(Number value)
+ public void configure(Binder binder)
{
- ByteBuffer buf = ByteBuffer.allocate(8);
- buf.putLong(value == null ? 0L : value.longValue());
- return buf.array();
+ binder.bind(KafkaConsumerFactory.class).to(PlainTextKafkaConsumerFactory.class).in(Scopes.SINGLETON);
}
}
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
index 006a70a52ac1974e898040889ca3e41662f7caf4..44a883464dee744658250e508a8a92a9f5ced003 100644
--- a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaErrorCode.java
@@ -18,6 +18,7 @@ import io.prestosql.spi.ErrorCodeSupplier;
import io.prestosql.spi.ErrorType;
import static io.prestosql.spi.ErrorType.EXTERNAL;
+import static io.prestosql.spi.ErrorType.INTERNAL_ERROR;
/**
* Kafka connector specific error codes.
@@ -25,7 +26,10 @@ import static io.prestosql.spi.ErrorType.EXTERNAL;
public enum KafkaErrorCode
implements ErrorCodeSupplier
{
- KAFKA_SPLIT_ERROR(0, EXTERNAL);
+ KAFKA_SPLIT_ERROR(0, EXTERNAL),
+ KAFKA_SCHEMA_ERROR(1, EXTERNAL),
+ KAFKA_PRODUCER_ERROR(2, INTERNAL_ERROR)
+ /**/;
private final ErrorCode errorCode;
diff --git a/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..a2c7e962d82cd3c6554d3b7b2d48781517706ec0
--- /dev/null
+++ b/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaFilterManager.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.predicate.Domain;
+import io.prestosql.spi.predicate.Marker;
+import io.prestosql.spi.predicate.Ranges;
+import io.prestosql.spi.predicate.SortedRangeSet;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.predicate.ValueSet;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static io.prestosql.plugin.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.OFFSET_TIMESTAMP_FIELD;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD;
+import static io.prestosql.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD;
+import static java.lang.Math.floorDiv;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class KafkaFilterManager
+{
+ public static final int MICROSECONDS_PER_MILLISECOND = 1_000;
+ private static final long INVALID_KAFKA_RANGE_INDEX = -1;
+ private static final String TOPIC_CONFIG_TIMESTAMP_KEY = "message.timestamp.type";
+ private static final String TOPIC_CONFIG_TIMESTAMP_VALUE_LOG_APPEND_TIME = "LogAppendTime";
+
+ private final KafkaConsumerFactory consumerFactory;
+ private final KafkaAdminFactory adminFactory;
+
+ @Inject
+ public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory)
+ {
+ this.consumerFactory = requireNonNull(consumerFactory, "consumerManager is null");
+ this.adminFactory = requireNonNull(adminFactory, "adminFactory is null");
+ }
+
+ public KafkaFilteringResult getKafkaFilterResult(
+ ConnectorSession session,
+ KafkaTableHandle kafkaTableHandle,
+ List partitionInfos,
+ Map partitionBeginOffsets,
+ Map partitionEndOffsets)
+ {
+ requireNonNull(session, "session is null");
+ requireNonNull(kafkaTableHandle, "kafkaTableHandle is null");
+ requireNonNull(partitionInfos, "partitionInfos is null");
+ requireNonNull(partitionBeginOffsets, "partitionBeginOffsets is null");
+ requireNonNull(partitionEndOffsets, "partitionEndOffsets is null");
+
+ TupleDomain constraint = kafkaTableHandle.getConstraint();
+ verify(!constraint.isNone(), "constraint is none");
+
+ if (!constraint.isAll()) {
+ Set partitionIds = partitionInfos.stream().map(partitionInfo -> (long) partitionInfo.partition()).collect(toImmutableSet());
+ Optional offsetRanged = Optional.empty();
+ Optional offsetTimestampRanged = Optional.empty();
+ Set partitionIdsFiltered = partitionIds;
+ Optional
+
+ io.airlift
+ slice
+
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java
new file mode 100644
index 0000000000000000000000000000000000000000..2d091cac947e39a734f44fa4ae6fb85c114bdcac
--- /dev/null
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorAccessControl.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+//package io.prestosql.plugin.base.classloader;
+//
+//import io.prestosql.spi.classloader.ThreadContextClassLoader;
+//import io.prestosql.spi.connector.ColumnMetadata;
+//import io.prestosql.spi.connector.ConnectorAccessControl;
+//import io.prestosql.spi.connector.ConnectorSecurityContext;
+//import io.prestosql.spi.connector.SchemaTableName;
+//import io.prestosql.spi.security.PrestoPrincipal;
+//import io.prestosql.spi.security.Privilege;
+//
+//import javax.inject.Inject;
+//
+//import java.util.List;
+//import java.util.Optional;
+//import java.util.Set;
+//
+//
+//public class ClassLoaderSafeConnectorAccessControl
+// implements ConnectorAccessControl
+//{
+// private final ConnectorAccessControl delegate;
+// private final ClassLoader classLoader;
+//
+// @Inject
+// public ClassLoaderSafeConnectorAccessControl(@ForClassLoaderSafe ConnectorAccessControl delegate, ClassLoader classLoader)
+// {
+// this.delegate = requireNonNull(delegate, "delegate is null");
+// this.classLoader = requireNonNull(classLoader, "classLoader is null");
+// }
+//
+// @Override
+// public void checkCanCreateSchema(ConnectorSecurityContext context, String schemaName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanCreateSchema(context, schemaName);
+// }
+// }
+//
+// @Override
+// public void checkCanDropSchema(ConnectorSecurityContext context, String schemaName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDropSchema(context, schemaName);
+// }
+// }
+//
+// @Override
+// public void checkCanRenameSchema(ConnectorSecurityContext context, String schemaName, String newSchemaName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRenameSchema(context, schemaName, newSchemaName);
+// }
+// }
+//
+// @Override
+// public void checkCanShowSchemas(ConnectorSecurityContext context)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowSchemas(context);
+// }
+// }
+//
+// @Override
+// public Set filterSchemas(ConnectorSecurityContext context, Set schemaNames)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// return delegate.filterSchemas(context, schemaNames);
+// }
+// }
+//
+// @Override
+// public void checkCanCreateTable(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanCreateTable(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanDropTable(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDropTable(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanRenameTable(ConnectorSecurityContext context, SchemaTableName tableName, SchemaTableName newTableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRenameTable(context, tableName, newTableName);
+// }
+// }
+//
+// @Override
+// public void checkCanSetTableComment(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanSetTableComment(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanShowTablesMetadata(ConnectorSecurityContext context, String schemaName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowTablesMetadata(context, schemaName);
+// }
+// }
+//
+// @Override
+// public Set filterTables(ConnectorSecurityContext context, Set tableNames)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// return delegate.filterTables(context, tableNames);
+// }
+// }
+//
+// @Override
+// public void checkCanShowColumnsMetadata(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowColumnsMetadata(context, tableName);
+// }
+// }
+//
+// @Override
+// public List filterColumns(ConnectorSecurityContext context, SchemaTableName tableName, List columns)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// return delegate.filterColumns(context, tableName, columns);
+// }
+// }
+//
+// @Override
+// public void checkCanAddColumn(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanAddColumn(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanDropColumn(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDropColumn(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanRenameColumn(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRenameColumn(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanSelectFromColumns(ConnectorSecurityContext context, SchemaTableName tableName, Set columnNames)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanSelectFromColumns(context, tableName, columnNames);
+// }
+// }
+//
+// @Override
+// public void checkCanInsertIntoTable(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanInsertIntoTable(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanDeleteFromTable(ConnectorSecurityContext context, SchemaTableName tableName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDeleteFromTable(context, tableName);
+// }
+// }
+//
+// @Override
+// public void checkCanCreateView(ConnectorSecurityContext context, SchemaTableName viewName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanCreateView(context, viewName);
+// }
+// }
+//
+// @Override
+// public void checkCanRenameView(ConnectorSecurityContext context, SchemaTableName viewName, SchemaTableName newViewName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRenameView(context, viewName, newViewName);
+// }
+// }
+//
+// @Override
+// public void checkCanDropView(ConnectorSecurityContext context, SchemaTableName viewName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDropView(context, viewName);
+// }
+// }
+//
+// @Override
+// public void checkCanCreateViewWithSelectFromColumns(ConnectorSecurityContext context, SchemaTableName tableName, Set columnNames)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanCreateViewWithSelectFromColumns(context, tableName, columnNames);
+// }
+// }
+//
+// @Override
+// public void checkCanSetCatalogSessionProperty(ConnectorSecurityContext context, String propertyName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanSetCatalogSessionProperty(context, propertyName);
+// }
+// }
+//
+// @Override
+// public void checkCanGrantTablePrivilege(ConnectorSecurityContext context, Privilege privilege, SchemaTableName tableName, PrestoPrincipal grantee, boolean withGrantOption)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanGrantTablePrivilege(context, privilege, tableName, grantee, withGrantOption);
+// }
+// }
+//
+// @Override
+// public void checkCanRevokeTablePrivilege(ConnectorSecurityContext context, Privilege privilege, SchemaTableName tableName, PrestoPrincipal revokee, boolean grantOptionFor)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRevokeTablePrivilege(context, privilege, tableName, revokee, grantOptionFor);
+// }
+// }
+//
+// @Override
+// public void checkCanCreateRole(ConnectorSecurityContext context, String role, Optional grantor)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanCreateRole(context, role, grantor);
+// }
+// }
+//
+// @Override
+// public void checkCanDropRole(ConnectorSecurityContext context, String role)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanDropRole(context, role);
+// }
+// }
+//
+// @Override
+// public void checkCanGrantRoles(ConnectorSecurityContext context, Set roles, Set grantees, boolean withAdminOption, Optional grantor, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanGrantRoles(context, roles, grantees, withAdminOption, grantor, catalogName);
+// }
+// }
+//
+// @Override
+// public void checkCanRevokeRoles(ConnectorSecurityContext context, Set roles, Set grantees, boolean adminOptionFor, Optional grantor, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanRevokeRoles(context, roles, grantees, adminOptionFor, grantor, catalogName);
+// }
+// }
+//
+// @Override
+// public void checkCanSetRole(ConnectorSecurityContext context, String role, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanSetRole(context, role, catalogName);
+// }
+// }
+//
+// @Override
+// public void checkCanShowRoles(ConnectorSecurityContext context, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowRoles(context, catalogName);
+// }
+// }
+//
+// @Override
+// public void checkCanShowCurrentRoles(ConnectorSecurityContext context, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowCurrentRoles(context, catalogName);
+// }
+// }
+//
+// @Override
+// public void checkCanShowRoleGrants(ConnectorSecurityContext context, String catalogName)
+// {
+// try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+// delegate.checkCanShowRoleGrants(context, catalogName);
+// }
+// }
+//}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
similarity index 99%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
index 15fe7cc75527cf6b6074987f66437b99ab93ec0b..525df9aab62554a65d263582208321d2c78ec07b 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.airlift.slice.Slice;
import io.prestosql.spi.PartialAndFinalAggregationType;
@@ -64,6 +64,8 @@ import io.prestosql.spi.statistics.TableStatistics;
import io.prestosql.spi.statistics.TableStatisticsMetadata;
import io.prestosql.spi.type.Type;
+import javax.inject.Inject;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -80,7 +82,8 @@ public class ClassLoaderSafeConnectorMetadata
private final ConnectorMetadata delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeConnectorMetadata(ConnectorMetadata delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeConnectorMetadata(@ForClassLoaderSafe ConnectorMetadata delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSink.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java
similarity index 95%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSink.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java
index bc4d12dbb1436cdefcf1cfafe4fef52ab3f4116d..3fe398550638b82479cf47fb1f8867e95284568b 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSink.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSink.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.airlift.slice.Slice;
import io.prestosql.spi.Page;
@@ -24,6 +24,8 @@ import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.snapshot.BlockEncodingSerdeProvider;
import io.prestosql.spi.snapshot.RestorableConfig;
+import javax.inject.Inject;
+
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -37,7 +39,8 @@ public class ClassLoaderSafeConnectorPageSink
private final ConnectorPageSink delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeConnectorPageSink(ConnectorPageSink delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeConnectorPageSink(@ForClassLoaderSafe ConnectorPageSink delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProvider.java
similarity index 95%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProvider.java
index 62d19d92755da4e42ced97131f77d1c81cb411fa..24fd5ae0b951a70faf68f72fc0c9d2172616e02d 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProvider.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProvider.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.ConnectorDeleteAsInsertTableHandle;
@@ -25,6 +25,8 @@ import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.ConnectorUpdateTableHandle;
import io.prestosql.spi.connector.ConnectorVacuumTableHandle;
+import javax.inject.Inject;
+
import static java.util.Objects.requireNonNull;
public final class ClassLoaderSafeConnectorPageSinkProvider
@@ -33,7 +35,8 @@ public final class ClassLoaderSafeConnectorPageSinkProvider
private final ConnectorPageSinkProvider delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeConnectorPageSinkProvider(ConnectorPageSinkProvider delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeConnectorPageSinkProvider(@ForClassLoaderSafe ConnectorPageSinkProvider delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProvider.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProvider.java
similarity index 92%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProvider.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProvider.java
index 12cf3c521250d4a5df55c8b1599681c1687453f5..5f678a3f7e20bd9df513598fdd7d2d7b87144999 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProvider.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProvider.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.ColumnHandle;
@@ -23,6 +23,8 @@ import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.dynamicfilter.DynamicFilterSupplier;
+import javax.inject.Inject;
+
import java.util.List;
import java.util.Optional;
@@ -34,7 +36,8 @@ public class ClassLoaderSafeConnectorPageSourceProvider
private final ConnectorPageSourceProvider delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeConnectorPageSourceProvider(ConnectorPageSourceProvider delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeConnectorPageSourceProvider(@ForClassLoaderSafe ConnectorPageSourceProvider delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java
new file mode 100644
index 0000000000000000000000000000000000000000..814402237456bbe521245cb0e743e682dc85f85c
--- /dev/null
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorRecordSetProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.prestosql.plugin.base.classloader;
+
+import io.prestosql.spi.classloader.ThreadContextClassLoader;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorRecordSetProvider;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorSplit;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.connector.RecordSet;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ClassLoaderSafeConnectorRecordSetProvider
+ implements ConnectorRecordSetProvider
+{
+ private final ConnectorRecordSetProvider delegate;
+ private final ClassLoader classLoader;
+
+ @Inject
+ public ClassLoaderSafeConnectorRecordSetProvider(@ForClassLoaderSafe ConnectorRecordSetProvider delegate, ClassLoader classLoader)
+ {
+ this.delegate = requireNonNull(delegate, "delegate is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List extends ColumnHandle> columns)
+ {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ return new ClassLoaderSafeRecordSet(delegate.getRecordSet(transaction, session, split, table, columns), classLoader);
+ }
+ }
+
+ @Override
+ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List extends ColumnHandle> columns)
+ {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ return new ClassLoaderSafeRecordSet(delegate.getRecordSet(transactionHandle, session, split, columns), classLoader);
+ }
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java
similarity index 94%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java
index 727955327deddde57de19e2e9ac2ecd7d0522468..7af96151663c9e90ee65ee8bd6dafac82e800772 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManager.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.ColumnMetadata;
@@ -26,6 +26,8 @@ import io.prestosql.spi.dynamicfilter.DynamicFilter;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.resourcegroups.QueryType;
+import javax.inject.Inject;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -40,7 +42,8 @@ public final class ClassLoaderSafeConnectorSplitManager
private final ConnectorSplitManager delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeConnectorSplitManager(ConnectorSplitManager delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeConnectorSplitManager(@ForClassLoaderSafe ConnectorSplitManager delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProvider.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProvider.java
similarity index 93%
rename from presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProvider.java
rename to presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProvider.java
index d72719742b41272bb69c5d28680091e6d8a99053..3fd871294df812aa59f2aa8d25f1d06f6128f8b7 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProvider.java
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProvider.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
import io.prestosql.spi.connector.BucketFunction;
@@ -24,6 +24,8 @@ import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.type.Type;
+import javax.inject.Inject;
+
import java.util.List;
import java.util.function.ToIntFunction;
@@ -35,7 +37,8 @@ public final class ClassLoaderSafeNodePartitioningProvider
private final ConnectorNodePartitioningProvider delegate;
private final ClassLoader classLoader;
- public ClassLoaderSafeNodePartitioningProvider(ConnectorNodePartitioningProvider delegate, ClassLoader classLoader)
+ @Inject
+ public ClassLoaderSafeNodePartitioningProvider(@ForClassLoaderSafe ConnectorNodePartitioningProvider delegate, ClassLoader classLoader)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
diff --git a/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeRecordSet.java b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeRecordSet.java
new file mode 100644
index 0000000000000000000000000000000000000000..00c1dc6901182b3665acc7b0df7ef7df31db1ce4
--- /dev/null
+++ b/presto-plugin-toolkit/src/main/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeRecordSet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.prestosql.plugin.base.classloader;
+
+import io.prestosql.spi.classloader.ThreadContextClassLoader;
+import io.prestosql.spi.connector.RecordCursor;
+import io.prestosql.spi.connector.RecordSet;
+import io.prestosql.spi.type.Type;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ClassLoaderSafeRecordSet
+ implements RecordSet
+{
+ private final RecordSet delegate;
+ private final ClassLoader classLoader;
+
+ @Inject
+ public ClassLoaderSafeRecordSet(@ForClassLoaderSafe RecordSet delegate, ClassLoader classLoader)
+ {
+ this.delegate = requireNonNull(delegate, "delegate is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public List getColumnTypes()
+ {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ return delegate.getColumnTypes();
+ }
+ }
+
+ @Override
+ public RecordCursor cursor()
+ {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ return delegate.cursor();
+ }
+ }
+}
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadataTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadataTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadataTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadataTest.java
index f89c3c629e2b39648e47e931c0e97fb151a3f5fa..19ab4432c8d4fab111820db48c32969b1be2c90c 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorMetadataTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorMetadataTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.airlift.slice.Slice;
import io.prestosql.spi.PartialAndFinalAggregationType;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java
index 0f4ce97461ca520c9291469555275430c8305c8d..f1b156c40d1ac243f4834deebf0f14a3d342c96e 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkProviderTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.connector.ConnectorDeleteAsInsertTableHandle;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkTest.java
index f02d581f8e5a7df6ce8ead9e3d490c53f8039579..a12dcc4ea1ca47e2c60286c38719a4b2aa8a2568 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSinkTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSinkTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.airlift.slice.Slice;
import io.prestosql.spi.Page;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java
index 342a6087f238ad9686c4af313d9d41820a612bd1..4a28f8a59d8c549d37d1230ac44b2122e010c13c 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPageSource;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManagerTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManagerTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManagerTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManagerTest.java
index 4e33fa1268637cf5c29516f6aa2a352760f85e45..a494b8378e732cace899e1e7d264541919b8f39d 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeConnectorSplitManagerTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeConnectorSplitManagerTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorSession;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProviderTest.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProviderTest.java
similarity index 99%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProviderTest.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProviderTest.java
index cf74ecc9ecc1d2dcd28be9a5b404e2d6a7ce76ae..e55372d175e4d2d7f1da7b676ef4bc51368ad403 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/ClassLoaderSafeNodePartitioningProviderTest.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/ClassLoaderSafeNodePartitioningProviderTest.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.connector.BucketFunction;
import io.prestosql.spi.connector.ConnectorBucketNodeMap;
diff --git a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/TestClassLoaderSafeWrappers.java b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java
similarity index 97%
rename from presto-spi/src/test/java/io/prestosql/spi/connector/classloader/TestClassLoaderSafeWrappers.java
rename to presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java
index e58b71b340976a028276623950ad3225f719b39c..a4501cb767357a3b25f8da0b26226433c6cdcd54 100644
--- a/presto-spi/src/test/java/io/prestosql/spi/connector/classloader/TestClassLoaderSafeWrappers.java
+++ b/presto-plugin-toolkit/src/test/java/io/prestosql/plugin/base/classloader/TestClassLoaderSafeWrappers.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.prestosql.spi.connector.classloader;
+package io.prestosql.plugin.base.classloader;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java
index c4cc4f906e2d1aa9e3b5a03f8496494171d6e3fb..afd3c0dcf2476fe0aee336925129b8f6eb63c9f7 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorMetadata.java
@@ -582,12 +582,6 @@ public interface ConnectorMetadata
return beginInsert(session, tableHandle, columns);
}
- @Deprecated
- default ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns)
- {
- return beginInsert(session, tableHandle);
- }
-
/**
* Begin the atomic creation of a table with data.
*
@@ -626,13 +620,30 @@ public interface ConnectorMetadata
default void cleanupQuery(ConnectorSession session) {}
/**
- * Begin insert query
+ * @deprecated Use {@link #beginInsert(ConnectorSession, ConnectorTableHandle, List)} instead.
*/
+ @Deprecated
default ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support inserts");
}
+ /**
+ * Begin insert query
+ */
+ default ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns)
+ {
+ return beginInsert(session, tableHandle);
+ }
+
+ /**
+ * @return whether connector handles missing columns during insert
+ */
+ default boolean supportsMissingColumnsOnInsert()
+ {
+ return false;
+ }
+
/**
* Begin insert query.
*/
diff --git a/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignature.java b/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignature.java
index b1e77fd8e53f61086791236010b2c3725517077f..cee88c29c1ffe6c4ca5969e71ca7e72510cb868f 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignature.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignature.java
@@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@@ -25,8 +26,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
-import static io.prestosql.spi.type.TypeSignatureParameter.typeParameter;
import static java.lang.Character.isDigit;
import static java.lang.String.format;
import static java.util.Arrays.asList;
@@ -50,11 +51,6 @@ public class TypeSignature
SIMPLE_TYPE_WITH_SPACES.add("double precision");
}
- public static TypeSignature mapType(TypeSignature keyType, TypeSignature valueType)
- {
- return new TypeSignature(StandardTypes.MAP, typeParameter(keyType), typeParameter(valueType));
- }
-
public TypeSignature(String base, TypeSignatureParameter... parameters)
{
this(base, asList(parameters));
@@ -100,7 +96,11 @@ public class TypeSignature
return calculated;
}
+ /**
+ * @deprecated Build TypeSignature programmatically instead
+ */
@JsonCreator
+ @Deprecated
public static TypeSignature parseTypeSignature(String signature)
{
return parseTypeSignature(signature, new HashSet<>());
@@ -394,4 +394,54 @@ public class TypeSignature
{
return Objects.hash(base.toLowerCase(Locale.ENGLISH), parameters);
}
+
+ // Type signature constructors for common types
+
+ public static TypeSignature arrayType(TypeSignature elementType)
+ {
+ return new TypeSignature(StandardTypes.ARRAY, TypeSignatureParameter.of(elementType));
+ }
+
+ public static TypeSignature arrayType(TypeSignatureParameter elementType)
+ {
+ return new TypeSignature(StandardTypes.ARRAY, elementType);
+ }
+
+ public static TypeSignature mapType(TypeSignature keyType, TypeSignature valueType)
+ {
+ return new TypeSignature(StandardTypes.MAP, TypeSignatureParameter.of(keyType), TypeSignatureParameter.of(valueType));
+ }
+
+ public static TypeSignature parametricType(String name, TypeSignature... parameters)
+ {
+ return new TypeSignature(
+ name,
+ Arrays.asList(parameters).stream()
+ .map(TypeSignatureParameter::of)
+ .collect(Collectors.toList()));
+ }
+
+ public static TypeSignature functionType(TypeSignature first, TypeSignature... rest)
+ {
+ List parameters = new ArrayList<>();
+ parameters.add(TypeSignatureParameter.of(first));
+
+ Arrays.asList(rest).stream()
+ .map(TypeSignatureParameter::of)
+ .forEach(parameters::add);
+
+ return new TypeSignature("function", parameters);
+ }
+
+ public static TypeSignature rowType(TypeSignatureParameter... fields)
+ {
+ return rowType(Arrays.asList(fields));
+ }
+
+ public static TypeSignature rowType(List fields)
+ {
+ checkArgument(fields.stream().allMatch(parameter -> parameter.getKind() == ParameterKind.NAMED_TYPE), "Parameters for ROW type must be NAMED_TYPE parameters");
+
+ return new TypeSignature(StandardTypes.ROW, fields);
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignatureParameter.java b/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignatureParameter.java
index 1580fc2c0e67d8dc2e58f57112b005f242de2ba4..6f5b25c5e187767f68551953d64554cefbe79144 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignatureParameter.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/type/TypeSignatureParameter.java
@@ -44,6 +44,16 @@ public class TypeSignatureParameter
return new TypeSignatureParameter(ParameterKind.NAMED_TYPE, namedTypeSignature);
}
+ public static TypeSignatureParameter of(String name, TypeSignature type)
+ {
+ return new TypeSignatureParameter(ParameterKind.NAMED_TYPE, new NamedTypeSignature(Optional.of(new RowFieldName(name, false)), type));
+ }
+
+ /*public static TypeSignatureParameter anonymousField(TypeSignature type)
+ {
+ return new TypeSignatureParameter(ParameterKind.NAMED_TYPE, new NamedTypeSignature(Optional.empty(), type));
+ }*/
+
public static TypeSignatureParameter of(String variable)
{
return new TypeSignatureParameter(ParameterKind.VARIABLE, variable);