Compare commits

...

3 commits

Author SHA1 Message Date
Cameron Seamons
40980241dd Got Spark working locally 2025-12-15 01:07:48 -07:00
Cameron Seamons
844bb1daa1 updated transaction logic 2025-12-15 01:07:33 -07:00
Cameron Seamons
b3eb473ebf added spark tests 2025-12-15 01:07:24 -07:00
3 changed files with 242 additions and 104 deletions

View file

@ -0,0 +1,58 @@
import os
from pyspark.sql import SparkSession
from dotenv import load_dotenv
load_dotenv()
# ---- WINDOWS FIX ----
os.environ.setdefault("HADOOP_HOME", "C:\\hadoop")
os.environ.setdefault("hadoop.home.dir", "C:\\hadoop")
os.environ["PATH"] += ";C:\\hadoop\\bin"
spark = (
SparkSession.builder
.appName("bronze-to-silver-batch")
# ---- ALL JARS IN ONE PLACE ----
.config(
"spark.jars.packages",
",".join([
# Delta
"io.delta:delta-core_2.12:2.3.0",
# S3A
"org.apache.hadoop:hadoop-aws:3.3.4",
"com.amazonaws:aws-java-sdk-bundle:1.12.262"
])
)
# ---- DELTA ----
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# ---- S3 ----
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("STORAGE_ENDPOINT"))
.config("spark.hadoop.fs.s3a.access.key", os.getenv("STORAGE_ACCESS_KEY"))
.config("spark.hadoop.fs.s3a.secret.key", os.getenv("STORAGE_SECRET_KEY"))
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
)
print("Spark created OK")
# Prove S3A filesystem class is on the classpath
print("fs.s3a.impl =", spark.sparkContext._jsc.hadoopConfiguration().get("fs.s3a.impl"))
# Force a real read/action from S3
df = spark.read.json("s3a://camdoesdata/bronze/transactions_raw/")
print("About to show() ...")
df.limit(5).show(truncate=False)
print("Done.")
spark.stop()

13
Scripts/spark_test.py Normal file
View file

@ -0,0 +1,13 @@
import os
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
os.environ.setdefault("HADOOP_HOME", "C:\\hadoop")
os.environ.setdefault("hadoop.home.dir", "C:\\hadoop")
os.environ["PATH"] += ";C:\\hadoop\\bin"
builder = SparkSession.builder.appName("spark-test")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

View file

