当前位置: 首页 > news >正文

四川宜宾建设局官方网站校园官方网站建设的书籍

四川宜宾建设局官方网站,校园官方网站建设的书籍,网页设计师主要做什么,数据网站排名# PyFlink 数据写入 MySQL 项目文档 ## 1. 项目概述 本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署#xff0c;包含 Flink 集群#xff08;JobManager 和 TaskManager#xff09;以及 MySQL 数据库。 ## 2. 环境要求 - Docker - Dock…# PyFlink 数据写入 MySQL 项目文档 ## 1. 项目概述 本项目实现了一个使用 PyFlink 将数据写入 MySQL 的示例。项目使用 Docker 容器化部署包含 Flink 集群JobManager 和 TaskManager以及 MySQL 数据库。 ## 2. 环境要求 - Docker - Docker Compose - 操作系统支持 Linux/MacOS/Windows ## 3. 镜像版本及配置 ### 3.1 Flink 镜像 #### 基础镜像 - 官方镜像flink:1.17.0-scala_2.12-java11 - 自定义镜像flink-scala_2.12-java11-py3.8 #### Dockerfile 配置 dockerfile # 使用官方 Flink 基础镜像 FROM flink:1.17.0-scala_2.12-java11 # 设置环境变量 ENV PYTHON_VERSION3.8 \ PYTHON_PIP_VERSION23.2.1 \ FLINK_HOME/opt/flink \ PATH/opt/conda/bin:$PATH # 安装系统依赖 RUN apt-get update \ apt-get install -y --no-install-recommends \ wget \ bzip2 \ ca-certificates \ curl \ openjdk-11-jdk-headless \ build-essential \ python3-dev # 安装 Miniconda RUN wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_23.5.2-0-Linux-x86_64.sh \ /bin/bash Miniconda3-py38_23.5.2-0-Linux-x86_64.sh -b -p /opt/conda \ rm Miniconda3-py38_23.5.2-0-Linux-x86_64.sh # 安装 PyFlink 及依赖 RUN pip install --no-cache-dir \ numpy1.21.4,1.22.0 \ pandas1.3.0,1.4.0 \ apache-flink1.17.0 # 设置环境变量 ENV PYFLINK_PYTHON/opt/conda/bin/python \ PYFLINK_DRIVER_EXECUTABLE/opt/conda/bin/python ### 3.2 MySQL 镜像 - 版本mysql:8.0 - 端口映射3306:3306 - 环境变量 - MYSQL_ROOT_PASSWORDroot - MYSQL_DATABASEtest ## 4. 容器部署 ### 4.1 构建自定义 Flink 镜像 bash # 在 docker_file 目录下执行 docker build -t flink-scala_2.12-java11-py3.8 -f docker_file_flink_1.17_scala_212_java11_py38 . ### 4.2 Docker Compose 配置 yaml version: 3 services: jobmanager: image: flink-scala_2.12-java11-py3.8 ports: - 8081:8081 command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESSjobmanager volumes: - ./flink/custom_jars:/opt/flink/lib networks: - flink-network taskmanager: image: flink-scala_2.12-java11-py3.8 depends_on: - jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESSjobmanager volumes: - ./flink/custom_jars:/opt/flink/lib networks: - flink-network networks: flink-network: driver: bridge ### 4.3 启动服务 bash # 启动 Flink 集群 docker-compose up -d # 启动 MySQL需要连接到 Flink 网络 docker run --name mysql-flink \ -e MYSQL_ROOT_PASSWORDroot \ -e MYSQL_DATABASEtest \ -p 3306:3306 \ --network flink-network \ -d mysql:8.0 ### 4.4 网络配置说明 #### 4.4.1 网络架构 - 创建了名为 flink-network 的桥接网络 - JobManager 和 TaskManager 通过该网络通信 - MySQL 容器也连接到该网络确保与 Flink 集群互通 #### 4.4.2 网络连接验证 bash # 查看网络列表 docker network ls # 查看网络详情 docker network inspect flink-network # 测试网络连通性在 JobManager 容器中 docker exec -it docker_utils-jobmanager-1 ping mysql-flink #### 4.4.3 关键网络配置点 1. Flink 集群内部通信 - JobManager 和 TaskManager 通过 flink-network 网络通信 - 使用服务名 jobmanager 作为 RPC 地址 2. MySQL 连接配置 - MySQL 容器通过 --network flink-network 连接到 Flink 网络 - 在 PyFlink 代码中使用 mysql-flink 作为主机名连接 MySQL python .with_url(jdbc:mysql://mysql-flink:3306/test) 3. 端口映射 - Flink Web UI: 8081:8081 - MySQL: 3306:3306 #### 4.4.4 网络故障排查 1. 检查网络连接 bash # 查看容器网络连接状态 docker network inspect flink-network # 检查容器日志 docker logs docker_utils-jobmanager-1 docker logs docker_utils-taskmanager-1 docker logs mysql-flink 2. 常见网络问题 - 容器间无法通信检查是否在同一网络 - 连接超时检查服务名是否正确 - 端口冲突检查端口映射 ### 4.5 JobManager 与 TaskManager 分离部署说明 - **必须分开部署**JobManager 和 TaskManager 必须部署在不同容器中。 - **原因** 1. **资源隔离**JobManager 负责任务调度和资源管理TaskManager 负责实际执行任务合并会导致资源竞争。 2. **高可用性**JobManager 故障时TaskManager 需继续运行合并会导致单点故障。 3. **扩展性**TaskManager 需根据任务负载动态扩展合并会限制扩展能力。 - **部署要求**至少 1 个 JobManager 容器 至少 1 个 TaskManager 容器。 ## 5. MySQL 配置 ### 5.1 创建用户和授权 sql CREATE USER IF NOT EXISTS root% IDENTIFIED BY root; GRANT ALL PRIVILEGES ON *.* TO root% WITH GRANT OPTION; FLUSH PRIVILEGES; ### 5.2 创建数据表 sql CREATE TABLE IF NOT EXISTS test.users ( id INT PRIMARY KEY, name VARCHAR(255), age INT ); ## 6. PyFlink 配置 ### 6.1 依赖配置 - PyFlink 版本1.17.0 - 必需依赖 - numpy1.21.4,1.22.0 - pandas1.3.0,1.4.0 - apache-flink1.17.0 ### 6.2 JDBC 连接器 - 将以下 jar 包放置在 flink/custom_jars 目录 1. Flink JDBC 连接器 - 文件名flink-connector-jdbc-3.1.1-1.17.jar - 作用提供 Flink DataStream API 与 JDBC 数据库的连接能力 - 版本要求与 Flink 1.17.0 兼容 - 下载地址 - Maven 中央仓库https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/ - 直接下载链接https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar - 注意事项 - 确保与 Flink 1.17.0 兼容 - 使用 DataStream API 的连接器版本 - 文件权限确保 jar 包在容器中有正确的读取权限 2. MySQL JDBC 驱动 - 文件名mysql-connector-java-8.0.28.jar - 作用提供 MySQL 数据库连接支持 - 版本要求与 MySQL 8.0 兼容 - 下载地址 - Maven 中央仓库https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/ - 直接下载链接https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar - 下载方式 1. 使用 wget 命令下载 bash # 创建目录 mkdir -p flink/custom_jars cd flink/custom_jars # 下载 Flink JDBC 连接器 wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar # 下载 MySQL JDBC 驱动 wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar 2. 使用 curl 命令下载 bash # 创建目录 mkdir -p flink/custom_jars cd flink/custom_jars # 下载 Flink JDBC 连接器 curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar # 下载 MySQL JDBC 驱动 curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar - 目录结构 flink/ └── custom_jars/ ├── flink-connector-jdbc-3.1.1-1.17.jar └── mysql-connector-java-8.0.28.jar - 部署位置 1. JobManager 容器docker_utils-jobmanager-1 - 路径/opt/flink/lib/ - 包含两个 jar 包都需要 - 用途用于提交和执行 PyFlink 作业 2. TaskManager 容器docker_utils-taskmanager-1 - 路径/opt/flink/lib/ - 包含两个 jar 包都需要 - 用途用于执行具体的任务 3. MySQL 容器mysql-flink - 不需要部署任何 jar 包 - 只提供数据库服务 - 部署方式 yaml # docker-compose.yml 中的配置 volumes: - ./flink/custom_jars:/opt/flink/lib 这个配置会自动将 jar 包挂载到 JobManager 和 TaskManager 容器中。 - 注意事项 1. 两个 jar 包都必须放在 /opt/flink/lib 目录下 2. 确保 jar 包版本与 Flink 和 MySQL 版本兼容 3. 使用 DataStream API 时需要使用对应的 JDBC 连接器 4. 不要使用 Table API 的连接器flink-connector-jdbc-table_2.12-1.17.0.jar 5. 确保 JobManager 和 TaskManager 容器都能访问到这些 jar 包 ## 7. PyFlink 代码示例 python from pyflink.datastream import StreamExecutionEnvironment from pyflink.common.typeinfo import Types from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions def main(): # 创建执行环境 env StreamExecutionEnvironment.get_execution_environment() # 示例数据 sample_data [(1, John, 30), (2, Alice, 25)] # 定义类型信息 type_info Types.ROW([Types.INT(), Types.STRING(), Types.INT()]) # 创建数据流 data_stream env.from_collection(sample_data, type_infotype_info) # SQL 语句 sql INSERT INTO users (id, name, age) VALUES (?, ?, ?) # 配置 JDBC sink jdbc_sink JdbcSink.sink( sql, type_info, JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .with_url(jdbc:mysql://mysql-flink:3306/test) .with_driver_name(com.mysql.cj.jdbc.Driver) .with_user_name(root) .with_password(root) .build(), JdbcExecutionOptions.builder() .with_batch_size(1000) .with_batch_interval_ms(200) .with_max_retries(5) .build() ) # 添加 sink data_stream.add_sink(jdbc_sink) # 执行作业 env.execute(Flink MySQL Output Job) if __name__ __main__: main() ## 8. 执行步骤 1. 复制 PyFlink 脚本到 JobManager 容器 bash docker cp flink/test_case/test_flink_simply_output_to_mysql.py docker_utils-jobmanager-1:/opt/flink/ 2. 进入 JobManager 容器 bash docker exec -it docker_utils-jobmanager-1 bash 3. 执行 PyFlink 脚本 bash flink run -py /opt/flink/test_flink_simply_output_to_mysql.py 4. 验证数据写入 bash docker exec -i mysql-flink mysql -uroot -proot -e SELECT * FROM test.users; ## 9. 注意事项 1. 确保 MySQL JDBC 驱动 jar 包已正确放置在 flink/custom_jars 目录 2. 检查 MySQL 用户权限配置是否正确 3. 确保 Flink 集群和 MySQL 容器网络互通 4. 注意 PyFlink 版本与 Flink 版本匹配 5. 确保 Python 环境变量正确配置 ## 10. 常见问题解决 1. 连接 MySQL 失败 - 检查 MySQL 用户权限 - 验证网络连接 - 确认 MySQL 容器状态 2. PyFlink 执行错误 - 检查 Python 环境 - 验证依赖包版本 - 查看 Flink 日志 3. 数据写入失败 - 检查表结构 - 验证数据类型匹配 - 查看 MySQL 错误日志
http://www.dnsts.com.cn/news/154119.html

