使用 Flink 解救多线程 Scala 应用
问题
从 Zipkin Zipkin 是 twitter 用的分布式系统 tracing 收集服务 https://zipkin.io/ 读取 Traces 信息然后计算 pXX pecentile XX https://en.wikipedia.org/wiki/Percentile 数据
Scala
看似很简单, 拿所有traces算就好了啊?
比如这样的过程
- 拿到数据源
- 按数据源从 zipkin 抓 traces
- 计算 pXX
- 结果写入某个数据存储
val datasources = CollectorService.datasources
def singleProcess(datasource) = for {
traces <- ZipkinService.getTraces(source)
metrics = MetricService.calculateMetrics(traces, source)
_ <- CollectorService.writeMetrics(metrics, source)
} yield ()
val program = datasources.map(_ map singleProcess)
datasoures
和 singleProcess
都是 Free[Program, ?]
Free Monad, 免费获得自定义数据类型的 Monad, 好将副作用与业务逻辑分开 https://typelevel.org/cats/datatypes/freemonad.html
由于是函数式, 很简单的可以多线程跑起来.
这里我用 Natural Transformation Program ~> IO
将代码翻译成 Cats Effect的 IO, 然后再
parSequence
多线程并发执行计算.
并行化代码基本上只需要这么一行
(program unsafeRunSync ()).parSequence
但是当拿超级大流量的 api 的一天时间的 traces 时, 会发现内存短缺, 也就是说, 对于大数据量计算 显然这样是不 scalable 的.
Flink Scala
要想应用能处理一天的大流量 api 的 traces, 自然直接的想法当然是找一个 MapReduce 的 Cluster 将一天的数据 map 成多份来进行处理, 最后 reduce 到一起来算 pXX.
1: lazy val env = ExecutionEnvironment.getExecutionEnvironment // <= 1
2:
3: lazy val rawSources = (ZipkinClient.datasources foldMap interp) unsafeRunSync ()
4:
5: lazy val datasources =
6: env.fromCollection(rawSources.toList) // <= 2
7:
8: lazy val program: DataSet[Vector[String]] = datasources
9: .faltMap(splitByHour _) // <= 3
10: .rebalance() // <= 4
11: .map(source =>
12: (ZipkinClient.fetchTraces(source) foldMap interp) unsafeRunSync ()) // <= 5
13: .groupBy(_._1.endpoint.id) // <= 6
14: .reduce((s1, s2) => s1 |+| s2) // <= 7
15: .map((traces: (DataSource, Vector[Trace])) =>
16: (ZipkinClient.aggregate(traces) foldMap interp) unsafeRunSync ()) // <= 8
17: .map((metrics: (Vector[LatencyMetrics], DataSource)) =>
18: (CollectorService
19: .writeMetrics(metrics._1, Vector(metrics._2)) foldMap interp) unsafeRunSync ())
20:
- 当前执行环境, 例如跑在 cluster 上, 还是 jvm 里
- 初始化DataSet, DataSet是 flink batch job 的数据结果, DataSet 中的数据意味着可以并行跑在 cluster 中
- 拆分数据源, 类似大数据的 helloworld 计算字符个数例子, 你得先把句子拆成字符
- 负载均衡
- 合并, 类似于把同样的字符都归到一组, 方便下来计算
- reduce, 把每一组数据合并成一条数据
- 计算pXX
有意思的是, 你是可以本地直接 sbt run
的, 此时 flink 的 cluster 会跑在 jvm 中.
但是 跑cluster 在单个 jvm 中会耗费大量资源, 所以最佳实践其实是根据运行环境, 选择不同的 ExecutionEnvironment
lazy val env = envOrNone("EXECUTION_IN_LOCAL")
.map(_ => ExecutionEnvironment.createCollectionsEnvironment)
.getOrElse(ExecutionEnvironment.getExecutionEnvironment)
若是在本地运行, 比如跑功能测试或集成测试, 使用 CollectionsEnvironment
会省不少资源
Flink Cluster in Docker
若是想要把 cluster 运行在 docker 中, 也是十分简易的事情
flink:
image: flink
volumes:
- ./target:/target
working_dir: /target
command: flink run -m jobmanager:8081 -p 4 /target/scala-2.11/app.jar
depends_on:
- taskmanager
jobmanager:
image: flink:1.6.1-scala_2.11
expose:
- "6123"
ports:
- "8082:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.6.1-scala_2.11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
其中 flink 是启动 flink job 的服务, 里面跑 flink run
命令
jobmanager 相当于 zookeeper, taskmanager 是真正跑任务的 slave
Flink Cluster on K8s
当然可以在docker中跑起来意味着部署到 k8s 也就是一个配置文件的事情
基本上, 照着 官方的 yaml 部署就好了
通常部署一个jobmanager, 两个 taskmanager 这样就有 8 个 slots
Footnotes:
Zipkin 是 twitter 用的分布式系统 tracing 收集服务 https://zipkin.io/
pecentile XX https://en.wikipedia.org/wiki/Percentile
Free Monad, 免费获得自定义数据类型的 Monad, 好将副作用与业务逻辑分开 https://typelevel.org/cats/datatypes/freemonad.html
并行化代码基本上只需要这么一行
(program unsafeRunSync ()).parSequence