Baidu AI Cloud
中国站

百度智能云

MapReduce

Step

Overview

The step is a cluster-related resource, and its operation needs the specified cluster ID.

Add Steps

BMR supports many kinds of steps, and different steps have different configurations. The following code is used to add Custom Jar, Streaming, Hive, and Pig steps to the cluster of specified hadoop type.

When adding steps, you can ensure the idempotence of creating the request through the configuration of clientToken properties of the AddStepsRequest object. The clientToken is an ASCII string with a length not exceeding 64 bits, and the configuration of clientToken for AddStepsRequest object is addStepsRequest.withClientToken(clientToken).

The AddStepsResponse object returned under request contains the new step ID array List<String> and is obtained via response.getStepIds().

public void addSteps(BmrClient bmrClient, String clusterId) {
    List<StepConfig> steps = new ArrayList<StepConfig>();
    // Custom Jar step
    steps.add(
            new JavaStepConfig()
                    .withName("java-step")
                    .withActionOnFailure("Continue")
                    .withJar("bos://benchmark/hadoop/hadoop-mapreduce-examples.jar")
                    .withMainClass("org.apache.hadoop.examples.WordCount")
                    .withArguments("bos://path/to/input bos://path/to/java_output")
    );

     // Streaming step
    steps.add(
            new StreamingStepConfig()
                    .withName("streaming-step")
                    .withActionOnFailure("Continue")
                    .withInput("bos://path/to/input_streaming")
                    .withMapper("cat")
                    .withOutput("bos://path/to/output_streaming")
    );

    // Streaming step with the use of attached files
    steps.add(
        new StreamingStepConfig()
                .withName("streaming-step2")
                .withActionOnFailure("Continue")
                .withInput("bos://path/to/input_streaming2")
                .withMapper("cat")
                .withReducer("cat")
                .withOutput("bos://path/to/output_streaming2")
                .withAdditionalFile("bos://path/to/testA.jar", "testB.jar")
                .withArguments("-libjars testB.jar")
    );

    // Hive step
    steps.add(
            new HiveStepConfig()
                    .withName("hive-step")
                    .withActionOnFailure("Continue")
                    .withScript("bos://path/to/hive/hql/hive_src.hql")
                    .withInput("bos://path/to/hive/data/hive_src.data")
                    .withOutput("bos://path/to/output_hive")
                    .withArguments("--hivevar LOCAT=bos://chy3/hive/tables/src")
    );

    // Pig step
    steps.add(
            new PigStepConfig()
                    .withName("pig-step")
                    .withActionOnFailure("Continue")
                    .withScript("bos://path/to/pig/script/pig_grep.pig")
                    .withInput("bos://path/to/pig/data/pig_grep.data")
                    .withOutput("bos://path/to/output_pig")
    );
    
    //Spark step
    steps.add(
            new SparkStepConfig()
                    .withName("spark-step")
                    .withActionOnFailure("Continue")
                    .withJar("bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar")
                    .withSubmitOptions("--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer")
                    .withArguments("bos://bmr-public-bj/data/log/accesslog-1k.log bos://tester01/sdk/output/out")
    );
    }

    try {
        AddStepsResponse response = bmrClient.addSteps(
                new AddStepsRequest().withClusterId(clusterId)
                        .withSteps(steps)
        );
        // output of added step ID
        for (String stepId : response.getStepIds()) {
            System.out.println(stepId);
        }
    } catch (BceServiceException e) {
        System.out.println("Add steps failed: " + e.getErrorMessage());
    } catch (BceClientException e) {
        System.out.println(e.getMessage());
    }
}

List All Steps

The following code is used to list all steps on the specified cluster. The query parameter maxKeys is used to restrict the number of steps returned under every request, and the valid query parameter marker is used to specify the initial position of query records. The marker parameter value is generated and returned by the BMR system. Instead of configuration in the initial request, such a parameter is subsequently used in the circular query requests.

public void listSteps(BmrClient bmrClient, String clusterId) {
    int maxKeys = 10;
    try {
        // method 1. list of steps about the specified cluster ID
        ListStepsResponse response1 = bmrClient.listSteps(clusterId);

        // method 2. single query request with configuration of maxKeys
        ListStepsResponse response2 = bmrClient.listSteps(clusterId,maxKeys);

        // method 3. circular query requests with configuration of maxKeys and marker
        boolean isTruncated = true;
        String marker = null;
        int page = 0;
        while (isTruncated) {
            ListStepsResponse response3 = bmrClient.listSteps(clusterId, marker, maxKeys);
            page++;
            System.out.format("Page %d: Step count: %d\n", page, response3.getSteps().size());
            isTruncated = response3.isTruncated();
            marker = response3.getNextMarker();
        }

        // method 4. query request with customization of ListStepsRequest object
        ListStepsResponse response4 = bmrClient.listSteps(
                new ListStepsRequest().withClusterId(clusterId).withMaxKeys(maxKeys)
        );
        // output of step status
        for (Step step : response4.getSteps()) {
            System.out.println(step.getStatus().getState());
        }
    } catch (BceServiceException e) {
        System.out.println("List steps failed: " + e.getErrorMessage());
    }
}

The ListStepsResponse object returned under request contains the cluster object array List<Step>, and the cluster object array is obtained via response.getSteps(). The properties of the step object (i.e., Step) include the step configuration information, and every property has its getter accessor method.

public class Step {
    private String id;                               // step ID
    private String actionOnFailure;                  // step failure policy
    private String type;                             // step type
    private Map<String, String> properties;          // step description
    private String name;                             // step name
    private StepStatus status;                       // step status
    }

public class StepStatus {
    private String createDateTime;                   // step submission time
    private String endDateTime;                      // step end time
    private String startDateTime;                    // step start time
    private String state;                            // step status field
}

Query Specified Steps

The following code is used to query the information on the specified step:

The GetStepResponse object returned under request contains the getter accessor method to obtain step properties, and the information on the target step’s properties is obtained by directly calling the response accessor.

public void getStep(BmrClient bmrClient, String clusterId, String stepId) {
    try {
        // method 1. query on steps based on specified cluster ID and step ID
        GetStepResponse response1 = bmrClient.getStep(clusterId, stepId);

        // method 2. query request with customization of GetStepRequest object
        GetStepResponse response2 = bmrClient.getStep(
                new GetStepRequest().withClusterId(clusterId).withStepId(stepId)
        );
        // output of step status information
        System.out.println(response1.getStatus().getState());
    } catch (BceServiceException e) {
        System.out.println("Describe steps failed: " + e.getErrorMessage());
    }
}
Previous
Cluster
Next
Log