相关文章:

  • 网站动态链接做Seo怎么办山西省城乡建设厅网站
  • 网站维护需要开封建设教育协会网站
  • 建筑企业网站模板阳江seo网站推广
  • 绵阳网站建站铜陵做网站
  • 做汽车养护的网站中国十大购物网站
  • 国外装饰公司网站恩施seo搜索引擎优化
  • 网站接广告平台wordpress二维码制作
  • 微信公众号 网站开发 2016做网站建设公司crm在线的提升服务
  • 上海专业的网站建网页游戏服务器搭建
  • 电子工程网站有哪些wordpress自动提交
  • 邢台网站建设报价多少钱网站建设时间及简介
  • 网店代运营合同自己如何做网站优化
  • 行业网站模版wordpress 首页 分类
  • 沈阳城市建设学院网站为什么别的电脑能打开的网站我的电脑打不开
  • 美工个人网站个人做电影网站违法吗
  • 用dw做网站结构图wordpress添加 下载文件
  • 最新章节 62.一起来做网站吧ui网页设计公司
  • 网站开发获取报价友情链接怎么做
  • 网站建设ssc源码最新中国室内设计网欧式
  • 网站代做正规品牌网站设计图片
  • 网站制作 佛山哪个外包公司比较好
  • 怎么给网站做优化编程猫官网
  • 网站背景音乐网站欢迎界面源码
  • 网站开发外文参考文献wordpress流行漏洞
  • 淘宝app网站建设编程猫加盟条件和费用
  • 邢台网站关键词优化建筑最吃香的专业
  • 做网站怎样设置搜索引擎做图表用的网站
  • 建网站的公司哪家好网络广告人社区官网
  • 大丰做网站找哪家好可以做公众号背景图的网站
  • iis 多网站安全设置建设工程信息网一体化平台