二、PySpark 与基于开发容器的集群搭建概述¶
0. 相关介绍¶
本教程将介绍如何使用开发容器(Dev Container)环境,快速搭建和体验 PySpark 集群,助力大数据分析与分布式计算的学习与实践。
- PySpark 是 Apache Spark 的 Python API 接口,允许用户通过 Python 编写 Spark 程序,具备强大的分布式计算能力,广泛应用于大规模数据处理、机器学习、图计算和流处理等场景。
- 为了简化 Spark 的部署和调试过程,可通过 开发容器 技术(如 Docker + Docker Compose)快速搭建一个可用于测试、开发甚至教学的 PySpark 分布式集群环境。
1. Spark的主要流行模式¶
Apache Spark 的主要流行运行模式(部署模式)有以下几种,每种模式都适合不同的应用场景和基础设施环境:
对比维度 | Local | Standalone | YARN | Kubernetes (K8s) |
---|---|---|---|---|
资源管理器 | 无 | Spark 自带 | Hadoop YARN | Kubernetes |
部署复杂度 | 极低(本地即可) | 低(部署 Spark 即可) | 中(需完整 Hadoop 环境) | 高(需容器技术与 K8s 集群) |
是否分布式 | 否(单机) | 是(Spark Master + Worker) | 是(YARN ResourceManager 管理) | 是(基于 K8s Pods 调度) |
容错性 | 低(无恢复机制) | 中(Worker 故障可恢复) | 高(YARN 提供资源与任务恢复) | 高(K8s 自动重启、容器隔离) |
动态资源扩展 | 否 | 支持有限 | 支持(需配置) | 原生支持自动扩缩容 |
典型部署方式 | 本机命令行或 Jupyter | 手动启动 Spark Master/Worker | Hadoop 集群中部署 Spark | Docker 容器运行于 K8s 中 |
依赖组件 | Python/Java | Spark | Spark + Hadoop/YARN | Spark + Docker + Kubernetes |
适用场景 | 本地开发测试 | 小型集群或教学实验 | 企业级大数据处理 | 云原生、大规模弹性调度 |
资源隔离 | 无 | 有限(基于端口和配置) | 好(支持队列、容器、用户隔离) | 极好(Pod 级别资源隔离) |
监控与可视化 | Spark UI(本地) | Spark UI(默认 4040) | Spark UI + YARN Web UI | Spark UI + Kubernetes Dashboard |
2. 典型开发容器的组成结构¶
文件或组件 | 描述 |
---|---|
Dockerfile | 构建容器镜像的基础文件,定义操作系统、语言环境、依赖库、工具链等。 |
devcontainer.json | 配置开发容器行为(启动命令、端口映射、默认终端、VS Code 插件等),VS Code 专属配置文件。 |
docker-compose.yaml | 启动多容器服务(如 Spark Master + Workers + Hadoop),用于模拟集群或复杂系统。 |
Volume 映射设置 | 将本地文件或目录(如数据集、脚本)挂载进容器以实现持久化与调试。 |
VS Code Remote Development | 插件(Remote - Containers)支持 IDE 远程连接容器 |
3. PySpark Local模式部署及测试¶
3.1 PySpark Local模式部署¶
- 新建
.devcontainer
文件夹,目录如下:
my-project/
├── .devcontainer/
│ ├── Dockerfile
│ ├── devcontainer.json
│ └── docker-compose.yaml
├── notebooks/
└── README.md
- 新建
devcontaienr.json
文件,文件内容如下:
{
// 开发容器名称
"name": "PySpark local cluster",
// 指定Docker Compose配置文件路径
"dockerComposeFile": ["./docker-compose.yaml"],
// 指定服务名称(与docker-compose.yaml中的服务名称一致)
"service": "spark",
// 指定容器的工作目录
"workspaceFolder": "/home/jovyan/code",
// VS Code的自定义配置
"customizations": {
"vscode" : {
"settings": {
// 配置终端使用bash
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash"
}
},
"terminal.integrated.defaultProfile.linux": "bash",
// 启用Python代码检查和pylint
"python.linting.enabled": true,
"python.linting.pylintEnabled": true
},
// 预安装的VS Code扩展
"extensions": [
"ms-python.python",
"ms-toolsai.jupyter"
]
}
}
}
- 新建
docker-compose.yaml
文件,文件内容如下:
version: '3' # Docker Compose文件版本
services:
spark: # 服务名称,需与Dockerfile中的服务名称一致
build:
context: . # 构建上下文目录,即.devcontainer目录
dockerfile: Dockerfile # 指定Dockerfile文件
#image: bitnami/spark:3.5 # 使用Bitnami的Spark镜像
#image: jupyter/pyspark-notebook:latest # 使用Jupyter的PySpark Notebook镜像
volumes:
- ..:/home/jovyan/code # 挂载上级目录到容器内的/home/jovyan/code
ports:
- "6666:6666" # 端口映射
- "4040:4040" # 端口映射
command: start.sh jupyter notebook --NotebookApp.token='' --NotebookApp.disable_check_xsrf=true --NotebookApp.allow_origin='*' --NotebookApp.ip='0.0.0.0'
# 启动容器时运行的命令,启动 Jupyter Notebook 并允许远程访问
- 新建
Dockerfile
文件,文件内容如下:
# 选择你想要的基础镜像
FROM jupyter/pyspark-notebook:latest
# conda环境名称和Python版本
ARG conda_env=vscode_pyspark
ARG py_ver=3.11
# 使用 mamba 创建新的 conda 环境,并安装 ipython 和 ipykernel
RUN mamba create --yes -p "${CONDA_DIR}/envs/${conda_env}" python=${py_ver} ipython ipykernel && \
mamba clean --all -f -y
# pip从阿里云镜像在conda环境中安装 PySpark 和其他常用库
RUN "${CONDA_DIR}/envs/${conda_env}/bin/pip" install pyspark pandas findspark --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple
# 让新建的 conda 环境成为默认环境
RUN echo "conda activate ${conda_env}" >> "${HOME}/.bashrc"
- 注意事项:
6.1 如果在查看 Docker 日志时遇到镜像拉取失败等问题,可通过配置 Docker 代理或使用 VPN 进行解决。建议及时查看日志以定位具体错误原因。
6.2 若运行 PySpark 测试代码时出现JavaPackage' object is not callable
错误,且基础镜像已包含 Java,可借助 findspark 正确初始化 Spark 环境变量和依赖(相关代码见下一节),以确保环境配置无误。
6.3 若 pip 安装 PySpark 时下载缓慢或失败,可在 Dockerfile 中配置阿里云 PyPI 镜像以加速安装。
3.2 PySpark Local模式测试¶
In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# 创建了一个 SparkSession 对象,并指定 master 为 "local"
# 表示在本地模式下运行(即只使用当前这台机器,不连接集群)。
# SparkSession 是与 Spark 进行交互的入口
spark = SparkSession.builder.master("local").getOrCreate()
# SparkContext 是 Spark 的核心,
# 用于连接到 Spark 集群并负责与集群管理器进行通信
# 可以用来执行分布式任务
# 在终端中不需要显示创建 SparkContext
sc = spark.sparkContext
sc
Out[1]:
In [2]:
# 从指定的文件创建一个 RDD。
data = sc.textFile('demo_file.txt')
print(type(data))
# collect() 会将 RDD 中的元素收集到一个列表中。
data.collect()
<class 'pyspark.rdd.RDD'>
Out[2]:
['This is just a demo file. ', 'Normally, a file this small would have no reaon to be on HDFS.']
- 端口转发:容器端口转发(Port Forwarding)是实现容器与外部系统通信的重要机制。
- Spark UI 访问:可通过浏览器访问 Spark UI(http://localhost:4040/executors/ ),查看任务执行与资源分配情况。
- Docker desktop容器界面
- 新建
devcontainer.json
文件,内容如下:
{
"name": "PySpark cluster",
"dockerComposeFile": ["./docker-compose.yaml"],
"service": "spark-master",
"workspaceFolder": "/home/jovyan/code",
"customizations": {
"vscode" : {
"settings": {
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash"
}
},
"terminal.integrated.defaultProfile.linux": "bash",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true
},
"extensions": [
"ms-python.python",
"ms-toolsai.jupyter"
]
}
}
}
- 新建
docker-compose.yaml
文件(一个master两个worker节点),内容如下:
version: '3'
networks:
sparknet:
name: sparknet
driver: bridge
services:
spark-master:
build:
context: .
dockerfile: Dockerfile
user: root
container_name: spark-master
hostname: spark-master
ports:
- "8888:8888" # Jupyter Notebook
- "7077:7077" # Spark Master
- "8080:8080" # Spark Master Web UI
- "8081:8081"
volumes:
- ..:/home/jovyan/code
networks:
- sparknet
command: bash -c "/usr/local/spark/sbin/start-master.sh && start.sh jupyter notebook --NotebookApp.token='' --NotebookApp.disable_check_xsrf=true --NotebookApp.allow_origin='*' --NotebookApp.ip='0.0.0.0'"
spark-worker-1:
build:
context: .
dockerfile: Dockerfile
user: root
container_name: spark-worker-1
hostname: spark-worker-1
depends_on:
- spark-master
networks:
- sparknet
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_WEBUI_PORT=8081
- SPARK_NO_DAEMONIZE=1 # 添加这个环境变量
- SPARK_WORKER_CORES=2
- SPARK_WORKER_MEMORY=2G
volumes:
- ..:/home/jovyan/code
command: start.sh /usr/local/spark/sbin/start-worker.sh spark://spark-master:7077
spark-worker-2:
build:
context: .
dockerfile: Dockerfile
user: root
container_name: spark-worker-2
hostname: spark-worker-2
depends_on:
- spark-master
networks:
- sparknet
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_NO_DAEMONIZE=1 # 添加这个环境变量
volumes:
- ..:/home/jovyan/code
command: start.sh /usr/local/spark/sbin/start-worker.sh spark://spark-master:7077
- 新建
Dockerfile
文件,文件内容如下:
# Choose your desired base image
FROM jupyter/pyspark-notebook:latest
# Switch to root to install system packages
USER root
# Set environment variables
ENV SPARK_HOME=/usr/local/spark \
PYSPARK_PYTHON=/usr/bin/python3 \
DEBIAN_FRONTEND=noninteractive
# Create conda environment variables
ARG conda_env=vscode_pyspark
ARG py_ver=3.11
# Switch to Aliyun mirrors and install packages
RUN sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list && \
sed -i 's/security.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list && \
apt-get clean && \
apt-get update && \
apt-get install -y --no-install-recommends \
iputils-ping \
net-tools \
&& rm -rf /var/lib/apt/lists/*
# Create necessary directories and set permissions
RUN mkdir -p /usr/local/spark/logs /usr/local/spark/work /home/jovyan/spark-logs && \
chown -R 1000:100 /usr/local/spark /home/jovyan/spark-logs && \
chmod -R 777 /usr/local/spark/work
# Setup conda environment with mamba
RUN mamba create --yes -p "${CONDA_DIR}/envs/${conda_env}" \
python=${py_ver} \
ipython \
ipykernel \
&& mamba clean --all -f -y \
&& echo "conda activate ${conda_env}" >> "${HOME}/.bashrc"
# Install Python packages
RUN "${CONDA_DIR}/envs/${conda_env}/bin/pip" install \
--no-cache-dir \
-i https://mirrors.aliyun.com/pypi/simple \
pyspark \
pandas \
findspark \
py4j \
requests
# Switch back to notebook user
USER jovyan
EXPOSE 8888 7077 8080
# Set working directory
WORKDIR /home/jovyan/code
4.2 PySpark Standalone模式测试¶
In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.ui.port", "8080").config("spark.acls.enable", "false").config("spark.ui.view.acls", "*").config("spark.modify.acls", "*").getOrCreate()
sc = spark.sparkContext
sc
Out[2]:
- 常用测试集群搭建是否成功的命令
查看spark-worker-1节点是否启动成功
docker logs spark-worker-1
查看网络
docker network inspect sparknet
Ping网络
docker exec spark-worker-1 ping -c 3 spark-master
In [ ]:
# This script retrieves cluster information from a Spark application using the Spark REST API.
import requests
import json
def get_cluster_info():
"""Get cluster information using Spark REST API"""
try:
# Get UI URL from SparkContext
ui_url = sc.uiWebUrl
# Get application ID
app_id = sc.applicationId
# Make REST API request
response = requests.get(f"{ui_url}/api/v1/applications/{app_id}/executors")
if response.status_code == 200:
executors = response.json()
# Print cluster information
print(f"Cluster Information:")
print(f"- Number of executors: {len(executors)}")
print(f"- Total nodes (including driver): {len(executors) + 1}")
# Print detail for each executor
for idx, executor in enumerate(executors):
print(f"\nExecutor {idx}:")
print(f"- ID: {executor.get('id')}")
print(f"- Host: {executor.get('hostPort')}")
print(f"- Active: {executor.get('isActive')}")
print(f"- Memory Used: {executor.get('memoryUsed')} / {executor.get('maxMemory')}")
print(f"- Cores: {executor.get('totalCores')}")
else:
print(f"Failed to get executor info. Status code: {response.status_code}")
except Exception as e:
print(f"Error getting cluster info: {str(e)}")
# Run the function
get_cluster_info()
Cluster Information: - Number of executors: 3 - Total nodes (including driver): 4 Executor 0: - ID: driver - Host: spark-master:41775 - Active: True - Memory Used: 0 / 455501414 - Cores: 0 Executor 1: - ID: 1 - Host: 172.22.0.3:43697 - Active: True - Memory Used: 0 / 455501414 - Cores: 2 Executor 2: - ID: 0 - Host: 172.22.0.4:40765 - Active: True - Memory Used: 0 / 455501414 - Cores: 16
- 成功部署PySpark Standalone模式docker desktop界面
- Spark Master Web UI (http://localhost:8080/ )
- Spark Worker Web UI (http://localhost:8081/executors/ )
In [3]:
# 从指定的文件创建一个 RDD。
data = sc.textFile('demo_file.txt')
print(type(data))
# collect() 会将 RDD 中的元素收集到一个列表中。
data.collect()
<class 'pyspark.rdd.RDD'>
Out[3]:
['This is just a demo file. ', 'Normally, a file this small would have no reaon to be on HDFS.']