updated transaction logic

This commit is contained in:
Cameron Seamons 2025-12-15 01:07:33 -07:00
parent b3eb473ebf
commit 844bb1daa1

View file

@ -1,130 +1,197 @@
from faker import Faker from faker import Faker
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime from datetime import datetime, timezone, timedelta
import os import os
import random import json
import pandas as pd
import boto3 import boto3
import io import random
from sqlalchemy import create_engine, text import uuid
# ---- Setup ---- # ---- Setup ----
fake = Faker() fake = Faker()
load_dotenv() load_dotenv()
# ---- Postgres setup ---- s3 = boto3.client(
user = os.getenv("PG_USER")
password = os.getenv("PG_PASSWORD")
host = os.getenv("PG_HOST")
port = "5432"
db = "postgres"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True)
# ---- S3 setup (backup only) ----
s3 = boto3.resource(
"s3", "s3",
endpoint_url=os.getenv("STORAGE_ENDPOINT"), endpoint_url=os.getenv("STORAGE_ENDPOINT"),
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"),
) )
bucket_name = os.getenv("STORAGE_BUCKET")
transactions_s3_key_parquet = "DataLab/transactions/transactions.parquet"
# ---- Load accounts from Postgres ---- bucket_name = os.getenv("STORAGE_BUCKET")
with engine.connect() as conn:
accounts_df = pd.read_sql( # Bronze locations
sql=text("SELECT account_id, customer_id, branch_id, account_type, open_date, balance FROM accounts;"), transactions_prefix = "bronze/transactions_raw/"
con=conn accounts_prefix = "bronze/accounts_raw/"
# ---- Load account IDs from bronze ----
account_ids = []
resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix)
for obj in resp.get("Contents", []):
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
account_ids.append(record["account"]["account_id"])
if not account_ids:
raise ValueError("No account IDs found in bronze accounts data")
MERCHANTS_BY_CATEGORY = {
"Grocery": [
"Walmart", "Target", "Costco", "Kroger", "Safeway",
"Publix", "Whole Foods", "Trader Joe's", "Aldi"
],
"Coffee": ["Starbucks", "Dunkin'"],
"Fast Food": [
"McDonald's", "Burger King", "Wendy's",
"Chick-fil-A", "Taco Bell", "KFC"
],
"Dining": [
"Chipotle", "Panera Bread", "Olive Garden",
"Applebee's", "Chili's"
],
"Online Shopping": ["Amazon"],
"Electronics": ["Best Buy"],
"Home Improvement": ["Home Depot", "Lowe's"],
"Furniture": ["IKEA"],
"Fuel": ["Shell", "Exxon", "Chevron", "BP"],
"Convenience Store": ["7-Eleven", "Circle K", "Wawa"],
"Ride Share": ["Uber", "Lyft"],
"Public Transit": ["Amtrak"],
"Car Rental": ["Enterprise Rent-A-Car", "Hertz"],
"Airfare": [
"Delta Airlines", "United Airlines",
"American Airlines", "Southwest Airlines"
],
"Streaming Services": [
"Netflix", "Hulu", "Disney+",
"Spotify", "Apple Music"
],
"Movies": ["AMC Theatres", "Regal Cinemas"],
"Pharmacy": ["CVS", "Walgreens", "Rite Aid"],
"Healthcare": ["Kaiser Permanente"],
"Internet": ["Comcast", "Spectrum"],
"Mobile Phone": ["AT&T", "Verizon", "T-Mobile"],
"Bank Fee": ["Chase", "Bank of America"],
}
# Flatten for fast lookup
MERCHANT_CATEGORY_MAP = {
merchant: category
for category, merchants in MERCHANTS_BY_CATEGORY.items()
for merchant in merchants
}
VENDORS = list(MERCHANT_CATEGORY_MAP.keys())
CATEGORIES = list(MERCHANTS_BY_CATEGORY.keys())
# -------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------
def get_category_for_vendor(vendor: str, noise_rate: float = 0.05) -> str:
"""
Deterministic category lookup with optional noise.
"""
if random.random() < noise_rate:
return random.choice(CATEGORIES)
return MERCHANT_CATEGORY_MAP[vendor]
def pick_transaction_context():
"""
Realistic transaction type merchant pairing
"""
txn_type = random.choice(["PURCHASE", "WITHDRAWAL", "DEPOSIT", "TRANSFER"])
if txn_type == "WITHDRAWAL":
return txn_type, "ATM Withdrawal", "Cash Withdrawal"
if txn_type == "DEPOSIT":
return txn_type, "Direct Deposit", "Income"
if txn_type == "TRANSFER":
return txn_type, "Account Transfer", "Transfer"
merchant = random.choice(VENDORS)
return txn_type, merchant, get_category_for_vendor(merchant)
# -------------------------------------------------------------------
# Static enums
# -------------------------------------------------------------------
channels = ["POS", "ONLINE", "ATM"]
statuses = ["POSTED", "PENDING", "DECLINED"]
currencies = ["USD", "USD", "USD", "EUR", "CAD"]
# -------------------------------------------------------------------
# Generate transaction events
# -------------------------------------------------------------------
events = []
NUM_TRANSACTIONS = 2000
for _ in range(NUM_TRANSACTIONS):
account_id = random.choice(account_ids)
txn_type, merchant, category = pick_transaction_context()
txn_ts = datetime.now(timezone.utc) - timedelta(
days=random.randint(0, 365),
minutes=random.randint(0, 1440),
) )
accounts_df["open_date"] = pd.to_datetime(accounts_df["open_date"]).dt.date amount = round(random.uniform(1.00, 2500.00), 2)
# ---- Sample vendors ---- event = {
vendors = ["Amazon", "Walmart", "Target", "Starbucks", "Apple", "Netflix", "Uber", "Lyft", "BestBuy", "Costco"] "event_id": str(uuid.uuid4()),
"event_type": "transaction_posted",
"event_ts": txn_ts.isoformat(),
# ---- Helper functions ---- "transaction": {
def generate_transaction_id(account_id, idx): "transaction_id": str(uuid.uuid4()),
return f"{account_id}{str(idx).zfill(5)}" "account_id": account_id,
"amount": amount,
"currency": random.choice(currencies),
"transaction_type": txn_type,
"merchant_name": merchant,
"category": category,
"merchant_id": str(uuid.uuid4()),
"merchant_city": fake.city(),
"merchant_state": fake.state_abbr(),
"merchant_country": "US",
"merchant_mcc": random.randint(1000, 9999),
"channel": random.choice(channels),
"status": random.choice(statuses),
"is_international": random.choice([True, False]),
"is_recurring": merchant in {
"Netflix", "Spotify", "Comcast", "AT&T"
},
"auth_code": random.randint(100000, 999999),
"device_id": str(uuid.uuid4()),
"ip_address": fake.ipv4_public(),
},
def generate_transaction(account): "source_system": "transaction_generator_v1",
t_type = random.choices( "batch_id": str(uuid.uuid4()),
["Deposit", "Withdrawal", "Payment", "Transfer"], "ingestion_ts": datetime.now(timezone.utc).isoformat(),
weights=[0.4, 0.3, 0.2, 0.1], k=1
)[0]
txn = {
"transaction_id": None, # to be filled later
"account_id": str(account['account_id']),
"branch_id": account['branch_id'] if t_type in ["Deposit", "Withdrawal"] else None,
"transaction_type": t_type,
"amount": 0,
"date": fake.date_between(start_date=pd.to_datetime(account['open_date']), end_date=datetime.today()),
"balance_after": 0,
"vendor": random.choice(vendors) if t_type in ["Payment", "Transfer"] else None,
"transaction_location": None
} }
if t_type == "Deposit": events.append(event)
amount = round(random.uniform(20, 10000), 2)
account['balance'] += amount
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = f"Branch {txn['branch_id']}"
elif t_type == "Withdrawal":
amount = round(random.uniform(50, 7000), 2)
amount = min(amount, account['balance'])
account['balance'] -= amount
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = f"Branch {txn['branch_id']}"
else: # Payment / Transfer
amount = round(random.uniform(5, 1000), 2)
account['balance'] = max(account['balance'] - amount, 0)
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = "POS / Online"
return txn # -------------------------------------------------------------------
# Write JSONL to S3
# -------------------------------------------------------------------
# ---- Generate transactions ---- key = (
transactions = [] f"{transactions_prefix}"
idx = 1 f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
for _, account in accounts_df.iterrows(): )
account_transactions_count = random.randint(5, 20)
for _ in range(account_transactions_count):
txn = generate_transaction(account)
txn['transaction_id'] = generate_transaction_id(account['account_id'], idx)
transactions.append(txn)
idx += 1
transactions_df = pd.DataFrame(transactions) body = "\n".join(json.dumps(e) for e in events)
# ---- Save to S3 backup ---- s3.put_object(
buffer = io.BytesIO() Bucket=bucket_name,
transactions_df.to_parquet(buffer, index=False, engine="pyarrow") Key=key,
s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=buffer.getvalue()) Body=body.encode("utf-8"),
print("Uploaded transactions.parquet to S3 (backup).") )
# ---- Insert into Postgres ---- print(f"Wrote {len(events)} raw transaction events to s3://{bucket_name}/{key}")
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS transactions (
transaction_id VARCHAR(30) PRIMARY KEY,
account_id VARCHAR(20) REFERENCES accounts(account_id),
branch_id INT,
transaction_type VARCHAR(20),
amount NUMERIC(12,2),
date DATE,
balance_after NUMERIC(12,2),
vendor VARCHAR(50),
transaction_location VARCHAR(50)
);
"""))
transactions_df.to_sql("transactions", conn, if_exists="append", index=False, method="multi")
print(f"Inserted {len(transactions_df)} transactions into Postgres successfully!")
# ---- Optional: row count check ----
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM transactions;"))
print(f"Rows in transactions table: {result.scalar()}")