From fdcbe68f3aeeeccb6ebfed46a98c1206b31747d2 Mon Sep 17 00:00:00 2001 From: Cameron Seamons Date: Sun, 14 Dec 2025 20:38:54 -0700 Subject: [PATCH] updated scripts to use json data --- Scripts/accounts.py | 188 ++++++++++++++++++++++++------------------- Scripts/branches.py | 109 +++++++++---------------- Scripts/customers.py | 184 ++++++++++++++++++++---------------------- Scripts/employees.py | 140 ++++++++++++++++++++++++++++++++ 4 files changed, 370 insertions(+), 251 deletions(-) create mode 100644 Scripts/employees.py diff --git a/Scripts/accounts.py b/Scripts/accounts.py index 4e70fa0..0643396 100644 --- a/Scripts/accounts.py +++ b/Scripts/accounts.py @@ -1,114 +1,132 @@ from faker import Faker from dotenv import load_dotenv -from datetime import datetime +from datetime import datetime, timezone import os -import random -import pandas as pd +import json import boto3 -import io -from sqlalchemy import create_engine, text +import random +import uuid # ---- Setup ---- fake = Faker() load_dotenv() -# ---- Postgres setup ---- -user = os.getenv("PG_USER") -password = os.getenv("PG_PASSWORD") -host = os.getenv("PG_HOST") -port = "5432" -db = "postgres" - -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}", future=True) - -# ---- S3 setup (backup only) ---- -s3 = boto3.resource( +s3 = boto3.client( "s3", endpoint_url=os.getenv("STORAGE_ENDPOINT"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), - aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"), ) + bucket_name = os.getenv("STORAGE_BUCKET") -accounts_s3_key_parquet = "DataLab/accounts/accounts.parquet" -# ---- Load customers from Postgres ---- -with engine.connect() as conn: - customers_df = pd.read_sql( - sql=text("SELECT customer_id, home_branch_id, customer_since FROM customers;"), - con=conn - ) +# Bronze prefixes +accounts_prefix = "bronze/accounts_raw/" +cust_prefix = "bronze/customers_raw/" +branches_prefix = "bronze/branches_raw/" -customers_df["customer_since"] = pd.to_datetime(customers_df["customer_since"]).dt.date +# ---- Helpers ---- +def random_balance(): + return round(random.uniform(-500, 30000), 2) # overdrafts allowed -# ---- Unique account ID generator ---- -generated_ids = set() -def generate_account_id(branch_id): - while True: - branch_part = str(branch_id).zfill(3) - random_part = str(random.randint(10**8, 10**9 - 1)) - acct_id = branch_part + random_part - if acct_id not in generated_ids: - generated_ids.add(acct_id) - return acct_id - -def generate_account_number(): - return str(random.randint(10**10, 10**11 - 1)) - -def assign_account_types(): +def random_account_types(): roll = random.random() - if roll < 0.50: + if roll < 0.55: return ["Checking"] - elif roll < 0.70: + elif roll < 0.80: return ["Savings"] else: return ["Checking", "Savings"] -def balance_for_type(account_type): - if account_type == "Checking": - return round(random.uniform(50, 7000), 2) - return round(random.uniform(200, 25000), 2) +# ---- Load customer IDs from bronze customers ---- +cust_ids = set() -# ---- Generate accounts ---- -accounts = [] -for _, row in customers_df.iterrows(): - customer_id = row["customer_id"] - customer_since = row["customer_since"] - home_branch_id = row["home_branch_id"] - account_types = assign_account_types() +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=cust_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + cust_ids.add(record["customer"]["customer_id"]) - for acct_type in account_types: - accounts.append({ - "account_id": generate_account_id(home_branch_id), - "account_number": generate_account_number(), - "customer_id": customer_id, - "account_type": acct_type, - "open_date": fake.date_between(start_date=customer_since, end_date=datetime.today().date()), - "balance": balance_for_type(acct_type), - "branch_id": home_branch_id - }) +if not cust_ids: + raise ValueError("No customer IDs found in bronze customers data") -accounts_df = pd.DataFrame(accounts) +# ---- Load existing account customer IDs ---- +customers_with_accounts = set() -# ---- Save to S3 backup ---- -buffer = io.BytesIO() -accounts_df.to_parquet(buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=accounts_s3_key_parquet, Body=buffer.getvalue()) -print("Uploaded accounts.parquet to S3 (backup).") +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=accounts_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + customers_with_accounts.add(record["customer"]["customer_id"]) -# ---- Ensure accounts table exists and insert into Postgres ---- -with engine.begin() as conn: - conn.execute(text(""" - CREATE TABLE IF NOT EXISTS accounts ( - account_id VARCHAR(20) PRIMARY KEY, - account_number VARCHAR(20) UNIQUE, - customer_id BIGINT REFERENCES customers(customer_id), - account_type VARCHAR(50), - open_date DATE, - balance NUMERIC(12,2), - branch_id INT REFERENCES branches(branch_id) - ); - """)) +# ---- Load branch IDs ---- +branch_ids = [] - # Pandas to_sql now uses the connection from SQLAlchemy 2.x - accounts_df.to_sql("accounts", conn, if_exists="append", index=False, method="multi") - print(f"Inserted {len(accounts_df)} accounts into Postgres successfully!") +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + branch_ids.append(record["branch"]["branch_id"]) + +if not branch_ids: + raise ValueError("No branch IDs found in bronze branches data") + +# ---- Determine eligible customers ---- +eligible_customers = cust_ids - customers_with_accounts + +# ---- Generate ONE account per eligible customer ---- +events = [] + +for cust_id in eligible_customers: + event = { + "event_id": str(uuid.uuid4()), + "event_type": "account_opened", + "event_ts": datetime.now(timezone.utc).isoformat(), + + "account": { + "account_id": str(uuid.uuid4()), + "account_number": str(random.randint(10**9, 10**11)), + "account_types": random_account_types(), + "open_date": fake.date_between(start_date="-30d", end_date="today").isoformat(), + "balance": random_balance(), + "currency": random.choice(["USD", "USD", "USD", "EUR"]), + "interest_rate": round(random.uniform(0.01, 4.5), 2), + "status": random.choice(["ACTIVE", "ACTIVE", "FROZEN", "CLOSED"]), + }, + + "customer": { + "customer_id": cust_id, + "segment": random.choice(["Retail", "SMB", "VIP"]), + }, + + "branch": { + "branch_id": random.choice(branch_ids), + "teller_id": random.randint(1000, 9999), + }, + + # intentional noise + "source_system": "account_generator_v1", + "batch_id": str(uuid.uuid4()), + "ingestion_ts": datetime.now(timezone.utc).isoformat(), + } + + events.append(event) + +# ---- Write JSONL batch ---- +if events: + key = f"{accounts_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" + body = "\n".join(json.dumps(e) for e in events) + + s3.put_object( + Bucket=bucket_name, + Key=key, + Body=body.encode("utf-8"), + ) + +# ---- Logging (IMPORTANT) ---- +print(f"Total customers found: {len(cust_ids)}") +print(f"Customers already with accounts: {len(customers_with_accounts)}") +print(f"New accounts created this run: {len(events)}") diff --git a/Scripts/branches.py b/Scripts/branches.py index 33d77cc..69411a6 100644 --- a/Scripts/branches.py +++ b/Scripts/branches.py @@ -1,18 +1,16 @@ from faker import Faker from dotenv import load_dotenv import os -import pandas as pd +import json import boto3 -import io -from sqlalchemy import create_engine, text -from urllib.parse import quote_plus +from datetime import datetime, timezone +import uuid -# ---- Faker setup ---- +# ---- Setup ---- fake = Faker() load_dotenv() -# ---- S3 Setup ---- -s3 = boto3.resource( +s3 = boto3.client( 's3', endpoint_url=os.getenv('STORAGE_ENDPOINT'), aws_access_key_id=os.getenv('STORAGE_ACCESS_KEY'), @@ -20,74 +18,47 @@ s3 = boto3.resource( ) bucket_name = os.getenv('STORAGE_BUCKET') -s3_key_csv = 'DataLab/branches/branches.csv' -s3_key_parquet = 'DataLab/branches/branches.parquet' -# ---- Postgres Setup ---- -user = os.getenv("PG_USER") -password = os.getenv("PG_PASSWORD") -host = os.getenv("PG_HOST") -port = "5432" -db = "postgres" +# Bronze landing zone (RAW) +branches_prefix = "bronze/branches_raw/" -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") +# ---- Generate branch events ---- +events = [] -# ---- Ensure local data folder exists ---- -os.makedirs("../Data", exist_ok=True) +now_utc = datetime.now(timezone.utc) -# ---- Generate branch data ---- -branches = [] -for i in range(1, 11): # 10 branches - branches.append({ - "branch_id": str(i), # store as string for consistency - "branch_name": f"{fake.city()} Branch", - "address": fake.street_address(), - "city": fake.city(), - "state": fake.state_abbr() - }) +for _ in range(3): + event = { + "event_id": str(uuid.uuid4()), + "event_type": "branch_created", + "event_ts": now_utc.isoformat(), + "branch": { + "branch_id": str(uuid.uuid4()), + "branch_name": f"{fake.city()} Branch", + "address": fake.street_address(), + "city": fake.city(), + "state": fake.state_abbr(), + "open_date": fake.date_between(start_date="-30d", end_date="today").isoformat(), # New in the last 30 days + "employee_count": fake.random_int(min=5, max=50), + "branch_manager": fake.name(), + "phone_number": fake.phone_number(), + "timezone": fake.timezone() + }, + "source_system": "branch_generator", + "ingestion_ts": now_utc.isoformat() + } -df = pd.DataFrame(branches) + events.append(event) -# ---- Save locally as CSV ---- -local_file = "../Data/branches.csv" -df.to_csv(local_file, index=False) -print("Generated 10 branches locally.") +# ---- Write events as JSON lines ---- +key = f"{branches_prefix}batch_{now_utc.strftime('%Y%m%d_%H%M%S')}.json" -# ---- Upload CSV to S3 ---- -s3.Bucket(bucket_name).upload_file(local_file, s3_key_csv) -print(f"Uploaded branches.csv to s3://{bucket_name}/{s3_key_csv}") +body = "\n".join(json.dumps(e) for e in events) -# ---- Upload / append to S3 as Parquet ---- -try: - obj = s3.Bucket(bucket_name).Object(s3_key_parquet).get() - existing_df = pd.read_parquet(io.BytesIO(obj['Body'].read())) - combined_df = pd.concat([existing_df, df], ignore_index=True) - print(f"Appended {len(df)} branches to existing Parquet on S3.") -except s3.meta.client.exceptions.NoSuchKey: - combined_df = df - print("No existing branches Parquet on S3, creating new one.") +s3.put_object( + Bucket=bucket_name, + Key=key, + Body=body.encode("utf-8") +) -parquet_buffer = io.BytesIO() -combined_df.to_parquet(parquet_buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=s3_key_parquet, Body=parquet_buffer.getvalue()) -print(f"Uploaded branches.parquet to s3://{bucket_name}/{s3_key_parquet}") - -# ---- Create / Append to Postgres ---- -with engine.connect() as conn: - for _, row in df.iterrows(): - stmt = text(""" - INSERT INTO branches (branch_id, branch_name, address, city, state) - VALUES (:branch_id, :branch_name, :address, :city, :state) - ON CONFLICT (branch_id) DO NOTHING - """) - conn.execute(stmt, { - "branch_id": str(row["branch_id"]), - "branch_name": row["branch_name"], - "address": row["address"], - "city": row["city"], - "state": row["state"] - }) - conn.commit() - # Optional: row count check - result = conn.execute(text("SELECT COUNT(*) FROM branches;")) - print(f"Rows in branches table: {result.scalar()}") +print(f"Wrote {len(events)} raw branch events to s3://{bucket_name}/{key}") diff --git a/Scripts/customers.py b/Scripts/customers.py index 3513dc1..c8d4dab 100644 --- a/Scripts/customers.py +++ b/Scripts/customers.py @@ -1,125 +1,115 @@ -from sqlalchemy import create_engine, text -from urllib.parse import quote_plus from faker import Faker from dotenv import load_dotenv import os -import io -import pandas as pd +import json import boto3 +import uuid import random -from datetime import datetime +from datetime import datetime, timezone -# ---- Load env ---- -load_dotenv() +# ---- Setup ---- fake = Faker() +load_dotenv() -# ---- Postgres setup ---- -user = os.getenv("PG_USER") -password = quote_plus(os.getenv("PG_PASSWORD")) -host = os.getenv("PG_HOST") -port = "5432" -db = "postgres" -engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}") - -# ---- Hetzner S3 setup ---- (backup only) ---- -s3 = boto3.resource( +s3 = boto3.client( "s3", endpoint_url=os.getenv("STORAGE_ENDPOINT"), aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY") ) + bucket_name = os.getenv("STORAGE_BUCKET") -branches_s3_key = "DataLab/branches/branches.csv" -customers_s3_key = "DataLab/customers/customers.parquet" -# ---- Load branches from S3 (still needed for customer assignment) ---- -branches_local = "../Data/branches.csv" -s3.Bucket(bucket_name).download_file(branches_s3_key, branches_local) -branches = pd.read_csv(branches_local) +# Bronze landing zone +cust_prefix = "bronze/customers_raw/" +branches_prefix = "bronze/branches_raw/" -# ---- Load existing customers from Postgres for email uniqueness ---- -with engine.connect() as conn: - table_exists = conn.execute( - text("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='customers');") - ).scalar() +# ---- Helper generators (intentionally imperfect) ---- +def random_credit_score(): + return random.randint(250, 900) # invalid values on purpose - if table_exists: - existing_customers = pd.read_sql( - text("SELECT email FROM customers;"), - con=conn - ) - existing_emails = set(existing_customers["email"]) if not existing_customers.empty else set() - else: - existing_emails = set() +def random_income(): + return random.choice([ + random.randint(15000, 30000), + random.randint(30000, 80000), + random.randint(80000, 200000), + None + ]) + +def random_employment(): + return random.choice([ + "Employed", + "Self-Employed", + "Unemployed", + "Student", + "Retired", + "Unknown", + None + ]) + +# ---- Load branch IDs from bronze ---- +branch_ids = [] + +response = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix) +for obj in response.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + branch_ids.append(record["branch"]["branch_id"]) + +if not branch_ids: + raise ValueError("No branch IDs found in bronze branches data") -# ---- Helper functions ---- -def realistic_credit_score(): - return max(300, min(int(random.gauss(680, 60)), 850)) +# ---- Generate customer events ---- +events = [] -def realistic_income(): - brackets = [(20000,40000),(40000,70000),(70000,120000),(120000,200000)] - low, high = random.choice(brackets) - return random.randint(low, high) +for _ in range(150): + dob = fake.date_between(start_date="-90y", end_date="-16y") -def realistic_employment(): - return random.choices( - ["Employed","Self-Employed","Unemployed","Student","Retired"], - weights=[50,15,10,15,10] - )[0] + event = { + "event_id": str(uuid.uuid4()), + "event_type": random.choice(["customer_created", "customer_updated"]), + "event_ts": datetime.now(timezone.utc).isoformat(), -def realistic_contact(): - return random.choice(["Email","Phone","SMS"]) + "customer": { + "customer_id": random.getrandbits(48), + "first_name": fake.first_name(), + "last_name": fake.last_name(), + "email": fake.email(), # duplicates allowed + "phone": fake.phone_number(), + "date_of_birth": dob.isoformat(), + "gender": random.choice(["Male", "Female", "Other", None]), + "married": random.choice([True, False, "Unknown"]), + "employment_status": random_employment(), + "annual_income": random_income(), + "credit_score": random_credit_score(), + "home_branch_id": random.choice(branch_ids), + "customer_since": fake.date_between(start_date="-30d", end_date="today").isoformat(), # New in the last 30 days + "preferred_contact_method": random.choice( + ["Email", "Phone", "SMS", "Mail", None] + ), + # extra junk fields + "browser": fake.user_agent(), + "ip_address": fake.ipv4_public(), + "marketing_opt_in": random.choice([True, False, None]) + }, -def generate_customer_id(): - return random.getrandbits(48) + "source_system": "customer_generator", + "ingestion_ts": datetime.now(timezone.utc).isoformat() + } -# ---- Generate Customers ---- -customers = [] -for _ in range(50): - first = fake.first_name() - last = fake.last_name() - email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}" + events.append(event) - while email in existing_emails: - first = fake.first_name() - last = fake.last_name() - email = f"{first.lower()}.{last.lower()}@{fake.free_email_domain()}" - existing_emails.add(email) +# ---- Write JSON lines to S3 ---- +key = f"{cust_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" - dob = fake.date_between(start_date="-80y", end_date="-18y") - age = (datetime.now().date() - dob).days // 365 - income = realistic_income() - credit = realistic_credit_score() +body = "\n".join(json.dumps(e) for e in events) - customers.append({ - "customer_id": generate_customer_id(), - "full_name": f"{first} {last}", - "email": email, - "phone": fake.phone_number(), - "date_of_birth": dob, - "age": age, - "gender": random.choice(["Male","Female","Other"]), - "street_address": fake.street_address(), - "city": fake.city(), - "state": fake.state_abbr(), - "zip_code": fake.zipcode(), - "home_branch_id": random.choice(branches["branch_id"]), - "customer_since": fake.date_between(start_date="-10y", end_date="today"), - "employment_status": realistic_employment(), - "annual_income": income, - "credit_score": credit, - "preferred_contact_method": realistic_contact() - }) +s3.put_object( + Bucket=bucket_name, + Key=key, + Body=body.encode("utf-8") +) -df = pd.DataFrame(customers) - -# ---- Save to S3 backup ---- -buffer = io.BytesIO() -df.to_parquet(buffer, index=False, engine="pyarrow") -s3.Bucket(bucket_name).put_object(Key=customers_s3_key, Body=buffer.getvalue()) -print("Uploaded customers.parquet to S3 (backup).") - -# ---- Insert into Postgres ---- -df.to_sql("customers", engine, if_exists="append", index=False, method="multi") -print("Inserted customers into Postgres successfully!") +print(f"Wrote {len(events)} raw customer events to s3://{bucket_name}/{key}") diff --git a/Scripts/employees.py b/Scripts/employees.py new file mode 100644 index 0000000..789843a --- /dev/null +++ b/Scripts/employees.py @@ -0,0 +1,140 @@ +from faker import Faker +from dotenv import load_dotenv +import os +import json +import boto3 +from datetime import datetime, timezone +import uuid +import random + +# ---- Setup ---- +fake = Faker() +load_dotenv() + +s3 = boto3.client( + "s3", + endpoint_url=os.getenv("STORAGE_ENDPOINT"), + aws_access_key_id=os.getenv("STORAGE_ACCESS_KEY"), + aws_secret_access_key=os.getenv("STORAGE_SECRET_KEY"), +) + +bucket_name = os.getenv("STORAGE_BUCKET") + +branches_prefix = "bronze/branches_raw/" +employees_prefix = "bronze/employees_raw/" + +# ------------------------------------------------ +# Load branch IDs +# ------------------------------------------------ +branch_ids = [] + +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=branches_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + branch_ids.append(record["branch"]["branch_id"]) + +if not branch_ids: + raise ValueError("No branch IDs found") + +# ------------------------------------------------ +# Load existing employees from bronze +# ------------------------------------------------ +existing_employee_ids = [] + +resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=employees_prefix) +for obj in resp.get("Contents", []): + body = s3.get_object(Bucket=bucket_name, Key=obj["Key"])["Body"].read() + for line in body.decode("utf-8").splitlines(): + record = json.loads(line) + if "employee" in record: + existing_employee_ids.append(record["employee"]["employee_id"]) + +existing_employee_ids = list(set(existing_employee_ids)) + +# ------------------------------------------------ +# Event generation config +# ------------------------------------------------ +NEW_EMPLOYEES = 60 +TERMINATIONS = min(len(existing_employee_ids), random.randint(10, 30)) + +events = [] + +# ------------------------------------------------ +# Create new employees +# ------------------------------------------------ +for _ in range(NEW_EMPLOYEES): + birth_date = fake.date_between(start_date="-65y", end_date="-18y") + + event = { + "event_id": str(uuid.uuid4()), + "event_type": "employee_created", + "event_ts": datetime.now(timezone.utc).isoformat(), + + "employee": { + "employee_id": str(uuid.uuid4()), + "first_name": fake.first_name(), + "last_name": fake.last_name(), + "birth_date": birth_date.isoformat(), + "email": fake.email(), + "phone_number": fake.phone_number(), + "married": random.choice([True, False, None]), + "job_title": fake.job(), + "salary": random.randint(35000, 140000), + "work_satisfaction": random.randint(1, 5), + "hire_date": fake.date_between(start_date="-30d", end_date="today").isoformat(), + "employment_type": random.choice(["full_time", "part_time", "contract"]), + "remote": fake.boolean(), + "branch_id": random.choice(branch_ids) + }, + + "source_system": "employee_generator", + "ingestion_ts": datetime.now(timezone.utc).isoformat() + } + + events.append(event) + +# ------------------------------------------------ +# Terminate existing employees +# ------------------------------------------------ +terminated_ids = random.sample(existing_employee_ids, TERMINATIONS) + +for emp_id in terminated_ids: + event = { + "event_id": str(uuid.uuid4()), + "event_type": "employee_terminated", + "event_ts": datetime.now(timezone.utc).isoformat(), + + "employee": { + "employee_id": emp_id, + "termination_reason": random.choice( + ["Resigned", "Laid Off", "Retired", "Fired"] + ) + }, + + "source_system": "employee_generator", + "ingestion_ts": datetime.now(timezone.utc).isoformat() + } + + events.append(event) + +# ------------------------------------------------ +# Write to S3 (JSONL) +# ------------------------------------------------ +key = f"{employees_prefix}batch_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" +body = "\n".join(json.dumps(e) for e in events) + +s3.put_object( + Bucket=bucket_name, + Key=key, + Body=body.encode("utf-8") +) + +# ------------------------------------------------ +# Stats output +# ------------------------------------------------ +print(f"Existing employees found: {len(existing_employee_ids)}") +print(f"New employees created: {NEW_EMPLOYEES}") +print(f"Employees terminated this run: {len(terminated_ids)}") +print(f"{len(events)} events written to s3://{bucket_name}/{key}")