✏️
blog
  • README
  • 2023 11
    • expect使用
  • 2023 10
    • 通过Appium给iOS应用自动化执行脚本
  • 2023 06
    • 三种ThreadLocal详解
    • 常见限流算法总结
    • 分布式ID生成算法
  • 2023 05
    • 线上机器CLOSE_WAIT连接数异常排查
    • 多数据源引发transactional事务回滚失效
  • 2023 04
    • MySQL中BufferPool
  • 2022 12
    • Linux IO
    • Netty总结
  • 2022 04
    • Thrift
  • 2022 03
    • JVM命令总结
    • 频繁FullGC定位思路
    • Redis总结
    • Spring常见问题总结
    • Kafka总结
  • 2022 02
    • Dubbo柔性服务天池大赛总结
  • 2021 12
    • 泛型中的extends和super
    • 手写一个Spring Boot Starter
  • 2021 11
    • 常用消息队列总结
  • 2021 10
    • swagger2快速使用
    • SpringBoot接口cors跨域访问
  • 2021 08
    • 常用shell命令总结
  • 2021 05
    • 线程cpu飙升排查
    • zookeeper install
  • 2021 04
    • Java虚拟机
    • [Spring Boot](2021-04/2021-04-04-Spring Boot.md)
    • [Spring MVC](2021-04/2021-04-04-Spring MVC.md)
    • 分布式ID
    • 消息队列
    • [Spring AOP](2021-04/2021-04-05-Spring AOP.md)
    • 布隆过滤器
    • Scala内核Spark阻塞排查
  • 2020 12
    • 使用Python优雅实现tail命令
  • 2020 11
    • Spark基础架构
    • 一文搞定Git
    • Spark线上问题引发的思考
  • 2020 04
    • 使用GitBook
  • 2019 05
    • SELinux、Netfilter、iptables、firewall和ufw五者关系
    • 安装npm和nodejs
    • 访问不到云服务器中的项目
  • 2019 04
    • 二叉树中节点与度数
    • 实现会话跟踪的技术有哪些
    • 计算机操作系统-死锁
    • Semaphore Count Down Latch Cyclic Barrier
    • Java内存模型
    • 双重检查锁定
    • synchronized实现底层
    • Lock接口
    • HTTP与HTTPS的区别
    • Java中线程池
    • Java中的阻塞队列
    • 排序算法
  • 2019 03
    • MySQL中索引
    • MySQL存储引擎
    • MySQL锁机制
    • n的阶乘结果后面0的个数
由 GitBook 提供支持
在本页
  • 背景
  • 其他内核实现简单调研
  • Almond (当前我们所选方案)
  • IScala
  • Toree
  • 总结
  • 尝试排查问题
  • 附:编译环境准备

这有帮助吗?

  1. 2021 04

Scala内核Spark阻塞排查

上一页布隆过滤器下一页2020 12

最后更新于4年前

这有帮助吗?

背景

在Jupyter Scala内核(Almond Scala内核),当运行Spark SQL时会偶发的卡主(Jupyter lab还显示running状态,但到Spark ui中查看已经运行结束了)

由于排查内核问题,成本较高,所以想先简单调研下其他的Jupyter Scala内核实现

其他内核实现简单调研

目前开源的Jupyter Scala内核:

Almond (当前我们所选方案)

启动内核后可以根据自己需要随意配置Spark并启动,输出数据比较美观,文档丰富,社区较为活跃

IScala

依赖于IPython,Scala 2.10.2,没有使用Spark相关案例,项目不活跃,最近一次提交已经是七年前

Toree

安装内核的时候就要指定好Spark配置,启动内核就相当于执行spark-submit,用户使用不友好不方便,输出数据展示不好看

安装

pip install toree==0.4.0

jupyter toree install \
--sys-prefix /conda/envs/notebook/ \
--replace \
--debug \
--kernel_name 'Toree Scala Debug' \
--spark_home=/opt/spark-2.2 \
--spark_opts='--master=yarn --deploy-mode=client --name=test --driver-memory=14G --executor-memory=8G --conf=spark.dynamicAllocation.enabled=true --conf=spark.shuffle.service.enabled=true --queue=root.test'

总结

Toree和IScala无法灵活配置Spark,输出样式不美观,社区活跃度不高,所以只能继续使用Almond,开始尝试修复Almond当前bug

尝试排查问题

命令

papermill --log-output --log-level=DEBUG test.ipynb  test_output.ipynb

发现内核卡在某个stage,更新task非常慢,偶尔超时直接内核挂掉,有时一直卡住,因为使用的是papermill,所以直接排除了Jupyter lab端导致的问题

