Streamlining Data Processing with AWS Redshift, Python, and Power BI¶

Abhinav_AWS_de_project_1.png

Import necessary libraries¶
In [181]:
import boto3
import pandas as pd
import psycopg2
import json
import configparser

Read AWS and Redshift cluster configuration from file¶

In [182]:
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))
Extract AWS credentials and Redshift cluster details¶
In [183]:
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
DWH_CLUSTER_TYPE = config.get('DWH','DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH','DWH_NUM_NODES') 
DWH_NODE_TYPE = config.get('DWH','DWH_NODE_TYPE')  
DWH_CLUSTER_IDENTIFIER = config.get('DWH','DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH','DWH_DB')
DWH_DB_USER = config.get('DWH','DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH','DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH','DWH_PORT')
DWH_IAM_ROLE_NAME = config.get('DWH','DWH_IAM_ROLE_NAME')
Create a DataFrame to store parameter values¶
In [184]:
df = pd.DataFrame({"Param":["key" ,"SECRET" ,"DWH_CLUSTER_TYPE" ,"DWH_NUM_NODES" ,"DWH_NODE_TYPE" ,"DWH_CLUSTER_IDENTIFIER" ,"DWH_DB" ,"DWH_DB_USER" ,"DWH_DB_PASSWORD" ,"DWH_PORT" ,"DWH_IAM_ROLE_NAME"]
              , "Value" : [KEY ,SECRET ,DWH_CLUSTER_TYPE ,DWH_NUM_NODES ,DWH_NODE_TYPE ,DWH_CLUSTER_IDENTIFIER ,DWH_DB,DWH_DB_USER,DWH_DB_PASSWORD,DWH_PORT ,DWH_IAM_ROLE_NAME]
    
})
Create AWS clients using Boto3¶
In [185]:
ec2 = boto3.resource('ec2', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
s3 = boto3.resource('s3', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
iam_client = boto3.resource('iam', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
redshift_client = boto3.client('redshift', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)

Access an S3 bucket and retrieve file names¶

In [186]:
bucket = s3.Bucket("abhinav-de-s3")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files
Out[186]:
['Customer.csv',
 'Sales_date.csv',
 'markets.csv',
 'products.csv',
 'transactions.csv']
Create Redshift cluster¶
In [187]:
roleArn = 'arn:aws:iam::533267202542:role/redshift-s3-role'
In [188]:
try:
    response = redshift_client.create_cluster(
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        
        # Identifiers & credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        # Role (for S3 access)
        IamRoles=[roleArn]  # Corrected parameter name to IamRoles
    )
    print("Cluster creation request submitted successfully.")
except Exception as e:
    print(f"Error creating cluster: {e}")
Cluster creation request submitted successfully.
Function to display selected Redshift properties¶
In [191]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth',-1)
    keysToShow = ["ClusterIdentifier","NodeType","ClusterStatus","MasterUsername","DBName","Endpoint","VpcId" ]
    x =[(k,v) for k , v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["key","Value"])


myClusterprops = redshift_client.describe_clusters( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterprops)
C:\Users\user\AppData\Local\Temp\ipykernel_11892\2240008960.py:2: FutureWarning: Passing a negative integer is deprecated in version 1.0 and will not be supported in future version. Instead, use None to not limit the column width.
  pd.set_option('display.max_colwidth',-1)
Out[191]:
key Value
0 ClusterIdentifier my-first-redshift
1 NodeType dc2.large
2 ClusterStatus available
3 MasterUsername awsuser
4 DBName myfirstdb
5 Endpoint {'Address': 'my-first-redshift.c3igl3ulf3d3.ap-south-1.redshift.amazonaws.com', 'Port': 5439}
6 VpcId vpc-0b364b0263dd27367
Extract relevant cluster information¶
In [192]:
DWH_Endpoint = myClusterprops['Endpoint']['Address']
DWH_Role_ARN = myClusterprops['IamRoles'][0]['IamRoleArn']
DWH_DBName = myClusterprops['DBName']
DWH_MasterUsername = myClusterprops['MasterUsername']
Configure security group for Redshift cluster¶
In [194]:
try:
    vpc = ec2.Vpc(id=myClusterprops['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
   
    print(f"Error creating cluster: {e}")
ec2.SecurityGroup(id='sg-0f55abca3022809b9')
Connect to Redshift cluster¶
In [195]:
try:
    conn = psycopg2.connect(host=DWH_Endpoint, dbname=DWH_DBName, user=DWH_DB_USER, password=DWH_DB_PASSWORD, port=5439)
except Exception as e:
    print(f"Error creating cluster: {e}")
Set autocommit for the connection¶
In [196]:
conn.set_session(autocommit=True)
Create cursor for executing SQL commands¶
In [197]:
try:
    cur = conn.cursor()
except Exception as e:
    print(f"Error: {e}")
Function to generate DDL statement for a specific file in S3¶
In [198]:
# Function to generate CREATE TABLE statements for CSV files
def generate_create_table_statements(bucket_name, file_names):
    create_table_statements = []

    for file_name in file_names:
        try:
            s3_object = s3.Object(bucket_name, file_name)
            df = pd.read_csv(s3_object.get()['Body'], sep=',')  # Specify the correct delimiter as per your file/ csv
            
            # Generate CREATE TABLE statement based on DataFrame columns
            table_name = file_name.split('/')[-1].split('.')[0]  # Extract table name from file name
            columns_definition = ", ".join([f'"{col}" varchar' for col in df.columns])
            create_table_statement = f"CREATE TABLE {table_name} ({columns_definition});"
            
            # Append CREATE TABLE statement to the list
            create_table_statements.append(create_table_statement)
            
        except Exception as e:
            print(f"Error reading file {file_name}: {e}")
    
    return create_table_statements

# Call the function to get CREATE TABLE statements for each CSV file in S3
create_table_statements = generate_create_table_statements("abhinav-de-s3", log_data_files)

# Display CREATE TABLE statements
for idx, create_table_statement in enumerate(create_table_statements, start=1):
    print(f"CREATE TABLE Statement {idx}:\n{create_table_statement}\n{'='*50}\n")
CREATE TABLE Statement 1:
CREATE TABLE Customer ("customer_code" varchar, "custmer_name" varchar, "customer_type" varchar);
==================================================

CREATE TABLE Statement 2:
CREATE TABLE Sales_date ("date" varchar, "cy_date" varchar, "year" varchar, "month_name" varchar, "date_yy_mmm" varchar);
==================================================

CREATE TABLE Statement 3:
CREATE TABLE markets ("markets_code" varchar, "markets_name" varchar, "zone" varchar);
==================================================

CREATE TABLE Statement 4:
CREATE TABLE products ("product_code" varchar, "product_type" varchar);
==================================================

CREATE TABLE Statement 5:
CREATE TABLE transactions ("product_code" varchar, "customer_code" varchar, "market_code" varchar, "order_date" varchar, "sales_qty" varchar, "sales_amount" varchar, "currency" varchar, "profit_margin_percentage" varchar, "profit_margin" varchar, "cost_price" varchar);
==================================================

In [199]:
log_data_files
Out[199]:
['Customer.csv',
 'Sales_date.csv',
 'markets.csv',
 'products.csv',
 'transactions.csv']
Execute SQL commands to create a table, copy data, and query the data¶
Customer¶
In [200]:
try:
    cur.execute("""
        CREATE TABLE Customer ("customer_code" varchar, "custmer_name" varchar, "customer_type" varchar);
    """)
except Exception as e:
    print(f"Error: {e}")
In [201]:
try:
    cur.execute("""
        copy Customer from 's3://abhinav-de-s3/Customer.csv' 
        credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
        delimiter ','
        region 'ap-south-1'
    """)
except Exception as e:
    print(f"Error: {e}")
Sales_date.csv¶
In [202]:
try:
    cur.execute("""
        CREATE TABLE Sales_date ("cy_date" varchar, "date" varchar, "date_yy_mmm" varchar, "month_name" varchar, "year" varchar);
    """)
except Exception as e:
    print(f"Error: {e}")
In [203]:
try:
    cur.execute("""
        copy Sales_date from 's3://abhinav-de-s3/Sales_date.csv' 
        credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
        delimiter ','
        region 'ap-south-1'
    """)
except Exception as e:
    print(f"Error: {e}")
markets.csv¶
In [204]:
try:
    cur.execute("""
   CREATE TABLE markets ("markets_code" varchar, "markets_name" varchar, "zone" varchar);
    """)
except Exception as e:
    print(f"Error: {e}")
In [205]:
try:
    cur.execute("""
        copy markets from 's3://abhinav-de-s3/markets.csv' 
        credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
        delimiter ','
        region 'ap-south-1'
    """)
except Exception as e:
    print(f"Error: {e}")
products.csv¶
In [206]:
try:
    cur.execute("""
   CREATE TABLE products ("product_code" varchar, "product_type" varchar);
   """)
except Exception as e:
    print(f"Error: {e}")
In [207]:
try:
    cur.execute("""
        copy products from 's3://abhinav-de-s3/products.csv' 
        credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
        delimiter ','
        region 'ap-south-1'
    """)
except Exception as e:
    print(f"Error: {e}")
transactions.csv¶
In [208]:
try:
    cur.execute("""
  CREATE TABLE transactions ("product_code" varchar, "customer_code" varchar, "market_code" varchar, "order_date" varchar, "sales_qty" varchar, "sales_amount" varchar, "currency" varchar, "profit_margin_percentage" varchar, "profit_margin" varchar, "cost_price" varchar);
  """)
except Exception as e:
    print(f"Error: {e}")
In [209]:
try:
    cur.execute("""
        copy transactions from 's3://abhinav-de-s3/transactions.csv' 
        credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
        delimiter ','
        region 'ap-south-1'
    """)
except Exception as e:
    print(f"Error: {e}")
select the data¶
In [210]:
try:
    cur.execute("""
        select * from transactions;
    """)
except Exception as e:
    print(f"Error: {e}")
In [212]:
# Fetch and print the first few rows from the result set
row = cur.fetchone()
i=0
while row:
    print(row)
    i=i+1
    if i>2:
        break 
    
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
Close the connection to the Redshift cluster¶
In [213]:
try:
    conn.close()
except Exception as e:
    print(f"Error: {e}")
Delete the Redshift cluster if Redshift cluster is not required any more¶
In [180]:
try:
    redshift_client.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)
except Exception as e:
    print(f"Error: {e}")
In [214]:
DWH_DBName
Out[214]:
'myfirstdb'
In [ ]: