百度智能云

All Product Document

          MapReduce

          Step

          Overview

          The step is a cluster-related resource, and its operation needs the specified cluster ID.

          Add Steps

          BMR supports many kinds of steps, and different steps have different configurations. The following code is used to add Custom Jar, Streaming, Hive, and Pig steps to the cluster of specified hadoop type.

          When adding steps, you can ensure the idempotence of creating the request through the configuration of clientToken properties of the AddStepsRequest object. The clientToken is an ASCII string with a length not exceeding 64 bits, and the configuration of clientToken for AddStepsRequest object is addStepsRequest.withClientToken(clientToken).

          The AddStepsResponse object returned under request contains the new step ID array List<String> and is obtained via response.getStepIds().

          public void addSteps(BmrClient bmrClient, String clusterId) {
              List<StepConfig> steps = new ArrayList<StepConfig>();
              // Custom Jar step
              steps.add(
                      new JavaStepConfig()
                              .withName("java-step")
                              .withActionOnFailure("Continue")
                              .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
                              .withMainClass("org.apache.hadoop.examples.WordCount")
                              .withArguments("bos://path/to/input bos://path/to/java_output")
              );
          
               // Streaming step
              steps.add(
                      new StreamingStepConfig()
                              .withName("streaming-step")
                              .withActionOnFailure("Continue")
                              .withInput("bos://path/to/input_streaming")
                              .withMapper("cat")
                              .withOutput("bos://path/to/output_streaming")
              );
          
              // Streaming step with the use of attached files
              steps.add(
                  new StreamingStepConfig()
                          .withName("streaming-step2")
                          .withActionOnFailure("Continue")
                          .withInput("bos://path/to/input_streaming2")
                          .withMapper("cat")
                          .withReducer("cat")
                          .withOutput("bos://path/to/output_streaming2")
                          .withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
                          .withArguments("-libjars testB.jar")
              );
          
              // Hive step
              steps.add(
                      new HiveStepConfig()
                              .withName("hive-step")
                              .withActionOnFailure("Continue")
                              .withScript("bos://path/to/hive/hql/hive_src.hql")
                              .withInput("bos://path/to/hive/data/hive_src.data")
                              .withOutput("bos://path/to/output_hive")
                              .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
              );
          
              // Pig step
              steps.add(
                      new PigStepConfig()
                              .withName("pig-step")
                              .withActionOnFailure("Continue")
                              .withScript("bos://path/to/pig/script/pig_grep.pig")
                              .withInput("bos://path/to/pig/data/pig_grep.data")
                              .withOutput("bos://path/to/output_pig")
              );
              
              //Spark step
              steps.add(
                      new SparkStepConfig()
                              .withName("spark-step")
                              .withActionOnFailure("Continue")
                              .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
                              .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
                              .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
              );
              }
          
              try {
                  AddStepsResponse response = bmrClient.addSteps(
                          new AddStepsRequest().withClusterId(clusterId)
                                  .withSteps(steps)
                  );
                  // output of added step ID
                  for (String stepId : response.getStepIds()) {
                      System.out.println(stepId);
                  }
              } catch (BceServiceException e) {
                  System.out.println("Add steps failed: " + e.getErrorMessage());
              } catch (BceClientException e) {
                  System.out.println(e.getMessage());
              }
          }

          List All Steps

          The following code is used to list all steps on the specified cluster. The query parameter maxKeys is used to restrict the number of steps returned under every request, and the valid query parameter marker is used to specify the initial position of query records. The marker parameter value is generated and returned by the BMR system. Instead of configuration in the initial request, such a parameter is subsequently used in the circular query requests.

          public void listSteps(BmrClient bmrClient, String clusterId) {
              int maxKeys = 10;
              try {
                  // method 1. list of steps about the specified cluster ID
                  ListStepsResponse response1 = bmrClient.listSteps(clusterId);
          
                  // method 2. single query request with configuration of maxKeys
                  ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);
          
                  // method 3. circular query requests with configuration of maxKeys and marker
                  boolean isTruncated = true;
                  String marker = null;
                  int page = 0;
                  while (isTruncated) {
                      ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
                      page++;
                      System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
                      isTruncated = response3.isTruncated();
                      marker = response3.getNextMarker();
                  }
          
                  // method 4. query request with customization of ListStepsRequest object
                  ListStepsResponse response4 = bmrClient.listSteps(
                          new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
                  );
                  // output of step status
                  for (Step step : response4.getSteps()) {
                      System.out.println(step.getStatus().getState());
                  }
              } catch (BceServiceException e) {
                  System.out.println("List steps failed: " + e.getErrorMessage());
              }
          }

          The ListStepsResponse object returned under request contains the cluster object array List<Step>, and the cluster object array is obtained via response.getSteps(). The properties of the step object (i.e., Step) include the step configuration information, and every property has its getter accessor method.

          public class Step {
              private String id;                               // step ID
              private String actionOnFailure;                  // step failure policy
              private String type;                             // step type
              private Map<String, String> properties;          // step description
              private String name;                             // step name
              private StepStatus status;                       // step status
              }
          
          public class StepStatus {
              private String createDateTime;                   // step submission time
              private String endDateTime;                      // step end time
              private String startDateTime;                    // step start time
              private String state;                            // step status field
          }

          Query Specified Steps

          The following code is used to query the information on the specified step:

          The GetStepResponse object returned under request contains the getter accessor method to obtain step properties, and the information on the target step’s properties is obtained by directly calling the response accessor.

          public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
              try {
                  // method 1. query on steps based on specified cluster ID and step ID
                  GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);
          
                  // method 2. query request with customization of GetStepRequest object
                  GetStepResponse response2 = bmrClient.getStep(
                          new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
                  );
                  // output of step status information
                  System.out.println(response1.getStatus().getState());
              } catch (BceServiceException e) {
                  System.out.println("Describe steps failed: " + e.getErrorMessage());
              }
          }
          Previous
          Cluster
          Next
          Log