Compare commits
3 commits
fdcbe68f3a
...
40980241dd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40980241dd | ||
|
|
844bb1daa1 | ||
|
|
b3eb473ebf |
3 changed files with 242 additions and 104 deletions
58
Scripts/bronze_to_silver.py
Normal file
58
Scripts/bronze_to_silver.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
import os
|
||||
from pyspark.sql import SparkSession
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# ---- WINDOWS FIX ----
|
||||
os.environ.setdefault("HADOOP_HOME", "C:\\hadoop")
|
||||
os.environ.setdefault("hadoop.home.dir", "C:\\hadoop")
|
||||
os.environ["PATH"] += ";C:\\hadoop\\bin"
|
||||
|
||||
|
||||
spark = (
|
||||
SparkSession.builder
|
||||
.appName("bronze-to-silver-batch")
|
||||
|
||||
# ---- ALL JARS IN ONE PLACE ----
|
||||
.config(
|
||||
"spark.jars.packages",
|
||||
",".join([
|
||||
# Delta
|
||||
"io.delta:delta-core_2.12:2.3.0",
|
||||
|
||||
# S3A
|
||||
"org.apache.hadoop:hadoop-aws:3.3.4",
|
||||
"com.amazonaws:aws-java-sdk-bundle:1.12.262"
|
||||
])
|
||||
)
|
||||
|
||||
# ---- DELTA ----
|
||||
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
|
||||
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
|
||||
|
||||
# ---- S3 ----
|
||||
.config("spark.hadoop.fs.s3a.endpoint", os.getenv("STORAGE_ENDPOINT"))
|
||||
.config("spark.hadoop.fs.s3a.access.key", os.getenv("STORAGE_ACCESS_KEY"))
|
||||
.config("spark.hadoop.fs.s3a.secret.key", os.getenv("STORAGE_SECRET_KEY"))
|
||||
.config("spark.hadoop.fs.s3a.path.style.access", "true")
|
||||
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
|
||||
|
||||
.getOrCreate()
|
||||
|
||||
)
|
||||
|
||||
print("Spark created OK")
|
||||
|
||||
|
||||
# Prove S3A filesystem class is on the classpath
|
||||
print("fs.s3a.impl =", spark.sparkContext._jsc.hadoopConfiguration().get("fs.s3a.impl"))
|
||||
|
||||
# Force a real read/action from S3
|
||||
df = spark.read.json("s3a://camdoesdata/bronze/transactions_raw/")
|
||||
print("About to show() ...")
|
||||
df.limit(5).show(truncate=False)
|
||||
|
||||
print("Done.")
|
||||
spark.stop()
|
||||
|
||||
13
Scripts/spark_test.py
Normal file
13
Scripts/spark_test.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
import os
|
||||
from pyspark.sql import SparkSession
|
||||
from delta import configure_spark_with_delta_pip
|
||||
|
||||
|
||||
os.environ.setdefault("HADOOP_HOME", "C:\\hadoop")
|
||||
os.environ.setdefault("hadoop.home.dir", "C:\\hadoop")
|
||||
os.environ["PATH"] += ";C:\\hadoop\\bin"
|
||||
|
||||
|
||||
builder = SparkSession.builder.appName("spark-test")
|
||||
spark = configure_spark_with_delta_pip(builder).getOrCreate()
|
||||
|
||||
|
|
@ -1,130 +1,197 @@
|
|||
from faker import Faker
|
||||
from dotenv import load_dotenv
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone, timedelta
|
||||
import os
|
||||
import random
|
||||
import pandas as pd
|
||||
import json
|
||||
import boto3
|
||||
import io
|
||||
from sqlalchemy import create_engine, text
|
||||
import random
|
||||
import uuid
|
||||
|
||||
# ---- Setup ----
|
||||
fake = Faker()
|
||||
load_dotenv()
|
||||
|
||||
# ---- Postgres setup ----
|
||||
user = os.getenv("PG_USER")
|
||||
password = os.getenv("PG_PASSWORD")
|
||||
host = os.getenv("PG_HOST")
|
||||
port = "5432"
|
||||
db = "postgres"
|
||||
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True)
|
||||
|
||||
# ---- S3 setup (backup only) ----
|
||||
s3 = boto3.resource(
|
||||
s3 = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=os.getenv("STORAGE_ENDPOINT"),
|
||||
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
|
||||
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY")
|
||||
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"),
|
||||
)
|
||||
bucket_name = os.getenv("STORAGE_BUCKET")
|
||||
transactions_s3_key_parquet = "DataLab/transactions/transactions.parquet"
|
||||
|
||||
# ---- Load accounts from Postgres ----
|
||||
with engine.connect() as conn:
|
||||
accounts_df = pd.read_sql(
|
||||
sql=text("SELECT account_id, customer_id, branch_id, account_type, open_date, balance FROM accounts;"),
|
||||
con=conn
|
||||
bucket_name = os.getenv("STORAGE_BUCKET")
|
||||
|
||||
# Bronze locations
|
||||
transactions_prefix = "bronze/transactions_raw/"
|
||||
accounts_prefix = "bronze/accounts_raw/"
|
||||
|
||||
# ---- Load account IDs from bronze ----
|
||||
account_ids = []
|
||||
|
||||
resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix)
|
||||
for obj in resp.get("Contents", []):
|
||||
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
|
||||
for line in body.decode("utf-8").splitlines():
|
||||
record = json.loads(line)
|
||||
account_ids.append(record["account"]["account_id"])
|
||||
|
||||
if not account_ids:
|
||||
raise ValueError("No account IDs found in bronze accounts data")
|
||||
|
||||
|
||||
MERCHANTS_BY_CATEGORY = {
|
||||
"Grocery": [
|
||||
"Walmart", "Target", "Costco", "Kroger", "Safeway",
|
||||
"Publix", "Whole Foods", "Trader Joe's", "Aldi"
|
||||
],
|
||||
"Coffee": ["Starbucks", "Dunkin'"],
|
||||
"Fast Food": [
|
||||
"McDonald's", "Burger King", "Wendy's",
|
||||
"Chick-fil-A", "Taco Bell", "KFC"
|
||||
],
|
||||
"Dining": [
|
||||
"Chipotle", "Panera Bread", "Olive Garden",
|
||||
"Applebee's", "Chili's"
|
||||
],
|
||||
"Online Shopping": ["Amazon"],
|
||||
"Electronics": ["Best Buy"],
|
||||
"Home Improvement": ["Home Depot", "Lowe's"],
|
||||
"Furniture": ["IKEA"],
|
||||
"Fuel": ["Shell", "Exxon", "Chevron", "BP"],
|
||||
"Convenience Store": ["7-Eleven", "Circle K", "Wawa"],
|
||||
"Ride Share": ["Uber", "Lyft"],
|
||||
"Public Transit": ["Amtrak"],
|
||||
"Car Rental": ["Enterprise Rent-A-Car", "Hertz"],
|
||||
"Airfare": [
|
||||
"Delta Airlines", "United Airlines",
|
||||
"American Airlines", "Southwest Airlines"
|
||||
],
|
||||
"Streaming Services": [
|
||||
"Netflix", "Hulu", "Disney+",
|
||||
"Spotify", "Apple Music"
|
||||
],
|
||||
"Movies": ["AMC Theatres", "Regal Cinemas"],
|
||||
"Pharmacy": ["CVS", "Walgreens", "Rite Aid"],
|
||||
"Healthcare": ["Kaiser Permanente"],
|
||||
"Internet": ["Comcast", "Spectrum"],
|
||||
"Mobile Phone": ["AT&T", "Verizon", "T-Mobile"],
|
||||
"Bank Fee": ["Chase", "Bank of America"],
|
||||
}
|
||||
|
||||
# Flatten for fast lookup
|
||||
MERCHANT_CATEGORY_MAP = {
|
||||
merchant: category
|
||||
for category, merchants in MERCHANTS_BY_CATEGORY.items()
|
||||
for merchant in merchants
|
||||
}
|
||||
|
||||
VENDORS = list(MERCHANT_CATEGORY_MAP.keys())
|
||||
CATEGORIES = list(MERCHANTS_BY_CATEGORY.keys())
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Helpers
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def get_category_for_vendor(vendor: str, noise_rate: float = 0.05) -> str:
|
||||
"""
|
||||
Deterministic category lookup with optional noise.
|
||||
"""
|
||||
if random.random() < noise_rate:
|
||||
return random.choice(CATEGORIES)
|
||||
return MERCHANT_CATEGORY_MAP[vendor]
|
||||
|
||||
def pick_transaction_context():
|
||||
"""
|
||||
Realistic transaction type → merchant pairing
|
||||
"""
|
||||
txn_type = random.choice(["PURCHASE", "WITHDRAWAL", "DEPOSIT", "TRANSFER"])
|
||||
|
||||
if txn_type == "WITHDRAWAL":
|
||||
return txn_type, "ATM Withdrawal", "Cash Withdrawal"
|
||||
if txn_type == "DEPOSIT":
|
||||
return txn_type, "Direct Deposit", "Income"
|
||||
if txn_type == "TRANSFER":
|
||||
return txn_type, "Account Transfer", "Transfer"
|
||||
|
||||
merchant = random.choice(VENDORS)
|
||||
return txn_type, merchant, get_category_for_vendor(merchant)
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Static enums
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
channels = ["POS", "ONLINE", "ATM"]
|
||||
statuses = ["POSTED", "PENDING", "DECLINED"]
|
||||
currencies = ["USD", "USD", "USD", "EUR", "CAD"]
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Generate transaction events
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
events = []
|
||||
NUM_TRANSACTIONS = 2000
|
||||
|
||||
for _ in range(NUM_TRANSACTIONS):
|
||||
account_id = random.choice(account_ids)
|
||||
txn_type, merchant, category = pick_transaction_context()
|
||||
|
||||
txn_ts = datetime.now(timezone.utc) - timedelta(
|
||||
days=random.randint(0, 365),
|
||||
minutes=random.randint(0, 1440),
|
||||
)
|
||||
|
||||
accounts_df["open_date"] = pd.to_datetime(accounts_df["open_date"]).dt.date
|
||||
amount = round(random.uniform(1.00, 2500.00), 2)
|
||||
|
||||
# ---- Sample vendors ----
|
||||
vendors = ["Amazon", "Walmart", "Target", "Starbucks", "Apple", "Netflix", "Uber", "Lyft", "BestBuy", "Costco"]
|
||||
event = {
|
||||
"event_id": str(uuid.uuid4()),
|
||||
"event_type": "transaction_posted",
|
||||
"event_ts": txn_ts.isoformat(),
|
||||
|
||||
# ---- Helper functions ----
|
||||
def generate_transaction_id(account_id, idx):
|
||||
return f"{account_id}{str(idx).zfill(5)}"
|
||||
"transaction": {
|
||||
"transaction_id": str(uuid.uuid4()),
|
||||
"account_id": account_id,
|
||||
"amount": amount,
|
||||
"currency": random.choice(currencies),
|
||||
"transaction_type": txn_type,
|
||||
"merchant_name": merchant,
|
||||
"category": category,
|
||||
"merchant_id": str(uuid.uuid4()),
|
||||
"merchant_city": fake.city(),
|
||||
"merchant_state": fake.state_abbr(),
|
||||
"merchant_country": "US",
|
||||
"merchant_mcc": random.randint(1000, 9999),
|
||||
"channel": random.choice(channels),
|
||||
"status": random.choice(statuses),
|
||||
"is_international": random.choice([True, False]),
|
||||
"is_recurring": merchant in {
|
||||
"Netflix", "Spotify", "Comcast", "AT&T"
|
||||
},
|
||||
"auth_code": random.randint(100000, 999999),
|
||||
"device_id": str(uuid.uuid4()),
|
||||
"ip_address": fake.ipv4_public(),
|
||||
},
|
||||
|
||||
def generate_transaction(account):
|
||||
t_type = random.choices(
|
||||
["Deposit", "Withdrawal", "Payment", "Transfer"],
|
||||
weights=[0.4, 0.3, 0.2, 0.1], k=1
|
||||
)[0]
|
||||
|
||||
txn = {
|
||||
"transaction_id": None, # to be filled later
|
||||
"account_id": str(account['account_id']),
|
||||
"branch_id": account['branch_id'] if t_type in ["Deposit", "Withdrawal"] else None,
|
||||
"transaction_type": t_type,
|
||||
"amount": 0,
|
||||
"date": fake.date_between(start_date=pd.to_datetime(account['open_date']), end_date=datetime.today()),
|
||||
"balance_after": 0,
|
||||
"vendor": random.choice(vendors) if t_type in ["Payment", "Transfer"] else None,
|
||||
"transaction_location": None
|
||||
"source_system": "transaction_generator_v1",
|
||||
"batch_id": str(uuid.uuid4()),
|
||||
"ingestion_ts": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
if t_type == "Deposit":
|
||||
amount = round(random.uniform(20, 10000), 2)
|
||||
account['balance'] += amount
|
||||
txn["amount"] = amount
|
||||
txn["balance_after"] = round(account['balance'], 2)
|
||||
txn["transaction_location"] = f"Branch {txn['branch_id']}"
|
||||
elif t_type == "Withdrawal":
|
||||
amount = round(random.uniform(50, 7000), 2)
|
||||
amount = min(amount, account['balance'])
|
||||
account['balance'] -= amount
|
||||
txn["amount"] = amount
|
||||
txn["balance_after"] = round(account['balance'], 2)
|
||||
txn["transaction_location"] = f"Branch {txn['branch_id']}"
|
||||
else: # Payment / Transfer
|
||||
amount = round(random.uniform(5, 1000), 2)
|
||||
account['balance'] = max(account['balance'] - amount, 0)
|
||||
txn["amount"] = amount
|
||||
txn["balance_after"] = round(account['balance'], 2)
|
||||
txn["transaction_location"] = "POS / Online"
|
||||
events.append(event)
|
||||
|
||||
return txn
|
||||
# -------------------------------------------------------------------
|
||||
# Write JSONL to S3
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
# ---- Generate transactions ----
|
||||
transactions = []
|
||||
idx = 1
|
||||
for _, account in accounts_df.iterrows():
|
||||
account_transactions_count = random.randint(5, 20)
|
||||
for _ in range(account_transactions_count):
|
||||
txn = generate_transaction(account)
|
||||
txn['transaction_id'] = generate_transaction_id(account['account_id'], idx)
|
||||
transactions.append(txn)
|
||||
idx += 1
|
||||
key = (
|
||||
f"{transactions_prefix}"
|
||||
f"batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
|
||||
)
|
||||
|
||||
transactions_df = pd.DataFrame(transactions)
|
||||
body = "\n".join(json.dumps(e) for e in events)
|
||||
|
||||
# ---- Save to S3 backup ----
|
||||
buffer = io.BytesIO()
|
||||
transactions_df.to_parquet(buffer, index=False, engine="pyarrow")
|
||||
s3.Bucket(bucket_name).put_object(Key=transactions_s3_key_parquet, Body=buffer.getvalue())
|
||||
print("Uploaded transactions.parquet to S3 (backup).")
|
||||
s3.put_object(
|
||||
Bucket=bucket_name,
|
||||
Key=key,
|
||||
Body=body.encode("utf-8"),
|
||||
)
|
||||
|
||||
# ---- Insert into Postgres ----
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS transactions (
|
||||
transaction_id VARCHAR(30) PRIMARY KEY,
|
||||
account_id VARCHAR(20) REFERENCES accounts(account_id),
|
||||
branch_id INT,
|
||||
transaction_type VARCHAR(20),
|
||||
amount NUMERIC(12,2),
|
||||
date DATE,
|
||||
balance_after NUMERIC(12,2),
|
||||
vendor VARCHAR(50),
|
||||
transaction_location VARCHAR(50)
|
||||
);
|
||||
"""))
|
||||
|
||||
transactions_df.to_sql("transactions", conn, if_exists="append", index=False, method="multi")
|
||||
print(f"Inserted {len(transactions_df)} transactions into Postgres successfully!")
|
||||
|
||||
# ---- Optional: row count check ----
|
||||
with engine.connect() as conn:
|
||||
result = conn.execute(text("SELECT COUNT(*) FROM transactions;"))
|
||||
print(f"Rows in transactions table: {result.scalar()}")
|
||||
print(f"Wrote {len(events)} raw transaction events to s3://{bucket_name}/{key}")
|
||||
|
|
|
|||
Loading…
Reference in a new issue