From 844bb1daa16ecfdcc46013b0ef20593caf453d6a Mon Sep 17 00:00:00 2001 From: Cameron Seamons Date: Mon, 15 Dec 2025 01:07:33 -0700 Subject: [PATCH] updated transaction logic --- Scripts/transactions.py | 275 +++++++++++++++++++++++++--------------- 1 file changed, 171 insertions(+), 104 deletions(-) diff --git a/Scripts/transactions.py b/Scripts/transactions.py index 8dcd9b5..dd63c16 100644 --- a/Scripts/transactions.py +++ b/Scripts/transactions.py @@ -1,130 +1,197 @@ from faker import Faker from dotenv import load_dotenv -from datetime import datetime +from datetime import datetime, timezone, timedelta import os -import random -import pandas as pd +import json import boto3 -import io -from sqlalchemy import create_engine, text +import random +import uuid # ---- Setup ---- fake = Faker() load_dotenv() -# ---- Postgres setup ---- -user = os.getenv("PG_USER") -password = os.getenv("PG_PASSWORD") -host = os.getenv("PG_HOST") -port = "5432" -db = "postgres" -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True) - -# ---- S3 setup (backup only) ---- -s3 = boto3.resource( +s3 = boto3.client( "s3", endpoint_url=os.getenv("STORAGE_ENDPOINT"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), - aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"), ) -bucket_name = os.getenv("STORAGE_BUCKET") -transactions_s3_key_parquet = "DataLab/transactions/transactions.parquet" -# ---- Load accounts from Postgres ---- -with engine.connect() as conn: - accounts_df = pd.read_sql( - sql=text("SELECT account_id, customer_id, branch_id, account_type, open_date, balance FROM accounts;"), - con=conn +bucket_name = os.getenv("STORAGE_BUCKET") + +# Bronze locations +transactions_prefix = "bronze/transactions_raw/" +accounts_prefix = "bronze/accounts_raw/" + +# ---- Load account IDs from bronze ---- +account_ids = [] + +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + account_ids.append(record["account"]["account_id"]) + +if not account_ids: + raise ValueError("No account IDs found in bronze accounts data") + + +MERCHANTS_BY_CATEGORY = { + "Grocery": [ + "Walmart", "Target", "Costco", "Kroger", "Safeway", + "Publix", "Whole Foods", "Trader Joe's", "Aldi" + ], + "Coffee": ["Starbucks", "Dunkin'"], + "Fast Food": [ + "McDonald's", "Burger King", "Wendy's", + "Chick-fil-A", "Taco Bell", "KFC" + ], + "Dining": [ + "Chipotle", "Panera Bread", "Olive Garden", + "Applebee's", "Chili's" + ], + "Online Shopping": ["Amazon"], + "Electronics": ["Best Buy"], + "Home Improvement": ["Home Depot", "Lowe's"], + "Furniture": ["IKEA"], + "Fuel": ["Shell", "Exxon", "Chevron", "BP"], + "Convenience Store": ["7-Eleven", "Circle K", "Wawa"], + "Ride Share": ["Uber", "Lyft"], + "Public Transit": ["Amtrak"], + "Car Rental": ["Enterprise Rent-A-Car", "Hertz"], + "Airfare": [ + "Delta Airlines", "United Airlines", + "American Airlines", "Southwest Airlines" + ], + "Streaming Services": [ + "Netflix", "Hulu", "Disney+", + "Spotify", "Apple Music" + ], + "Movies": ["AMC Theatres", "Regal Cinemas"], + "Pharmacy": ["CVS", "Walgreens", "Rite Aid"], + "Healthcare": ["Kaiser Permanente"], + "Internet": ["Comcast", "Spectrum"], + "Mobile Phone": ["AT&T", "Verizon", "T-Mobile"], + "Bank Fee": ["Chase", "Bank of America"], +} + +# Flatten for fast lookup +MERCHANT_CATEGORY_MAP = { + merchant: category + for category, merchants in MERCHANTS_BY_CATEGORY.items() + for merchant in merchants +} + +VENDORS = list(MERCHANT_CATEGORY_MAP.keys()) +CATEGORIES = list(MERCHANTS_BY_CATEGORY.keys()) + +# ------------------------------------------------------------------- +# Helpers +# ------------------------------------------------------------------- + +def get_category_for_vendor(vendor: str, noise_rate: float = 0.05) -> str: + """ + Deterministic category lookup with optional noise. + """ + if random.random() < noise_rate: + return random.choice(CATEGORIES) + return MERCHANT_CATEGORY_MAP[vendor] + +def pick_transaction_context(): + """ + Realistic transaction type → merchant pairing + """ + txn_type = random.choice(["PURCHASE", "WITHDRAWAL", "DEPOSIT", "TRANSFER"]) + + if txn_type == "WITHDRAWAL": + return txn_type, "ATM Withdrawal", "Cash Withdrawal" + if txn_type == "DEPOSIT": + return txn_type, "Direct Deposit", "Income" + if txn_type == "TRANSFER": + return txn_type, "Account Transfer", "Transfer" + + merchant = random.choice(VENDORS) + return txn_type, merchant, get_category_for_vendor(merchant) + +# ------------------------------------------------------------------- +# Static enums +# ------------------------------------------------------------------- + +channels = ["POS", "ONLINE", "ATM"] +statuses = ["POSTED", "PENDING", "DECLINED"] +currencies = ["USD", "USD", "USD", "EUR", "CAD"] + +# ------------------------------------------------------------------- +# Generate transaction events +# ------------------------------------------------------------------- + +events = [] +NUM_TRANSACTIONS = 2000 + +for _ in range(NUM_TRANSACTIONS): + account_id = random.choice(account_ids) + txn_type, merchant, category = pick_transaction_context() + + txn_ts = datetime.now(timezone.utc) - timedelta( + days=random.randint(0, 365), + minutes=random.randint(0, 1440), ) -accounts_df["open_date"] = pd.to_datetime(accounts_df["open_date"]).dt.date + amount = round(random.uniform(1.00, 2500.00), 2) -# ---- Sample vendors ---- -vendors = ["Amazon", "Walmart", "Target", "Starbucks", "Apple", "Netflix", "Uber", "Lyft", "BestBuy", "Costco"] + event = { + "event_id": str(uuid.uuid4()), + "event_type": "transaction_posted", + "event_ts": txn_ts.isoformat(), -# ---- Helper functions ---- -def generate_transaction_id(account_id, idx): - return f"{account_id}{str(idx).zfill(5)}" + "transaction": { + "transaction_id": str(uuid.uuid4()), + "account_id": account_id, + "amount": amount, + "currency": random.choice(currencies), + "transaction_type": txn_type, + "merchant_name": merchant, + "category": category, + "merchant_id": str(uuid.uuid4()), + "merchant_city": fake.city(), + "merchant_state": fake.state_abbr(), + "merchant_country": "US", + "merchant_mcc": random.randint(1000, 9999), + "channel": random.choice(channels), + "status": random.choice(statuses), + "is_international": random.choice([True, False]), + "is_recurring": merchant in { + "Netflix", "Spotify", "Comcast", "AT&T" + }, + "auth_code": random.randint(100000, 999999), + "device_id": str(uuid.uuid4()), + "ip_address": fake.ipv4_public(), + }, -def generate_transaction(account): - t_type = random.choices( - ["Deposit", "Withdrawal", "Payment", "Transfer"], - weights=[0.4, 0.3, 0.2, 0.1], k=1 - )[0] - - txn = { - "transaction_id": None, # to be filled later - "account_id": str(account['account_id']), - "branch_id": account['branch_id'] if t_type in ["Deposit", "Withdrawal"] else None, - "transaction_type": t_type, - "amount": 0, - "date": fake.date_between(start_date=pd.to_datetime(account['open_date']), end_date=datetime.today()), - "balance_after": 0, - "vendor": random.choice(vendors) if t_type in ["Payment", "Transfer"] else None, - "transaction_location": None + "source_system": "transaction_generator_v1", + "batch_id": str(uuid.uuid4()), + "ingestion_ts": datetime.now(timezone.utc).isoformat(), } - if t_type == "Deposit": - amount = round(random.uniform(20, 10000), 2) - account['balance'] += amount - txn["amount"] = amount - txn["balance_after"] = round(account['balance'], 2) - txn["transaction_location"] = f"Branch {txn['branch_id']}" - elif t_type == "Withdrawal": - amount = round(random.uniform(50, 7000), 2) - amount = min(amount, account['balance']) - account['balance'] -= amount - txn["amount"] = amount - txn["balance_after"] = round(account['balance'], 2) - txn["transaction_location"] = f"Branch {txn['branch_id']}" - else: # Payment / Transfer - amount = round(random.uniform(5, 1000), 2) - account['balance'] = max(account['balance'] - amount, 0) - txn["amount"] = amount - txn["balance_after"] = round(account['balance'], 2) - txn["transaction_location"] = "POS / Online" + events.append(event) - return txn +# ------------------------------------------------------------------- +# Write JSONL to S3 +# ------------------------------------------------------------------- -# ---- Generate transactions ---- -transactions = [] -idx = 1 -for _, account in accounts_df.iterrows(): - account_transactions_count = random.randint(5, 20) - for _ in range(account_transactions_count): - txn = generate_transaction(account) - txn['transaction_id'] = generate_transaction_id(account['account_id'], idx) - transactions.append(txn) - idx += 1 +key = ( + f"{transactions_prefix}" + f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" +) -transactions_df = pd.DataFrame(transactions) +body = "\n".join(json.dumps(e) for e in events) -# ---- Save to S3 backup ---- -buffer = io.BytesIO() -transactions_df.to_parquet(buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=buffer.getvalue()) -print("Uploaded transactions.parquet to S3 (backup).") +s3.put_object( + Bucket=bucket_name, + Key=key, + Body=body.encode("utf-8"), +) -# ---- Insert into Postgres ---- -with engine.begin() as conn: - conn.execute(text(""" - CREATE TABLE IF NOT EXISTS transactions ( - transaction_id VARCHAR(30) PRIMARY KEY, - account_id VARCHAR(20) REFERENCES accounts(account_id), - branch_id INT, - transaction_type VARCHAR(20), - amount NUMERIC(12,2), - date DATE, - balance_after NUMERIC(12,2), - vendor VARCHAR(50), - transaction_location VARCHAR(50) - ); - """)) - - transactions_df.to_sql("transactions", conn, if_exists="append", index=False, method="multi") - print(f"Inserted {len(transactions_df)} transactions into Postgres successfully!") - -# ---- Optional: row count check ---- -with engine.connect() as conn: - result = conn.execute(text("SELECT COUNT(*) FROM transactions;")) - print(f"Rows in transactions table: {result.scalar()}") +print(f"Wrote {len(events)} raw transaction events to s3://{bucket_name}/{key}")