在Business Critical数据库中插入数据(100.000 行)需要 14 秒。这次我能够使用具有 20 列的表的单线程进行重现。
为了改进此 Python 代码,我建议并行运行此批量插入,每批大小为 10.000行,而且,我遵循了减少此过程的执行时间的最佳实践:
客户端虚拟机级别:
根据我需要创建一个 CPU/Vcore 的并行进程数量,在本例中为 10 个 vCore。
将虚拟机放置在与数据库相同的区域中。
数据库级别:
创建一个包含 20 列的表。
由于PK 是一个顺序键,我在聚集索引定义中包含了参数 OPTIMIZE_FOR_SEQUENTIAL_KEY = ON
配置与我想要的最大并行进程数相同数量的 CPU/vCore 。在本例中,为 10 个 vCore。
根据数据量使用关键业务来减少存储延迟。
Python代码级别:
使用executemany方法为了减少网络往返,仅发送参数值。
批量运行(1000,10000)而不是单个进程。
使用SET NOCOUNT ON 减少有关插入行数的回复响应/行集。
在连接字符串中使用 autocommit=False
您可以在此处找到 Python 代码示例。该 Python 读取 CSV 文件,并使用线程池每 10000 行执行一次批量插入。
import csv
import pyodbc
import threading
import os
import datetime
class ThreadsOrder: # Class to run in parallel the process.
def ExecuteSQL(self, a, s, n):
TExecutor = threading.Thread(target=ExecuteSQL, args=(a, s, n,))
TExecutor.start()
def SaveResults(Message, bSaveFile): # Save the details of the file.
try:
print(Message)
if (bSaveFile == True):
file_object = open(filelog, "a")
file_object.write(
datetime.datetime.strftime(datetime.datetime.now(), '%d/%m/%y %H:%M:%S') + '-' + Message + 'n')
file_object.close()
except BaseException as e:
print('And error occurred - ', format(e))
def ExecuteSQLcc(sTableName):
try:
cnxn1 = pyodbc.connect(
"DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" + SQL_user + ';PWD=' + SQL_password,
autocommit=False, Timeout=3600)
cursor = cnxn1.cursor()
cursor.execute("DROP TABLE IF EXISTS" + sTableName)
cursor.commit()
cursor.execute("CREATE TABLE " + sTableName + " ("
" [Key] [int] NOT NULL,"
" [Num_TEST] [int] NULL,"
" [TEST_01] [varchar](6) NULL,"
" [TEST_02] [varchar](6) NULL,"
" [TEST_03] [varchar](6) NULL,"
" [TEST_04] [varchar](6) NULL,"
" [TEST_05] [varchar](6) NULL,"
" [TEST_06] [varchar](6) NULL,"
" [TEST_07] [varchar](6) NULL,"
" [TEST_08] [varchar](6) NULL,"
" [TEST_09] [varchar](6) NULL,"
" [TEST_10] [varchar](6) NULL,"
" [TEST_11] [varchar](6) NULL,"
" [TEST_12] [varchar](6) NULL,"
" [TEST_13] [varchar](6) NULL,"
" [TEST_14] [varchar](6) NULL,"
" [TEST_15] [varchar](6) NULL,"
" [TEST_16] [varchar](6) NULL,"
" [TEST_17] [varchar](6) NULL,"
" [TEST_18] [varchar](6) NULL,"
" [TEST_19] [varchar](6) NULL,"
" [TEST_20] [varchar](6) NULL)")
cursor.commit()
cursor.execute(
"CREATE CLUSTERED INDEX [ix_ms_example] ON " + sTableName + " ([Key] ASC) WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = ON) ON [PRIMARY]")
cursor.commit()
except BaseException as e:
SaveResults('Executing SQL - an error occurred - ' + format(e), True)
def ExecuteSQL(a, sTableName, n):
try:
Before = datetime.datetime.now()
if n == -1:
sTypeProcess = "NoAsync"
else:
sTypeProcess = "Async - Thread:" + str(n)
SaveResults('Executing at ' + str(Before) + " Process Type: " + sTypeProcess, True)
cnxn1 = pyodbc.connect(
"DRIVER={ODBC Driver 17 for SQL Server};APP=Bulk Insert Test;SERVER=" + SQL_server + ";DATABASE=" + SQL_database + ";UID=" + SQL_user + ';PWD=' + SQL_password,
autocommit=False, Timeout=3600)
cursor = cnxn1.cursor()
cursor.fast_executemany = True
cursor.executemany(
"SET NOCOUNT ON;INSERT INTO " + sTableName + " ([Key], Num_TEST, TEST_01, TEST_02, TEST_03, TEST_04, TEST_05, TEST_06, TEST_07, TEST_08, TEST_09, TEST_10, TEST_11, TEST_12, TEST_13, TEST_14, TEST_15, TEST_16, TEST_17, TEST_18, TEST_19, TEST_20) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
a)
cursor.commit()
SaveResults('Time Difference INSERT process ' + str(datetime.datetime.now() - Before) + " " + sTypeProcess,
True)
except BaseException as e:
SaveResults('Executing SQL - an error occurred - ' + format(e), True)
# Connectivity details.
SQL_server = 'tcp:servername.database.windows.net,1433'
SQL_database = 'databasename'
SQL_user = 'username'
SQL_password = 'password'
# file details to read
filepath = 'c:\k\' ##To Read the demo file
filelog = filepath + '\Error.log' # Save the log.
chunksize = 10000 # Transaction batch rows.
sTableName = "[test_data]" # Table Name (dummy)
pThreadOrder = ThreadsOrder()
nThread = 0 # Number of Threads -- Right now, we provided an unlimited threads.
ExecuteSQLcc(sTableName)
Before = datetime.datetime.now()
line_count = 0
for directory, subdirectories, files in os.walk(filepath):
for file in files:
name, ext = os.path.splitext(file)
if ext == '.csv':
a = []
SaveResults('Reading the file ' + name, True)
BeforeFile = datetime.datetime.now()
with open(os.path.join(directory, file), mode='r') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
line_count += 1
if line_count > 1:
a.append(row)
if (line_count % chunksize) == 0:
deltaFile = datetime.datetime.now() - BeforeFile
nThread = nThread + 1
SaveResults(
'Time Difference Reading file is ' + str(deltaFile) + ' for ' + str(line_count) + ' rows',
True)
pThreadOrder.ExecuteSQL(a, sTableName, nThread) # Open a new theard per transaction batch size.
# ExecuteSQL(a,sTableName,-1)
a = []
BeforeFile = datetime.datetime.now()
SaveResults('Total Time Difference Reading file is ' + str(datetime.datetime.now() - Before) + ' for ' + str(
line_count) + ' rows for the file: ' + name, True)
在执行过程中如果您需要了解连接数、行数以及对资源的影响,请参见以下 TSQL
SELECT substring(REPLACE(REPLACE(SUBSTRING(ST.text, (req.statement_start_offset/2) + 1, ( (CASE statement_end_offset WHEN -1 THEN DATALENGTH(ST.text) ELSE req.statement_end_offset END - req.statement_start_offset)/2) + 1) , CHAR(10), ' '), CHAR(13), ' '), 1, 512) AS statement_text ,req.database_id ,program_name ,req.session_id , req.cpu_time 'cpu_time_ms' , req.status , wait_time , wait_resource , wait_type , last_wait_type , req.total_elapsed_time , total_scheduled_time , req.row_count as [Row Count] , command , scheduler_id , memory_usage , req.writes , req.reads , req.logical_reads, blocking_session_id FROM sys.dm_exec_requests AS req inner join sys.dm_exec_sessions as sess on sess.session_id = req.session_id CROSS APPLY sys.dm_exec_sql_text(req.sql_handle) as ST where req.session_id <> @@SPID select count(*) from test_data select * from sys.dm_db_resource_stats order by end_time desc