二、PySpark 与基于开发容器的集群搭建概述¶

0. 相关介绍¶

本教程将介绍如何使用开发容器(Dev Container)环境,快速搭建和体验 PySpark 集群,助力大数据分析与分布式计算的学习与实践。

  1. PySpark 是 Apache Spark 的 Python API 接口,允许用户通过 Python 编写 Spark 程序,具备强大的分布式计算能力,广泛应用于大规模数据处理、机器学习、图计算和流处理等场景。
  2. 为了简化 Spark 的部署和调试过程,可通过 开发容器 技术(如 Docker + Docker Compose)快速搭建一个可用于测试、开发甚至教学的 PySpark 分布式集群环境。

1. Spark的主要流行模式¶

Apache Spark 的主要流行运行模式(部署模式)有以下几种,每种模式都适合不同的应用场景和基础设施环境:

对比维度 Local Standalone YARN Kubernetes (K8s)
资源管理器 无 Spark 自带 Hadoop YARN Kubernetes
部署复杂度 极低(本地即可) 低(部署 Spark 即可) 中(需完整 Hadoop 环境) 高(需容器技术与 K8s 集群)
是否分布式 否(单机) 是(Spark Master + Worker) 是(YARN ResourceManager 管理) 是(基于 K8s Pods 调度)
容错性 低(无恢复机制) 中(Worker 故障可恢复) 高(YARN 提供资源与任务恢复) 高(K8s 自动重启、容器隔离)
动态资源扩展 否 支持有限 支持(需配置) 原生支持自动扩缩容
典型部署方式 本机命令行或 Jupyter 手动启动 Spark Master/Worker Hadoop 集群中部署 Spark Docker 容器运行于 K8s 中
依赖组件 Python/Java Spark Spark + Hadoop/YARN Spark + Docker + Kubernetes
适用场景 本地开发测试 小型集群或教学实验 企业级大数据处理 云原生、大规模弹性调度
资源隔离 无 有限(基于端口和配置) 好(支持队列、容器、用户隔离) 极好(Pod 级别资源隔离)
监控与可视化 Spark UI(本地) Spark UI(默认 4040) Spark UI + YARN Web UI Spark UI + Kubernetes Dashboard

2. 典型开发容器的组成结构¶

文件或组件 描述
Dockerfile 构建容器镜像的基础文件,定义操作系统、语言环境、依赖库、工具链等。
devcontainer.json 配置开发容器行为(启动命令、端口映射、默认终端、VS Code 插件等),VS Code 专属配置文件。
docker-compose.yaml 启动多容器服务(如 Spark Master + Workers + Hadoop),用于模拟集群或复杂系统。
Volume 映射设置 将本地文件或目录(如数据集、脚本)挂载进容器以实现持久化与调试。
VS Code Remote Development 插件(Remote - Containers)支持 IDE 远程连接容器

3. PySpark Local模式部署及测试¶

3.1 PySpark Local模式部署¶

  1. 新建.devcontainer文件夹,目录如下:
my-project/
├── .devcontainer/
│   ├── Dockerfile
│   ├── devcontainer.json
│   └── docker-compose.yaml   
├── notebooks/
└── README.md
  1. 新建devcontaienr.json文件,文件内容如下:
{
    // 开发容器名称
	"name": "PySpark local cluster",
    // 指定Docker Compose配置文件路径
	"dockerComposeFile": ["./docker-compose.yaml"],
    // 指定服务名称(与docker-compose.yaml中的服务名称一致)
    "service": "spark",
    // 指定容器的工作目录
    "workspaceFolder": "/home/jovyan/code",
    // VS Code的自定义配置
    "customizations": {
        "vscode" : {
            "settings": {
                // 配置终端使用bash
                "terminal.integrated.profiles.linux": {
                    "bash": {
                        "path": "/bin/bash"
                    }
                },
                "terminal.integrated.defaultProfile.linux": "bash",

                // 启用Python代码检查和pylint
                "python.linting.enabled": true,
                "python.linting.pylintEnabled": true
            },
            // 预安装的VS Code扩展
            "extensions": [
                "ms-python.python",
		        "ms-toolsai.jupyter"
            ]
        }
    }
}
  1. 新建docker-compose.yaml文件,文件内容如下:
version: '3' # Docker Compose文件版本
services:
  spark: # 服务名称,需与Dockerfile中的服务名称一致
    build:
      context: .  # 构建上下文目录,即.devcontainer目录
      dockerfile: Dockerfile # 指定Dockerfile文件
    #image: bitnami/spark:3.5 # 使用Bitnami的Spark镜像
    #image: jupyter/pyspark-notebook:latest # 使用Jupyter的PySpark Notebook镜像
    volumes:
      - ..:/home/jovyan/code # 挂载上级目录到容器内的/home/jovyan/code
    ports:
      - "6666:6666" # 端口映射
      - "4040:4040" # 端口映射
    command: start.sh jupyter notebook --NotebookApp.token='' --NotebookApp.disable_check_xsrf=true --NotebookApp.allow_origin='*' --NotebookApp.ip='0.0.0.0'
    # 启动容器时运行的命令,启动 Jupyter Notebook 并允许远程访问
  1. 新建Dockerfile文件,文件内容如下:
# 选择你想要的基础镜像
FROM jupyter/pyspark-notebook:latest

# conda环境名称和Python版本
ARG conda_env=vscode_pyspark
ARG py_ver=3.11

# 使用 mamba 创建新的 conda 环境,并安装 ipython 和 ipykernel
RUN mamba create --yes -p "${CONDA_DIR}/envs/${conda_env}" python=${py_ver} ipython ipykernel && \
    mamba clean --all -f -y

# pip从阿里云镜像在conda环境中安装 PySpark 和其他常用库
RUN "${CONDA_DIR}/envs/${conda_env}/bin/pip" install pyspark pandas findspark --no-cache-dir -i https://mirrors.aliyun.com/pypi/simple

# 让新建的 conda 环境成为默认环境
RUN echo "conda activate ${conda_env}" >> "${HOME}/.bashrc"
  1. 注意事项:
    6.1 如果在查看 Docker 日志时遇到镜像拉取失败等问题,可通过配置 Docker 代理或使用 VPN 进行解决。建议及时查看日志以定位具体错误原因。
    6.2 若运行 PySpark 测试代码时出现 JavaPackage' object is not callable 错误,且基础镜像已包含 Java,可借助 findspark 正确初始化 Spark 环境变量和依赖(相关代码见下一节),以确保环境配置无误。
    6.3 若 pip 安装 PySpark 时下载缓慢或失败,可在 Dockerfile 中配置阿里云 PyPI 镜像以加速安装。

3.2 PySpark Local模式测试¶

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
# 创建了一个 SparkSession 对象,并指定 master 为 "local"
# 表示在本地模式下运行(即只使用当前这台机器,不连接集群)。
# SparkSession 是与 Spark 进行交互的入口
spark = SparkSession.builder.master("local").getOrCreate()
# SparkContext 是 Spark 的核心,
# 用于连接到 Spark 集群并负责与集群管理器进行通信
# 可以用来执行分布式任务
# 在终端中不需要显示创建 SparkContext
sc = spark.sparkContext
sc
Out[1]:

SparkContext

Spark UI

