13.3 流处理计算:Flink,Storm,Spark Streaming
学习知识的重要方法:
解决同一类场景的各种不同的技术,抽象提取共性点:问题共性点,解决方案共性点。
然后寻找解决思路或者解决方法的不同点。可以把技术区分开来,抓住技术关键点,优缺点。
如果一头扎进细节里面,多种不同的概念,就会迷失在细节里面。
架构师训练营学习方法:抽象解决方案的本质,不迷失在技术产品的海洋中。
13.3 流处理计算:Flink,Storm,Spark Streaming
批处理 VS 流处理:
批处理:大批数据作为输入, 特点:数据量比较大,计算时间比较长,以 T 为单位计算处理任务。 典型 MapReduce,Spark,Hive(网页排名,全球网页内容作为输入,对网页排名计算)
流处理:流处理时间比较短,数据量比较小(一个或者一小批),数据源源不断像水流一样溜进来,一边流入,一边计算。流入数据量也比较大,流入的数据不断的计算。
(日志不断产生,订单不断创建,订单不断计算风控模型),实时对大规模流入数据进行计算。-------大数据计算场景
1.Storm 实时的 Hadoop
实时计算系统
低延迟
高性能
分布式
可伸缩
高可用
2.Storm 基本概念
Nimbus:负责资源分配和任务调度
Supervisor:负责接收 Nimbus 分配的任务,启动和停止属于自己管理的 worker 进程
worker:运行具体处理组件逻辑的进程
Task:Worker 中每一个 Spout/Bolt 的线程称为一个 Task
Topology:Storm 中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
Spout:在一个 Topology 中产生源数据流的组件。通常情况下 Spout 会从外部数据源中读取数据,
然后转换为 Topology 内部的数据源。Spout 是一个主动的角色,其接口中有个 nextTuple()函数,
Storm 框架会不停地调用此函数,用户只要在其中生成源数据即可
Bolt:在一个 Topology 中接受数据然后执行处理的组件。Bolt 可以执行过滤,函数操作,合并,写数据库等任务操作。
Bolt 是一个被动角色,其接口中有个 execute(Tuple input)函数,在接收到消息后调用此函数,用户可以再其中执行自己想要的操作。
Tuple:一次消息传递的基本单元。本来是一个 key-value 的 map,但是由于各个组件间传递的 Tuple 的字段名称事先已经定义好,
所以 tuple 中只要按顺序填入各个 value 就行,所以就是一个 value list。
Stream:源源不断传递的 tuple 就组成了 Stream。
3.example
当车辆超越 80 公里每小时,则记录。
使用一个类型日志,其中包含的车辆数据信息有:车牌号,车辆行驶的速度以及数据获取的位置。
3.1.topology
3.2.Spout
3.3.Bolt
3.4.Storm-Code
public classStormMain{
public static void main(String[] args)throws AlreadyAliveException,InvliadTopologyException,InterruptedException{
ParallelFileSpout parallelFileSpout=new ParallelFileSpout();
ThresholdBolt thresholdBolt=new ThresholdBolt();
DBWriterBolt dbWriterBolt=new DBWriterBolt();
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("spout",parallelFileSpout,1);
builder.setBolt("thresholdBolt",thresholdBolt,1).shuffleGrouping("spout");
builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("ThresholdBolt");
if(this.argsMain!=null && this.argsMain.length>0){
conf.setNumWorkers(1);
StormSubmitter.submitTopology(this.argsMain[0],conf,builder.createTopology);
}else{
Config conf=new Config();
conf.setDebug(true);
conf.setMaxTaskParallelism(3);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("Threshold_test",conf,builder.createTopology());
}
}
}
4.Spark Streaming
//Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc=new JavaStreamingContext(conf,Durations.seconds(1));
//Create a DataStream that will connect to hostname:port,like localhost:9999
JavaReceiverInputDStream<String> lines=jssc.socketTextStream("localhost",9999);
//Split each line into words
JavaStream<String> words=lines.flatMap(x->Arrays.asList(x.split(" "))).iterator();
5.Flink
Flink 流计算处理
StreamExecutionEnvironment see=StreamExecutionEnviroment.getExecutionEnviroment();
DataStream<WikipediaEditEvent> edits=see.addSource(new WikipediaEditsSource());
Flink 批处理计算
ExecutionEnviroment env=ExecutionEvniroment.getExecutionEnviroment();
DataSet<String> text=env.readTextFile("/path/to/file");
public class WordCountExample{
public static void main(String[] args)throws Exception{
final ExecutionEnvironment env=ExecutionEnviroment.getExecutionEnviroment();
DataSet<String> text=env.fromElements("Who's there?","I think I hear them. Stand,ho! Who's there?");
DataSet<Tuple2<String,Integer>> wordCounts=text.flatMap(new LineSplitter()).groupBy(0).sum(1);
wordCounts.print();
}
}
评论