后来发现当一个stage中task过多的时候就基本会稳定复现这个问题,比如下面这段代码

代码

import $ivy.`org.apache.spark::spark-sql:2.2.1` // Or use any other 2.x version here
import $ivy.`sh.almond::almond-spark:0.6.1-SNAPSHOT`

import org.apache.spark.sql._

val spark = {
  NotebookSparkSession
    .builder()
    .master("local[*]")
    .getOrCreate()
}

spark.range(10000000).repartition(5000).count

直接本地调试(进入到调试状态鬼知道经历了什么!!)打日志发现,是发送的消息超过限制了

所以当zero sub端接收消息超出限制(jupyter server端配置1000msgs/sec),就会丢弃后面的消息,所以很明显问题就是出在了丢弃消息上,也就是执行状态的消息被丢失了,解决这个问题有几种思路

  1. 调大jupyter server端的1000msgs/sec配置

  2. 降低消息输出速率

  3. 1 Jupyter官方设置这个默认值肯定有他的深意,不想随意调整默认值

  4. 2 找到产出消息的源头,我们是否可以降低这个消息的频率,避免最终执行状态的消息被丢弃

到Almond代码中找到对应的Spark监听类

ProgressSparkListener.scala

  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit =
    if (progress)
      Try {
        val elem = newStageElem(
          stageSubmitted.stageInfo.stageId,
          stageSubmitted.stageInfo.numTasks,
          stageSubmitted.stageInfo.name,
          stageSubmitted.stageInfo.details
        )
        elem.init(commTargetName, !sentInitCode)
        sentInitCode = true
      }

  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
    if (progress)
      Try {
        val elem = stageElem(stageCompleted.stageInfo.stageId)
        elem.allDone()
        // 更新stage进度
        elem.update()
      }

  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit =
    if (progress)
      Try {
        val elem = stageElem(taskStart.stageId)
        elem.taskStart()
        // 更新task进度,相当于给notebook client发送消息,然后client端通过websocket发给前端
        elem.update()
      }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
    if (progress)
      Try {
        val elem = stageElem(taskEnd.stageId)
        elem.taskDone()
        // 更新task进度,相当于给notebook client发送消息,然后client端通过websocket发给前端
        elem.update()
      }

所以Almond-Spark进度条当一个stage中的task开始或者结束都会执行elem.update(),调用publish.html("spark progress info..."),更新当前的进度信息,并发送消息到Jupyter Client,在回头看我们之前执行的代码

spark.range(10000000).repartition(5000).count

这个会产生一个有5000个task的stage,也就是说会发送一万条的消息,当spark执行task比较快的时候,比如说每秒有1000个以上的任务有启停动作,就会导致jupyter丢弃相关的消息,导致最终执行状态的消息也有可能被丢弃,所以Jupyter Lab显示一直running状态

对方案2(降低产出消息速率)进行两种尝试:

  1. 直接不发送进度信息,直接注释掉

  2. 降低发送进度信息速率,定时更新进度,每秒更新一次

附:编译环境准备

# clone 内核代码
cd ~/work/code
git clone git@github.com:almond-sh/almond.git

# 因为集团使用的是Spark2.2 对应的Scala是2.11,所以我们需要切到Scala2.11最新的commit (当前Almond Scala2.11已经没有维护)
git reset --hard b711f116

# 配置编译环境(坑点非常多!!)
curl -sL https://github.com/sbt/sbt/releases/download/v1.2.8/sbt-1.2.8.zip | tar -xf - -C ~/software
# 会报下图1的错,需要切到1.3.2版本以上再切回,如果直接用1.3.2也不行,必须切一下版本
export PATH=~/software/sbt/bin:$PATH

vim ~/.sbt/repositories
[repositories]
local
aliyun: https://maven.aliyun.com/repository/public/
huaweicloud-maven: https://repo.huaweicloud.com/repository/maven/
maven-central: https://repo1.maven.org/maven2/
sbt-plugin-repo: https://repo.scala-sbt.org/scalasbt/sbt-plugin-releases, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]


echo "-Dsbt.override.build.repos=true" >> ~/software/sbt/conf/sbtconfig.txt

cd ~/word/code/almond
# 编译并启动本地Jupyter Lab进行调试
sbt ++2.11.12 jupyterStart shell
# 可以在另一个terminal进行热点部署
sbt ~scala-kernel/pack
# 发布到本地仓库
sbt ++2.11.12 publishLocal

图1

依赖冲突,必须exclude一下依赖,再添加合适的依赖版本

exclude依赖

添加合适版本的依赖

sbt ++2.11.12 publishLocal

