import boto3
import pandas as pd
import psycopg2
import json
import configparser
config = configparser.ConfigParser()
config.read_file(open('cluster.config'))
KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')
DWH_CLUSTER_TYPE = config.get('DWH','DWH_CLUSTER_TYPE')
DWH_NUM_NODES = config.get('DWH','DWH_NUM_NODES')
DWH_NODE_TYPE = config.get('DWH','DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER = config.get('DWH','DWH_CLUSTER_IDENTIFIER')
DWH_DB = config.get('DWH','DWH_DB')
DWH_DB_USER = config.get('DWH','DWH_DB_USER')
DWH_DB_PASSWORD = config.get('DWH','DWH_DB_PASSWORD')
DWH_PORT = config.get('DWH','DWH_PORT')
DWH_IAM_ROLE_NAME = config.get('DWH','DWH_IAM_ROLE_NAME')
df = pd.DataFrame({"Param":["key" ,"SECRET" ,"DWH_CLUSTER_TYPE" ,"DWH_NUM_NODES" ,"DWH_NODE_TYPE" ,"DWH_CLUSTER_IDENTIFIER" ,"DWH_DB" ,"DWH_DB_USER" ,"DWH_DB_PASSWORD" ,"DWH_PORT" ,"DWH_IAM_ROLE_NAME"]
, "Value" : [KEY ,SECRET ,DWH_CLUSTER_TYPE ,DWH_NUM_NODES ,DWH_NODE_TYPE ,DWH_CLUSTER_IDENTIFIER ,DWH_DB,DWH_DB_USER,DWH_DB_PASSWORD,DWH_PORT ,DWH_IAM_ROLE_NAME]
})
ec2 = boto3.resource('ec2', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
s3 = boto3.resource('s3', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
iam_client = boto3.resource('iam', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
redshift_client = boto3.client('redshift', region_name="ap-south-1", aws_access_key_id=KEY, aws_secret_access_key=SECRET)
bucket = s3.Bucket("abhinav-de-s3")
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')]
log_data_files
['Customer.csv', 'Sales_date.csv', 'markets.csv', 'products.csv', 'transactions.csv']
roleArn = 'arn:aws:iam::533267202542:role/redshift-s3-role'
try:
response = redshift_client.create_cluster(
ClusterType=DWH_CLUSTER_TYPE,
NodeType=DWH_NODE_TYPE,
# Identifiers & credentials
DBName=DWH_DB,
ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
MasterUsername=DWH_DB_USER,
MasterUserPassword=DWH_DB_PASSWORD,
# Role (for S3 access)
IamRoles=[roleArn] # Corrected parameter name to IamRoles
)
print("Cluster creation request submitted successfully.")
except Exception as e:
print(f"Error creating cluster: {e}")
Cluster creation request submitted successfully.
def prettyRedshiftProps(props):
pd.set_option('display.max_colwidth',-1)
keysToShow = ["ClusterIdentifier","NodeType","ClusterStatus","MasterUsername","DBName","Endpoint","VpcId" ]
x =[(k,v) for k , v in props.items() if k in keysToShow]
return pd.DataFrame(data=x, columns=["key","Value"])
myClusterprops = redshift_client.describe_clusters( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterprops)
C:\Users\user\AppData\Local\Temp\ipykernel_11892\2240008960.py:2: FutureWarning: Passing a negative integer is deprecated in version 1.0 and will not be supported in future version. Instead, use None to not limit the column width.
pd.set_option('display.max_colwidth',-1)
| key | Value | |
|---|---|---|
| 0 | ClusterIdentifier | my-first-redshift |
| 1 | NodeType | dc2.large |
| 2 | ClusterStatus | available |
| 3 | MasterUsername | awsuser |
| 4 | DBName | myfirstdb |
| 5 | Endpoint | {'Address': 'my-first-redshift.c3igl3ulf3d3.ap-south-1.redshift.amazonaws.com', 'Port': 5439} |
| 6 | VpcId | vpc-0b364b0263dd27367 |
DWH_Endpoint = myClusterprops['Endpoint']['Address']
DWH_Role_ARN = myClusterprops['IamRoles'][0]['IamRoleArn']
DWH_DBName = myClusterprops['DBName']
DWH_MasterUsername = myClusterprops['MasterUsername']
try:
vpc = ec2.Vpc(id=myClusterprops['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
print(defaultSg)
defaultSg.authorize_ingress(
GroupName=defaultSg.group_name,
CidrIp='0.0.0.0/0',
IpProtocol='TCP',
FromPort=int(DWH_PORT),
ToPort=int(DWH_PORT)
)
except Exception as e:
print(f"Error creating cluster: {e}")
ec2.SecurityGroup(id='sg-0f55abca3022809b9')
try:
conn = psycopg2.connect(host=DWH_Endpoint, dbname=DWH_DBName, user=DWH_DB_USER, password=DWH_DB_PASSWORD, port=5439)
except Exception as e:
print(f"Error creating cluster: {e}")
conn.set_session(autocommit=True)
try:
cur = conn.cursor()
except Exception as e:
print(f"Error: {e}")
# Function to generate CREATE TABLE statements for CSV files
def generate_create_table_statements(bucket_name, file_names):
create_table_statements = []
for file_name in file_names:
try:
s3_object = s3.Object(bucket_name, file_name)
df = pd.read_csv(s3_object.get()['Body'], sep=',') # Specify the correct delimiter as per your file/ csv
# Generate CREATE TABLE statement based on DataFrame columns
table_name = file_name.split('/')[-1].split('.')[0] # Extract table name from file name
columns_definition = ", ".join([f'"{col}" varchar' for col in df.columns])
create_table_statement = f"CREATE TABLE {table_name} ({columns_definition});"
# Append CREATE TABLE statement to the list
create_table_statements.append(create_table_statement)
except Exception as e:
print(f"Error reading file {file_name}: {e}")
return create_table_statements
# Call the function to get CREATE TABLE statements for each CSV file in S3
create_table_statements = generate_create_table_statements("abhinav-de-s3", log_data_files)
# Display CREATE TABLE statements
for idx, create_table_statement in enumerate(create_table_statements, start=1):
print(f"CREATE TABLE Statement {idx}:\n{create_table_statement}\n{'='*50}\n")
CREATE TABLE Statement 1:
CREATE TABLE Customer ("customer_code" varchar, "custmer_name" varchar, "customer_type" varchar);
==================================================
CREATE TABLE Statement 2:
CREATE TABLE Sales_date ("date" varchar, "cy_date" varchar, "year" varchar, "month_name" varchar, "date_yy_mmm" varchar);
==================================================
CREATE TABLE Statement 3:
CREATE TABLE markets ("markets_code" varchar, "markets_name" varchar, "zone" varchar);
==================================================
CREATE TABLE Statement 4:
CREATE TABLE products ("product_code" varchar, "product_type" varchar);
==================================================
CREATE TABLE Statement 5:
CREATE TABLE transactions ("product_code" varchar, "customer_code" varchar, "market_code" varchar, "order_date" varchar, "sales_qty" varchar, "sales_amount" varchar, "currency" varchar, "profit_margin_percentage" varchar, "profit_margin" varchar, "cost_price" varchar);
==================================================
log_data_files
['Customer.csv', 'Sales_date.csv', 'markets.csv', 'products.csv', 'transactions.csv']
try:
cur.execute("""
CREATE TABLE Customer ("customer_code" varchar, "custmer_name" varchar, "customer_type" varchar);
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
copy Customer from 's3://abhinav-de-s3/Customer.csv'
credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
delimiter ','
region 'ap-south-1'
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
CREATE TABLE Sales_date ("cy_date" varchar, "date" varchar, "date_yy_mmm" varchar, "month_name" varchar, "year" varchar);
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
copy Sales_date from 's3://abhinav-de-s3/Sales_date.csv'
credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
delimiter ','
region 'ap-south-1'
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
CREATE TABLE markets ("markets_code" varchar, "markets_name" varchar, "zone" varchar);
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
copy markets from 's3://abhinav-de-s3/markets.csv'
credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
delimiter ','
region 'ap-south-1'
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
CREATE TABLE products ("product_code" varchar, "product_type" varchar);
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
copy products from 's3://abhinav-de-s3/products.csv'
credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
delimiter ','
region 'ap-south-1'
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
CREATE TABLE transactions ("product_code" varchar, "customer_code" varchar, "market_code" varchar, "order_date" varchar, "sales_qty" varchar, "sales_amount" varchar, "currency" varchar, "profit_margin_percentage" varchar, "profit_margin" varchar, "cost_price" varchar);
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
copy transactions from 's3://abhinav-de-s3/transactions.csv'
credentials 'aws_iam_role=arn:aws:iam::533267202542:role/redshift-s3-role'
delimiter ','
region 'ap-south-1'
""")
except Exception as e:
print(f"Error: {e}")
try:
cur.execute("""
select * from transactions;
""")
except Exception as e:
print(f"Error: {e}")
# Fetch and print the first few rows from the result set
row = cur.fetchone()
i=0
while row:
print(row)
i=i+1
if i>2:
break
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
('Prod279', 'Cus020', 'Mark011', '2017-10-18', '1', '102.0', 'INR', '-0.12', '-12.24', '114.24')
try:
conn.close()
except Exception as e:
print(f"Error: {e}")
try:
redshift_client.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)
except Exception as e:
print(f"Error: {e}")
DWH_DBName
'myfirstdb'