四川宜宾建设局官方网站,校园官方网站建设的书籍,网页设计师主要做什么,数据网站排名# PyFlink 数据写入 MySQL 项目文档 ## 1. 项目概述 本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署#xff0c;包含 Flink 集群#xff08;JobManager 和 TaskManager#xff09;以及 MySQL 数据库。 ## 2. 环境要求 - Docker
- Dock…# PyFlink 数据写入 MySQL 项目文档 ## 1. 项目概述 本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署包含 Flink 集群JobManager 和 TaskManager以及 MySQL 数据库。 ## 2. 环境要求 - Docker
- Docker Compose
- 操作系统支持 Linux/MacOS/Windows ## 3. 镜像版本及配置 ### 3.1 Flink 镜像 #### 基础镜像
- 官方镜像flink:1.17.0-scala_2.12-java11
- 自定义镜像flink-scala_2.12-java11-py3.8 #### Dockerfile 配置
dockerfile
# 使用官方 Flink 基础镜像
FROM flink:1.17.0-scala_2.12-java11 # 设置环境变量
ENV PYTHON_VERSION3.8 \
PYTHON_PIP_VERSION23.2.1 \
FLINK_HOME/opt/flink \
PATH/opt/conda/bin:$PATH # 安装系统依赖
RUN apt-get update \
apt-get install -y --no-install-recommends \
wget \
bzip2 \
ca-certificates \
curl \
openjdk-11-jdk-headless \
build-essential \
python3-dev # 安装 Miniconda
RUN wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_23.5.2-0-Linux-x86_64.sh \
/bin/bash Miniconda3-py38_23.5.2-0-Linux-x86_64.sh -b -p /opt/conda \
rm Miniconda3-py38_23.5.2-0-Linux-x86_64.sh # 安装 PyFlink 及依赖
RUN pip install --no-cache-dir \
numpy1.21.4,1.22.0 \
pandas1.3.0,1.4.0 \
apache-flink1.17.0 # 设置环境变量
ENV PYFLINK_PYTHON/opt/conda/bin/python \
PYFLINK_DRIVER_EXECUTABLE/opt/conda/bin/python ### 3.2 MySQL 镜像
- 版本mysql:8.0
- 端口映射3306:3306
- 环境变量
- MYSQL_ROOT_PASSWORDroot
- MYSQL_DATABASEtest ## 4. 容器部署 ### 4.1 构建自定义 Flink 镜像
bash
# 在 docker_file 目录下执行
docker build -t flink-scala_2.12-java11-py3.8 -f docker_file_flink_1.17_scala_212_java11_py38 . ### 4.2 Docker Compose 配置
yaml
version: 3
services:
jobmanager:
image: flink-scala_2.12-java11-py3.8
ports:
- 8081:8081
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESSjobmanager
volumes:
- ./flink/custom_jars:/opt/flink/lib
networks:
- flink-network taskmanager:
image: flink-scala_2.12-java11-py3.8
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESSjobmanager
volumes:
- ./flink/custom_jars:/opt/flink/lib
networks:
- flink-network networks:
flink-network:
driver: bridge ### 4.3 启动服务
bash
# 启动 Flink 集群
docker-compose up -d # 启动 MySQL需要连接到 Flink 网络
docker run --name mysql-flink \
-e MYSQL_ROOT_PASSWORDroot \
-e MYSQL_DATABASEtest \
-p 3306:3306 \
--network flink-network \
-d mysql:8.0 ### 4.4 网络配置说明 #### 4.4.1 网络架构
- 创建了名为 flink-network 的桥接网络
- JobManager 和 TaskManager 通过该网络通信
- MySQL 容器也连接到该网络确保与 Flink 集群互通 #### 4.4.2 网络连接验证
bash
# 查看网络列表
docker network ls # 查看网络详情
docker network inspect flink-network # 测试网络连通性在 JobManager 容器中
docker exec -it docker_utils-jobmanager-1 ping mysql-flink #### 4.4.3 关键网络配置点
1. Flink 集群内部通信
- JobManager 和 TaskManager 通过 flink-network 网络通信
- 使用服务名 jobmanager 作为 RPC 地址 2. MySQL 连接配置
- MySQL 容器通过 --network flink-network 连接到 Flink 网络
- 在 PyFlink 代码中使用 mysql-flink 作为主机名连接 MySQL
python
.with_url(jdbc:mysql://mysql-flink:3306/test) 3. 端口映射
- Flink Web UI: 8081:8081
- MySQL: 3306:3306 #### 4.4.4 网络故障排查
1. 检查网络连接
bash
# 查看容器网络连接状态
docker network inspect flink-network # 检查容器日志
docker logs docker_utils-jobmanager-1
docker logs docker_utils-taskmanager-1
docker logs mysql-flink 2. 常见网络问题
- 容器间无法通信检查是否在同一网络
- 连接超时检查服务名是否正确
- 端口冲突检查端口映射 ### 4.5 JobManager 与 TaskManager 分离部署说明
- **必须分开部署**JobManager 和 TaskManager 必须部署在不同容器中。
- **原因**
1. **资源隔离**JobManager 负责任务调度和资源管理TaskManager 负责实际执行任务合并会导致资源竞争。
2. **高可用性**JobManager 故障时TaskManager 需继续运行合并会导致单点故障。
3. **扩展性**TaskManager 需根据任务负载动态扩展合并会限制扩展能力。
- **部署要求**至少 1 个 JobManager 容器 至少 1 个 TaskManager 容器。 ## 5. MySQL 配置 ### 5.1 创建用户和授权
sql
CREATE USER IF NOT EXISTS root% IDENTIFIED BY root;
GRANT ALL PRIVILEGES ON *.* TO root% WITH GRANT OPTION;
FLUSH PRIVILEGES; ### 5.2 创建数据表
sql
CREATE TABLE IF NOT EXISTS test.users (
id INT PRIMARY KEY,
name VARCHAR(255),
age INT
); ## 6. PyFlink 配置 ### 6.1 依赖配置
- PyFlink 版本1.17.0
- 必需依赖
- numpy1.21.4,1.22.0
- pandas1.3.0,1.4.0
- apache-flink1.17.0 ### 6.2 JDBC 连接器
- 将以下 jar 包放置在 flink/custom_jars 目录
1. Flink JDBC 连接器
- 文件名flink-connector-jdbc-3.1.1-1.17.jar
- 作用提供 Flink DataStream API 与 JDBC 数据库的连接能力
- 版本要求与 Flink 1.17.0 兼容
- 下载地址
- Maven 中央仓库https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
- 直接下载链接https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
- 注意事项
- 确保与 Flink 1.17.0 兼容
- 使用 DataStream API 的连接器版本
- 文件权限确保 jar 包在容器中有正确的读取权限 2. MySQL JDBC 驱动
- 文件名mysql-connector-java-8.0.28.jar
- 作用提供 MySQL 数据库连接支持
- 版本要求与 MySQL 8.0 兼容
- 下载地址
- Maven 中央仓库https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/
- 直接下载链接https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar - 下载方式
1. 使用 wget 命令下载
bash
# 创建目录
mkdir -p flink/custom_jars
cd flink/custom_jars
# 下载 Flink JDBC 连接器
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# 下载 MySQL JDBC 驱动
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar 2. 使用 curl 命令下载
bash
# 创建目录
mkdir -p flink/custom_jars
cd flink/custom_jars
# 下载 Flink JDBC 连接器
curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
# 下载 MySQL JDBC 驱动
curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar - 目录结构 flink/
└── custom_jars/
├── flink-connector-jdbc-3.1.1-1.17.jar
└── mysql-connector-java-8.0.28.jar - 部署位置
1. JobManager 容器docker_utils-jobmanager-1
- 路径/opt/flink/lib/
- 包含两个 jar 包都需要
- 用途用于提交和执行 PyFlink 作业 2. TaskManager 容器docker_utils-taskmanager-1
- 路径/opt/flink/lib/
- 包含两个 jar 包都需要
- 用途用于执行具体的任务 3. MySQL 容器mysql-flink
- 不需要部署任何 jar 包
- 只提供数据库服务 - 部署方式
yaml
# docker-compose.yml 中的配置
volumes:
- ./flink/custom_jars:/opt/flink/lib 这个配置会自动将 jar 包挂载到 JobManager 和 TaskManager 容器中。 - 注意事项
1. 两个 jar 包都必须放在 /opt/flink/lib 目录下
2. 确保 jar 包版本与 Flink 和 MySQL 版本兼容
3. 使用 DataStream API 时需要使用对应的 JDBC 连接器
4. 不要使用 Table API 的连接器flink-connector-jdbc-table_2.12-1.17.0.jar
5. 确保 JobManager 和 TaskManager 容器都能访问到这些 jar 包 ## 7. PyFlink 代码示例 python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions def main():
# 创建执行环境
env StreamExecutionEnvironment.get_execution_environment() # 示例数据
sample_data [(1, John, 30), (2, Alice, 25)] # 定义类型信息
type_info Types.ROW([Types.INT(), Types.STRING(), Types.INT()]) # 创建数据流
data_stream env.from_collection(sample_data, type_infotype_info) # SQL 语句
sql INSERT INTO users (id, name, age) VALUES (?, ?, ?) # 配置 JDBC sink
jdbc_sink JdbcSink.sink(
sql,
type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(jdbc:mysql://mysql-flink:3306/test)
.with_driver_name(com.mysql.cj.jdbc.Driver)
.with_user_name(root)
.with_password(root)
.build(),
JdbcExecutionOptions.builder()
.with_batch_size(1000)
.with_batch_interval_ms(200)
.with_max_retries(5)
.build()
) # 添加 sink
data_stream.add_sink(jdbc_sink)
# 执行作业
env.execute(Flink MySQL Output Job) if __name__ __main__:
main() ## 8. 执行步骤 1. 复制 PyFlink 脚本到 JobManager 容器
bash
docker cp flink/test_case/test_flink_simply_output_to_mysql.py docker_utils-jobmanager-1:/opt/flink/ 2. 进入 JobManager 容器
bash
docker exec -it docker_utils-jobmanager-1 bash 3. 执行 PyFlink 脚本
bash
flink run -py /opt/flink/test_flink_simply_output_to_mysql.py 4. 验证数据写入
bash
docker exec -i mysql-flink mysql -uroot -proot -e SELECT * FROM test.users; ## 9. 注意事项 1. 确保 MySQL JDBC 驱动 jar 包已正确放置在 flink/custom_jars 目录
2. 检查 MySQL 用户权限配置是否正确
3. 确保 Flink 集群和 MySQL 容器网络互通
4. 注意 PyFlink 版本与 Flink 版本匹配
5. 确保 Python 环境变量正确配置 ## 10. 常见问题解决 1. 连接 MySQL 失败
- 检查 MySQL 用户权限
- 验证网络连接
- 确认 MySQL 容器状态 2. PyFlink 执行错误
- 检查 Python 环境
- 验证依赖包版本
- 查看 Flink 日志 3. 数据写入失败
- 检查表结构
- 验证数据类型匹配
- 查看 MySQL 错误日志