diff --git a/Scripts/Generate_customers.py b/Scripts/Generate_customers.py index a1fa2d6..74ef84e 100644 --- a/Scripts/Generate_customers.py +++ b/Scripts/Generate_customers.py @@ -86,15 +86,7 @@ for i in range(50): "employment_status": realistic_employment(), "annual_income": income, "credit_score": credit, - "preferred_contact_method": realistic_contact(), - "is_high_value_customer": income > 120000 or credit > 750, - "age_group": ( - "18-25" if age < 26 else - "26-35" if age < 36 else - "36-50" if age < 51 else - "51-65" if age < 66 else - "66+" - ) + "preferred_contact_method": realistic_contact() }) df = pd.DataFrame(customers) diff --git a/Scripts/accounts.py b/Scripts/accounts.py new file mode 100644 index 0000000..53e787b --- /dev/null +++ b/Scripts/accounts.py @@ -0,0 +1,133 @@ +from faker import Faker +from dotenv import load_dotenv +from datetime import datetime +import os +import random +import pandas as pd +import boto3 +import io +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus + +# ---- Setup ---- +fake = Faker() +load_dotenv() + +# ---- S3 Setup ---- +s3 = boto3.resource( + "s3", + endpoint_url=os.getenv("STORAGE_ENDPOINT"), + aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") +) + +bucket_name = os.getenv("STORAGE_BUCKET") +customers_key_csv = "DataLab/customers/customers.csv" +accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet" + +# ---- Postgres Setup (optional) ---- +user = os.getenv("PG_USER") +password = os.getenv("PG_PASSWORD") +host = os.getenv("PG_HOST") +port = "5432" +db = "postgres" + +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") + +# ---- Ensure local data folder exists ---- +os.makedirs("../Data", exist_ok=True) + +# ---- Download customers.csv from S3 ---- +local_customers_file = "../Data/customers.csv" +try: + s3.Bucket(bucket_name).download_file(customers_key_csv, local_customers_file) + print("Downloaded customers.csv from S3.") +except Exception as e: + print("ERROR: Could not download customers.csv:", e) + raise SystemExit() + +# ---- Load customers DataFrame ---- +customers_df = pd.read_csv(local_customers_file) +customers_df["customer_since"] = pd.to_datetime(customers_df["customer_since"]).dt.date + +# ---- Helper Functions ---- +def generate_account_id(branch_id): + branch_part = str(branch_id).zfill(3) + random_part = str(random.randint(10**8, 10**9 - 1)) + return branch_part + random_part + +def generate_account_number(): + return str(random.randint(10**10, 10**11 - 1)) + +def assign_account_types(): + roll = random.random() + if roll < 0.50: + return ["Checking"] + elif roll < 0.70: + return ["Savings"] + else: + return ["Checking", "Savings"] + +def balance_for_type(account_type): + if account_type == "Checking": + return round(random.uniform(50, 7000), 2) + return round(random.uniform(200, 25000), 2) + +# ---- Generate accounts ---- +accounts = [] +for _, row in customers_df.iterrows(): + customer_id = row["customer_id"] + customer_since = row["customer_since"] + home_branch_id = row["home_branch_id"] + account_types = assign_account_types() + + for acct_type in account_types: + accounts.append({ + "account_id": generate_account_id(home_branch_id), + "account_number": generate_account_number(), + "customer_id": customer_id, + "account_type": acct_type, + "open_date": fake.date_between(start_date=customer_since, end_date=datetime.today().date()), + "balance": balance_for_type(acct_type), + "branch_id": home_branch_id + }) + +accounts_df = pd.DataFrame(accounts) + +# ---- Save locally as CSV ---- +local_accounts_file = "../Data/accounts.csv" +accounts_df.to_csv(local_accounts_file, index=False) +print("Generated accounts.csv locally.") + +# ---- Upload / append to S3 as Parquet ---- +try: + obj = s3.Bucket(bucket_name).Object(accounts_s3_key_parquet).get() + existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) + combined_df = pd.concat([existing_df, accounts_df], ignore_index=True) + print(f"Appended {len(accounts_df)} rows to existing S3 Parquet") +except s3.meta.client.exceptions.NoSuchKey: + combined_df = accounts_df + print("No existing Parquet on S3, creating new one") + +parquet_buffer = io.BytesIO() +combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow") +s3.Bucket(bucket_name).put_object(Key=accounts_s3_key_parquet, Body=parquet_buffer.getvalue()) +print(f"Uploaded accounts.parquet to s3") + +# ---- Append to Postgres ---- +try: + accounts_df.to_sql( + "accounts", + engine, + if_exists="append", + index=False, + method='multi' # faster inserts +) +except Exception as e: + print("Failed to insert into Postgres:") + +# ---- Optional: row count check ---- +with engine.connect() as conn: + existing_ids = pd.read_sql("SELECT account_id FROM accounts;", conn) + accounts_df = accounts_df[~accounts_df['account_id'].isin(existing_ids['account_id'])] + diff --git a/Scripts/customers.py b/Scripts/customers.py new file mode 100644 index 0000000..bca1a04 --- /dev/null +++ b/Scripts/customers.py @@ -0,0 +1,148 @@ +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +from faker import Faker +from dotenv import load_dotenv +import os +import io +import pandas as pd +import boto3 +import random +from datetime import datetime +import pyarrow.parquet as pq + +user = os.getenv("PG_USER") +password = os.getenv("PG_PASSWORD") +host = os.getenv("PG_HOST") +port = "5432" +db = "postgres" + +engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") + +fake = Faker() + +# ---- Load env ---- +load_dotenv() + +# ---- Hetzner S3 setup ---- +s3 = boto3.resource( + "s3", + endpoint_url=os.getenv("STORAGE_ENDPOINT"), + aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") +) + +bucket_name = os.getenv("STORAGE_BUCKET") +customers_s3_key = "DataLab/customers/customers.csv" +branches_s3_key = "DataLab/branches/branches.csv" + +# ---- Load branches from S3 ---- +branches_local = "../Data/branches.csv" +s3.Bucket(bucket_name).download_file(branches_s3_key, branches_local) +branches = pd.read_csv(branches_local) + +# ---- Helper functions ---- +def realistic_credit_score(): + """Normal distribution around 680.""" + score = int(random.gauss(680, 60)) + return max(300, min(score, 850)) + +def realistic_income(): + brackets = [ + (20000, 40000), + (40000, 70000), + (70000, 120000), + (120000, 200000) + ] + low, high = random.choice(brackets) + return random.randint(low, high) + +def realistic_employment(): + return random.choices( + ["Employed", "Self-Employed", "Unemployed", "Student", "Retired"], + weights=[50, 15, 10, 15, 10] + )[0] + +def realistic_contact(): + return random.choice(["Email", "Phone", "SMS"]) + +# ---- Generate Customers ---- +customers = [] +start_id = 100000 # Realistic banking customer IDs + +for i in range(50): + first = fake.first_name() + last = fake.last_name() + + dob = fake.date_between(start_date="-80y", end_date="-18y") + age = (datetime.now().date() - dob).days // 365 + + income = realistic_income() + credit = realistic_credit_score() + + customers.append({ + "customer_id": start_id + i, + "first_name": first, + "last_name": last, + "full_name": f"{first} {last}", + "email": f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}", + "phone": fake.phone_number(), + "date_of_birth": dob, + "age": age, + "gender": random.choice(["Male", "Female", "Other"]), + "street_address": fake.street_address(), + "city": fake.city(), + "state": fake.state_abbr(), + "zip_code": fake.zipcode(), + "home_branch_id": random.choice(branches["branch_id"]), + "customer_since": fake.date_between(start_date="-10y", end_date="today"), + "employment_status": realistic_employment(), + "annual_income": income, + "credit_score": credit, + "preferred_contact_method": realistic_contact() + }) + +df = pd.DataFrame(customers) + +# ---- Save locally ---- +local_file = f"../Data/customers_{datetime.now():%Y%m%d_%H%M%S}.csv" +df.to_csv(local_file, index=False) +print("Generated customers.") + + +# ---- Upload / append to S3 as Parquet ---- +customers_s3_key = "DataLab/customers/customers.parquet" + +try: + # Check if Parquet exists + obj = s3.Bucket(bucket_name).Object(customers_s3_key).get() + existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) + df_s3 = pd.concat([existing_df, df], ignore_index=True) + print(f"Appended {len(df_s3)} rows to existing S3 Parquet") +except s3.meta.client.exceptions.NoSuchKey: + # No existing file + df_s3 = df + print("No existing Parquet on S3, creating new one") + +# Convert to Parquet buffer +parquet_buffer = io.BytesIO() +df_s3.to_parquet(parquet_buffer, index=False, engine="pyarrow") + +# Upload to S3 +s3.Bucket(bucket_name).put_object( + Key=customers_s3_key, + Body=parquet_buffer.getvalue() +) +print(f"Uploaded customers.parquet to s3") + + +# ---- Write customers to Postgres ---- +try: + df.to_sql("customers", engine, if_exists="append", index=False) + print("Inserted customers into Postgres successfully!") +except Exception as e: + print("Failed to insert into Postgres:", e) + + +with engine.connect() as conn: + result = conn.execute(text("SELECT COUNT(*) FROM customers;")) + print(f"Rows in customers table: {result.scalar()}")