Please enable Javascript to view the contents

使用 Iceberg 和 Spark 在 Kubernetes 上处理数据

 ·  ☕ 10 分钟

1. 数据处理架构

主要分为四层:

  • 处理能力层,Spark on Kubernetes 提供流式的数据处理能力
  • 数据管理层,Iceberg 提供 ACID、table 等数据集访问操作能力
  • 存储层,Hive MetaStore 管理 Iceberg 表元数据,Postgresql 作为 Hive MetaStore 存储后端,S3 作为数据存储后端
  • 资源层,Kubernetes 管理集群的计算、存储和网络资源,对上层提供统一的资源管理能力

1.1 Spark

Apache Spark 是一个开源集群运算框架,最初是由加州大学柏克莱分校 AMPLab 所开发。相对于 Hadoop 的 MapReduce 会在执行完工作后将中介资料存放到磁盘中,Spark 使用了内存运算技术,能在资料尚未写入硬盘时即在内存分析运算。

可以完成的任务包括:

  • 批处理,提取、转换、加载、处理等
  • 流处理,实时分析、事件处理
  • SQL 查询,与 Hive、Iceberg 等数据源集成,提供强大的查询优化功能
  • 机器学习,Spark 中的 MLlib 支持常见的机器学习算法,如回归、分类、聚类等
  • 图计算,Spark 中的 GraphX 支持常见的图计算算法,如 PageRank 算法

Spark on Kubernetes 项目是一个在 Kubernetes 上运行 Apache Spark 应用程序的解决方案,提供了更加云原生的运行方式。

1.2 Iceberg

Iceberg 是一种表格式(tableformat),我们可以把它定义成一种数据组织格式。

Iceberg 是由 Netflix 开发并开源的、用于庞大分析数据集的开放表格式。Iceberg 在 Presto 和 Spark 中添加了使用高性能格式的表(Hudi 也支持 Presto 和 Spark 集成),该格式的工作方式类似于 SQL 表。