@ -1,130 +1,197 @@
from faker import Faker
from dotenv import load_dotenv
from datetime import datetime
from datetime import datetime, timezone, timedelta
import os
import random
import pandas as pd
import json
import boto3
import io
from sqlalchemy import create_engine, text
import random
import uuid
# ---- Setup ----
fake = Faker()
load_dotenv()
# ---- Postgres setup ----
user = os.getenv("PG_USER")
password = os.getenv("PG_PASSWORD")
host = os.getenv("PG_HOST")
port = "5432"
db = "postgres"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True)
# ---- S3 setup (backup only) ----
s3 = boto3.resource(
s3 = boto3.client(
"s3",
endpoint_url=os.getenv("STORAGE_ENDPOINT"),
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY")
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"),
)
bucket_name = os.getenv("STORAGE_BUCKET")
transactions_s3_key_parquet = "DataLab/transactions/transactions.parquet"
# ---- Load accounts from Postgres ----
with engine.connect() as conn:
accounts_df = pd.read_sql(
sql=text("SELECT account_id, customer_id, branch_id, account_type, open_date, balance FROM accounts;"),
con=conn
)
# Bronze locations
transactions_prefix = "bronze/transactions_raw/"
accounts_prefix = "bronze/accounts_raw/"
accounts_df["open_date"] = pd.to_datetime(accounts_df["open_date"]).dt.date
# ---- Load account IDs from bronze ----
account_ids = []
# ---- Sample vendors ----
vendors = ["Amazon", "Walmart", "Target", "Starbucks", "Apple", "Netflix", "Uber", "Lyft", "BestBuy", "Costco"]
resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix)
for obj in resp.get("Contents", []):
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
account_ids.append(record["account"]["account_id"])
# ---- Helper functions ----
def generate_transaction_id(account_id, idx):
return f"{account_id}{str(idx).zfill(5)}"
if not account_ids:
raise ValueError("No account IDs found in bronze accounts data")
def generate_transaction(account):
t_type = random.choices(
["Deposit", "Withdrawal", "Payment", "Transfer"],
weights=[0.4, 0.3, 0.2, 0.1], k=1
)[0]
txn = {
"transaction_id": None, # to be filled later
"account_id": str(account['account_id']),
"branch_id": account['branch_id'] if t_type in ["Deposit", "Withdrawal"] else None,
"transaction_type": t_type,
"amount": 0,
"date": fake.date_between(start_date=pd.to_datetime(account['open_date']), end_date=datetime.today()),
"balance_after": 0,
"vendor": random.choice(vendors) if t_type in ["Payment", "Transfer"] else None,
"transaction_location": None
MERCHANTS_BY_CATEGORY = {
"Grocery": [
"Walmart", "Target", "Costco", "Kroger", "Safeway",
"Publix", "Whole Foods", "Trader Joe's", "Aldi"
],
"Coffee": ["Starbucks", "Dunkin'"],
"Fast Food": [
"McDonald's", "Burger King", "Wendy's",
"Chick-fil-A", "Taco Bell", "KFC"
],
"Dining": [
"Chipotle", "Panera Bread", "Olive Garden",
"Applebee's", "Chili's"
],
"Online Shopping": ["Amazon"],
"Electronics": ["Best Buy"],
"Home Improvement": ["Home Depot", "Lowe's"],
"Furniture": ["IKEA"],
"Fuel": ["Shell", "Exxon", "Chevron", "BP"],
"Convenience Store": ["7-Eleven", "Circle K", "Wawa"],
"Ride Share": ["Uber", "Lyft"],
"Public Transit": ["Amtrak"],
"Car Rental": ["Enterprise Rent-A-Car", "Hertz"],
"Airfare": [
"Delta Airlines", "United Airlines",
"American Airlines", "Southwest Airlines"
],
"Streaming Services": [
"Netflix", "Hulu", "Disney+",
"Spotify", "Apple Music"
],
"Movies": ["AMC Theatres", "Regal Cinemas"],
"Pharmacy": ["CVS", "Walgreens", "Rite Aid"],
"Healthcare": ["Kaiser Permanente"],
"Internet": ["Comcast", "Spectrum"],
"Mobile Phone": ["AT&T", "Verizon", "T-Mobile"],
"Bank Fee": ["Chase", "Bank of America"],
}
if t_type == "Deposit":
amount = round(random.uniform(20, 10000), 2)
account['balance'] += amount
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = f"Branch {txn['branch_id']}"
elif t_type == "Withdrawal":
amount = round(random.uniform(50, 7000), 2)
amount = min(amount, account['balance'])
account['balance'] -= amount
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = f"Branch {txn['branch_id']}"
else: # Payment / Transfer
amount = round(random.uniform(5, 1000), 2)
account['balance'] = max(account['balance'] - amount, 0)
txn["amount"] = amount
txn["balance_after"] = round(account['balance'], 2)
txn["transaction_location"] = "POS / Online"
# Flatten for fast lookup
MERCHANT_CATEGORY_MAP = {
merchant: category
for category, merchants in MERCHANTS_BY_CATEGORY.items()
for merchant in merchants
}
return txn
VENDORS = list(MERCHANT_CATEGORY_MAP.keys())
CATEGORIES = list(MERCHANTS_BY_CATEGORY.keys())
# ---- Generate transactions ----
transactions = []
idx = 1
for _, account in accounts_df.iterrows():
account_transactions_count = random.randint(5, 20)
for _ in range(account_transactions_count):
txn = generate_transaction(account)
txn['transaction_id'] = generate_transaction_id(account['account_id'], idx)
transactions.append(txn)
idx += 1
# -------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------
transactions_df = pd.DataFrame(transactions)
def get_category_for_vendor(vendor: str, noise_rate: float = 0.05) -> str:
"""
Deterministic category lookup with optional noise.
"""
if random.random() < noise_rate:
return random.choice(CATEGORIES)
return MERCHANT_CATEGORY_MAP[vendor]
# ---- Save to S3 backup ----
buffer = io.BytesIO()
transactions_df.to_parquet(buffer, index=False, engine="pyarrow")
s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=buffer.getvalue())
print("Uploaded transactions.parquet to S3 (backup).")
def pick_transaction_context():
"""
Realistic transaction type merchant pairing
"""
txn_type = random.choice(["PURCHASE", "WITHDRAWAL", "DEPOSIT", "TRANSFER"])
# ---- Insert into Postgres ----
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS transactions (
transaction_id VARCHAR(30) PRIMARY KEY,
account_id VARCHAR(20) REFERENCES accounts(account_id),
branch_id INT,
transaction_type VARCHAR(20),
amount NUMERIC(12,2),
date DATE,
balance_after NUMERIC(12,2),
vendor VARCHAR(50),
transaction_location VARCHAR(50)
);
"""))
if txn_type == "WITHDRAWAL":
return txn_type, "ATM Withdrawal", "Cash Withdrawal"
if txn_type == "DEPOSIT":
return txn_type, "Direct Deposit", "Income"
if txn_type == "TRANSFER":
return txn_type, "Account Transfer", "Transfer"
transactions_df.to_sql("transactions", conn, if_exists="append", index=False, method="multi")
print(f"Inserted {len(transactions_df)} transactions into Postgres successfully!")
merchant = random.choice(VENDORS)
return txn_type, merchant, get_category_for_vendor(merchant)
# ---- Optional: row count check ----
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM transactions;"))
print(f"Rows in transactions table: {result.scalar()}")
# -------------------------------------------------------------------
# Static enums
# -------------------------------------------------------------------
channels = ["POS", "ONLINE", "ATM"]
statuses = ["POSTED", "PENDING", "DECLINED"]
currencies = ["USD", "USD", "USD", "EUR", "CAD"]
# -------------------------------------------------------------------
# Generate transaction events
# -------------------------------------------------------------------
events = []
NUM_TRANSACTIONS = 2000
for _ in range(NUM_TRANSACTIONS):
account_id = random.choice(account_ids)
txn_type, merchant, category = pick_transaction_context()
txn_ts = datetime.now(timezone.utc) - timedelta(
days=random.randint(0, 365),
minutes=random.randint(0, 1440),
)
amount = round(random.uniform(1.00, 2500.00), 2)
event = {
"event_id": str(uuid.uuid4()),
"event_type": "transaction_posted",
"event_ts": txn_ts.isoformat(),
"transaction": {
"transaction_id": str(uuid.uuid4()),
"account_id": account_id,
"amount": amount,
"currency": random.choice(currencies),
"transaction_type": txn_type,
"merchant_name": merchant,
"category": category,
"merchant_id": str(uuid.uuid4()),
"merchant_city": fake.city(),
"merchant_state": fake.state_abbr(),
"merchant_country": "US",
"merchant_mcc": random.randint(1000, 9999),
"channel": random.choice(channels),
"status": random.choice(statuses),
"is_international": random.choice([True, False]),
"is_recurring": merchant in {
"Netflix", "Spotify", "Comcast", "AT&T"
},
"auth_code": random.randint(100000, 999999),
"device_id": str(uuid.uuid4()),
"ip_address": fake.ipv4_public(),
},
"source_system": "transaction_generator_v1",
"batch_id": str(uuid.uuid4()),
"ingestion_ts": datetime.now(timezone.utc).isoformat(),
}
events.append(event)
# -------------------------------------------------------------------
# Write JSONL to S3
# -------------------------------------------------------------------
key = (
f"{transactions_prefix}"
f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
)
body = "\n".join(json.dumps(e) for e in events)
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body.encode("utf-8"),
)
print(f"Wrote {len(events)} raw transaction events to s3://{bucket_name}/{key}")