个人网站备案资料,做网站在哪里添加关键词,成都网站推广哪家专业,建设工程质量监督竣工备案网站用C实现高性能数据处理#xff0c;Python实现操作Redshift导入数据文件。 在Visual Studio 2022中用C和ODBC API导出SQL Server数据库中张表中的所有表的数据为CSV文件格式的数据流#xff0c;用逗号作为分隔符#xff0c;用双引号包裹每个数据#xff0c;字符串类型的数据…用C实现高性能数据处理Python实现操作Redshift导入数据文件。 在Visual Studio 2022中用C和ODBC API导出SQL Server数据库中张表中的所有表的数据为CSV文件格式的数据流用逗号作为分隔符用双引号包裹每个数据字符串类型的数据去掉前后的空格数据中如果包含双引号则将一个双引号替换为两个双引号创建gzip压缩文件输出数据流写入到gzip压缩文件包中的唯一的csv文件中一张表对应一个gzip压缩文件。用异步编程的方法优化数据库表的读取和写入文件的过程每个gzip压缩文件包中的csv文件的第一行都是用逗号分割的表头对应数据库表的字段程序需要异常处理和日志文件输出当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导出数据行数、成功或者失败的状态日志文件不存在则创建。
以下是一个使用C和ODBC API在Visual Studio 2022中实现将SQL Server数据库表数据导出为CSV文件并进行gzip压缩的示例代码。代码使用异步编程优化数据库读取和文件写入过程并包含异常处理和日志记录功能。
#include iostream
#include fstream
#include sstream
#include string
#include vector
#include future
#include sql.h
#include sqlext.h
#include zlib.h
#include ctime
#include iomanip// 日志记录函数
void logMessage(const std::string message) {std::ofstream logFile(export_log.txt, std::ios::app);if (logFile.is_open()) {auto now std::chrono::system_clock::now();std::time_t now_c std::chrono::system_clock::to_time_t(now);std::tm* now_tm std::localtime(now_c);std::ostringstream oss;oss std::put_time(now_tm, %Y-%m-%d %H:%M:%S) message std::endl;logFile oss.str();logFile.close();}
}// 处理字符串中的双引号
std::string escapeDoubleQuotes(const std::string str) {std::string result str;size_t pos 0;while ((pos result.find(, pos))! std::string::npos) {result.replace(pos, 1, 2, );pos 2;}return result;
}// 从数据库读取表数据
std::vectorstd::vectorstd::string readTableData(SQLHSTMT hstmt) {std::vectorstd::vectorstd::string data;SQLSMALLINT columnCount 0;SQLNumResultCols(hstmt, columnCount);std::vectorSQLCHAR* columns(columnCount);std::vectorSQLINTEGER lengths(columnCount);for (SQLSMALLINT i 0; i columnCount; i) {columns[i] new SQLCHAR[SQL_MAX_MESSAGE_LENGTH];SQLBindCol(hstmt, i 1, SQL_C_CHAR, columns[i], SQL_MAX_MESSAGE_LENGTH, lengths[i]);}while (SQLFetch(hstmt) SQL_SUCCESS) {std::vectorstd::string row;for (SQLSMALLINT i 0; i columnCount; i) {std::string value(reinterpret_castconst char*(columns[i]));value escapeDoubleQuotes(value);row.push_back(value);}data.push_back(row);}for (SQLSMALLINT i 0; i columnCount; i) {delete[] columns[i];}return data;
}// 将数据写入CSV文件
void writeToCSV(const std::vectorstd::vectorstd::string data, const std::vectorstd::string headers, const std::string filename) {std::ofstream csvFile(filename);if (csvFile.is_open()) {// 写入表头for (size_t i 0; i headers.size(); i) {csvFile headers[i] ;if (i headers.size() - 1) csvFile ,;}csvFile std::endl;// 写入数据for (const auto row : data) {for (size_t i 0; i row.size(); i) {csvFile row[i] ;if (i row.size() - 1) csvFile ,;}csvFile std::endl;}csvFile.close();} else {throw std::runtime_error(Failed to open CSV file for writing);}
}// 压缩CSV文件为gzip
void compressCSV(const std::string csvFilename, const std::string gzipFilename) {std::ifstream csvFile(csvFilename, std::ios::binary);std::ofstream gzipFile(gzipFilename, std::ios::binary);if (csvFile.is_open() gzipFile.is_open()) {gzFile gzOut gzopen(gzipFilename.c_str(), wb);if (gzOut) {char buffer[1024];while (csvFile.read(buffer, sizeof(buffer))) {gzwrite(gzOut, buffer, sizeof(buffer));}gzwrite(gzOut, buffer, csvFile.gcount());gzclose(gzOut);} else {throw std::runtime_error(Failed to open gzip file for writing);}csvFile.close();gzipFile.close();std::remove(csvFilename.c_str());} else {throw std::runtime_error(Failed to open files for compression);}
}// 导出单个表
void exportTable(const std::string server, const std::string database, const std::string schema, const std::string table) {SQLHENV henv nullptr;SQLHDBC hdbc nullptr;SQLHSTMT hstmt nullptr;try {SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, henv);SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);SQLAllocHandle(SQL_HANDLE_DBC, henv, hdbc);std::string connectionString DRIVER{ODBC Driver 17 for SQL Server};SERVER server ;DATABASE database ;UIDyour_username;PWDyour_password;SQLRETURN ret SQLDriverConnect(hdbc, nullptr, (SQLCHAR*)connectionString.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);if (ret! SQL_SUCCESS ret! SQL_SUCCESS_WITH_INFO) {throw std::runtime_error(Failed to connect to database);}std::string query SELECT * FROM schema . table;SQLAllocHandle(SQL_HANDLE_STMT, hdbc, hstmt);ret SQLExecDirect(hstmt, (SQLCHAR*)query.c_str(), SQL_NTS);if (ret! SQL_SUCCESS ret! SQL_SUCCESS_WITH_INFO) {throw std::runtime_error(Failed to execute query);}std::vectorstd::vectorstd::string data readTableData(hstmt);std::vectorstd::string headers;SQLSMALLINT columnCount 0;SQLNumResultCols(hstmt, columnCount);for (SQLSMALLINT i 0; i columnCount; i) {SQLCHAR columnName[SQL_MAX_COLUMN_NAME_LEN];SQLSMALLINT nameLen;SQLDescribeCol(hstmt, i 1, columnName, SQL_MAX_COLUMN_NAME_LEN, nameLen, nullptr, nullptr, nullptr, nullptr);headers.push_back(reinterpret_castconst char*(columnName));}std::string csvFilename table .csv;writeToCSV(data, headers, csvFilename);std::string gzipFilename table .gz;compressCSV(csvFilename, gzipFilename);std::ostringstream oss;oss Database: database , Schema: schema , Table: table , Rows Exported: data.size() , Status: Success;logMessage(oss.str());} catch (const std::exception e) {std::ostringstream oss;oss Database: database , Schema: schema , Table: table , Status: Failed, Error: e.what();logMessage(oss.str());} finally {if (hstmt) SQLFreeHandle(SQL_HANDLE_STMT, hstmt);if (hdbc) SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc);if (henv) SQLFreeHandle(SQL_HANDLE_ENV, henv);}
}int main() {std::string server 实例名;std::string database 数据库名;std::string schema Schema名;std::vectorstd::string tables {表名1, 表名2, 表名3};std::vectorstd::futurevoid futures;for (const auto table : tables) {futures.push_back(std::async(std::launch::async, exportTable, server, database, schema, table));}for (auto fut : futures) {fut.get();}return 0;
}代码说明
日志记录 logMessage 函数用于记录操作日志包括时间戳、数据库名、schema名、表名、导出数据行数和操作状态。 字符串处理 escapeDoubleQuotes 函数用于处理字符串中的双引号将其替换为两个双引号。 数据库读取 readTableData 函数使用ODBC API从数据库中读取表数据并将其存储在二维向量中。 CSV写入 writeToCSV 函数将数据写入CSV文件包括表头和数据行并用双引号包裹每个数据使用逗号作为分隔符。 文件压缩 compressCSV 函数将生成的CSV文件压缩为gzip格式并删除原始CSV文件。 表导出 exportTable 函数负责连接数据库、执行查询、读取数据、写入CSV文件并压缩。 主函数 main 函数定义了数据库服务器、数据库名、schema名和表名并使用异步任务并行导出每个表的数据。
用Python删除当前目录下所有功能扩展名为gz文件接着运行export_sqlserver.exe程序输出该程序的输出内容并等待它运行完成然后连接SQL Server数据库和Amazon Redshift数据仓库从数据库中获取所有表和它们的字段名然后在Redshift中创建字段名全部相同的同名表字段长度全部为最长的varchar类型如果表已经存在则不创建表自动上传当前目录下所有功能扩展名为gz文件到S3默认覆盖同名的文件然后使用COPY INTO将S3上包含csv文件的gz压缩包导入对应创建的Redshift表中文件数据的第一行是表头导入所有上传的文件到Redshift表程序需要异常处理和日志文件输出当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导入数据行数、成功或者失败的状态日志文件不存在则创建。
import os
import subprocess
import pyodbc
import redshift_connector
import boto3
import logging
from datetime import datetime# 配置日志记录
logging.basicConfig(filenameoperation_log.log, levellogging.INFO,format%(asctime)s - %(message)s, datefmt%Y-%m-%d %H:%M:%S)def delete_gz_files():try:for file in os.listdir(.):if file.endswith(.gz):os.remove(file)logging.info(所有.gz文件已删除)except Exception as e:logging.error(f删除.gz文件时出错: {e})def run_export_sqlserver():try:result subprocess.run([export_sqlserver.exe], capture_outputTrue, textTrue)print(result.stdout)logging.info(export_sqlserver.exe运行成功)except Exception as e:logging.error(f运行export_sqlserver.exe时出错: {e})def create_redshift_tables():# SQL Server 连接配置sqlserver_conn_str DRIVER{ODBC Driver 17 for SQL Server};SERVERyour_sqlserver_server;DATABASEyour_database;UIDyour_username;PWDyour_passwordtry:sqlserver_conn pyodbc.connect(sqlserver_conn_str)sqlserver_cursor sqlserver_conn.cursor()# Redshift 连接配置redshift_conn redshift_connector.connect(hostyour_redshift_host,databaseyour_redshift_database,useryour_redshift_user,passwordyour_redshift_password,port5439)redshift_cursor redshift_conn.cursor()sqlserver_cursor.execute(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE BASE TABLE)tables sqlserver_cursor.fetchall()for table in tables:table_name table[0]sqlserver_cursor.execute(fSELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME {table_name})columns sqlserver_cursor.fetchall()column_definitions , .join([f{column[0]} VARCHAR(MAX) for column in columns])try:redshift_cursor.execute(fCREATE TABLE IF NOT EXISTS {table_name} ({column_definitions}))redshift_conn.commit()logging.info(f在Redshift中成功创建表 {table_name})except Exception as e:logging.error(f在Redshift中创建表 {table_name} 时出错: {e})sqlserver_conn.close()redshift_conn.close()except Exception as e:logging.error(f连接数据库或创建表时出错: {e})def upload_gz_files_to_s3():s3 boto3.client(s3)bucket_name your_bucket_nametry:for file in os.listdir(.):if file.endswith(.gz):s3.upload_file(file, bucket_name, file)logging.info(f成功上传文件 {file} 到S3)except Exception as e:logging.error(f上传文件到S3时出错: {e})def copy_data_to_redshift():redshift_conn redshift_connector.connect(hostyour_redshift_host,databaseyour_redshift_database,useryour_redshift_user,passwordyour_redshift_password,port5439)redshift_cursor redshift_conn.cursor()bucket_name your_bucket_nametry:for file in os.listdir(.):if file.endswith(.gz) and file.endswith(.csv.gz):table_name file.split(.)[0]s3_path fs3://{bucket_name}/{file}sql fCOPY {table_name} FROM {s3_path} IAM_ROLE your_iam_role CSV HEADERtry:redshift_cursor.execute(sql)redshift_conn.commit()row_count redshift_cursor.rowcountlogging.info(f成功将数据导入表 {table_name}导入行数: {row_count})except Exception as e:logging.error(f将数据导入表 {table_name} 时出错: {e})except Exception as e:logging.error(f连接Redshift或导入数据时出错: {e})finally:redshift_conn.close()if __name__ __main__:delete_gz_files()run_export_sqlserver()create_redshift_tables()upload_gz_files_to_s3()copy_data_to_redshift()代码说明
日志记录使用 logging 模块配置日志记录记录操作的时间戳和操作信息到 operation_log.log 文件。 删除.gz文件 delete_gz_files 函数删除当前目录下所有扩展名为 .gz 的文件。 运行export_sqlserver.exe run_export_sqlserver 函数运行 export_sqlserver.exe 程序并输出其内容。 创建Redshift表 create_redshift_tables 函数连接SQL Server和Redshift数据库获取SQL Server中所有表和字段名在Redshift中创建同名表字段类型为 VARCHAR(MAX) 。 上传.gz文件到S3 upload_gz_files_to_s3 函数上传当前目录下所有扩展名为 .gz 的文件到S3。 将数据从S3导入Redshift copy_data_to_redshift 函数使用 COPY INTO 语句将S3上的CSV压缩包数据导入对应的Redshift表中。
请根据实际的数据库配置、S3桶名和IAM角色等信息修改代码中的相关参数。