Flink
Flink Introduction
Apache Flink is an open-source streaming processing framework developed by Apache Software Foundation, and its core is the distributed data stream engine written by Java and Scala. Flink executes the random stream data program in ways of data parallelism and pipeline, and the system can execute batch processing and stream processing programs when Flink’s pipeline is running. Besides, Flink supports the execution of iterative algorithm when it is running.
Create Clusters
Log in to Baidu AI Cloud console, select "Product Service->Baidu MapReduce BMR", and click "Create Cluster" to enter the cluster creation page. BMR of version 2.0.0 and above supports the integration of Flink components. You can select Flink components when purchasing the cluster, as the following figure shows:
Usage Introduction
-
Remotely log in to the created cluster
ssh hdfs@$public_ip
Use the password entered during cluster creation - Run WordCount step (Kerberos authentication not enabled)
-
Upload a file to HDFS
hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp
-
Execute the following command, and submit the step on Yarn:
flink run --jobmanager yarn-cluster \ -yn 1 \ -ytm 1024 \ -yjm 1024 \ /opt/bmr/flink/examples/batch/WordCount.jar \ --input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \ --output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
-
The final running outcome is as follows after the step is submitted successfully:
Real-Time Stream Computing (Scala)
Data computing consumes BMS through BMR’s Flink. The use of Scala (Flink version 1.8.2 and on-line Kafka version 2.1) is taken as an example. The specific steps are as follows:
Step 1 Create Topic and Download BMS Certificate
(For more information, please see Spark Streaming Application Scenario)
Download certificate.
Step 2 Write Business Codes
package com.baidu.inf.flink
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
object CloudFlinkConsumeKafkaDemo {
private val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
logger.info("************ Flink Consume Kafka Demo start **************")
if (args.length < 7) {
logger.error(" Parameters Are Missing , " +
"Needs : <topic> " +
"<groupId> " +
"<brokerHosts> " +
"<truststore_location> " +
"<truststore_pass> " +
"<keystore_location> " +
"<keystore_pass>")
System.exit(-1)
}
val Array(topic, groupId, brokerHosts,
truststore_location, truststore_pass,
keystore_location, keystore_pass, _*) = args
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env.getConfig.disableSysoutLogging
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
kafkaProperties.setProperty("group.id", groupId)
kafkaProperties.setProperty("auto.offset.reset", "latest")
kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
kafkaProperties.setProperty("security.protocol", "SSL")
kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
kafkaProperties.setProperty("enable.auto.commit", "true")
val ds = env.addSource(
new FlinkKafkaConsumer[String](topic,
new SimpleStringSchema(),
kafkaProperties))
ds.print()
env.execute()
}
}
Step 3 Compile Code, Pack Executable Jar Files, and Upload to Server
(Note: Make sure the certificate downloaded in Step 1 exists under the same path of every node on the cluster)
Example of running step:
flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"
Step 4 Create Some Messages in Message Queue and View the Outputs on the Flink Step Monitoring Page
Use Tunnel to log in to the cluster’s Yarn page (Use SSH-Tunnel to Access Cluster)
Find the step’s application in yarn console, and click application name to enter the step details page:
(On the native page of Flink, click "TaskManagers>Stdout" to view the step running conditions)