百度智能云

All Product Document

          MapReduce

          Hadoop-Streaming

          Hadoop Streaming Introduction

          The Weblog analysis for statistics on daily request volume is taken as an example to introduce how to use Hadoop Streaming on the Baidu AI Cloud platform in this document.

          In the BMR cluster, you can use any programming language you are familiar with (such as python, shell, and C++) to develop the Hadoop Streaming step. As a programming tool provided by Hadoop, Hadoop Streaming allows users to use an executable file or script file as Mapper and Reducer. It encapsulates Mapper and Reducer files to be MapReduce step and submits it for running, allowing for quick realization of MapReduce function without package and meeting complex business needs.

          Program Preparation

          Mapper.py Program:

              #!/usr/bin/env python
              import sys
              import re
              separator = "\\s{1}"
              ipAddr = "(\\S+)"
              remoteUser = "(.*?)"
              timeLocal = "\\[(.*?)\\]"
              request = "(.*?)"
              status = "(\\d{3})"
              bodyBytesSent = "(\\S+)"
              httpReferer = "(.*?)"
              httpCookie = "(.*?)"
              httpUserAgent = "(.*?)"
              requestTime = "(\\S+)"
              host = "(\\S+)"
              msec = "(\\S+)"
              p = ipAddr + separator + "-" + separator + timeLocal\
                  + separator + request + separator + status + separator\
                  + bodyBytesSent + separator + httpReferer + separator + httpCookie\
                  + separator + remoteUser + separator + httpUserAgent + separator\
                  + requestTime + separator + host + separator + msec
              for line in sys.stdin:
                  line = line.strip()
                  m = re.match(p, line)
                  if m is not None:
                      print '%s\t%s' % (m.group(2).split(":")[0], 1)

          Reducer.py Program:

              #!/usr/bin/env python
              import sys
              from operator import itemgetter
              sum_result = {}
              for line in sys.stdin:
                  line = line.strip()
                  key, value = line.split()
                  try:
                      value = int(value)
                      sum_result[key] = sum_result.get(key, 0) + value
                  except ValueError:
                      pass
              sorted_sum_result = sorted(sum_result.items(), key=itemgetter(0))
              for key, value in sorted_sum_result:
                  print '%s\t%s'% (key, value)

          Cluster Preparation

          1. Prepare the data. For more information, please see Data Preparation;
          2. Preparation for Baidu AI Cloud Environment;
          3. Log in to the console, select "Product Service->Baidu MapReduce BMR", and click "Create Cluster" to enter the cluster creation page and configure the following:

            • Set cluster name
            • Set administrator password
            • Disable log
            • Select image version “BMR 1.0.0(hadoop 2.7)”
            • Select the built-in template “hadoop”
          4. Keep other default configurations of the cluster, and click "Finish" to view the created cluster in the cluster list page. The cluster is created successfully when cluster status changes from "Initializing" to "Waiting".

          Run Steps

          1. In "Product Service>MapReduce>Baidu MapReduce-Homework List" page, click "Create Step" to enter the step creation page.
          2. Configure Streaming parameters:

            • Step type: Select “Streaming step”;
            • Step name: Enter the step name with length not exceeding 255 characters;
            • Mapper: If you use your code, please upload program to BOS or your local HDFS, and enter program path there; you can directly use sample program provided by Baidu AI Cloud, and the path is as follows: Sample program path for BMR clusters in North China - Beijing region: bos://bmr-public-bj/sample/streaming-1.0-mapper.py Sample program path for BMR clusters in South China - Guangzhou region: bos://bmr-public-gz/sample/streaming-1.0-mapper.py
            • Reducer: If you use your code, please upload program to BOS or your local HDFS, and enter program path there; you can directly use sample program provided by Baidu AI Cloud, and the path is as follows: Sample program path for BMR clusters in North China - Beijing region: bos://bmr-public-bj/sample/streaming-1.0-reducer.py Sample program path for BMR clusters in South China - Guangzhou region: bos://bmr-public-gz/sample/streaming-1.0-reducer.py
            • BOS input address: bos://bmr-public-bj/data/log/accesslog-1k.log
            • BOS output path: The output path must have write permission and not be repeated, bos://{your-bucket}/output
            • Action after failure: Continue;
            • Application parameters: None.
          3. Select the adaptive cluster in the "Cluster Adaption" section.
          4. Click "Finish" to complete the creation of the step. The status changes from "Waiting" to "Running" when the step is running, and changes to "Completed" when the step is completed. After then, you can view the result.

          View Results

          View the output at the storage system selected by you (BOS or HDFS). You can view the output in BOS as follows:

          You can view the output under the path bos://{your-bucket}/output. If you use input data and program provided by the system, you can see the following when viewing the output:

          03/Oct/2015    139
          04/Oct/2015    375
          05/Oct/2015    372
          06/Oct/2015    114

          Data Import Through Distributed Cache

          As a file cache tool provided by Hadoop, the distributed cache automatically distributes the specified files to the nodes running Map or Reduce tasks, and provides the local cache for user applications to read and use. In the case of Map or Reduce, you can use distributed cache to greatly improve the efficiency of access to general data. The streaming step is taken as an example to introduce how to use the Hadoop distributed cache.

          Application Scenario

          The distributed cache is generally used when MapReduce is dependent on repositories of unique versions or when MapReduce needs to access third-party repositories or files. It is difficult to install the repositories or files on every machine of the cluster. You can use distributed cache to distribute those repositories or files to cluster nodes, and Hadoop automatically deletes the distributed cache after the program operation.

          The distributed cache can be used to solve problems in the following scenarios:

          • Distribution of dictionary files: The distribution of dictionary files (such as blacklist/whitelist and vocabulary) can be used when Map or Reduce needs external dictionaries.
          • Automatic deployment of software: The automated deployment of software can be used when MapReduce is dependent on a repository of a special version. In case of dependency on the PHP interpreter of a particular version, the distributed cache can be used to upload and distribute that interpreter.

          Types of Files Supported

          The distributed cache supports individual files and package files, and also supports the following compression formats:

          • zip
          • tgz
          • tar.gz
          • tar
          • jar

          Operating Steps

          1. You need to upload files to BOS. For more information, please see [BOS Uploads Object](BOS/Console Operation Guide/Manage Objects.md);
          2. You can specify a soft link for cache files by specifying parameters when submitting the step, and you can call the soft link in Mapper and Reducer to access the actual file content;

            • -files: Generally used to upload individual files, and distribute the specified files to the working directory of every Task, without any other processing; the files to upload are separated with commas.
            • -archives: Generally used to upload package files, distribute the specified files to the working directory of every Task, and automatically decompress files with suffixes “.jar”, “.zip”, “.tar.gz” and “.tgz”; by default, the content after decompression is stored in the working directory named the file name before decompression; the package files to upload are separated with commas.
          File Type Action Description
          Add individual files Enter parameters: -files+file location in BOS+pound sign (#)+name of soft link for file in cache -files bos://bucket_name/file_name#file_name
          Add package files Enter parameters: -archives+file location in BOS+pound sign (#)+name of soft link for file in cache -archives bos://bucket_name/archive_name#archive_name

          For example:

          • If you want to upload the file mydata.txt to BOS file system and the path is bos://mybucket/mydata.txt, you need to specify -files bos://mybucket/mydata.txt#data among parameters when submitting the MapReduce Streaming step, and then use ./data to access the file in MapTask and ReduceTask;
          • If you want to pack the file libA.so into the file mylib.tar.gz and upload it to the BOS file system with the path bos://mybucket/mylib.tar.gz, you need to specify -archives bos://mybucket/mylib.tar.gz#libs among parameters when submitting the MapReduce Streaming step, and then use ./libs/libA.so to access the file in MapTask and ReduceTask.
          Previous
          Getting Started
          Next
          Spark