Lesson 33. Load Large CSVs Into Data Warehouse Staging Tables

BULK INSERT is a nice SQL statement to load large amounts of data from a flat file into a database table. The problem with BULK INSERT is that the schema of the file and the schema of the table have to be the same. That is problematic when the table you are loading has audit columns.

The solution is to create a view based on the table that only has the columns that you want to load. You name the view and the table the same. The table should be in a schema. The view should be in the dbo schema. That way, you can reference both with the same name, and SQL Server knows when you are referring to the view or the table.

When you write your BULK INSERT statement, you reference the view. That will load the actual table.


Example #1: Load A Large CSV

In order to run this example, you will need to set up SQL Server by creating the staging table and the view.

In the SQL directory of the source code directory SQL\33 Load Large CSVs Into Staging, you will find two files sequentially named. Open them in SSMS and run them in the order specified by their file name. First the table, then the view.

Now that we are good to go. We are going to take the large Eurostat data that we compiled, and shoehorn it into a table.

In this example, I am going to introduce some feedback mechanisms in the form of timestamps. You can use these to understand how long the processes is taking and where the process is in the execution.

Indexes slow the load of database tables. For small amounts of data, that is no big deal. When loading large files, you need to drop any indexes first. Once the file is loaded, you then rebuild any indexes that you dropped.

This, of course, is a long running process. However, it is much faster than other methods.

Script Performance

This is not a toy example and it is entirely possible you could choke your machine. On my box, the script took about 30 minutes to load the data. My machine specs are below.

Processor: Intel® Core™ i5-4590 CPU @ 3.30GHz RAM: 16.0 GB System type: 64-bit Operating System, x64-based processor

If your system has less RAM, expect the process to take longer. If your system performance is significantly less than what is stated above, do not attempt to run this script with the sample file. Instead, cope a file from the FileLoopExample folder and use that instead. Either rename the file or change the value of the file_name variable.


Before you run this example, you may want to review Lesson 56. Troubleshooting Long Running Queries from the Practical T-SQL Pocket Guide For Beginners.

Viewing Results

When the process has completed, you can view the results by running the SQL queries found after the Python script.

#import what you need and set up your file variables.
import os
import pandas as pd
import pyodbc as db
import time

script_dir = os.getcwd()
data_directory = 'data\\'
source_directory = 'CombineCSVExample\\'
file_name = 'EurostatDataCombined.csv'
source_path = os.path.join(script_dir, data_directory,source_directory, file_name)

#Build SQL Statements
drop_index_sql = 'ALTER TABLE [euro].[EurostatData] DROP CONSTRAINT [PK_EurostatData] WITH ( ONLINE = OFF )'

add_index_sql = 'ALTER TABLE [euro].[EurostatData] ADD  CONSTRAINT [PK_EurostatData] PRIMARY KEY CLUSTERED'
add_index_sql = add_index_sql + '([ETLKey] ASC) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,' 
add_index_sql = add_index_sql + 'SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON,'
add_index_sql = add_index_sql + 'ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]'

sql = "BULK INSERT EurostatData FROM '" + source_path + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2, TABLOCK, BATCHSIZE = 100000)"

#Set up the connection.
print('Connecting to SQL Server database' + time.strftime(' %H:%M:%S'))
connection_string = 'DSN=ETL;'
conn = db.connect(connection_string)
print('Preparing database for update' + time.strftime(' %H:%M:%S'))
csr = conn.cursor()

#now let's load the file
print('Begin processing {}.'.format(file_name) + time.strftime(' %H:%M:%S'))
csr.execute('TRUNCATE TABLE euro.EurostatData')
print('Updating staging')
print('Processing file {} complete.'.format(file_name) + time.strftime(' %H:%M:%S'))

print('Complete: Processing Data'  + time.strftime(' %H:%M:%S'))

SELECT COUNT(*) FROM euro.EurostatData --31,599,999

SELECT TOP 100 * FROM euro.EurostatData

Last updated