updated scripts to use json data

This commit is contained in:
Cameron Seamons 2025-12-14 20:38:54 -07:00
parent a6cce866d3
commit fdcbe68f3a
4 changed files with 370 additions and 251 deletions

View file

@ -1,114 +1,132 @@
from faker import Faker from faker import Faker
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime from datetime import datetime, timezone
import os import os
import random import json
import pandas as pd
import boto3 import boto3
import io import random
from sqlalchemy import create_engine, text import uuid
# ---- Setup ---- # ---- Setup ----
fake = Faker() fake = Faker()
load_dotenv() load_dotenv()
# ---- Postgres setup ---- s3 = boto3.client(
user = os.getenv("PG_USER")
password = os.getenv("PG_PASSWORD")
host = os.getenv("PG_HOST")
port = "5432"
db = "postgres"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True)
# ---- S3 setup (backup only) ----
s3 = boto3.resource(
"s3", "s3",
endpoint_url=os.getenv("STORAGE_ENDPOINT"), endpoint_url=os.getenv("STORAGE_ENDPOINT"),
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"),
) )
bucket_name = os.getenv("STORAGE_BUCKET") bucket_name = os.getenv("STORAGE_BUCKET")
accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet"
# ---- Load customers from Postgres ---- # Bronze prefixes
with engine.connect() as conn: accounts_prefix = "bronze/accounts_raw/"
customers_df = pd.read_sql( cust_prefix = "bronze/customers_raw/"
sql=text("SELECT customer_id, home_branch_id, customer_since FROM customers;"), branches_prefix = "bronze/branches_raw/"
con=conn
)
customers_df["customer_since"] = pd.to_datetime(customers_df["customer_since"]).dt.date # ---- Helpers ----
def random_balance():
return round(random.uniform(-500, 30000), 2) # overdrafts allowed
# ---- Unique account ID generator ---- def random_account_types():
generated_ids = set()
def generate_account_id(branch_id):
while True:
branch_part = str(branch_id).zfill(3)
random_part = str(random.randint(10**8, 10**9 - 1))
acct_id = branch_part + random_part
if acct_id not in generated_ids:
generated_ids.add(acct_id)
return acct_id
def generate_account_number():
return str(random.randint(10**10, 10**11 - 1))
def assign_account_types():
roll = random.random() roll = random.random()
if roll < 0.50: if roll < 0.55:
return ["Checking"] return ["Checking"]
elif roll < 0.70: elif roll < 0.80:
return ["Savings"] return ["Savings"]
else: else:
return ["Checking", "Savings"] return ["Checking", "Savings"]
def balance_for_type(account_type): # ---- Load customer IDs from bronze customers ----
if account_type == "Checking": cust_ids = set()
return round(random.uniform(50, 7000), 2)
return round(random.uniform(200, 25000), 2)
# ---- Generate accounts ---- resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=cust_prefix)
accounts = [] for obj in resp.get("Contents", []):
for _, row in customers_df.iterrows(): body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
customer_id = row["customer_id"] for line in body.decode("utf-8").splitlines():
customer_since = row["customer_since"] record = json.loads(line)
home_branch_id = row["home_branch_id"] cust_ids.add(record["customer"]["customer_id"])
account_types = assign_account_types()
for acct_type in account_types: if not cust_ids:
accounts.append({ raise ValueError("No customer IDs found in bronze customers data")
"account_id": generate_account_id(home_branch_id),
"account_number": generate_account_number(),
"customer_id": customer_id,
"account_type": acct_type,
"open_date": fake.date_between(start_date=customer_since, end_date=datetime.today().date()),
"balance": balance_for_type(acct_type),
"branch_id": home_branch_id
})
accounts_df = pd.DataFrame(accounts) # ---- Load existing account customer IDs ----
customers_with_accounts = set()
# ---- Save to S3 backup ---- resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix)
buffer = io.BytesIO() for obj in resp.get("Contents", []):
accounts_df.to_parquet(buffer, index=False, engine="pyarrow") body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
s3.Bucket(bucket_name).put_object(Key=accounts_s3_key_parquet, Body=buffer.getvalue()) for line in body.decode("utf-8").splitlines():
print("Uploaded accounts.parquet to S3 (backup).") record = json.loads(line)
customers_with_accounts.add(record["customer"]["customer_id"])
# ---- Ensure accounts table exists and insert into Postgres ---- # ---- Load branch IDs ----
with engine.begin() as conn: branch_ids = []
conn.execute(text("""
CREATE TABLE IF NOT EXISTS accounts (
account_id VARCHAR(20) PRIMARY KEY,
account_number VARCHAR(20) UNIQUE,
customer_id BIGINT REFERENCES customers(customer_id),
account_type VARCHAR(50),
open_date DATE,
balance NUMERIC(12,2),
branch_id INT REFERENCES branches(branch_id)
);
"""))
# Pandas to_sql now uses the connection from SQLAlchemy 2.x resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix)
accounts_df.to_sql("accounts", conn, if_exists="append", index=False, method="multi") for obj in resp.get("Contents", []):
print(f"Inserted {len(accounts_df)} accounts into Postgres successfully!") body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
branch_ids.append(record["branch"]["branch_id"])
if not branch_ids:
raise ValueError("No branch IDs found in bronze branches data")
# ---- Determine eligible customers ----
eligible_customers = cust_ids - customers_with_accounts
# ---- Generate ONE account per eligible customer ----
events = []
for cust_id in eligible_customers:
event = {
"event_id": str(uuid.uuid4()),
"event_type": "account_opened",
"event_ts": datetime.now(timezone.utc).isoformat(),
"account": {
"account_id": str(uuid.uuid4()),
"account_number": str(random.randint(10**9, 10**11)),
"account_types": random_account_types(),
"open_date": fake.date_between(start_date="-30d", end_date="today").isoformat(),
"balance": random_balance(),
"currency": random.choice(["USD", "USD", "USD", "EUR"]),
"interest_rate": round(random.uniform(0.01, 4.5), 2),
"status": random.choice(["ACTIVE", "ACTIVE", "FROZEN", "CLOSED"]),
},
"customer": {
"customer_id": cust_id,
"segment": random.choice(["Retail", "SMB", "VIP"]),
},
"branch": {
"branch_id": random.choice(branch_ids),
"teller_id": random.randint(1000, 9999),
},
# intentional noise
"source_system": "account_generator_v1",
"batch_id": str(uuid.uuid4()),
"ingestion_ts": datetime.now(timezone.utc).isoformat(),
}
events.append(event)
# ---- Write JSONL batch ----
if events:
key = f"{accounts_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
body = "\n".join(json.dumps(e) for e in events)
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body.encode("utf-8"),
)
# ---- Logging (IMPORTANT) ----
print(f"Total customers found: {len(cust_ids)}")
print(f"Customers already with accounts: {len(customers_with_accounts)}")
print(f"New accounts created this run: {len(events)}")

