To keep track on ES client and bulk processor configs that I am using for bulk indexing on ES v2.0.0. I just started a standalone java app to listen to a RabbitMQ Queue, parsed the json messages, created the Index Request for ES and added it into the bulk processor. The snippets below that I used to create the connection and setup the bulk processor.

Creates the connection

    import org.elasticsearch.client.Client;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;

    import java.net.InetAddress;
    import java.net.UnknownHostException;

    public class ESConnectionFactory {

        public static Client newConnection() throws UnknownHostException {
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "<your-cluster-name>")
                    .build();

            Client connection = TransportClient.builder()
                .settings(settings)
                .build()
                .addTransportAddress(
                    new InetSocketTransportAddress(
                        InetAddress.getByName("<the-host-name>"), portNumber));
            return connection;
        }

    }

Creates the bulk processor

    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;

    public class BulkProcessorFactory {

        public static BulkProcessor create(Client connection) throws UnknownHostException {
            return BulkProcessor.builder(connection,
                    new BulkProcessor.Listener() {
                        // TODO impl beforeBulk, afterBulk, afterBulk
                    })
            .setBulkActions(X) // Max number of requests in the buffer
            .setBulkSize(new ByteSizeValue(Y, ByteSizeUnit.MB)) // Max document size in MB
            .setConcurrentRequests(Z) // Max concurrent bulk requests performed
            .build();
        }

    }

In order to add the requests into the processor you just need to use:

    bulkProcessor.add(request);

When the bulk processor reach the number of actions (# of requests) it will fire the bulk request to Elasticsearch. Or, if the bulk size is reached before the number of action, it will also send the bulk request to Elasticsearch. You can also perform a manual flush using: bulkProcessor.flush. The number of concurrent requests recommended is 4 * num_available_cores.




Comments

comments powered by Disqus

Felipe Forbeck
Senior Developer, Java, Scala, Akka, NoSQL dbs, Brazilian Jiu-Jitsu.