与底层的存储格式(比如 ORC、Parquet 之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的表的语义。

常见的使用方式是,在 Hive Metastore 建立一个 iceberg 格式的表。用 fink 或者 spark 写入 iceberg,然后再通过其他方式来读取这个表,比如 spark、flink、presto 等。本篇主要也是采用的这种技术路线。

1.3 Hive Metastore

Hive Metastore 是 Apache Hive 的核心组件之一,它主要用作元数据管理服务,为 Hive、Spark、Presto、Trino 等大数据处理工具提供了一个统一的元数据存储和访问层。

简单来说,Hive Metastore 负责管理和存储表的结构、数据库信息以及存储位置等元数据,方便分布式计算框架快速访问和处理大规模数据。

2. 部署 Hive Metastore

2.1 部署 PG

参考 https://github.com/shaowenchen/demo/tree/master/spark-3.5-iceberg 目录中的 postgres15.yaml 文件部署。

需要注意的是,示例中 PGSQL 存储使用的是本地主机 hostPath 存储。如果是生产环境,需要更换为更可靠的存储服务。

2.2 PG 数据库初始化

  • 设置环境变量
1
2
3
4
5
export POSTGRES_USER=postgresadmin
export POSTGRES_PASSWORD=postgrespassword
export POSTGRES_IP= 主机 IP
export POSTGRES_PORT= 集群 postgres svc 的 nodePort
export POSTGRES_DB=postgresdb
  • 创建数据库
1
psql -U $POSTGRES_USER -h $POSTGRES_IP -p $POSTGRES_PORT -d postgres -c "CREATE DATABASE $POSTGRES_DB;"

2.3 部署 Hive Metastore

参考 https://github.com/shaowenchen/demo/tree/master/spark-3.5-iceberg 目录中的 hive-metastore.yaml 文件部署。

需要替换一下 S3 相关的存储桶配置,BUCKETACCESS_KEYSECRET_KEYACCESS_KEYSECRET_KEY 等。

Hive Metastore 使用 PGSQL 作为数据库后端,在部署的配置文件中有写 PGSQL 的信息。如果采用外部数据库,需要更新部署文件中的相关配置。

3. 部署 Spark Operator

3.1 Spark Operator 简介

上图是 Spark Operator 的架构,展示了各个组件的关系。其工作流程如下:

  1. spark-submit 提交 Spark 作业到 Kubernetes 集群(可以通过 sparkapplications 对象来提交)
  2. Kubernetes 集群创建 Driver Pod
  3. Driver 启动若干 Executor Pods
  4. Executor 执行具体的 Task 任务
  5. 执行完成后,Driver 清理 Executor

3.2 安装 Spark Operator

  • 添加 repo
1
2
helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm repo update
  • 安装 spark-operator
1
2
3
4
5
6
7
8
helm install spark-operator spark-operator/spark-operator \
    --version 2.0.0-rc.0 \
    --namespace spark-operator \
    --set 'spark.jobNamespaces={default,spark,spark-operator,spark}' \
    --create-namespace \
    --set webhook.enable=true \
    --set image.repository=kubeflow/spark-operator \
    --set image.tag=2.0.0-rc.0

spark-operator 仅会处理 jobNamespaces 中指定的命名空间的 Spark 作业。

  • 卸载 spark-operator
1
helm -n spark-operator uninstall spark-operator
  • 查看负载
1
2
3
4
5
kubectl -n spark-operator get pod

NAME                                         READY   STATUS    RESTARTS   AGE
spark-operator-controller-679bcc59c9-lsljx   1/1     Running   0          2d3h
spark-operator-webhook-676c675cdd-t8p95      1/1     Running   0          2d3h
  • 查看 CRD
1
2
3
4
kubectl get crd |grep spark

scheduledsparkapplications.sparkoperator.k8s.io        2024-05-23T06:53:32Z
sparkapplications.sparkoperator.k8s.io                 2024-05-23T06:53:32Z

sparkapplications.sparkoperator.k8s.io 定义的是 Spark 作业,scheduledsparkapplications.sparkoperator.k8s.io 定义的是定时执行的 Spark 作业。

4. 使用 Standalone Spark 和 Iceberg 处理数据

Standalone 模式下的 Spark 会在本地启动完整的依赖。

4.1 部署一个 standalone 的 Spark 实例

参考 https://github.com/shaowenchen/demo/tree/master/spark-3.5-iceberg 目录中的 spark-iceberg.yaml 文件部署。

4.2 进入 spark Pod 交互操作

1
kubectl -n spark exec -it spark-iceberg-749cf599dd-xdzlg bash

一共有三种可用的 spark 交互终端:

  • spark-shell
  • spark-sql
  • spark-python

4.3 创建 Iceberg 表

执行 spark-sql 命令进入 spark-sql (default)> 终端

  • 创建命名空间
1
CREATE NAMESPACE ns1;
  • 创建表
1
2
3
4
5
CREATE TABLE demo.ns1.table1 (
    id BIGINT,
    data STRING,
    category STRING
) USING iceberg;

上面命令创建的表会被保存到默认配置文件设置的 spark-warehouse 中,如果想要指定存储位置,可以使用 LOCATION 关键字指定。

1
2
3
4
5
6
CREATE TABLE demo.ns1.table2 (
    id BIGINT,
    data STRING,
    category STRING
) USING iceberg
LOCATION 's3a://mybucket/datalake/spark-warehouse/mytable/';

4.4 使用 spark-python 处理

退出 spark-shell 终端,将下面的代码保存到 Pod 中,使 python spark-example.py 执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# spark-example.py
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length

sparkSessionConf = SparkConf().setAppName(f"DacnSpark Application [1]"spark =    SparkSession.builder.config(conf=sparkSessionConf).enableHiveSupport().getOrCreate()
if __name__ == "__main__":
    # 数据加载
    df = spark.read.json(
        "s3a://mybucket/datalake/spark-warehouse/source/ccnet-4000.json"
    # 数据处理
    df_cleaned = df.na.drop()
    # 使用 iceberg 表管理,方便下一次处理
    df_cleaned.writeTo("demo.ns1.table3").createOrReplace()
    # 数据输出
    spark.read.table("demo.ns1.table3").write.json(
        "s3a://mybucket/datalake/spark-warehouse/result/ccnet-4000-example.json", "overwrite"

这个处理流程分为三部分:

  1. 数据加载,将外部原生 JSON 数据,也可以直接读取 Iceberg 表的数据。
  2. 数据处理,利用 Spark 的计算能力,对数据进行处理。
  3. 数据输出,将处理后的数据输出到外部存储 S3 中,提供给业务方使用。

此时,在对象存储中可以看到,Iceberg 表的数据

Iceberg 表的元数据

处理后导出的数据

5. 使用 Spark 在集群处理数据

5.1 使用 yaml 提交 Spark 作业

  • 给 Driver 设置权限
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spark-driver
rules:
  - apiGroups: [""]
    resources:
      [
        "pods",
        "configmaps",
        "secrets",
        "services",
        "persistentvolumeclaims",
        "events",
      ]
    verbs: ["*"]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: spark-driver-binding
subjects:
  - kind: ServiceAccount
    name: default
    namespace: spark
roleRef:
  kind: ClusterRole
  name: spark-driver
  apiGroup: rbac.authorization.k8s.io
  • 提交 Spark 作业
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: pyspark-pi
  namespace: spark
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: spark:3.5.1-python3
  imagePullPolicy: Always
  mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
  sparkVersion: "3.5.1"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.5.1
    serviceAccount: default
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.5.1

这里使用的是 Spark 官方镜像,内置有 examples 示例。

  • 跟踪运行状态
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
kubectl -n spark get pod -w

NAME                             READY   STATUS              RESTARTS   AGE
pyspark-pi-driver                0/1     ContainerCreating   0          0s
spark-iceberg-749cf599dd-xdzlg   1/1     Running             0          20h
pyspark-pi-driver                1/1     Running             0          2s
pythonpi-5a8ac191caddfc56-exec-1   0/1     Pending             0          0s
pythonpi-5a8ac191caddfc56-exec-1   0/1     Pending             0          0s
pythonpi-5a8ac191caddfc56-exec-1   0/1     ContainerCreating   0          0s
pythonpi-5a8ac191caddfc56-exec-1   1/1     Running             0          2s
pythonpi-5a8ac191caddfc56-exec-1   1/1     Terminating         0          6s
pythonpi-5a8ac191caddfc56-exec-1   0/1     Terminating         0          7s
pythonpi-5a8ac191caddfc56-exec-1   0/1     Terminating         0          7s
pythonpi-5a8ac191caddfc56-exec-1   0/1     Terminating         0          7s
pyspark-pi-driver                  0/1     Completed           0          16s
pyspark-pi-driver                  0/1     Completed           0          18s

当 Completed 时,Spark 作业已完成;同时,executor pod 已经被清理掉,只留下 driver pod。

  • 查看运行结果
1
2
3
kubectl -n spark logs pyspark-pi-driver -f |grep roughly

Pi is roughly 3.144760

Driver 从 Executor 获取执行结果,然后输出到本地。

  • 清理
1
kubectl -n spark delete sparkapplication pyspark-pi

5.2 spark-submit 相关参数

1
spark-submit --help
  • 基本用法
spark-submit [选项] <应用程序 JAR 文件 | Python 文件 | R 文件> [应用程序参数]
spark-submit --kill [提交 ID] --master [spark://...]
spark-submit --status [提交 ID] --master [spark://...]
spark-submit run-example [选项] 示例类名 [示例参数]
  • 选项(Options)

–master MASTER_URL:Spark 集群的主节点 URL(如 spark://host:portmesos://host:portyarnk8s://https://host:port 或本地模式 local[*]),默认为 local[*](即使用本地所有 CPU 内核运行)。

–deploy-mode DEPLOY_MODE:指定驱动程序的部署模式,“client” 表示在本地运行驱动程序,“cluster” 表示在集群中运行驱动程序(默认为 client 模式)。

–class CLASS_NAME:你的应用程序的主类(针对 Java / Scala 应用程序)。

–name NAME:应用程序的名称。

–jars JARS:逗号分隔的 JAR 文件列表,包含在驱动程序和执行程序的类路径中。

–packages:逗号分隔的 Maven 坐标列表,指定要包含在驱动程序和执行程序类路径中的 JAR 包。首先会在本地 Maven 仓库中搜索,然后是 Maven Central 以及通过 --repositories 指定的其他远程仓库。

–exclude-packages:逗号分隔的 groupId:artifactId 列表,用于排除指定的依赖包,避免与 --packages 提供的依赖发生冲突。

–repositories:逗号分隔的远程仓库列表,用于搜索 --packages 指定的 Maven 坐标。

–py-files PY_FILES:逗号分隔的 .zip.egg.py 文件列表,这些文件会被放在 Python 应用的 PYTHONPATH 中。

–files FILES:逗号分隔的文件列表,文件会被放置在每个执行器的工作目录中,可通过 SparkFiles.get(fileName) 访问这些文件。

–archives ARCHIVES:逗号分隔的压缩文件列表,这些压缩文件会被解压到每个执行器的工作目录中。

–conf, -c PROP=VALUE:设置任意 Spark 配置属性。

–properties-file FILE:指定一个文件路径,从该文件中加载额外的配置属性。如果未指定,Spark 将默认查找 conf/spark-defaults.conf 文件。

–driver-memory MEM:指定驱动程序的内存大小(如 1000M2G),默认为 1024M

–driver-java-options:向驱动程序传递额外的 Java 选项。

–driver-library-path:为驱动程序传递额外的库路径。

–driver-class-path:为驱动程序传递额外的类路径。注意,使用 --jars 添加的 JAR 文件会自动包含在类路径中。

–executor-memory MEM:指定每个执行器的内存大小(如 1000M2G),默认为 1G

–proxy-user NAME:指定要提交应用时模拟的用户。此选项与 --principal--keytab 不兼容。

–help, -h:显示帮助信息并退出。

–verbose, -v:打印额外的调试信息。

–version:打印当前 Spark 的版本信息。

  • Spark Connect(仅限 Spark Connect 模式)

–remote CONNECT_URL:指定连接 Spark Connect 服务的 URL,例如 sc://host:port。此选项无法与 --master--deploy-mode 一起设置。该选项为实验性质,可能会在小版本更新中发生变化。

  • 集群部署模式(Cluster Deploy Mode Only)

–driver-cores NUM:指定驱动程序使用的核心数(仅在集群模式下使用,默认为 1)。

  • Spark Standalone 或 Mesos(仅限集群模式)

–supervise:如果指定此选项,驱动程序在失败时会自动重启。

  • Spark Standalone、Mesos 或 K8S(仅限集群模式)

–kill SUBMISSION_ID:指定后,杀死具有该 ID 的驱动程序。

–status SUBMISSION_ID:指定后,获取具有该 ID 的驱动程序状态。

  • Spark Standalone 和 Mesos(仅限 Standalone 和 Mesos 模式)

–total-executor-cores NUM:为所有执行器指定使用的总核心数。

  • Spark Standalone、YARN 和 Kubernetes(仅限 Standalone、YARN 和 K8S 模式)

–executor-cores NUM:每个执行器使用的核心数(在 YARN 和 Kubernetes 模式下默认为 1,在 Standalone 模式下默认为工作节点上所有可用的核心)。

  • Spark on YARN 和 Kubernetes(仅限 YARN 和 K8S 模式)

–num-executors NUM:启动的执行器数量(默认为 2)。如果启用了动态资源分配,初始的执行器数量至少为指定的值。

–principal PRINCIPAL:指定用于登录到 KDC 的主体。

–keytab KEYTAB:指定主体对应的 keytab 文件的完整路径。

  • Spark on YARN(仅限 YARN 模式)

–queue QUEUE_NAME:指定 YARN 的队列名称,默认为 “default”。

5.3 使用 spark-submit 提交 Spark 作业

  • 进入 Pod 终端
1
kubectl -n spark exec -it spark-iceberg-749cf599dd-xdzlg bash
  • 提交 Spark 作业

在上面的步骤中,已经给 default ServiceAccount 设置了足够权限,这里直接指定 master 地址即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/opt/spark/bin/spark-submit \
  --master k8s://https://x.x.x.x:6443 \
  --deploy-mode cluster \
  --name spark-iceberg-example \
  --class org.apache.spark.deploy.python.PythonRunner \
  --conf spark.executor.instances=2 \
  --conf spark.executor.memory=16G \
  --conf spark.executor.cores=8 \
  --conf spark.driver.memory=16g \
  --conf spark.pyspark.python=/usr/bin/python3 \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.container.image=shaowenchen/spark:3.5.1-python3-s3 \
  --conf spark.eventLog.enabled=false \
  --conf spark.kubernetes.namespace=spark \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=default \
  --conf spark.hadoop.fs.s3a.bucket.name=mybucket \
  --conf spark.hadoop.fs.s3a.access.key=xxx \
  --conf spark.hadoop.fs.s3a.secret.key=xxx \
  --conf spark.kubernetes.file.upload.path=s3a://mybucket/datalake/spark-warehouse/upload \
  /opt/spark/examples/src/main/python/pi.py

其中的 –conf 包含大量配置参数,详见 https://spark.apache.org/docs/latest/running-on-kubernetes.html

执行时,会输出与 Kubernetes 交互的信息,主要是 Pod 的状态

  • 查看相关的 Pod
1
2
3
spark-iceberg-example-e611cd91cb54a5fc-driver   0/1     Completed   0          50s
pythonpi-e80edc91cb54c2eb-exec-1                0/1     Terminating         0          5s
pythonpi-e80edc91cb54c2eb-exec-2                0/1     Terminating         0          5s

与上面一样,driver 执行完成之后会保留下来,executor 会被删除掉。

  • 查看相关日志
1
2
3
kubectl -n spark logs spark-iceberg-example-e611cd91cb54a5fc-driver -f |grep roughly

Pi is roughly 3.144840

6. 使用 Spark 和 Iceberg 在集群处理数据

由于在生产环境下,Spark 处理的脚本有时会发生调整,为了便于更新和管理脚本,在 Spark 处理数据的脚本需要通过 PVC 挂载到 Driver 、Executor 中。

参考 https://github.com/shaowenchen/demo/blob/master/spark-3.5-iceberg/sparkapp.yaml

7. 使用 Argo Webhook 对外提供 Spark 处理 API

  • 创建 Sensor

https://github.com/shaowenchen/demo/blob/master/spark-3.5-iceberg/argo-sensor.yaml

  • 创建一个 EventSource
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: spark-webhook
  namespace: argo
spec:
  service:
    ports:
      - port: 13000
        targetPort: 13000
  webhook:
    sparkapp-start:
      port: "13000"
      endpoint: /sparkapp/start
      method: POST
      url: ""
  • 检测 Sensor Pod 状态
1
2
3
kubectl -n argo get pod  |grep spark

sparkapp-start-sensor-sensor-gkxzn-58d7648fd7-fs8np               1/1     Running     0             22s
  • 查看 Webhook 的服务端口
1
2
3
4
5
kubectl -n argo get svc

NAME                                  TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      AGE
eventbus-default-stan-svc             ClusterIP   None           <none>        4222/TCP,6222/TCP,8222/TCP   291d
spark-webhook-eventsource-svc         ClusterIP   10.103.215.1   <none>        13000:30001/TCP                    291d
  • 调用 API 接口触发任务
1
curl -d '{"script":"spark-example.py", "pvc":"mypvc", "path":"spark", "executor":2, "ak":"xxx", "sk":"xxx", "endpoint":"ks3-cn-beijing-internal.ksyuncs.com", "bucket": "mybucket"}' -H "Content-Type: application/json" -X POST http://localhost:31300/sparkapp/start -v

通过 script 可以指定启动脚本,path 可以用来隔离 pvc 中的目录。

8. 总结

本篇记录了部署基于 Spark 、Iceberg、Hive Metastore 的数据处理软件栈的流程。主要内容如下:

  1. 介绍了 Spark 、Iceberg、Hive Metastore 的基本概念
  2. 部署 Hive Metastore、Spark Operator
  3. 测试使用 Standalone 模式运行 Spark 作业
  4. 测试使用 spark-submit、yaml 方式运行 Spark 作业
  5. 通过 Argo Webhook 对外提供 Spark 数据处理 API

9. 参考


微信公众号
作者
微信公众号