# 本地
SCALA_VERSION=2.11.12 ALMOND_VERSION=0.6.1-SNAPSHOT
./coursier.sh bootstrap \
  -r https://maven.aliyun.com/repository/public/ \
  -r central \
  sh.almond:scala-kernel_$SCALA_VERSION:$ALMOND_VERSION \
  --sources --default=true \
  -o almond-snapshot --standalone

 SCALA_VERSION=2.11.12 ALMOND_VERSION=0.6.1-SNAPSHOT
./coursier.sh bootstrap \
  sh.almond:scala-kernel_$SCALA_VERSION:$ALMOND_VERSION \
  --sources --default=true \
  -o almond-snapshot --standalone

 # docker
./almond-snapshot --install --force --jupyter-path "$CONDA_DIR/envs/notebook/share/jupyter/kernels" \
  --id scala211 \
  --display-name "Scala (2.11)" \
  --predef-code "java.nio.file.Files.list(java.nio.file.Paths.get(\"/opt/scala/lib\")).toArray.map(_.toString).foreach(jar => interp.load.cp(ammonite.ops.Path(jar)));java.nio.file.Files.list(java.nio.file.Paths.get(\"/opt/spark-2.2/jars\")).toArray.map(_.toString).foreach(jar => interp.load.cp(ammonite.ops.Path(jar)));interp.load.cp(ammonite.ops.Path(\"/opt/hadoop/share/hadoop/yarn/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar\"));System.setProperty(\"java.security.krb5.conf\",\"/opt/hadoop/etc/hadoop/krb5.conf\");import \$ivy.{\`sh.almond::almond-spark:0.6.1-SNAPSHOT\`}" \
  --extra-repository http://maven.aliyun.com/repository/central \
  --extra-repository http://mvnrepository.com \
  --extra-repository https://repo1.maven.org/maven2

  # 本地
  ./almond-snapshot --install --force --jupyter-path "~/conda/envs/python3.6.7/share/jupyter/kernels" \
  --id scala-local-211 \
  --display-name "Scala local (2.11)" \
  --extra-repository http://maven.aliyun.com/repository/central \
  --extra-repository http://mvnrepository.com \
  --extra-repository https://repo1.maven.org/maven2

 ./coursier.sh fetch -p --no-default \
    -r http://maven.aliyun.com/repository/central \
     com.github.jupyter:jvm-repr:0.4.0

 ./coursier fetch -p --no-default \
     -r http://maven.aliyun.com/repository/central \
     com.fasterxml.jackson.core:jackson-databind:2.6.5 

 Deps.metabrowseServer exclude("com.fasterxml.jackson.core", "jackson-databind"),
{
  "language" : "scala",
  "display_name" : "Scala (2.11.12)",
  "argv" : [
    "/conda/envs/notebook/share/jupyter/kernels/scala211/pack/bin/scala-kernel",
    "--id",
    "scala211",
    "--predef-code",
    "java.nio.file.Files.list(java.nio.file.Paths.get(\"/opt/scala/lib\")).toArray.map(_.toString).foreach(jar => interp.load.cp(ammonite.ops.Path(jar)));java.nio.file.Files.list(java.nio.file.Paths.get(\"/opt/spark-2.2/jars\")).toArray.map(_.toString).foreach(jar => interp.load.cp(ammonite.ops.Path(jar)));interp.load.cp(ammonite.ops.Path(\"/opt/hadoop/share/hadoop/yarn/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar\"));System.setProperty(\"java.security.krb5.conf\",\"/opt/hadoop/etc/hadoop/krb5.conf\");import $ivy.{`sh.almond::almond-spark:0.6.1-SNAPSHOT`}",
    "--extra-repository",
    "http://maven.aliyun.com/repository/central",
    "--connection-file",
    "{connection_file}"
  ]
}
project scala-interpreter
whatDependsOn com.fasterxml.jackson.core jackson-databind 2.6.5

# 两个%%的坑 https://www.scala-sbt.org/1.x/docs/zh-cn/Library-Dependencies.html
def jackson = "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.5"

使用 执行打debug日志

由于jupyter client和内核层消息框架用的是zeromq,所以简单了解了一下zeromq的消息机制,由于使用的pub/sub模式,直接找到对应的

再次实验,两种方案都可以完美解决,

参考链接
GitHub
官方文档
GitHub
GitHub
papermill
介绍文档
Git地址
coursier文档
Almond文档
http://dblab.xmu.edu.cn/blog/maven-network-problem/
scala-kernel-debug
zeromq-pub/sub
sbt-build-error
dependencies-conflict
exclude-dependencies
add-dependencies