From f5eca2baeebfbbb1931c9aeb7a06cd81203892d7 Mon Sep 17 00:00:00 2001 From: Cameron Seamons Date: Mon, 15 Dec 2025 20:41:45 -0700 Subject: [PATCH] updated scripts --- Scripts/bronze_to_silver.py | 2 + Scripts/read_trx.py | 127 ++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 Scripts/read_trx.py diff --git a/Scripts/bronze_to_silver.py b/Scripts/bronze_to_silver.py index 514b702..bdb768b 100644 --- a/Scripts/bronze_to_silver.py +++ b/Scripts/bronze_to_silver.py @@ -54,5 +54,7 @@ print("About to show() ...") df.limit(5).show(truncate=False) print("Done.") + + spark.stop() diff --git a/Scripts/read_trx.py b/Scripts/read_trx.py new file mode 100644 index 0000000..e6e5a1b --- /dev/null +++ b/Scripts/read_trx.py @@ -0,0 +1,127 @@ +import os +from pyspark.sql import SparkSession +from dotenv import load_dotenv +from pyspark.sql import functions as F + +load_dotenv() + +# ---- WINDOWS FIX ---- +os.environ.setdefault("HADOOP_HOME", "C:\\hadoop") +os.environ.setdefault("hadoop.home.dir", "C:\\hadoop") +os.environ["PATH"] += ";C:\\hadoop\\bin" + + +spark = ( + SparkSession.builder + .appName("bronze-to-silver-batch") + + # ---- ALL JARS IN ONE PLACE ---- + .config( + "spark.jars.packages", + ",".join([ + # Delta + "io.delta:delta-core_2.12:2.3.0", + + # S3A + "org.apache.hadoop:hadoop-aws:3.3.4", + "com.amazonaws:aws-java-sdk-bundle:1.12.262" + ]) + ) + + # ---- DELTA ---- + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + + # ---- S3 ---- + .config("spark.hadoop.fs.s3a.endpoint", os.getenv("STORAGE_ENDPOINT")) + .config("spark.hadoop.fs.s3a.access.key", os.getenv("STORAGE_ACCESS_KEY")) + .config("spark.hadoop.fs.s3a.secret.key", os.getenv("STORAGE_SECRET_KEY")) + .config("spark.hadoop.fs.s3a.path.style.access", "true") + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + + .getOrCreate() + +) + +print("Spark created OK") + + +# Prove S3A filesystem class is on the classpath +print("fs.s3a.impl =", spark.sparkContext._jsc.hadoopConfiguration().get("fs.s3a.impl")) + +# Force a real read/action from S3 +df = spark.read.json("s3a://camdoesdata/bronze/transactions_raw/") +print("About to show() ...") +df.limit(5).show(truncate=False) + +print("Done.") + + +# ---- READ TRANSACTIONS ---- +transactions_df = spark.read.json("s3a://camdoesdata/bronze/transactions_raw/") + +# ---- METHOD 1: 5 random transaction records ---- +random_txns = ( + transactions_df + .select( + F.col("transaction.account_id").alias("account_id"), + F.col("transaction.amount").alias("amount"), + F.col("transaction.merchant_name").alias("merchant"), + F.col("transaction.category").alias("category") + ) + .sample(fraction=0.1) + .limit(5) +) + +print("5 Random Transactions:") +random_txns.show(truncate=False) + +# ---- METHOD 2: All transactions for 5 random accounts ---- +# Get 5 random account IDs +random_account_ids = ( + transactions_df + .select(F.col("transaction.account_id").alias("account_id")) + .distinct() + .orderBy(F.rand()) # Random shuffle + .limit(5) +) + +# Get all their transactions +all_txns_for_random_accounts = ( + transactions_df + .select( + F.col("transaction.account_id").alias("account_id"), + F.col("transaction.amount").alias("amount"), + F.col("transaction.merchant_name").alias("merchant"), + F.col("transaction.transaction_type").alias("type"), + F.col("event_ts").alias("timestamp") + ) + .join(random_account_ids, on="account_id", how="inner") + .orderBy("account_id", "timestamp") +) + +print("\nAll Transactions for 5 Random Accounts:") +all_txns_for_random_accounts.show(50, truncate=False) + +# ---- METHOD 3: Summary per account (cleaner view) ---- +summary = ( + transactions_df + .select( + F.col("transaction.account_id").alias("account_id"), + F.col("transaction.amount").alias("amount") + ) + .join(random_account_ids, on="account_id", how="inner") + .groupBy("account_id") + .agg( + F.count("*").alias("num_transactions"), + F.collect_list("amount").alias("amounts"), + F.sum("amount").alias("total_amount"), + F.avg("amount").alias("avg_amount") + ) +) + +print("\nSummary of 5 Random Accounts:") +summary.show(truncate=False) + +spark.stop() +