View file

@ -1,18 +1,16 @@
from faker import Faker from faker import Faker
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
import pandas as pd import json
import boto3 import boto3
import io from datetime import datetime, timezone
from sqlalchemy import create_engine, text import uuid
from urllib.parse import quote_plus
# ---- Faker setup ---- # ---- Setup ----
fake = Faker() fake = Faker()
load_dotenv() load_dotenv()
# ---- S3 Setup ---- s3 = boto3.client(
s3 = boto3.resource(
's3', 's3',
endpoint_url=os.getenv('STORAGE_ENDPOINT'), endpoint_url=os.getenv('STORAGE_ENDPOINT'),
aws_access_key_id=os.getenv('STORAGE_ACCESS_KEY'), aws_access_key_id=os.getenv('STORAGE_ACCESS_KEY'),
@ -20,74 +18,47 @@ s3 = boto3.resource(
) )
bucket_name = os.getenv('STORAGE_BUCKET') bucket_name = os.getenv('STORAGE_BUCKET')
s3_key_csv = 'DataLab/branches/branches.csv'
s3_key_parquet = 'DataLab/branches/branches.parquet'
# ---- Postgres Setup ---- # Bronze landing zone (RAW)
user = os.getenv("PG_USER") branches_prefix = "bronze/branches_raw/"
password = os.getenv("PG_PASSWORD")
host = os.getenv("PG_HOST")
port = "5432"
db = "postgres"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") # ---- Generate branch events ----
events = []
# ---- Ensure local data folder exists ---- now_utc = datetime.now(timezone.utc)
os.makedirs("../Data", exist_ok=True)
# ---- Generate branch data ---- for _ in range(3):
branches = [] event = {
for i in range(1, 11): # 10 branches "event_id": str(uuid.uuid4()),
branches.append({ "event_type": "branch_created",
"branch_id": str(i), # store as string for consistency "event_ts": now_utc.isoformat(),
"branch_name": f"{fake.city()} Branch", "branch": {
"address": fake.street_address(), "branch_id": str(uuid.uuid4()),
"city": fake.city(), "branch_name": f"{fake.city()} Branch",
"state": fake.state_abbr() "address": fake.street_address(),
}) "city": fake.city(),
"state": fake.state_abbr(),
"open_date": fake.date_between(start_date="-30d", end_date="today").isoformat(), # New in the last 30 days
"employee_count": fake.random_int(min=5, max=50),
"branch_manager": fake.name(),
"phone_number": fake.phone_number(),
"timezone": fake.timezone()
},
"source_system": "branch_generator",
"ingestion_ts": now_utc.isoformat()
}
df = pd.DataFrame(branches) events.append(event)
# ---- Save locally as CSV ---- # ---- Write events as JSON lines ----
local_file = "../Data/branches.csv" key = f"{branches_prefix}batch_{now_utc.strftime('%Y%m%d_%H%M%S')}.json"
df.to_csv(local_file, index=False)
print("Generated 10 branches locally.")
# ---- Upload CSV to S3 ---- body = "\n".join(json.dumps(e) for e in events)
s3.Bucket(bucket_name).upload_file(local_file, s3_key_csv)
print(f"Uploaded branches.csv to s3://{bucket_name}/{s3_key_csv}")
# ---- Upload / append to S3 as Parquet ---- s3.put_object(
try: Bucket=bucket_name,
obj = s3.Bucket(bucket_name).Object(s3_key_parquet).get() Key=key,
existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) Body=body.encode("utf-8")
combined_df = pd.concat([existing_df, df], ignore_index=True) )
print(f"Appended {len(df)} branches to existing Parquet on S3.")
except s3.meta.client.exceptions.NoSuchKey:
combined_df = df
print("No existing branches Parquet on S3, creating new one.")
parquet_buffer = io.BytesIO() print(f"Wrote {len(events)} raw branch events to s3://{bucket_name}/{key}")
combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow")
s3.Bucket(bucket_name).put_object(Key=s3_key_parquet, Body=parquet_buffer.getvalue())
print(f"Uploaded branches.parquet to s3://{bucket_name}/{s3_key_parquet}")
# ---- Create / Append to Postgres ----
with engine.connect() as conn:
for _, row in df.iterrows():
stmt = text("""
INSERT INTO branches (branch_id, branch_name, address, city, state)
VALUES (:branch_id, :branch_name, :address, :city, :state)
ON CONFLICT (branch_id) DO NOTHING
""")
conn.execute(stmt, {
"branch_id": str(row["branch_id"]),
"branch_name": row["branch_name"],
"address": row["address"],
"city": row["city"],
"state": row["state"]
})
conn.commit()
# Optional: row count check
result = conn.execute(text("SELECT COUNT(*) FROM branches;"))
print(f"Rows in branches table: {result.scalar()}")

