Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Elasticsearch client to 7.x #20248

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugin/trino-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.elasticsearch.version>6.8.23</dep.elasticsearch.version>
<dep.elasticsearch.version>7.17.16</dep.elasticsearch.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -114,7 +114,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.2</version>
<version>4.1.4</version>
<exclusions>
<!-- Brings in duplicate classes already in org.slf4j:jcl-over-slf4j (from io.airlift:log-manager) -->
<exclusion>
Expand Down Expand Up @@ -158,6 +158,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<!-- trino-main brings in 8.4.1 (vs 7.7.3) -->
<exclusion>
<groupId>org.apache.lucene</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;
import org.elasticsearch.rest.RestStatus;

import java.io.Closeable;
Expand All @@ -58,7 +59,9 @@ public class BackpressureRestHighLevelClient
public BackpressureRestHighLevelClient(RestClientBuilder restClientBuilder, ElasticsearchConfig config, TimeStat backpressureStats)
{
this.backpressureStats = requireNonNull(backpressureStats, "backpressureStats is null");
delegate = new RestHighLevelClient(requireNonNull(restClientBuilder, "restClientBuilder is null"));
delegate = new RestHighLevelClientBuilder(requireNonNull(restClientBuilder, "restClientBuilder is null").build())
.setApiCompatibilityMode(true)
.build();
backpressureRestClient = new BackpressureRestClient(delegate.getLowLevelClient(), config, backpressureStats);
retryPolicy = RetryPolicy.<ActionResponse>builder()
.withMaxAttempts(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.weakref.jmx.Managed;
Expand Down Expand Up @@ -201,8 +201,7 @@ private static BackpressureRestHighLevelClient createClient(
RestClientBuilder builder = RestClient.builder(
config.getHosts().stream()
.map(httpHost -> new HttpHost(httpHost, config.getPort(), config.isTlsEnabled() ? "https" : "http"))
.toArray(HttpHost[]::new))
.setMaxRetryTimeoutMillis(toIntExact(config.getMaxRetryTime().toMillis()));
.toArray(HttpHost[]::new));

builder.setHttpClientConfigCallback(ignored -> {
RequestConfig requestConfig = RequestConfig.custom()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.elasticsearch;

import com.amazonaws.util.Base64;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,8 +28,8 @@
import io.trino.testing.TestingConnectorBehavior;
import io.trino.tpch.TpchTable;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.intellij.lang.annotations.Language;
Expand All @@ -38,11 +39,15 @@
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.PASSWORD;
import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.USER;
import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
import static io.trino.plugin.elasticsearch.ElasticsearchQueryRunner.getSSLContext;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.MaterializedResult.resultBuilder;
Expand Down Expand Up @@ -72,10 +77,13 @@ public abstract class BaseElasticsearchConnectorTest
protected QueryRunner createQueryRunner()
throws Exception
{
elasticsearch = new ElasticsearchServer(image, ImmutableMap.of());
elasticsearch = new ElasticsearchServer(image);

HostAndPort address = elasticsearch.getAddress();
client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort())));
client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort(), "https")).setHttpClientConfigCallback(
callback -> callback
.setSSLContext(getSSLContext())
.setDefaultHeaders(ImmutableList.of(new BasicHeader("Authorization", format("Basic %s", Base64.encodeAsString(format("%s:%s", USER, PASSWORD).getBytes(StandardCharsets.UTF_8))))))));

return createElasticsearchQueryRunner(
elasticsearch.getAddress(),
Expand Down Expand Up @@ -123,7 +131,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)