Version
v3.5.0
Master
local
AppName
pyspark-shell
In [2]:
# 从指定的文件创建一个 RDD。
data = sc.textFile('demo_file.txt')
print(type(data))
# collect() 会将 RDD 中的元素收集到一个列表中。
data.collect()
<class 'pyspark.rdd.RDD'>
Out[2]:
['This is just a demo file. ',
 'Normally, a file this small would have no reaon to be on HDFS.']
  • 端口转发:容器端口转发(Port Forwarding)是实现容器与外部系统通信的重要机制。
  • Spark UI 访问:可通过浏览器访问 Spark UI(http://localhost:4040/executors/ ),查看任务执行与资源分配情况。

image.png

  • Docker desktop容器界面

image-2.png

4. PySpark Standalone模式部署与测试¶

4.1 PySpark Standalone模式部署¶

  1. 新建.devcontainer文件夹,目录如下:
my-project/
├── .devcontainer/
│   ├── Dockerfile
│   ├── devcontainer.json
│   └── docker-compose.yaml   
├── notebooks/
└── README.md
  1. 新建devcontainer.json文件,内容如下:
{
	"name": "PySpark cluster",
	"dockerComposeFile": ["./docker-compose.yaml"],
    "service": "spark-master",
    "workspaceFolder": "/home/jovyan/code",

    "customizations": {
        "vscode" : {
            "settings": {
                "terminal.integrated.profiles.linux": {
                    "bash": {
                        "path": "/bin/bash"
                    }
                },
                "terminal.integrated.defaultProfile.linux": "bash",
                "python.linting.enabled": true,
                "python.linting.pylintEnabled": true
            },
            "extensions": [
                "ms-python.python",
		        "ms-toolsai.jupyter"
            ]
        }
    }
}
  1. 新建docker-compose.yaml文件(一个master两个worker节点),内容如下:
version: '3'

networks:
  sparknet:
    name: sparknet
    driver: bridge

services:
  spark-master:
    build:
      context: .
      dockerfile: Dockerfile
    user: root
    container_name: spark-master
    hostname: spark-master
    ports:
      - "8888:8888" # Jupyter Notebook
      - "7077:7077" # Spark Master
      - "8080:8080" # Spark Master Web UI
      - "8081:8081"
    volumes:
      - ..:/home/jovyan/code
    networks:
      - sparknet
    command: bash -c "/usr/local/spark/sbin/start-master.sh && start.sh jupyter notebook --NotebookApp.token='' --NotebookApp.disable_check_xsrf=true --NotebookApp.allow_origin='*' --NotebookApp.ip='0.0.0.0'"

  spark-worker-1:
    build:
      context: .
      dockerfile: Dockerfile
    user: root
    container_name: spark-worker-1
    hostname: spark-worker-1
    depends_on:
      - spark-master
    networks:
      - sparknet
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_WEBUI_PORT=8081
      - SPARK_NO_DAEMONIZE=1  # 添加这个环境变量
      - SPARK_WORKER_CORES=2
      - SPARK_WORKER_MEMORY=2G
    volumes:
      - ..:/home/jovyan/code
    command: start.sh /usr/local/spark/sbin/start-worker.sh spark://spark-master:7077

  spark-worker-2:
    build:
      context: .
      dockerfile: Dockerfile
    user: root
    container_name: spark-worker-2
    hostname: spark-worker-2
    depends_on:
      - spark-master
    networks:
      - sparknet
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_NO_DAEMONIZE=1  # 添加这个环境变量
    volumes:
      - ..:/home/jovyan/code
    command: start.sh /usr/local/spark/sbin/start-worker.sh spark://spark-master:7077
  1. 新建Dockerfile文件,文件内容如下:
# Choose your desired base image
FROM jupyter/pyspark-notebook:latest

# Switch to root to install system packages
USER root

# Set environment variables
ENV SPARK_HOME=/usr/local/spark \
    PYSPARK_PYTHON=/usr/bin/python3 \
    DEBIAN_FRONTEND=noninteractive

# Create conda environment variables
ARG conda_env=vscode_pyspark
ARG py_ver=3.11

# Switch to Aliyun mirrors and install packages
RUN sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list && \
    sed -i 's/security.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list && \
    apt-get clean && \
    apt-get update && \
    apt-get install -y --no-install-recommends \
    iputils-ping \
    net-tools \
    && rm -rf /var/lib/apt/lists/*

# Create necessary directories and set permissions
RUN mkdir -p /usr/local/spark/logs /usr/local/spark/work /home/jovyan/spark-logs && \
    chown -R 1000:100 /usr/local/spark /home/jovyan/spark-logs && \
    chmod -R 777 /usr/local/spark/work


# Setup conda environment with mamba
RUN mamba create --yes -p "${CONDA_DIR}/envs/${conda_env}" \
    python=${py_ver} \
    ipython \
    ipykernel \
    && mamba clean --all -f -y \
    && echo "conda activate ${conda_env}" >> "${HOME}/.bashrc"

# Install Python packages
RUN "${CONDA_DIR}/envs/${conda_env}/bin/pip" install \
    --no-cache-dir \
    -i https://mirrors.aliyun.com/pypi/simple \
    pyspark \
    pandas \
    findspark \
    py4j \
    requests

# Switch back to notebook user
USER jovyan

EXPOSE 8888 7077 8080

# Set working directory
WORKDIR /home/jovyan/code

4.2 PySpark Standalone模式测试¶

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("spark://spark-master:7077").config("spark.ui.port", "8080").config("spark.acls.enable", "false").config("spark.ui.view.acls", "*").config("spark.modify.acls", "*").getOrCreate()
sc = spark.sparkContext
sc
Out[2]:

SparkContext

Spark UI

Version
v3.5.0
Master
spark://spark-master:7077
AppName
pyspark-shell
  • 常用测试集群搭建是否成功的命令
    • 查看spark-worker-1节点是否启动成功

      • docker logs spark-worker-1

      image-3.png

    • 查看网络

      • docker network inspect sparknet

      image.png

    • Ping网络

      • docker exec spark-worker-1 ping -c 3 spark-master

      image-2.png

In [ ]:
# This script retrieves cluster information from a Spark application using the Spark REST API.
import requests
import json

def get_cluster_info():
    """Get cluster information using Spark REST API"""
    try:
        # Get UI URL from SparkContext
        ui_url = sc.uiWebUrl
        
        # Get application ID
        app_id = sc.applicationId
        
        # Make REST API request
        response = requests.get(f"{ui_url}/api/v1/applications/{app_id}/executors")
        
        if response.status_code == 200:
            executors = response.json()
            
            # Print cluster information
            print(f"Cluster Information:")
            print(f"- Number of executors: {len(executors)}")
            print(f"- Total nodes (including driver): {len(executors) + 1}")
            
            # Print detail for each executor
            for idx, executor in enumerate(executors):
                print(f"\nExecutor {idx}:")
                print(f"- ID: {executor.get('id')}")
                print(f"- Host: {executor.get('hostPort')}")
                print(f"- Active: {executor.get('isActive')}")
                print(f"- Memory Used: {executor.get('memoryUsed')} / {executor.get('maxMemory')}")
                print(f"- Cores: {executor.get('totalCores')}")
        else:
            print(f"Failed to get executor info. Status code: {response.status_code}")
            
    except Exception as e:
        print(f"Error getting cluster info: {str(e)}")

# Run the function
get_cluster_info()
Cluster Information:
- Number of executors: 3
- Total nodes (including driver): 4

Executor 0:
- ID: driver
- Host: spark-master:41775
- Active: True
- Memory Used: 0 / 455501414
- Cores: 0

Executor 1:
- ID: 1
- Host: 172.22.0.3:43697
- Active: True
- Memory Used: 0 / 455501414
- Cores: 2

Executor 2:
- ID: 0
- Host: 172.22.0.4:40765
- Active: True
- Memory Used: 0 / 455501414
- Cores: 16
  • 成功部署PySpark Standalone模式docker desktop界面

image.png

  • Spark Master Web UI (http://localhost:8080/ )

image-2.png

  • Spark Worker Web UI (http://localhost:8081/executors/ )

image-3.png

In [3]:
# 从指定的文件创建一个 RDD。
data = sc.textFile('demo_file.txt')
print(type(data))
# collect() 会将 RDD 中的元素收集到一个列表中。
data.collect()
<class 'pyspark.rdd.RDD'>
Out[3]:
['This is just a demo file. ',
 'Normally, a file this small would have no reaon to be on HDFS.']