diff --git a/Scripts/accounts.py b/Scripts/accounts.py index 53e787b..4e70fa0 100644 --- a/Scripts/accounts.py +++ b/Scripts/accounts.py @@ -7,54 +7,49 @@ import pandas as pd import boto3 import io from sqlalchemy import create_engine, text -from urllib.parse import quote_plus # ---- Setup ---- fake = Faker() load_dotenv() -# ---- S3 Setup ---- -s3 = boto3.resource( - "s3", - endpoint_url=os.getenv("STORAGE_ENDPOINT"), - aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), - aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") -) - -bucket_name = os.getenv("STORAGE_BUCKET") -customers_key_csv = "DataLab/customers/customers.csv" -accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet" - -# ---- Postgres Setup (optional) ---- +# ---- Postgres setup ---- user = os.getenv("PG_USER") password = os.getenv("PG_PASSWORD") host = os.getenv("PG_HOST") port = "5432" db = "postgres" -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True) -# ---- Ensure local data folder exists ---- -os.makedirs("../Data", exist_ok=True) +# ---- S3 setup (backup only) ---- +s3 = boto3.resource( + "s3", + endpoint_url=os.getenv("STORAGE_ENDPOINT"), + aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") +) +bucket_name = os.getenv("STORAGE_BUCKET") +accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet" -# ---- Download customers.csv from S3 ---- -local_customers_file = "../Data/customers.csv" -try: - s3.Bucket(bucket_name).download_file(customers_key_csv, local_customers_file) - print("Downloaded customers.csv from S3.") -except Exception as e: - print("ERROR: Could not download customers.csv:", e) - raise SystemExit() +# ---- Load customers from Postgres ---- +with engine.connect() as conn: + customers_df = pd.read_sql( + sql=text("SELECT customer_id, home_branch_id, customer_since FROM customers;"), + con=conn + ) -# ---- Load customers DataFrame ---- -customers_df = pd.read_csv(local_customers_file) customers_df["customer_since"] = pd.to_datetime(customers_df["customer_since"]).dt.date -# ---- Helper Functions ---- +# ---- Unique account ID generator ---- +generated_ids = set() def generate_account_id(branch_id): - branch_part = str(branch_id).zfill(3) - random_part = str(random.randint(10**8, 10**9 - 1)) - return branch_part + random_part + while True: + branch_part = str(branch_id).zfill(3) + random_part = str(random.randint(10**8, 10**9 - 1)) + acct_id = branch_part + random_part + if acct_id not in generated_ids: + generated_ids.add(acct_id) + return acct_id def generate_account_number(): return str(random.randint(10**10, 10**11 - 1)) @@ -94,40 +89,26 @@ for _, row in customers_df.iterrows(): accounts_df = pd.DataFrame(accounts) -# ---- Save locally as CSV ---- -local_accounts_file = "../Data/accounts.csv" -accounts_df.to_csv(local_accounts_file, index=False) -print("Generated accounts.csv locally.") +# ---- Save to S3 backup ---- +buffer = io.BytesIO() +accounts_df.to_parquet(buffer, index=False, engine="pyarrow") +s3.Bucket(bucket_name).put_object(Key=accounts_s3_key_parquet, Body=buffer.getvalue()) +print("Uploaded accounts.parquet to S3 (backup).") -# ---- Upload / append to S3 as Parquet ---- -try: - obj = s3.Bucket(bucket_name).Object(accounts_s3_key_parquet).get() - existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) - combined_df = pd.concat([existing_df, accounts_df], ignore_index=True) - print(f"Appended {len(accounts_df)} rows to existing S3 Parquet") -except s3.meta.client.exceptions.NoSuchKey: - combined_df = accounts_df - print("No existing Parquet on S3, creating new one") - -parquet_buffer = io.BytesIO() -combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=accounts_s3_key_parquet, Body=parquet_buffer.getvalue()) -print(f"Uploaded accounts.parquet to s3") - -# ---- Append to Postgres ---- -try: - accounts_df.to_sql( - "accounts", - engine, - if_exists="append", - index=False, - method='multi' # faster inserts -) -except Exception as e: - print("Failed to insert into Postgres:") - -# ---- Optional: row count check ---- -with engine.connect() as conn: - existing_ids = pd.read_sql("SELECT account_id FROM accounts;", conn) - accounts_df = accounts_df[~accounts_df['account_id'].isin(existing_ids['account_id'])] +# ---- Ensure accounts table exists and insert into Postgres ---- +with engine.begin() as conn: + conn.execute(text(""" + CREATE TABLE IF NOT EXISTS accounts ( + account_id VARCHAR(20) PRIMARY KEY, + account_number VARCHAR(20) UNIQUE, + customer_id BIGINT REFERENCES customers(customer_id), + account_type VARCHAR(50), + open_date DATE, + balance NUMERIC(12,2), + branch_id INT REFERENCES branches(branch_id) + ); + """)) + # Pandas to_sql now uses the connection from SQLAlchemy 2.x + accounts_df.to_sql("accounts", conn, if_exists="append", index=False, method="multi") + print(f"Inserted {len(accounts_df)} accounts into Postgres successfully!") diff --git a/Scripts/branches.py b/Scripts/branches.py new file mode 100644 index 0000000..33d77cc --- /dev/null +++ b/Scripts/branches.py @@ -0,0 +1,93 @@ +from faker import Faker +from dotenv import load_dotenv +import os +import pandas as pd +import boto3 +import io +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus + +# ---- Faker setup ---- +fake = Faker() +load_dotenv() + +# ---- S3 Setup ---- +s3 = boto3.resource( + 's3', + endpoint_url=os.getenv('STORAGE_ENDPOINT'), + aws_access_key_id=os.getenv('STORAGE_ACCESS_KEY'), + aws_secret_access_key=os.getenv('STORAGE_SECRET_KEY') +) + +bucket_name = os.getenv('STORAGE_BUCKET') +s3_key_csv = 'DataLab/branches/branches.csv' +s3_key_parquet = 'DataLab/branches/branches.parquet' + +# ---- Postgres Setup ---- +user = os.getenv("PG_USER") +password = os.getenv("PG_PASSWORD") +host = os.getenv("PG_HOST") +port = "5432" +db = "postgres" + +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") + +# ---- Ensure local data folder exists ---- +os.makedirs("../Data", exist_ok=True) + +# ---- Generate branch data ---- +branches = [] +for i in range(1, 11): # 10 branches + branches.append({ + "branch_id": str(i), # store as string for consistency + "branch_name": f"{fake.city()} Branch", + "address": fake.street_address(), + "city": fake.city(), + "state": fake.state_abbr() + }) + +df = pd.DataFrame(branches) + +# ---- Save locally as CSV ---- +local_file = "../Data/branches.csv" +df.to_csv(local_file, index=False) +print("Generated 10 branches locally.") + +# ---- Upload CSV to S3 ---- +s3.Bucket(bucket_name).upload_file(local_file, s3_key_csv) +print(f"Uploaded branches.csv to s3://{bucket_name}/{s3_key_csv}") + +# ---- Upload / append to S3 as Parquet ---- +try: + obj = s3.Bucket(bucket_name).Object(s3_key_parquet).get() + existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) + combined_df = pd.concat([existing_df, df], ignore_index=True) + print(f"Appended {len(df)} branches to existing Parquet on S3.") +except s3.meta.client.exceptions.NoSuchKey: + combined_df = df + print("No existing branches Parquet on S3, creating new one.") + +parquet_buffer = io.BytesIO() +combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow") +s3.Bucket(bucket_name).put_object(Key=s3_key_parquet, Body=parquet_buffer.getvalue()) +print(f"Uploaded branches.parquet to s3://{bucket_name}/{s3_key_parquet}") + +# ---- Create / Append to Postgres ---- +with engine.connect() as conn: + for _, row in df.iterrows(): + stmt = text(""" + INSERT INTO branches (branch_id, branch_name, address, city, state) + VALUES (:branch_id, :branch_name, :address, :city, :state) + ON CONFLICT (branch_id) DO NOTHING + """) + conn.execute(stmt, { + "branch_id": str(row["branch_id"]), + "branch_name": row["branch_name"], + "address": row["address"], + "city": row["city"], + "state": row["state"] + }) + conn.commit() + # Optional: row count check + result = conn.execute(text("SELECT COUNT(*) FROM branches;")) + print(f"Rows in branches table: {result.scalar()}") diff --git a/Scripts/customers.py b/Scripts/customers.py index bca1a04..3513dc1 100644 --- a/Scripts/customers.py +++ b/Scripts/customers.py @@ -8,87 +8,98 @@ import pandas as pd import boto3 import random from datetime import datetime -import pyarrow.parquet as pq - -user = os.getenv("PG_USER") -password = os.getenv("PG_PASSWORD") -host = os.getenv("PG_HOST") -port = "5432" -db = "postgres" - -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") - -fake = Faker() # ---- Load env ---- load_dotenv() +fake = Faker() -# ---- Hetzner S3 setup ---- +# ---- Postgres setup ---- +user = os.getenv("PG_USER") +password = quote_plus(os.getenv("PG_PASSWORD")) +host = os.getenv("PG_HOST") +port = "5432" +db = "postgres" +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") + +# ---- Hetzner S3 setup ---- (backup only) ---- s3 = boto3.resource( "s3", endpoint_url=os.getenv("STORAGE_ENDPOINT"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") ) - bucket_name = os.getenv("STORAGE_BUCKET") -customers_s3_key = "DataLab/customers/customers.csv" branches_s3_key = "DataLab/branches/branches.csv" +customers_s3_key = "DataLab/customers/customers.parquet" -# ---- Load branches from S3 ---- +# ---- Load branches from S3 (still needed for customer assignment) ---- branches_local = "../Data/branches.csv" s3.Bucket(bucket_name).download_file(branches_s3_key, branches_local) branches = pd.read_csv(branches_local) +# ---- Load existing customers from Postgres for email uniqueness ---- +with engine.connect() as conn: + table_exists = conn.execute( + text("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='customers');") + ).scalar() + + if table_exists: + existing_customers = pd.read_sql( + text("SELECT email FROM customers;"), + con=conn + ) + existing_emails = set(existing_customers["email"]) if not existing_customers.empty else set() + else: + existing_emails = set() + + # ---- Helper functions ---- def realistic_credit_score(): - """Normal distribution around 680.""" - score = int(random.gauss(680, 60)) - return max(300, min(score, 850)) + return max(300, min(int(random.gauss(680, 60)), 850)) def realistic_income(): - brackets = [ - (20000, 40000), - (40000, 70000), - (70000, 120000), - (120000, 200000) - ] + brackets = [(20000,40000),(40000,70000),(70000,120000),(120000,200000)] low, high = random.choice(brackets) return random.randint(low, high) def realistic_employment(): return random.choices( - ["Employed", "Self-Employed", "Unemployed", "Student", "Retired"], - weights=[50, 15, 10, 15, 10] + ["Employed","Self-Employed","Unemployed","Student","Retired"], + weights=[50,15,10,15,10] )[0] def realistic_contact(): - return random.choice(["Email", "Phone", "SMS"]) + return random.choice(["Email","Phone","SMS"]) + +def generate_customer_id(): + return random.getrandbits(48) # ---- Generate Customers ---- customers = [] -start_id = 100000 # Realistic banking customer IDs - -for i in range(50): +for _ in range(50): first = fake.first_name() last = fake.last_name() - + email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}" + + while email in existing_emails: + first = fake.first_name() + last = fake.last_name() + email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}" + existing_emails.add(email) + dob = fake.date_between(start_date="-80y", end_date="-18y") age = (datetime.now().date() - dob).days // 365 - income = realistic_income() credit = realistic_credit_score() customers.append({ - "customer_id": start_id + i, - "first_name": first, - "last_name": last, + "customer_id": generate_customer_id(), "full_name": f"{first} {last}", - "email": f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}", + "email": email, "phone": fake.phone_number(), "date_of_birth": dob, "age": age, - "gender": random.choice(["Male", "Female", "Other"]), + "gender": random.choice(["Male","Female","Other"]), "street_address": fake.street_address(), "city": fake.city(), "state": fake.state_abbr(), @@ -103,46 +114,12 @@ for i in range(50): df = pd.DataFrame(customers) -# ---- Save locally ---- -local_file = f"../Data/customers_{datetime.now():%Y%m%d_%H%M%S}.csv" -df.to_csv(local_file, index=False) -print("Generated customers.") +# ---- Save to S3 backup ---- +buffer = io.BytesIO() +df.to_parquet(buffer, index=False, engine="pyarrow") +s3.Bucket(bucket_name).put_object(Key=customers_s3_key, Body=buffer.getvalue()) +print("Uploaded customers.parquet to S3 (backup).") - -# ---- Upload / append to S3 as Parquet ---- -customers_s3_key = "DataLab/customers/customers.parquet" - -try: - # Check if Parquet exists - obj = s3.Bucket(bucket_name).Object(customers_s3_key).get() - existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) - df_s3 = pd.concat([existing_df, df], ignore_index=True) - print(f"Appended {len(df_s3)} rows to existing S3 Parquet") -except s3.meta.client.exceptions.NoSuchKey: - # No existing file - df_s3 = df - print("No existing Parquet on S3, creating new one") - -# Convert to Parquet buffer -parquet_buffer = io.BytesIO() -df_s3.to_parquet(parquet_buffer, index=False, engine="pyarrow") - -# Upload to S3 -s3.Bucket(bucket_name).put_object( - Key=customers_s3_key, - Body=parquet_buffer.getvalue() -) -print(f"Uploaded customers.parquet to s3") - - -# ---- Write customers to Postgres ---- -try: - df.to_sql("customers", engine, if_exists="append", index=False) - print("Inserted customers into Postgres successfully!") -except Exception as e: - print("Failed to insert into Postgres:", e) - - -with engine.connect() as conn: - result = conn.execute(text("SELECT COUNT(*) FROM customers;")) - print(f"Rows in customers table: {result.scalar()}") +# ---- Insert into Postgres ---- +df.to_sql("customers", engine, if_exists="append", index=False, method="multi") +print("Inserted customers into Postgres successfully!") diff --git a/Scripts/transactions.py b/Scripts/transactions.py index 87997bc..8dcd9b5 100644 --- a/Scripts/transactions.py +++ b/Scripts/transactions.py @@ -7,50 +7,42 @@ import pandas as pd import boto3 import io from sqlalchemy import create_engine, text -from urllib.parse import quote_plus # ---- Setup ---- fake = Faker() load_dotenv() -# ---- S3 Setup ---- +# ---- Postgres setup ---- +user = os.getenv("PG_USER") +password = os.getenv("PG_PASSWORD") +host = os.getenv("PG_HOST") +port = "5432" +db = "postgres" +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True) + +# ---- S3 setup (backup only) ---- s3 = boto3.resource( "s3", endpoint_url=os.getenv("STORAGE_ENDPOINT"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") ) - bucket_name = os.getenv("STORAGE_BUCKET") -accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet" transactions_s3_key_parquet = "DataLab/transactions/transactions.parquet" -# ---- Postgres Setup ---- -PG_USER = os.getenv("PG_USER") -PG_PASSWORD = quote_plus(os.getenv("PG_PASSWORD")) -PG_HOST = os.getenv("PG_HOST") -PG_PORT = "5432" -PG_DB = "postgres" +# ---- Load accounts from Postgres ---- +with engine.connect() as conn: + accounts_df = pd.read_sql( + sql=text("SELECT account_id, customer_id, branch_id, account_type, open_date, balance FROM accounts;"), + con=conn + ) -engine = create_engine(f"postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}") - -# ---- Ensure local data folder exists ---- -os.makedirs("../Data", exist_ok=True) - -# ---- Download accounts parquet from S3 ---- -accounts_df = pd.DataFrame() -try: - obj = s3.Bucket(bucket_name).Object(accounts_s3_key_parquet).get() - accounts_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) - print("Downloaded accounts.parquet from S3.") -except s3.meta.client.exceptions.NoSuchKey: - print("ERROR: Accounts Parquet not found on S3.") - raise SystemExit() +accounts_df["open_date"] = pd.to_datetime(accounts_df["open_date"]).dt.date # ---- Sample vendors ---- vendors = ["Amazon", "Walmart", "Target", "Starbucks", "Apple", "Netflix", "Uber", "Lyft", "BestBuy", "Costco"] -# ---- Helper Functions ---- +# ---- Helper functions ---- def generate_transaction_id(account_id, idx): return f"{account_id}{str(idx).zfill(5)}" @@ -63,28 +55,29 @@ def generate_transaction(account): txn = { "transaction_id": None, # to be filled later "account_id": str(account['account_id']), - "branch_id": None, + "branch_id": account['branch_id'] if t_type in ["Deposit", "Withdrawal"] else None, "transaction_type": t_type, "amount": 0, "date": fake.date_between(start_date=pd.to_datetime(account['open_date']), end_date=datetime.today()), "balance_after": 0, - "vendor": None, + "vendor": random.choice(vendors) if t_type in ["Payment", "Transfer"] else None, "transaction_location": None } - if t_type in ["Deposit", "Withdrawal"]: - txn["branch_id"] = account.get('branch_id', None) - amount = round(random.uniform(50, 7000), 2) if t_type == "Withdrawal" else round(random.uniform(20, 10000), 2) - if t_type == "Withdrawal": - amount = min(amount, account['balance']) - account['balance'] -= amount - else: - account['balance'] += amount + if t_type == "Deposit": + amount = round(random.uniform(20, 10000), 2) + account['balance'] += amount + txn["amount"] = amount + txn["balance_after"] = round(account['balance'], 2) + txn["transaction_location"] = f"Branch {txn['branch_id']}" + elif t_type == "Withdrawal": + amount = round(random.uniform(50, 7000), 2) + amount = min(amount, account['balance']) + account['balance'] -= amount txn["amount"] = amount txn["balance_after"] = round(account['balance'], 2) txn["transaction_location"] = f"Branch {txn['branch_id']}" else: # Payment / Transfer - txn["vendor"] = random.choice(vendors) amount = round(random.uniform(5, 1000), 2) account['balance'] = max(account['balance'] - amount, 0) txn["amount"] = amount @@ -96,7 +89,6 @@ def generate_transaction(account): # ---- Generate transactions ---- transactions = [] idx = 1 - for _, account in accounts_df.iterrows(): account_transactions_count = random.randint(5, 20) for _ in range(account_transactions_count): @@ -107,28 +99,30 @@ for _, account in accounts_df.iterrows(): transactions_df = pd.DataFrame(transactions) -# ---- Upload / append to S3 as Parquet ---- -try: - obj = s3.Bucket(bucket_name).Object(transactions_s3_key_parquet).get() - existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) - combined_df = pd.concat([existing_df, transactions_df], ignore_index=True) - print(f"Appended {len(transactions_df)} transactions to existing Parquet on S3.") -except s3.meta.client.exceptions.NoSuchKey: - combined_df = transactions_df - print("No existing transactions Parquet on S3, creating new one.") +# ---- Save to S3 backup ---- +buffer = io.BytesIO() +transactions_df.to_parquet(buffer, index=False, engine="pyarrow") +s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=buffer.getvalue()) +print("Uploaded transactions.parquet to S3 (backup).") -# Convert to Parquet and upload -parquet_buffer = io.BytesIO() -combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=parquet_buffer.getvalue()) -print(f"Uploaded combined transactions.parquet to s3://{bucket_name}/{transactions_s3_key_parquet}") +# ---- Insert into Postgres ---- +with engine.begin() as conn: + conn.execute(text(""" + CREATE TABLE IF NOT EXISTS transactions ( + transaction_id VARCHAR(30) PRIMARY KEY, + account_id VARCHAR(20) REFERENCES accounts(account_id), + branch_id INT, + transaction_type VARCHAR(20), + amount NUMERIC(12,2), + date DATE, + balance_after NUMERIC(12,2), + vendor VARCHAR(50), + transaction_location VARCHAR(50) + ); + """)) -# ---- Append to Postgres ---- -try: - combined_df.to_sql("transactions", engine, if_exists="append", index=False) - print("Inserted transactions into Postgres successfully!") -except Exception as e: - print("Failed to insert into Postgres:", e) + transactions_df.to_sql("transactions", conn, if_exists="append", index=False, method="multi") + print(f"Inserted {len(transactions_df)} transactions into Postgres successfully!") # ---- Optional: row count check ---- with engine.connect() as conn: