Skip to content

Commit

Permalink
2020.8.27
Browse files Browse the repository at this point in the history
  • Loading branch information
wangdonghao committed Aug 27, 2020
1 parent ea2efad commit 1cf399a
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 224 deletions.
16 changes: 16 additions & 0 deletions javase/基础/数据类型.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,22 @@ public enum StatusCodeEnum{

`System.out.println(StatusCodeEnum.SUCCESS.getCode());`

---

## 时间

### 数字类型

在日常 Java 开发中,我们最常使用 Long 类型,而不是 Date/Timestamp 类型表示时间。

我们可以通过 System.currentTimeMillis 方法获取当前系统时间,默认为 13 位的数字(精确到 ms)。

```java
Long timestamp1 = System.currentTimeMillis(); // 13 位 精确到 ms
Long timestamp2 = (System.currentTimeMillis()) / 1000; // 10 位 精确到 s
```


---

## 泛型
Expand Down
153 changes: 129 additions & 24 deletions 其它/分布式/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,113 @@
## Spark


Spark 是一种快速、通用、可扩展的大数据分析引擎,已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLib、SparkR等子项目,Spark是基于内存计算的大数据并行计算框架。以下主要涉及 Spark Streaming 流处理部分。

Spark 的流处理是基于所谓微批处理的思想,把流处理看作是批处理的一种特殊形式,每次接收到一个时间间隔的数据才会去处理,所以天生很难在实时性上有所提升。

虽然在 Spark2.3 中提出了连续处理模型( Continuous Processing Model),但是现在只支持很有限的功能,并不能在大的项目中使用。 Spark还需要做出很大的努力才能改进现有的流处理模型想要在流处理的实时性上提升,就不能継续用微批处理的模式,而要想办法实现真正的流处理即每当有一条数据输入就立刻处理,不做等待。

### 数据类型

在内部,每个数据块就是一个 RDD,所以 spark streaming 有 RDD 所有优点,处理速度快,容错性好,支持高度并行计算。

#### DStream 对象

Spark Streaming 提供一个对于流数据的抽象 DStream。DStream 可以由来自 Apache Kafka、Flume 或者 HDFS 中的流数据生成,也可以由别的 DStream 经过各种转换操作得来。

底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD。然后,Spark 核心引擎将对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作,将 RDD 经过操作变成中间结果保存在内存中。

由于 Spark Streaming 将底层的细节封装起来了,所以对于开发者来说,只需要操作 DStream 就行。接下来,让我们一起学习 DStream 的结构以及它支持的转换操作。

#### StreamingContext 对象

任何 Spark Streaming 的程序都要首先创建一个 StreamingContext 的对象,它是所有 Streaming 操作的入口。StreamingContext 中最重要的参数是批处理的时间间隔,即把流数据细分成数据块的粒度。


### 操作流程

第一,我们将Spark Streaming类名和StreamingContext的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)中。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程的本地StreamingContext,批处理间隔为1秒。

```java
// 统计单词出现次数

public static void main(String[] args) throws InterruptedException {
// 创造本地 StreamingContext :两工作线程 and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); // 设计时间间隔
// 创建 DStream 连接到 localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// 数据分离、统计并打印
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();

// 启动流运算
jssc.start();
// 等待流运算结束
jssc.awaitTermination();
}
```

`streamingContext.start()` 来开始接收数据并处理它
`streamingContext.awaitTermination()` 等待处理停止(手动停止或由于任何错误)
`streamingContext.stop()` 可以手动停止


一旦启动上下文,就无法设置新的流计算或将其添加到该流计算中
上下文一旦停止,就无法重新启动
一个JVM中只能同时激活一个StreamingContext
StreamingContext中的stop()也会停止SparkContext。但如果要仅停止StreamingContext的话,设置stop(false)
只要在创建下一个StreamingContext之前停止了上一个StreamingContext(不停止SparkContext),就可以将SparkContext重用于创建多个StreamingContext


## Spark 操作 kafka

Spark Streaming提供了两类内置的streaming源:

Basic sources :直接在StreamingContext API中可用的源。例如,文件系统和socket连接
Advanced sources :像Kafka,Flume,Kinesis等这样的源,可通过额外的程序类获得

### 消费

1. 先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。

0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且0.10版本API有变化(更加强大)

### 生产

与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。
最直接的做法我们可以想到如下这种方式:

```java
input.foreachRDD(rdd =>
// 不能在这里创建KafkaProducer
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)
```

但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

---

