Lesson 28. Bringing It All Together

Here is an example of how some of the information you have learned can be put together to develop a solution to a real world ETL problem.

Examples

Example #1: Absence Report Generation

We have been given the task of downloading a zip file containing student performance information. We then are required to use that information to create a report of student absences rolled up by sex and age.

We need to download the zip file, extract the one file we need, load it into a database, do the number crunching, and spit out an Excel file.

For this example to work, you need to create some objects in SQL server. Open the SQL folder in the example directory and run “create stage table.sql” in SSMS.

import os
import sys
import urllib
import datetime
import pandas as pd
import pyodbc as db
import zipfile as zf
from zipfile import ZipFile


if not 'ScriptFolder' in globals():
    ScriptFolder = os.getcwd()
    
ExampleFolder = 'BringingItAllTogether\\'
DataFolder = 'data\\'
InFolder = 'In\\'
OutFolder = 'Out\\'
ArchiveFolder = 'Archive\\'
DownloadZipFileName = 'student.zip'
DownloadFileName = 'student-por.csv'
DownloadFileNamePipe = 'student_performance.csv'
DownloadTimeStamp = datetime.datetime.today().strftime('%Y%m')
ArchiveZipFileName = 'student_performance_' + DownloadTimeStamp + '.zip'
OutputFileName = 'absence_report.xlsx'


SiteURL = 'https://archive.ics.uci.edu/ml/index.php'
FileURL = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00320/student.zip' #URL of File

InFolderPathZip = os.path.join(ScriptFolder, DataFolder,  ExampleFolder, InFolder, DownloadZipFileName)
InFolderPathSemiColon = os.path.join(ScriptFolder, DataFolder,  ExampleFolder, InFolder, DownloadFileName)
InFolderPathPipe = os.path.join(ScriptFolder, DataFolder,  ExampleFolder, InFolder, DownloadFileNamePipe)
InFolderDirectory =  os.path.join(ScriptFolder, DataFolder, ExampleFolder, InFolder)
ArchivePath = os.path.join(ScriptFolder, DataFolder,  ExampleFolder, ArchiveFolder, ArchiveZipFileName)
OutFolderPathPipe = os.path.join(ScriptFolder, DataFolder,  ExampleFolder, OutFolder, OutputFileName)

print("Starting: Processing Data")

# verify that the  site is available and the internet connection is working
try:
    print("Validating status of site and internet connection")
    urllib.request.urlopen(SiteURL)
except Exception as e:
    print(e)

# download the file
try:
    print("Downloading file to:", InFolderDirectory)
    urllib.request.urlretrieve(FileURL,InFolderPathZip)
except Exception as e:
    print(e)

#Extract a single file from zip
with ZipFile(InFolderPathZip, 'r') as zipObject:
   listOfFileNames = zipObject.namelist()
   for fileName in listOfFileNames:
       if fileName == DownloadFileName:
           zipObject.extract(fileName, InFolderDirectory)
           print('All the python files are extracted')

# delete zip file
try:
    print("Deleting file: {}".format(DownloadZipFileName))
    if os.path.isfile(InFolderPathZip):
        os.remove(InFolderPathZip)
except Exception as e:
    print(e)

# Read csv data into pandas and write | delimited txt file
try:
    print("Reading csv file: {}".format(DownloadFileName))
    df = pd.read_csv(InFolderPathSemiColon,index_col=False, sep=";")
    print("Writing txt file to: {}".format(InFolderDirectory))
    df.to_csv(InFolderPathPipe, sep="|",index=False)
    print("Deleting csv file: {}".format(DownloadFileName))
    os.remove(InFolderPathSemiColon)
except Exception as e:
    print(e)

#bulk load txt file to SQL Server
try:
    print("Connecting to SQL Server database")
    connection_string = 'DSN=ETL;'
    conn = db.connect(connection_string)
    print("Preparing database for update")
    csr = conn.cursor()
    csr.execute("TRUNCATE TABLE uci.StudentPerformance")
    sql = "BULK INSERT StudentPerformance FROM '" + InFolderPathPipe + "' WITH (FIELDTERMINATOR = '|', ROWTERMINATOR = '0x0a', FIRSTROW = 2)"
    print("Update database with {} file data.".format(InFolderPathPipe))
    csr.execute(sql)
    print("Completing SQL Server update")
    conn.commit()
    csr.close()
    conn.close()
except Exception as e:
    print(e)

# zip txt file to archive
try:
    print("Creating zip file for txt file archive")
    archive = zf.ZipFile(ArchivePath, "w")
    os.chdir(InFolderDirectory)
    archive.write(DownloadFileNamePipe)
    archive.close()
    os.chdir(ScriptFolder)
    os.remove(InFolderPathPipe)
except Exception as e:
    print(e)
    
#alter the below for your file.
try:
    print("Connecting to SQL Server database")
    connection_string = 'DSN=ETL;'
    conn = db.connect(connection_string)
    csr = conn.cursor()
    sql = 'SELECT sex AS SEX, age AS AGE, SUM(CAST(absences AS INT)) AS Absences '
    sql = sql + 'FROM [ODS].[uci].[StudentPerformance] '
    sql = sql + 'GROUP BY sex, age '
    sql = sql + 'ORDER BY Absences'
    df = pd.read_sql(sql,conn)
    conn.commit()
    csr.close()
    conn.close()
    print("Writing report.")
    df.to_excel(OutFolderPathPipe,index=False)
except Exception as e:
    print(e)c

Now you try it!

Don't copy and past. Type the code yourself!

Last updated