FAQ About Spark Accessing Es
Introduction
This document mainly introduces the meaning of common configuration items when accessing the ES through Spark
in elasticsearch-hadoop
. The es-spark
in this document is the package in elasticsearch-hadoop
associated with the Spark
. The user reads and writes the ES cluster through their own Spark
cluster. The elasticsearch-hadoop
is substantially compatible with all versions of the ES.
Version number detection exception
The "es-spark" detects the version number of the ES cluster automatically during running. The version number obtained is mainly used for API compatibility processing of the version of different clusters.
Generally, the user doesn't need to focus on the ES version number. However, sometimes some undetectable errors occur when automatically detecting the cluster version number on the Baidu AI Cloud. You can solve the errors through the following configuration:
Configuration items:
es.internal.es.version: "6.5.3"
Also, you need to perform configuration in some es-spark
packages of the new version:
es.internal.es.cluster.name: "Your Cluster Name"
Implementation principle:
After the configuration is complete, the "es-spark" does not request the / directory and parse version, but directly uses the user-configured version:
INTERNAL_ES_VERSION = "es.internal.es.version"
INTERNAL_ES_CLUSTER_NAME = "es.internal.es.cluster.name"
public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
return discoverClusterInfo(settings, log).getMajorVersion();
}
// It may vary in the elasticsearch-hadoop of different versions.
public static ClusterInfo discoverClusterInfo(Settings settings, Log log) {
ClusterName remoteClusterName = null;
EsMajorVersion remoteVersion = null;
// Try to acquire the cluster name from the configuration
String clusterName = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_NAME);
// Try to aquiire the cluster UUID from the configuration
String clusterUUID = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_UUID);
// Try to acquire the ES version from the configuration
String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
// If failed to acquire the cluster name and version number from the configuration file, initiate a network request (request root directory).
if (StringUtils.hasText(clusterName) && StringUtils.hasText(version)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Elasticsearch cluster [NAME:%s][UUID:%s][VERSION:%s] already present in configuration; skipping discovery",
clusterName, clusterUUID, version));
}
remoteClusterName = new ClusterName(clusterName, clusterUUID);
remoteVersion = EsMajorVersion.parse(version);
return new ClusterInfo(remoteClusterName, remoteVersion);
}
....
}
With the automatic detection feature of the cluster name and version number enabled, you need to guarantee that the user assigned to the "es-spark" has the GET
privileges for access to the root directory /
.
GET /
Discover the data node
Configuration items:
es.nodes.wan.only: false It is false by default
es.nodes.discovery: true It is true by default
There is a BLB before the Baidu AI Cloud ES cluster. Write the BLB address when configuring the es.nodes
, and guarantee that es-spark
can access the BLB address.
es.nodes.wan.only: false
,es.nodes.discovery: true
: Means that the Spark obtains all theip' and
portwith 'HTTP
service nodes of the ES cluster enabled by accessing ahost
(or multiple hosts) specified in the 'es .nodes. The subsequent access to data leads to direct access to the node for the shard data. Make sure that the
Spark' cluster can access all nodes of the ES cluster.es.nodes.wan.only: true
,es.nodes.discovery: false
or not set: You need to forward all requests sent by the Spark to the ES through this node, so the efficiency is low.
The specific code logic is as follows:
ES_NODES_DISCOVERY = "es.nodes.discovery"
ES_NODES_WAN_ONLY = "es.nodes.wan.only"
ES_NODES_WAN_ONLY_DEFAULT = "false"
InitializationUtils#discoverNodesIfNeeded
public static List<NodeInfo> discoverNodesIfNeeded(Settings settings, Log log) {
if (settings.getNodesDiscovery()) { // The configuration items that need to be read.
RestClient bootstrap = new RestClient(settings);
try {
List<NodeInfo> discoveredNodes = bootstrap.getHttpNodes(false);
if (log.isDebugEnabled()) {
log.debug(String.format ("Nodes discovery enabled - found %s", discoveredNodes));
}
SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
return discoveredNodes;
} finally {
bootstrap.close();
}
}
return null;
}
public boolean getNodesDiscovery() {
// by default, if not set, return a value compatible with the WAN setting
// otherwise return the user value.
// this helps validate the configuration
return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY), !getNodesWANOnly()); //The default value is “!getNodesWANOnly()”.
}
public boolean getNodesWANOnly() {
return Booleans.parseBoolean(getProperty(ES_NODES_WAN_ONLY, ES_NODES_WAN_ONLY_DEFAULT));
}
User privilege problem
To enable the node discovery feature, you need to guarantee the user of the es-spark has the privileges to access
GET /_nodes/http
GET /{index}/_search_shards
Otherwise, it may fail to do the whole Job
.
Way to solve the configuration bulk import error
When the Spark writes ES, an error occurs. The "Job" is interrupted after several attempts. By default, the current "Job" is interrupted directly, causing the failure of the whole task. If you want to print the document failed to import into the log, you can solve it through the following configuration:
Handling mechanism for configuration errors
es.write.rest.error.handlers = log
es.write.rest.error.handler.log.logger.name: es_error_handler
When a "bulk" write error occurs after setting, the "Job" should bot be interrupted and output to the log in the form of a log with the prefix es_error_handler
.
How to get other document metadata except for the "_Source"
Under normal circumstances, each data that we return by calling the _search API of the ES is as follows:
{
"_index": "d_index",
"_type": "doc",
"_id": "51rrB2sBaX4YjyPY-2EG",
"_score": 1,
"_source": {
"A": "field A value",
"B": "field B value"
}
}
When using the Spark to read the ES, do not to read other fields except for the_source
field by default, such as _id
_version
. In some scenarios, you need to get _id
for your business, through the following configuration:
es.read.metadata:true //The configuration is false by default.
es.read.metadata.field: "_id" //Configure the metadata fields you need to read.
es.read.metadata.version: It is false by default. Read the version number of "es".
The metadata field information of the document is in a _metadata
field.
Method for specifying the "id" and "version" during import
During data migration, such as migrating from a low version ES cluster to a high version ES cluster, we can read and write at the same time by using the es-spark. To specify information such as_id
, _rouring
and_version
, you can set:
es.mapping.id:”_meta._id" Specifies the "id" path in the "json".
es.mapping.routing:” _meta._rouring" Specifies the "id" path in the "json".
Es.mapping.version:”_meta._version" Specifies the "id" path in "json".
_meta .xxx
is the path of the required field in the Json document.
By default, the "refresh" is called at the end of each "bulk" when the spark writes the ES
Recommend to set it to False
. refresh
is controlled by refresh_interval
of index
inside the ES. Otherwise, many threads in the ES cluster may bring high CPU and disk pressure during refesh
.
es.batch.write.refresh: False. It is true by default.
Control the amount of each Bulk writing
es.batch.size.bytes:1mb means the size of the bulk writing file, which is 1 MB by default.
es.batch.size.entries:1000 means the number of "bulk" writing files, which is 1,000 by default.
The user can reasonably set it according to the ES cluster package.
Read-related settings
es.scroll.size: It is 50 by default. This value is relatively small, so it can increase to 1,000-10,000.
es.input.use.sliced.partitions: This value is true. To improve concurrency, the “es” performs the “scroll-slice” for slicing.
es.input.max.docs.per.partition: This value is 100,000, based on which the “Slice” gets done.
When the "es-spark" reads the ES cluster data, it performs slicing for scroll-slice
handling according to the total number of shards.
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
The "numDocs" means the total number of files for a single shard. If there are 50 million files, have it divided into 500 slices, which causes a considerable CPU pressure on the online ES clusters at the backend. Thus, recommend you to disable the scroll-slice
to avoid affecting the online business.
Recommended parameters:
es.scroll.size: 2,000 //Try to select this vaue according to the file size.
es.input.use.sliced.partitions: False
Control the file field to be written
Sometimes, when importing data for a business, you may hope not to write some fields into the ES. You can set it as follows :
Es.mapping.exclude: None by default.
// You can separate multiple fields with commas, such as ".", and "*".
es.mapping.exclude = *.description
es.mapping.include = u*, foo.*
Execute the "upsert" operation
Example:
String update_params = "parmas:update_time";
String update_script = "ctx._source.update_time = params.update_time";
// Set sparkConfig
SparkConf sparkConf = new SparkConf()
.setAppName("YourAppName”)
.set("es.net.http.auth.user", user)
.set("es.net.http.auth.pass", pwd)
.set("es.nodes", nodes)
.set("es.port", port)
.set("es.batch.size.entries", "50")
.set("es.http.timeout","5m")
.set("es.read.metadata.field", "_id")
.set("es.write.operation","upsert")
.set("es.update.script.params", update_params)
.set("es.update.script.inline", update_script)
.set("es.nodes.wan.only", "true");
Notice:
“es.update.script.params” is the parameter list required for updating.
es.update.script.inline is the “script” used for updating.