/**
* This method overrides the default values used for the data provider
* of the test {@link AbstractTestQueries#testLargeIn(int)} by taking
* of the test {@link AbstractTestQueries#testLargeIn()} by taking
* into account that by default Elasticsearch supports only up to `1024`
* clauses in query.
* <p>
Expand Down Expand Up @@ -1910,53 +1918,63 @@ protected void assertTableDoesNotExist(String name)
assertQueryFails("SELECT * FROM " + name, ".*Table '" + catalogName + ".tpch." + name + "' does not exist");
}

protected abstract String indexEndpoint(String index, String docId);
protected String indexEndpoint(String index, String docId)
{
return format("/%s/_doc/%s", index, docId);
}

private void index(String index, Map<String, Object> document)
throws IOException
{
String json = new ObjectMapper().writeValueAsString(document);
String endpoint = format("%s?refresh", indexEndpoint(index, String.valueOf(System.nanoTime())));
client.getLowLevelClient()
.performRequest("PUT", endpoint, ImmutableMap.of(), new NStringEntity(json, ContentType.APPLICATION_JSON));

Request request = new Request("PUT", endpoint);
request.setJsonEntity(json);

client.getLowLevelClient().performRequest(request);
}

private void addAlias(String index, String alias)
throws IOException
{
client.getLowLevelClient()
.performRequest("PUT", format("/%s/_alias/%s", index, alias));
.performRequest(new Request("PUT", format("/%s/_alias/%s", index, alias)));

refreshIndex(alias);
}

protected abstract String indexMapping(@Language("JSON") String properties);
protected String indexMapping(@Language("JSON") String properties)
{
return "{\"mappings\": " + properties + "}";
}

private void createIndex(String indexName)
throws IOException
{
client.getLowLevelClient().performRequest("PUT", "/" + indexName);
client.getLowLevelClient().performRequest(new Request("PUT", "/" + indexName));
}

private void createIndex(String indexName, @Language("JSON") String properties)
throws IOException
{
String mappings = indexMapping(properties);
client.getLowLevelClient()
.performRequest("PUT", "/" + indexName, ImmutableMap.of(), new NStringEntity(mappings, ContentType.APPLICATION_JSON));

Request request = new Request("PUT", "/" + indexName);
request.setJsonEntity(mappings);

client.getLowLevelClient().performRequest(request);
}

private void refreshIndex(String index)
throws IOException
{
client.getLowLevelClient()
.performRequest("GET", format("/%s/_refresh", index));
client.getLowLevelClient().performRequest(new Request("GET", format("/%s/_refresh", index)));
}

private void deleteIndex(String indexName)
throws IOException
{
client.getLowLevelClient()
.performRequest("DELETE", "/" + indexName);
client.getLowLevelClient().performRequest(new Request("DELETE", "/" + indexName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -42,7 +43,7 @@
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class ElasticsearchLoader
extends AbstractTestingTrinoClient<Void>
Expand Down Expand Up @@ -109,7 +110,7 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data)

request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
client.bulk(request);
client.bulk(request, RequestOptions.DEFAULT);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.elasticsearch;

import com.amazonaws.util.Base64;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import io.airlift.log.Level;
Expand All @@ -25,14 +26,25 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingTrinoClient;
import io.trino.tpch.TpchTable;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestHighLevelClientBuilder;

import javax.net.ssl.SSLContext;

import java.io.File;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;

import static com.google.common.io.Resources.getResource;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.plugin.base.ssl.SslUtils.createSSLContext;
import static io.trino.plugin.elasticsearch.ElasticsearchServer.ELASTICSEARCH_7_IMAGE;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.testing.TestingSession.testSessionBuilder;
Expand All @@ -42,6 +54,9 @@

public final class ElasticsearchQueryRunner
{
public static final String USER = "elastic_user";
public static final String PASSWORD = "123456";

static {
Logging logging = Logging.initialize();
logging.setLevel("org.elasticsearch.client.RestClient", Level.OFF);
Expand Down Expand Up @@ -97,7 +112,14 @@ public static DistributedQueryRunner createElasticsearchQueryRunner(

LOG.info("Loading data...");

client = new RestHighLevelClient(RestClient.builder(HttpHost.create(address.toString())));
client = new RestHighLevelClientBuilder(RestClient.builder(HttpHost.create("https://" + address))
.setDefaultHeaders(new Header[]{new BasicHeader("Authorization", format("Basic %s", Base64.encodeAsString(format("%s:%s", USER, PASSWORD).getBytes(StandardCharsets.UTF_8))))})
.setHttpClientConfigCallback(callback -> callback
.setSSLContext(getSSLContext()))
.build())
.setApiCompatibilityMode(true)
.build();

long startTime = System.nanoTime();
for (TpchTable<?> table : tables) {
loadTpchTopic(client, trinoClient, table);
Expand All @@ -112,12 +134,27 @@ public static DistributedQueryRunner createElasticsearchQueryRunner(
}
}

public static SSLContext getSSLContext()
{
try {
return createSSLContext(
Optional.empty(),
Optional.empty(),
Optional.of(new File(getResource("truststore.jks").toURI())),
Optional.of("123456"));
wendigo marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void installElasticsearchPlugin(
HostAndPort address,
QueryRunner queryRunner,
String catalogName,
ElasticsearchConnectorFactory factory,
Map<String, String> extraConnectorProperties)
throws URISyntaxException
{
queryRunner.installPlugin(new ElasticsearchPlugin(factory));
Map<String, String> config = ImmutableMap.<String, String>builder()
Expand All @@ -130,6 +167,13 @@ private static void installElasticsearchPlugin(
.put("elasticsearch.scroll-size", "1000")
.put("elasticsearch.scroll-timeout", "1m")
.put("elasticsearch.request-timeout", "2m")
.put("elasticsearch.tls.enabled", "true")
.put("elasticsearch.tls.truststore-path", new File(getResource("truststore.jks").toURI()).getPath())
.put("elasticsearch.tls.truststore-password", "123456")
wendigo marked this conversation as resolved.
Show resolved Hide resolved
.put("elasticsearch.tls.verify-hostnames", "false")
.put("elasticsearch.security", "PASSWORD")
.put("elasticsearch.auth.user", USER)
.put("elasticsearch.auth.password", PASSWORD)
.putAll(extraConnectorProperties)
.buildOrThrow();

Expand All @@ -149,7 +193,7 @@ public static void main(String[] args)
throws Exception
{
DistributedQueryRunner queryRunner = createElasticsearchQueryRunner(
new ElasticsearchServer(ELASTICSEARCH_7_IMAGE, ImmutableMap.of()).getAddress(),
new ElasticsearchServer(ELASTICSEARCH_7_IMAGE).getAddress(),
TpchTable.getTables(),
ImmutableMap.of("http-server.http.port", "8080"),
ImmutableMap.of(),
Expand Down
Loading
Loading