View file

@ -1,125 +1,115 @@
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
from faker import Faker from faker import Faker
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
import io import json
import pandas as pd
import boto3 import boto3
import uuid
import random import random
from datetime import datetime from datetime import datetime, timezone
# ---- Load env ---- # ---- Setup ----
load_dotenv()
fake = Faker() fake = Faker()
load_dotenv()
# ---- Postgres setup ---- s3 = boto3.client(
user = os.getenv("PG_USER")
password = quote_plus(os.getenv("PG_PASSWORD"))
host = os.getenv("PG_HOST")
port = "5432"
db = "postgres"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")
# ---- Hetzner S3 setup ---- (backup only) ----
s3 = boto3.resource(
"s3", "s3",
endpoint_url=os.getenv("STORAGE_ENDPOINT"), endpoint_url=os.getenv("STORAGE_ENDPOINT"),
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY")
) )
bucket_name = os.getenv("STORAGE_BUCKET") bucket_name = os.getenv("STORAGE_BUCKET")
branches_s3_key = "DataLab/branches/branches.csv"
customers_s3_key = "DataLab/customers/customers.parquet"
# ---- Load branches from S3 (still needed for customer assignment) ---- # Bronze landing zone
branches_local = "../Data/branches.csv" cust_prefix = "bronze/customers_raw/"
s3.Bucket(bucket_name).download_file(branches_s3_key, branches_local) branches_prefix = "bronze/branches_raw/"
branches = pd.read_csv(branches_local)
# ---- Load existing customers from Postgres for email uniqueness ---- # ---- Helper generators (intentionally imperfect) ----
with engine.connect() as conn: def random_credit_score():
table_exists = conn.execute( return random.randint(250, 900) # invalid values on purpose
text("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='customers');")
).scalar()
if table_exists: def random_income():
existing_customers = pd.read_sql( return random.choice([
text("SELECT email FROM customers;"), random.randint(15000, 30000),
con=conn random.randint(30000, 80000),
) random.randint(80000, 200000),
existing_emails = set(existing_customers["email"]) if not existing_customers.empty else set() None
else: ])
existing_emails = set()
def random_employment():
return random.choice([
"Employed",
"Self-Employed",
"Unemployed",
"Student",
"Retired",
"Unknown",
None
])
# ---- Load branch IDs from bronze ----
branch_ids = []
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix)
for obj in response.get("Contents", []):
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
branch_ids.append(record["branch"]["branch_id"])
if not branch_ids:
raise ValueError("No branch IDs found in bronze branches data")
# ---- Helper functions ---- # ---- Generate customer events ----
def realistic_credit_score(): events = []
return max(300, min(int(random.gauss(680, 60)), 850))
def realistic_income(): for _ in range(150):
brackets = [(20000,40000),(40000,70000),(70000,120000),(120000,200000)] dob = fake.date_between(start_date="-90y", end_date="-16y")
low, high = random.choice(brackets)
return random.randint(low, high)
def realistic_employment(): event = {
return random.choices( "event_id": str(uuid.uuid4()),
["Employed","Self-Employed","Unemployed","Student","Retired"], "event_type": random.choice(["customer_created", "customer_updated"]),
weights=[50,15,10,15,10] "event_ts": datetime.now(timezone.utc).isoformat(),
)[0]
def realistic_contact(): "customer": {
return random.choice(["Email","Phone","SMS"]) "customer_id": random.getrandbits(48),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"email": fake.email(), # duplicates allowed
"phone": fake.phone_number(),
"date_of_birth": dob.isoformat(),
"gender": random.choice(["Male", "Female", "Other", None]),
"married": random.choice([True, False, "Unknown"]),
"employment_status": random_employment(),
"annual_income": random_income(),
"credit_score": random_credit_score(),
"home_branch_id": random.choice(branch_ids),
"customer_since": fake.date_between(start_date="-30d", end_date="today").isoformat(), # New in the last 30 days
"preferred_contact_method": random.choice(
["Email", "Phone", "SMS", "Mail", None]
),
# extra junk fields
"browser": fake.user_agent(),
"ip_address": fake.ipv4_public(),
"marketing_opt_in": random.choice([True, False, None])
},
def generate_customer_id(): "source_system": "customer_generator",
return random.getrandbits(48) "ingestion_ts": datetime.now(timezone.utc).isoformat()
}
# ---- Generate Customers ---- events.append(event)
customers = []
for _ in range(50):
first = fake.first_name()
last = fake.last_name()
email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}"
while email in existing_emails: # ---- Write JSON lines to S3 ----
first = fake.first_name() key = f"{cust_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
last = fake.last_name()
email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}"
existing_emails.add(email)
dob = fake.date_between(start_date="-80y", end_date="-18y") body = "\n".join(json.dumps(e) for e in events)
age = (datetime.now().date() - dob).days // 365
income = realistic_income()
credit = realistic_credit_score()
customers.append({ s3.put_object(
"customer_id": generate_customer_id(), Bucket=bucket_name,
"full_name": f"{first} {last}", Key=key,
"email": email, Body=body.encode("utf-8")
"phone": fake.phone_number(), )
"date_of_birth": dob,
"age": age,
"gender": random.choice(["Male","Female","Other"]),
"street_address": fake.street_address(),
"city": fake.city(),
"state": fake.state_abbr(),
"zip_code": fake.zipcode(),
"home_branch_id": random.choice(branches["branch_id"]),
"customer_since": fake.date_between(start_date="-10y", end_date="today"),
"employment_status": realistic_employment(),
"annual_income": income,
"credit_score": credit,
"preferred_contact_method": realistic_contact()
})
df = pd.DataFrame(customers) print(f"Wrote {len(events)} raw customer events to s3://{bucket_name}/{key}")
# ---- Save to S3 backup ----
buffer = io.BytesIO()
df.to_parquet(buffer, index=False, engine="pyarrow")
s3.Bucket(bucket_name).put_object(Key=customers_s3_key, Body=buffer.getvalue())
print("Uploaded customers.parquet to S3 (backup).")
# ---- Insert into Postgres ----
df.to_sql("customers", engine, if_exists="append", index=False, method="multi")
print("Inserted customers into Postgres successfully!")

