Here is an example of how some of the information you have learned can be put together to develop a solution to a real world ETL problem.
Examples
Example #1: Absence Report Generation
We have been given the task of downloading a zip file containing student performance information. We then are required to use that information to create a report of student absences rolled up by sex and age.
We need to download the zip file, extract the one file we need, load it into a database, do the number crunching, and spit out an Excel file.
For this example to work, you need to create some objects in SQL server. Open the SQL folder in the example directory and run “create stage table.sql” in SSMS.
import osimport sysimport urllibimport datetimeimport pandas as pdimport pyodbc as dbimport zipfile as zffrom zipfile import ZipFileifnot'ScriptFolder'inglobals(): ScriptFolder = os.getcwd()ExampleFolder ='BringingItAllTogether\\'DataFolder ='data\\'InFolder ='In\\'OutFolder ='Out\\'ArchiveFolder ='Archive\\'DownloadZipFileName ='student.zip'DownloadFileName ='student-por.csv'DownloadFileNamePipe ='student_performance.csv'DownloadTimeStamp = datetime.datetime.today().strftime('%Y%m')ArchiveZipFileName ='student_performance_'+ DownloadTimeStamp +'.zip'OutputFileName ='absence_report.xlsx'SiteURL ='https://archive.ics.uci.edu/ml/index.php'FileURL ='https://archive.ics.uci.edu/ml/machine-learning-databases/00320/student.zip'#URL of FileInFolderPathZip = os.path.join(ScriptFolder, DataFolder, ExampleFolder, InFolder, DownloadZipFileName)InFolderPathSemiColon = os.path.join(ScriptFolder, DataFolder, ExampleFolder, InFolder, DownloadFileName)InFolderPathPipe = os.path.join(ScriptFolder, DataFolder, ExampleFolder, InFolder, DownloadFileNamePipe)InFolderDirectory = os.path.join(ScriptFolder, DataFolder, ExampleFolder, InFolder)ArchivePath = os.path.join(ScriptFolder, DataFolder, ExampleFolder, ArchiveFolder, ArchiveZipFileName)OutFolderPathPipe = os.path.join(ScriptFolder, DataFolder, ExampleFolder, OutFolder, OutputFileName)print("Starting: Processing Data")# verify that the site is available and the internet connection is workingtry:print("Validating status of site and internet connection") urllib.request.urlopen(SiteURL)exceptExceptionas e:print(e)# download the filetry:print("Downloading file to:", InFolderDirectory) urllib.request.urlretrieve(FileURL,InFolderPathZip)exceptExceptionas e:print(e)#Extract a single file from zipwithZipFile(InFolderPathZip, 'r')as zipObject: listOfFileNames = zipObject.namelist()for fileName in listOfFileNames:if fileName == DownloadFileName: zipObject.extract(fileName, InFolderDirectory)print('All the python files are extracted')# delete zip filetry:print("Deleting file: {}".format(DownloadZipFileName))if os.path.isfile(InFolderPathZip): os.remove(InFolderPathZip)exceptExceptionas e:print(e)# Read csv data into pandas and write | delimited txt filetry:print("Reading csv file: {}".format(DownloadFileName)) df = pd.read_csv(InFolderPathSemiColon,index_col=False, sep=";")print("Writing txt file to: {}".format(InFolderDirectory)) df.to_csv(InFolderPathPipe, sep="|",index=False)print("Deleting csv file: {}".format(DownloadFileName)) os.remove(InFolderPathSemiColon)exceptExceptionas e:print(e)#bulk load txt file to SQL Servertry:print("Connecting to SQL Server database") connection_string ='DSN=ETL;' conn = db.connect(connection_string)print("Preparing database for update") csr = conn.cursor() csr.execute("TRUNCATE TABLE uci.StudentPerformance") sql = "BULK INSERT StudentPerformance FROM '" + InFolderPathPipe + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
print("Update database with {} file data.".format(InFolderPathPipe)) csr.execute(sql)print("Completing SQL Server update") conn.commit() csr.close() conn.close()exceptExceptionas e:print(e)# zip txt file to archivetry:print("Creating zip file for txt file archive") archive = zf.ZipFile(ArchivePath, "w") os.chdir(InFolderDirectory) archive.write(DownloadFileNamePipe) archive.close() os.chdir(ScriptFolder) os.remove(InFolderPathPipe)exceptExceptionas e:print(e)#alter the below for your file.try:print("Connecting to SQL Server database") connection_string ='DSN=ETL;' conn = db.connect(connection_string) csr = conn.cursor() sql ='SELECT sex AS SEX, age AS AGE, SUM(CAST(absences AS INT)) AS Absences ' sql = sql +'FROM [ODS].[uci].[StudentPerformance] ' sql = sql +'GROUP BY sex, age ' sql = sql +'ORDER BY Absences' df = pd.read_sql(sql,conn) conn.commit() csr.close() conn.close()print("Writing report.") df.to_excel(OutFolderPathPipe,index=False)exceptExceptionas e:print(e)c