百度智能云

All Product Document

          Elasticsearch

          FAQ About Spark Accessing Es

          Introduction

          This document mainly introduces the meaning of common configuration items when accessing the ES through Spark in elasticsearch-hadoop. The es-spark in this document is the package in elasticsearch-hadoop associated with the Spark. The user reads and writes the ES cluster through their own Spark cluster. The elasticsearch-hadoop is substantially compatible with all versions of the ES.

          Version number detection exception

          The "es-spark" detects the version number of the ES cluster automatically during running. The version number obtained is mainly used for API compatibility processing of the version of different clusters.

          Generally, the user doesn't need to focus on the ES version number. However, sometimes some undetectable errors occur when automatically detecting the cluster version number on the Baidu AI Cloud. You can solve the errors through the following configuration:

          Configuration items:

          es.internal.es.version: "6.5.3"

          Also, you need to perform configuration in some es-spark packages of the new version:

          es.internal.es.cluster.name: "Your Cluster Name"

          Implementation principle:

          After the configuration is complete, the "es-spark" does not request the / directory and parse version, but directly uses the user-configured version:

          INTERNAL_ES_VERSION = "es.internal.es.version"
          INTERNAL_ES_CLUSTER_NAME = "es.internal.es.cluster.name"
          
          public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
                return discoverClusterInfo(settings, log).getMajorVersion();
          }
          
          // It may vary in the elasticsearch-hadoop of different versions.
          public static ClusterInfo discoverClusterInfo(Settings settings, Log log) {
                  ClusterName remoteClusterName = null;
                  EsMajorVersion remoteVersion = null;
                  // Try to acquire the cluster name from the configuration
                  String clusterName = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_NAME);
                  // Try to aquiire the cluster UUID from the configuration
                  String clusterUUID = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_UUID);
                  // Try to acquire the ES version from the configuration
                  String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
                  // If failed to acquire the cluster name and version number from the configuration file, initiate a network request (request root directory).
                  if (StringUtils.hasText(clusterName) && StringUtils.hasText(version)) {
                      if (log.isDebugEnabled()) {
                          log.debug(String.format("Elasticsearch cluster [NAME:%s][UUID:%s][VERSION:%s] already present in configuration; skipping discovery",
                                  clusterName, clusterUUID, version));
                      }
                      remoteClusterName = new ClusterName(clusterName, clusterUUID);
                      remoteVersion = EsMajorVersion.parse(version);
                      return new ClusterInfo(remoteClusterName, remoteVersion);
                  }
                ....
          }

          With the automatic detection feature of the cluster name and version number enabled, you need to guarantee that the user assigned to the "es-spark" has the GET privileges for access to the root directory /.

          GET /

          Discover the data node

          Configuration items:

          es.nodes.wan.only: false It is false by default 
          es.nodes.discovery: true   It is true by default 

          There is a BLB before the Baidu AI Cloud ES cluster. Write the BLB address when configuring the es.nodes, and guarantee that es-spark can access the BLB address.

          • es.nodes.wan.only: false,es.nodes.discovery: true: Means that the Spark obtains all theip' andportwith 'HTTP service nodes of the ES cluster enabled by accessing a host (or multiple hosts) specified in the 'es .nodes. The subsequent access to data leads to direct access to the node for the shard data. Make sure that theSpark' cluster can access all nodes of the ES cluster.
          • es.nodes.wan.only: true, es.nodes.discovery: false or not set: You need to forward all requests sent by the Spark to the ES through this node, so the efficiency is low.

          The specific code logic is as follows:

          ES_NODES_DISCOVERY = "es.nodes.discovery"
          ES_NODES_WAN_ONLY = "es.nodes.wan.only"
          ES_NODES_WAN_ONLY_DEFAULT = "false"
          
          InitializationUtils#discoverNodesIfNeeded
              public static List<NodeInfo> discoverNodesIfNeeded(Settings settings, Log log) {
                  if (settings.getNodesDiscovery()) { // The configuration items that need to be read. 
                      RestClient bootstrap = new RestClient(settings);
          
                      try {
                          List<NodeInfo> discoveredNodes = bootstrap.getHttpNodes(false);
                          if (log.isDebugEnabled()) {
                              log.debug(String.format ("Nodes discovery enabled - found %s", discoveredNodes));
                          }
          
                          SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
                          return discoveredNodes;
                      } finally {
                          bootstrap.close();
                      }
                  }
          
                  return null;
              }
           
          public boolean getNodesDiscovery() {
                  // by default, if not set, return a value compatible with the WAN setting
                  // otherwise return the user value.
                  // this helps validate the configuration
                  return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY), !getNodesWANOnly()); //The default value is “!getNodesWANOnly()”. 
              }
              
          public boolean getNodesWANOnly() {
                  return Booleans.parseBoolean(getProperty(ES_NODES_WAN_ONLY, ES_NODES_WAN_ONLY_DEFAULT));
              }

          User privilege problem

          To enable the node discovery feature, you need to guarantee the user of the es-spark has the privileges to access

          GET /_nodes/http
          GET /{index}/_search_shards

          Otherwise, it may fail to do the whole Job.

          Way to solve the configuration bulk import error

          When the Spark writes ES, an error occurs. The "Job" is interrupted after several attempts. By default, the current "Job" is interrupted directly, causing the failure of the whole task. If you want to print the document failed to import into the log, you can solve it through the following configuration:

          Handling mechanism for configuration errors

          es.write.rest.error.handlers = log 
          es.write.rest.error.handler.log.logger.name: es_error_handler

          When a "bulk" write error occurs after setting, the "Job" should bot be interrupted and output to the log in the form of a log with the prefix es_error_handler.

          How to get other document metadata except for the "_Source"

          Under normal circumstances, each data that we return by calling the _search API of the ES is as follows:

                   {
                      "_index": "d_index",
                      "_type": "doc",
                      "_id": "51rrB2sBaX4YjyPY-2EG",
                      "_score": 1,
                      "_source": {
                         "A": "field A value",
                         "B": "field B value"
                      }
                   }

          When using the Spark to read the ES, do not to read other fields except for the_source field by default, such as _id _version. In some scenarios, you need to get _id for your business, through the following configuration:

          es.read.metadata:true  //The configuration is false by default.
          es.read.metadata.field: "_id" //Configure the metadata fields you need to read.
          es.read.metadata.version: It is false by default. Read the version number of "es".

          The metadata field information of the document is in a _metadata field.

          Method for specifying the "id" and "version" during import

          During data migration, such as migrating from a low version ES cluster to a high version ES cluster, we can read and write at the same time by using the es-spark. To specify information such as_id, _rouring and_version, you can set:

          es.mapping.id:”_meta._id" Specifies the "id" path in the "json".
          es.mapping.routing:” _meta._rouring" Specifies the "id" path in the "json".
          Es.mapping.version:”_meta._version" Specifies the "id" path in "json".

          _meta .xxx is the path of the required field in the Json document.

          By default, the "refresh" is called at the end of each "bulk" when the spark writes the ES

          Recommend to set it to False. refresh is controlled by refresh_interval of index inside the ES. Otherwise, many threads in the ES cluster may bring high CPU and disk pressure during refesh.

          es.batch.write.refresh: False. It is true by default.

          Control the amount of each Bulk writing

          es.batch.size.bytes:1mb means the size of the bulk writing file, which is 1 MB by default.
          es.batch.size.entries:1000 means the number of "bulk" writing files, which is 1,000 by default.

          The user can reasonably set it according to the ES cluster package.

          es.scroll.size: It is 50 by default. This value is relatively small, so it can increase to 1,000-10,000.
          es.input.use.sliced.partitions:  This value is true. To improve concurrency, the “es” performs the “scroll-slice” for slicing. 
          es.input.max.docs.per.partition: This value is 100,000, based on which the “Slice” gets done.

          When the "es-spark" reads the ES cluster data, it performs slicing for scroll-slice handling according to the total number of shards.

          int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition); 

          The "numDocs" means the total number of files for a single shard. If there are 50 million files, have it divided into 500 slices, which causes a considerable CPU pressure on the online ES clusters at the backend. Thus, recommend you to disable the scroll-slice to avoid affecting the online business.

          Recommended parameters:

          es.scroll.size: 2,000 //Try to select this vaue according to the file size.
          es.input.use.sliced.partitions: False 

          Control the file field to be written

          Sometimes, when importing data for a business, you may hope not to write some fields into the ES. You can set it as follows :

          Es.mapping.exclude: None by default.
          // You can separate multiple fields with commas, such as ".", and "*". 
          es.mapping.exclude = *.description
          es.mapping.include = u*, foo.*

          Execute the "upsert" operation

          Example:

          String update_params = "parmas:update_time";
          String update_script = "ctx._source.update_time = params.update_time";
          // Set sparkConfig
          SparkConf sparkConf = new SparkConf()
                  .setAppName("YourAppName”)
                  .set("es.net.http.auth.user", user)
                  .set("es.net.http.auth.pass", pwd)
                  .set("es.nodes", nodes)
                  .set("es.port", port)
                  .set("es.batch.size.entries", "50")
                  .set("es.http.timeout","5m")
                  .set("es.read.metadata.field", "_id")
                  .set("es.write.operation","upsert")
                  .set("es.update.script.params", update_params)
                  .set("es.update.script.inline", update_script)
                  .set("es.nodes.wan.only", "true");

          Notice:

          “es.update.script.params” is the parameter list required for updating.
          es.update.script.inline is the “script” used for updating.
          Previous
          FAQs About the Elasticsearch System