140
Scripts/employees.py Normal file
View file

@ -0,0 +1,140 @@
from faker import Faker
from dotenv import load_dotenv
import os
import json
import boto3
from datetime import datetime, timezone
import uuid
import random
# ---- Setup ----
fake = Faker()
load_dotenv()
s3 = boto3.client(
"s3",
endpoint_url=os.getenv("STORAGE_ENDPOINT"),
aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"),
aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"),
)
bucket_name = os.getenv("STORAGE_BUCKET")
branches_prefix = "bronze/branches_raw/"
employees_prefix = "bronze/employees_raw/"
# ------------------------------------------------
# Load branch IDs
# ------------------------------------------------
branch_ids = []
resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix)
for obj in resp.get("Contents", []):
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
branch_ids.append(record["branch"]["branch_id"])
if not branch_ids:
raise ValueError("No branch IDs found")
# ------------------------------------------------
# Load existing employees from bronze
# ------------------------------------------------
existing_employee_ids = []
resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=employees_prefix)
for obj in resp.get("Contents", []):
body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read()
for line in body.decode("utf-8").splitlines():
record = json.loads(line)
if "employee" in record:
existing_employee_ids.append(record["employee"]["employee_id"])
existing_employee_ids = list(set(existing_employee_ids))
# ------------------------------------------------
# Event generation config
# ------------------------------------------------
NEW_EMPLOYEES = 60
TERMINATIONS = min(len(existing_employee_ids), random.randint(10, 30))
events = []
# ------------------------------------------------
# Create new employees
# ------------------------------------------------
for _ in range(NEW_EMPLOYEES):
birth_date = fake.date_between(start_date="-65y", end_date="-18y")
event = {
"event_id": str(uuid.uuid4()),
"event_type": "employee_created",
"event_ts": datetime.now(timezone.utc).isoformat(),
"employee": {
"employee_id": str(uuid.uuid4()),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"birth_date": birth_date.isoformat(),
"email": fake.email(),
"phone_number": fake.phone_number(),
"married": random.choice([True, False, None]),
"job_title": fake.job(),
"salary": random.randint(35000, 140000),
"work_satisfaction": random.randint(1, 5),
"hire_date": fake.date_between(start_date="-30d", end_date="today").isoformat(),
"employment_type": random.choice(["full_time", "part_time", "contract"]),
"remote": fake.boolean(),
"branch_id": random.choice(branch_ids)
},
"source_system": "employee_generator",
"ingestion_ts": datetime.now(timezone.utc).isoformat()
}
events.append(event)
# ------------------------------------------------
# Terminate existing employees
# ------------------------------------------------
terminated_ids = random.sample(existing_employee_ids, TERMINATIONS)
for emp_id in terminated_ids:
event = {
"event_id": str(uuid.uuid4()),
"event_type": "employee_terminated",
"event_ts": datetime.now(timezone.utc).isoformat(),
"employee": {
"employee_id": emp_id,
"termination_reason": random.choice(
["Resigned", "Laid Off", "Retired", "Fired"]
)
},
"source_system": "employee_generator",
"ingestion_ts": datetime.now(timezone.utc).isoformat()
}
events.append(event)
# ------------------------------------------------
# Write to S3 (JSONL)
# ------------------------------------------------
key = f"{employees_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json"
body = "\n".join(json.dumps(e) for e in events)
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=body.encode("utf-8")
)
# ------------------------------------------------
# Stats output
# ------------------------------------------------
print(f"Existing employees found: {len(existing_employee_ids)}")
print(f"New employees created: {NEW_EMPLOYEES}")
print(f"Employees terminated this run: {len(terminated_ids)}")
print(f"{len(events)} events written to s3://{bucket_name}/{key}")