Expand Down Expand Up @@ -130,7 +235,7 @@ public class StreamingJob {

在流处理中,一致性分为 3 个级别。

- **at-most-once**这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。
- **at-most-once**:故障发生之后,计数结果可能丢失。

- **at-least-once**:这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

Expand All @@ -149,7 +254,7 @@ https://zhuanlan.zhihu.com/p/92289771

flink 提供了一个特有的 kafka connector 去读写 kafka topic 的数据。这样在 flink 消费 kafka 数据时,就可以通过 flink 内部去跟踪 offset 和设定 checkpoint 去实现 exactly-once 的语义。

在 Flink 中,我们分别用 Source Connectors 代表连接数据源的连接器, Sink Connector 代表连接数据输出的连接器。
在 Flink 中,我们作为 Consumer 时需要用 Source Connectors 代表连接数据源的连接器,作为 Producer 时需要用 Sink Connector 代表连接数据输出的连接器。


### Source Connector
Expand All @@ -163,21 +268,20 @@ Flink Kafka connector 以并行的方式读入事件流,每个并行的 source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()

// 设定配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

// 设置消费者并添加源
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(
"topic",
new SimpleStringSchema(),
properties))
)
DataStream[String] stream = env.addSource(myConsumer)
);
DataStream[String] stream = env.addSource(myConsumer);
```


初始化 FlinkKafkaConsumer 需要如下参数:
**初始化 FlinkKafkaConsumer 参数**

1. **topic 名字**,用来指定消费一个或者多个topic的数据,也可以是正则表达式。
2. **反序列化器(schema)**,对消费数据进行反序列化,转换成自定义的数据结构。
Expand Down Expand Up @@ -223,34 +327,31 @@ FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
stream.addSink(myProducer)
```

初始化 FlinkKafkaProducer 需要如下参数:
**初始化 FlinkKafkaProducer 参数**

1. **broker 列表**,要发往的 brokers , 用逗号分割。
2. **topic 名字**,用来指定生产一个或者多个 topic 的数据,也可以是正则表达式。
3. **序列化器(schema)**,对消费数据进行序列化,将目标类型转换成字节数组。

序列化器类比于反序列化器实现:

- 转化为 String 类型 `SimpleStringSchema`
- 转化为其它类型 `TypeInformationSerializationSchema<T>`
- 转化为键值对类型 `TypeInformationKeyValueSerializationSchema<K, V>`
- 转化为 JSON 类型 `JSONKeyValueDeserializationSchema`

### Kakfa 容错机制

- **CheckPoint**

1. Checkpointing 启用时(默认)

Flink Kafka Consumer 将消费来自 Topic 的记录,并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败,Flink 将把流失程序恢复到最新 Checkpoint的状态,并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 OffsetCheckpointed State 中的 Offset 是一致的。

此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。

2. Checkpointing 没有启用
### Kakfa 容错机制

Kafka Consumer 将会周期性的提交 OffsetZookeeper 中去。
Kafka 0.9 之前不提供任何机制去保证 at-least-once 或 exactly-once 的语义。 但后续版本的 Kafka 可以通过以下方式来实现出错后恢复且不丢失数据:

1. **启用 Checkpoint**

*Flink Kafka Consumer 支持发现动态创建的 Kafka Partition,并且以 exactly-once(仅消费一次)语义保证来消费其中的数据。默认情况下,分区发现是禁用的。要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值,表示发现间隔(以毫秒为单位)。*
在默认启用 Checkpoint 的状况下, FlinkKafkaConsumer 将消费来自 Topic 的记录,并以一致的方式周期性地 Checkpoint 其所有 Kafka Offset 以及其它操作的状态。万一作业失败,Flink 将把流失程序恢复到最新 Checkpoint的状态,并且重新消费 Kafka 中的数据。这确保了在 Kafka Broker 中所提交的 OffsetCheckpointed State 中的 Offset 是一致的。此时 FlinkKafkaProducer 可以提供 exactly-once 的投递语义。

*Kafka 0.9 之前不提供任何机制去保证 at-least-once(至少处理一次)或 exactly-once(仅处理一次)的语义。*
如果 Checkpointing 没有启用,KafkaFlinkConsumer 将会周期性的提交 OffsetZookeeper 中去。

- **Semantic 参数**
- **配置 Semantic 参数**

除了启用 FlinkCheckpointing,还可以通过传递恰当的 semantic 参数给 FlinkKafkaProducer 选择 3 种不同的操作模式:

Expand All @@ -260,3 +361,7 @@ Kafka Consumer 将会周期性的提交 Offset 到 Zookeeper 中去。

Semantic.EXACTLY_ONCE 模式依赖于提交事务的能力,这些事务是在 taking a checkpoint 之前,从该 Checkpoint 恢复之后启动的。如果 Flink 应用崩溃且完成重启的时间比 Kafka 事务超时的时间大,则数据将会丢失(Kafka 将自动的终止超过超时时间的事务)。请务必根据预期的故障时间来配置你的事务超时。

### kafka 分区发现

FlinkKafkaConsumer 支持发现动态创建的 Kafka Partition,并且以 exactly-once 语义保证来消费其中的数据。默认情况下分区发现是禁用的,要启用该特性在提供的属性配置中为参数 flink.partition-discovery.interval-millis 设置一个非负数的值,表示发现间隔(以毫秒为单位)。

2 changes: 2 additions & 0 deletions 其它/前端/css.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ div { letter-spacing:0.5em; } /* 简单选择器 */

- `margin-top:30px;`

- `border:`

---

## 元素放置
Expand Down
Loading

0 comments on commit 1cf399a

Please sign in to comment.