A streaming data pipeline that captures live Wikipedia edits from Wikimedia EventStreams, processes them through a medallion architecture, and surfaces insights for content monitoring and risk detection.
Tech Stack: Amazon MSK (KRaft 3.9.x) · EMR Serverless (Spark 3.5) · AWS Step Functions · Amazon S3 Tables (Iceberg 1.10.0) · ECS Fargate · AWS Deequ
Deployment: AWS Cloud (us-east-1) · Terraform 1.6+
📐 For detailed architecture diagrams and component breakdown, see docs/ARCHITECTURE.md
- Business Problem
- Solution Overview
- Architecture
- Data Model
- Technology Choices
- Cost Estimation
- Quick Start
- Project Structure
Wikimedia operates one of the largest collaborative platforms globally:
| Metric | Value |
|---|---|
| Wikipedia editions | 300+ languages |
| Edits per minute | ~500-700 (peak: 1,500+) |
| Monthly active editors | 280,000+ |
| Bot edits | ~20-30% of total |
Wikimedia EventStreams provides real-time Server-Sent Events (SSE) of all edits across all Wikimedia projects.
| Use Case | Description | Value |
|---|---|---|
| Content Monitoring | Track edit velocity, top contributors, trending pages | Understand community activity |
| Vandalism Detection | Identify suspicious edit patterns (rapid edits, large deletions) | Protect content quality |
| Regional Analysis | Compare activity across language editions | Geographic insights |
| Bot Activity Tracking | Monitor automated vs human contributions | Community health |
We filter events from high-activity Wikipedia editions:
HIGH_ACTIVITY: en, de, ja, fr, zh, es, ru
REGIONAL: asia_pacific, europe, americas, middle_east
WikiStream implements the Medallion Architecture (Bronze → Silver → Gold) for progressive data refinement using Apache Iceberg format on S3 Tables:
flowchart LR
subgraph Sources["📡 Source"]
WM["Wikipedia<br/>EventStreams"]
end
subgraph Ingestion["📥 Ingestion"]
ECS["ECS Fargate<br/>Producer"]
MSK["Amazon MSK<br/>Kafka 3.9.x"]
end
subgraph Processing["⚡ EMR Serverless"]
BRONZE["🥉 Bronze<br/>Streaming"]
BDQ["🔍 Bronze DQ"]
SILVER["🥈 Silver<br/>Batch"]
SDQ["🔍 Silver DQ"]
GOLD["🥇 Gold<br/>Batch"]
GDQ["🔍 Gold DQ"]
end
subgraph Storage["💾 S3 Tables"]
ICE["Apache Iceberg<br/>Tables"]
end
subgraph Orchestration["🔄 Step Functions"]
SF["Self-Looping<br/>Batch Pipeline"]
end
WM -->|SSE| ECS
ECS -->|Produce| MSK
MSK -->|3 min micro-batch| BRONZE
BRONZE --> ICE
ICE --> BDQ
BDQ --> SILVER
SILVER --> ICE
ICE --> SDQ
SDQ --> GOLD
GOLD --> ICE
ICE --> GDQ
SF -.->|Orchestrates| BDQ
SF -.->|Orchestrates| SILVER
SF -.->|Orchestrates| SDQ
SF -.->|Orchestrates| GOLD
SF -.->|Orchestrates| GDQ
| Layer | Job Type | Trigger | Description |
|---|---|---|---|
| Bronze | Streaming | 3 min micro-batches | Kafka → Iceberg with exactly-once semantics |
| Bronze DQ Gate | Batch | Before Silver | Completeness, timeliness (95% ≤3min), validity checks |
| Silver | Batch | After Bronze DQ passes | Deduplication, normalization, enrichment |
| Silver DQ Gate | Batch | After Silver | Accuracy, consistency, uniqueness, drift detection |
| Gold | Batch | After Silver DQ passes | Aggregations: hourly stats, entity trends, risk scores |
| Gold DQ Gate | Batch | After Gold | Upstream validation + aggregation consistency |
The batch pipeline is orchestrated by AWS Step Functions with a self-looping continuous execution pattern:
┌─────────────────────────────────────────────────────────────────────────────┐
│ BATCH PIPELINE STATE MACHINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │ RecordPipelineStart │ ← Initial trigger (after 15 min of Bronze data) │
│ └──────────┬──────────┘ │
│ ↓ │
│ ┌──────────────────────┐ ❌ CATCH ┌──────────────────────┐ │
│ │ BronzeDQGate │ ────────────────→ │ NotifyFailure │ │
│ │ (EMR Serverless) │ │ → RecordMetric │ │
│ └──────────┬───────────┘ │ → FAIL (stop loop) │ │
│ ↓ ✅ └──────────────────────┘ │
│ ┌──────────────────────┐ │
│ │ StartSilverJob │ ──────────❌────→ Same failure pattern │
│ └──────────┬───────────┘ │
│ ↓ ✅ │
│ ┌──────────────────────┐ │
│ │ SilverDQGate │ ──────────❌────→ Same failure pattern │
│ └──────────┬───────────┘ │
│ ↓ ✅ │
│ ┌──────────────────────┐ │
│ │ StartGoldJob │ ──────────❌────→ Same failure pattern │
│ └──────────┬───────────┘ │
│ ↓ ✅ │
│ ┌──────────────────────┐ │
│ │ GoldDQGate │ ──────────❌────→ Same failure pattern │
│ └──────────┬───────────┘ │
│ ↓ ✅ ALL GATES PASSED │
│ ┌────────────────────────┐ │
│ │ RecordPipelineComplete │ │
│ └──────────┬─────────────┘ │
│ ↓ │
│ ┌────────────────────────┐ │
│ │ WaitBeforeNextCycle │ ⏱️ Wait 10 minutes │
│ └──────────┬─────────────┘ │
│ ↓ │
│ ┌────────────────────────┐ │
│ │ TriggerNextCycle │ ───→ StartExecution (self) ───→ END │
│ └────────────────────────┘ ↑ │
│ │ │
│ (New execution starts) │
│ │
├────────────────────────────────────────────────────────────────────────────┤
│ TERMINAL STATES: │
│ • ✅ SUCCESS: TriggerNextCycle → starts new execution → continuous loop │
│ • ❌ FAILURE: Fail state → stops loop → requires manual restart │
└────────────────────────────────────────────────────────────────────────────┘
| Phase | Duration | Notes |
|---|---|---|
| Initial wait (Bronze data collection) | 15 min | Ensures Bronze has enough data |
| EMR job startup (cold start) | ~5-8 min | JARs download, Spark init |
| Actual DQ/batch processing | ~2-3 min | Per job |
| Full batch pipeline execution | ~15-25 min | 5 jobs × (startup + processing) |
| Wait between successful cycles | 10 min | Prevents resource contention |
| Total cycle time | ~25-35 min | End-to-end |
Note: Step Function job timeouts are set to 15 minutes (900s) to accommodate EMR Serverless cold starts and JAR downloads.
DQ gates block downstream processing when checks fail:
Bronze → [Bronze DQ Gate] → Silver → [Silver DQ Gate] → Gold → [Gold DQ Gate]
↓ ↓ ↓
❌ FAIL ❌ FAIL ❌ FAIL
(stops pipeline) (stops pipeline) (stops pipeline)
| Gate | Check Types | Failure Action |
|---|---|---|
| Bronze DQ | Completeness (100% for IDs, 95% for optional), Timeliness (95% ≤3min) | Stops pipeline, SNS alert |
| Silver DQ | Accuracy, Consistency, Uniqueness, Drift detection | Stops pipeline, SNS alert |
| Gold DQ | Upstream verification, Aggregation consistency | Stops pipeline, SNS alert |
All DQ results are persisted to dq_audit.quality_results for audit and trend analysis.
- ≤3 minute Bronze ingestion latency (Spark Structured Streaming)
- ≤35 minute end-to-end SLA for dashboard freshness (continuous batch cycles)
- Exactly-once semantics via Spark checkpointing and idempotent MERGE
- Self-looping batch pipeline with automatic 10-minute intervals between cycles
- 15-minute job timeout to accommodate EMR Serverless cold starts
- Fail-fast on DQ failure - pipeline stops on any gate failure (prevents cascading bad data)
- Auto-recovery Lambda restarts Bronze streaming job on health check failure
- Data quality gates that block downstream processing on failures
- DQ audit trail with full evidence in Iceberg tables
- SNS alerts for DQ gate failures and pipeline errors
📐 See docs/ARCHITECTURE.md for detailed architecture diagrams, component breakdown, and data flow visualizations.
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ WikiStream Pipeline │
└─────────────────────────────────────────────────────────────────────────────────────┘
INGESTION STREAMING BATCH PROCESSING STORAGE
┌────────────┐ ┌────────────┐ ┌─────────────────────────┐ ┌───────────┐
│ Wikipedia │ │ Amazon │ │ EMR Serverless │ │ S3 Tables │
│ SSE Feed │────▶│ MSK │────▶│ │───▶│ (Iceberg) │
└────────────┘ │ (Kafka) │ │ Bronze (streaming) │ │ │
│ └────────────┘ │ ↓ │ │ bronze │
▼ │ Bronze DQ Gate │ │ silver │
┌────────────┐ │ ↓ │ │ gold │
│ECS Fargate │ │ Silver → Silver DQ Gate │ │ dq_audit │
│ Producer │ │ ↓ │ └───────────┘
└────────────┘ │ Gold → Gold DQ Gate │
└─────────────────────────┘
│
ORCHESTRATION │ MONITORING
┌──────────────────────┐ │ ┌─────────────┐
│ Step Functions │◀────────────────────────┘ │ CloudWatch │
│ (Self-Looping Batch │ │ Dashboard │
│ Pipeline) │─────────────────────────────────────▶│ + Alarms │
└──────────────────────┘ │ + SNS │
└─────────────┘
| Component | Service | Description |
|---|---|---|
| Data Source | Wikipedia EventStreams | Real-time SSE feed |
| Producer | ECS Fargate | Python Kafka producer with IAM auth |
| Message Broker | Amazon MSK (KRaft 3.9.x) | 2 brokers, topics: raw-events, dlq-events |
| Processing | EMR Serverless (Spark 3.5) | Bronze streaming + Silver/Gold batch |
| Storage | S3 Tables (Iceberg 1.10.0) | Medallion architecture tables |
| Initial Trigger | EventBridge Scheduler | One-time serverless trigger (auto-deletes after use) |
| Orchestration | Step Functions (self-looping) | Continuous batch pipeline with 10-min intervals |
| Auto-Recovery | Lambda + CloudWatch | Bronze job health monitoring |
| Monitoring | CloudWatch + SNS | Dashboard, metrics, alerts |
Sample event from stream.wikimedia.org/v2/stream/recentchange:
{
"meta": {
"id": "c3b60285-58c0-493e-ad60-ddab7732fcc4",
"domain": "en.wikipedia.org",
"dt": "2025-01-01T10:00:00Z"
},
"id": 1234567890,
"type": "edit",
"namespace": 0,
"title": "Example_Article",
"title_url": "https://en.wikipedia.org/wiki/Example_Article",
"user": "Editor123",
"bot": false,
"comment": "Fixed typo in introduction",
"wiki": "enwiki",
"server_name": "en.wikipedia.org",
"timestamp": 1704103200,
"length": {"old": 1000, "new": 1050},
"revision": {"old": 123456, "new": 123457}
}erDiagram
BRONZE_RAW_EVENTS {
string event_id PK
string kafka_topic
int kafka_partition
bigint kafka_offset
timestamp kafka_timestamp
bigint rc_id
string event_type
int namespace
string domain
string title
string title_url
string user
boolean is_bot
string comment
string wiki
string server_name
int length_old
int length_new
int length_delta
bigint revision_old
bigint revision_new
timestamp event_timestamp
timestamp producer_ingested_at
timestamp bronze_processed_at
string event_date "PARTITION"
int event_hour "PARTITION"
string schema_version
}
SILVER_CLEANED_EVENTS {
string event_id PK
string event_type
string domain
string region
string language
string user_normalized
boolean is_bot
boolean is_anonymous
int length_delta
boolean is_large_deletion
boolean is_large_addition
timestamp event_timestamp
timestamp silver_processed_at
string event_date "PARTITION"
string schema_version
}
GOLD_HOURLY_STATS {
string stat_date "PARTITION"
int stat_hour
string domain
string region "PARTITION"
bigint total_events
bigint unique_users
bigint unique_pages
bigint bytes_added
bigint bytes_removed
double avg_edit_size
bigint bot_edits
bigint human_edits
double bot_percentage
bigint anonymous_edits
bigint type_edit
bigint type_new
bigint type_categorize
bigint type_log
bigint large_deletions
bigint large_additions
timestamp gold_processed_at
string schema_version
}
GOLD_RISK_SCORES {
string stat_date "PARTITION"
string entity_id
string entity_type
bigint total_edits
double edits_per_hour_avg
bigint large_deletions
bigint domains_edited
double risk_score
string risk_level
string evidence
boolean alert_triggered
timestamp gold_processed_at
string schema_version
}
GOLD_DAILY_ANALYTICS_SUMMARY {
string summary_date "PARTITION"
bigint total_events
bigint unique_users
bigint active_domains
bigint unique_pages_edited
double bot_percentage
double anonymous_percentage
double registered_user_percentage
bigint total_bytes_added
bigint total_bytes_removed
bigint net_content_change
double avg_edit_size_bytes
bigint new_pages_created
bigint large_deletions_count
double large_deletion_rate
bigint high_risk_user_count
bigint medium_risk_user_count
bigint low_risk_user_count
double platform_avg_risk_score
double platform_max_risk_score
bigint total_alerts_triggered
double europe_percentage
double americas_percentage
double asia_pacific_percentage
bigint peak_hour_events
double avg_events_per_hour
double platform_health_score
timestamp gold_processed_at
string schema_version
}
BRONZE_RAW_EVENTS ||--o{ SILVER_CLEANED_EVENTS : "transforms to"
SILVER_CLEANED_EVENTS ||--o{ GOLD_HOURLY_STATS : "aggregates to"
SILVER_CLEANED_EVENTS ||--o{ GOLD_RISK_SCORES : "generates"
GOLD_HOURLY_STATS ||--o{ GOLD_DAILY_ANALYTICS_SUMMARY : "summarizes"
GOLD_RISK_SCORES ||--o{ GOLD_DAILY_ANALYTICS_SUMMARY : "summarizes"
| Table | Purpose | Key Metrics |
|---|---|---|
| hourly_stats | Hourly activity by domain/region | Events, users, bytes, bot %, edit types |
| risk_scores | User-level risk assessment | Risk score (0-100), risk level, evidence |
| daily_analytics_summary | Executive KPI dashboard | Platform health score, risk overview, trends |
The gold.risk_scores table identifies potentially problematic edit patterns:
| Risk Factor | Threshold | Points |
|---|---|---|
| High edit velocity | >50 edits/hour | 40 |
| Large deletions | >3 large deletions | 30 |
| Anonymous activity | >50% anonymous | 20 |
| Cross-domain activity | >5 domains + high velocity | 10 |
Risk Levels:
- HIGH (70-100): Immediate attention needed
- MEDIUM (40-69): Monitor closely
- LOW (0-39): Normal activity
The gold.daily_analytics_summary table provides a single-row-per-day executive dashboard with KPIs:
| Category | Metrics | Purpose |
|---|---|---|
| Volume KPIs | total_events, unique_users, active_domains, unique_pages_edited | Activity overview |
| User Mix | bot_percentage, anonymous_percentage, registered_user_percentage | Community composition |
| Content Health | net_content_change, avg_edit_size_bytes, new_pages_created | Content growth |
| Quality | large_deletions_count, large_deletion_rate | Vandalism indicator |
| Risk Overview | high/medium/low_risk_user_count, platform_avg_risk_score, alerts_triggered | Security posture |
| Regional | europe/americas/asia_pacific_percentage | Geographic distribution |
| Platform Health Score | 0-100 composite score | Overall platform wellness |
Platform Health Score (0-100):
| Factor | Points | Calculation |
|---|---|---|
| Low risk users | 0-40 | 40 × (low_risk_users / total_scored_users) |
| Registered users | 0-30 | 30 × (registered_events / total_events) |
| Content growth | 0-20 | 20 if bytes_added > bytes_removed |
| Low deletion rate | 0-10 | 10 if large_deletions < 1% of events |
| Table | Partition Columns | Rationale |
|---|---|---|
| bronze.raw_events | (event_date, event_hour) |
Streaming micro-batches need time-based partitioning |
| silver.cleaned_events | (event_date, region) |
Date + region for efficient regional time-range queries |
| gold.hourly_stats | (stat_date, region) |
Date + region for dashboard drill-downs |
| gold.risk_scores | (stat_date) |
Daily user risk assessments |
| gold.daily_analytics_summary | (summary_date) |
One row per day for KPI trends |
In addition to the medallion architecture tables, the pipeline maintains DQ audit tables in a separate dq_audit namespace for data quality tracking and compliance:
| Table | Purpose | Key Fields |
|---|---|---|
| quality_results | Records all DQ gate check results | run_id, layer, check_name, status, metric_value, evidence |
| profile_metrics | Data profiling statistics for drift detection | column_name, null_rate, distinct_count, mean_value, percentiles |
These tables enable:
- Audit trail for compliance and debugging
- Trend analysis of data quality over time
- Drift detection by comparing current profiles to historical baselines
| Component | Service | Why This Choice |
|---|---|---|
| Message Queue | Amazon MSK (KRaft 3.9.x) | Kafka without Zookeeper complexity. Native AWS IAM auth. |
| Stream Producer | ECS Fargate | Long-running SSE consumer. Pay-per-second, no server management. |
| Bronze Processing | EMR Serverless (Streaming) | Spark Structured Streaming with exactly-once semantics. Auto-restart via Lambda. |
| Silver/Gold Processing | EMR Serverless (Batch) | Zero idle cost. Pay only during job execution. Auto-scaling Spark. |
| Table Format | S3 Tables (Iceberg 1.10.0) | AWS-managed Iceberg with ACID transactions, time-travel, automatic compaction. |
| Data Quality | AWS Deequ 2.0.7 | Native Spark integration. Completeness, validity, uniqueness checks. |
| Orchestration | Step Functions (self-looping) | Serverless. Continuous batch pipeline with 10-min intervals. Fail-fast on errors. |
| Auto-Recovery | Lambda | Restarts Bronze job on CloudWatch alarm trigger |
| Alternative | Reason Not Used |
|---|---|
| MSK Serverless | Less control over configuration; provisioned is more predictable |
| MWAA (Airflow) | $250+/month minimum; Step Functions is 90% cheaper |
| Lambda for Producer | 15-minute timeout; SSE stream requires continuous connection |
| Kinesis | MSK provides more flexibility with Kafka ecosystem |
| All Streaming | Silver/Gold transformations don't benefit from streaming; batch is more cost-effective |
| dbt | Adds operational complexity. Native Spark SQL + Step Functions provides equivalent functionality |
| Service | Configuration | Est. Cost |
|---|---|---|
| MSK | 2× kafka.t3.small, 50GB each | ~$90 |
| ECS Fargate | 0.25 vCPU, 0.5GB, 24/7 (Producer) | ~$10 |
| EMR Serverless - Bronze | Streaming job, pre-warmed workers | ~$100 |
| EMR Serverless - Silver/Gold | Batch jobs via Step Functions | ~$15 |
| S3 Tables + S3 | ~10 GB storage | ~$3 |
| NAT Gateway | Single AZ | ~$35 |
| Step Functions + Lambda | Orchestration + auto-restart | ~$2 |
| CloudWatch | Logs + Metrics | ~$5 |
| Total (24/7) | ~$260/month |
For development, use the provided scripts to destroy/recreate costly infrastructure:
# End of day - full teardown with verification (idempotent, safe to run multiple times)
AWS_PROFILE=YOUR_AWS_PROFILE AWS_REGION=YOUR_AWS_REGION ./scripts/destroy_all.sh
# Start of day - recreates infrastructure (~25-35 min)
# Safe to close terminal after completion - uses EventBridge Scheduler
AWS_PROFILE=YOUR_AWS_PROFILE AWS_REGION=YOUR_AWS_REGION ./scripts/create_infra.shdestroy_all.sh features:
- 13-step teardown with status tracking
- Handles S3 Tables async deletion gracefully
- Final verification step confirms all resources deleted
- Clear summary report at completion
create_infra.sh features:
- Idempotent (safe to re-run)
- Uses EventBridge Scheduler for batch pipeline (serverless, survives terminal close)
- Waits for MSK readiness before starting jobs
aws --version # AWS CLI v2.x required
terraform --version # Terraform >= 1.6.0
docker --version # Docker for building images# 1. Setup Terraform backend (first time only)
./scripts/setup_terraform_backend.sh
# 2. Deploy infrastructure
cd infrastructure/terraform
terraform init
terraform apply
# 3. Build and push producer image
ECR_URL=$(terraform output -raw ecr_repository_url)
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $(echo $ECR_URL | cut -d'/' -f1)
cd ../../producer
docker build --platform linux/amd64 -t ${ECR_URL}:latest .
docker push ${ECR_URL}:latest
# 4. Upload Spark jobs
DATA_BUCKET=$(terraform -chdir=../infrastructure/terraform output -raw data_bucket)
aws s3 sync ../spark/jobs/ s3://${DATA_BUCKET}/spark/jobs/
# 5. Start services
./scripts/create_infra.sh # Starts ECS producer + Bronze streaming job + schedules batch pipelineNote: The batch pipeline is scheduled via EventBridge Scheduler (serverless) to start 15 minutes after
create_infra.sh. You can safely close the terminal immediately - the scheduler runs entirely in AWS. After the initial trigger, the pipeline self-loops with 10-minute intervals between cycles.
# Check EMR jobs
EMR_APP_ID=$(terraform -chdir=infrastructure/terraform output -raw emr_serverless_app_id)
aws emr-serverless list-job-runs --application-id ${EMR_APP_ID} --output table
# Check producer logs
aws logs tail /ecs/wikistream-dev-producer --follow --since 5m
# Check Step Functions executions
aws stepfunctions list-executions \
--state-machine-arn $(terraform -chdir=infrastructure/terraform output -raw batch_pipeline_state_machine_arn) \
--max-results 5WikiStream provides two types of analytics:
Local dockerized Grafana connected to CloudWatch for pipeline health:
# Start Grafana
cd monitoring/docker
docker-compose up -d
# Access at http://localhost:3000 (admin/wikistream)Dashboard includes:
- DQ Gate status (Bronze/Silver/Gold pass/fail)
- Processing latency and throughput
- Pipeline failures and trends
- Infrastructure metrics (MSK, ECS, EMR)
AWS QuickSight is fully provisioned via Terraform with datasets connected to the Gold and Silver layers:
Terraform-provisioned resources:
- QuickSight Athena Data Source
- Datasets:
hourly_stats,risk_scores,daily_analytics_summary,silver_cleaned_events - LakeFormation permissions for S3 Tables access
- Glue catalog integration for S3 Tables
Available insights:
- Edit activity trends and volume KPIs
- Regional and language analysis
- Risk score monitoring (HIGH/MEDIUM/LOW: 0-100 scale)
- Bot vs human contributor patterns
- Platform health score trends
wikistream/
├── README.md # This file
├── .github/
│ └── workflows/
│ └── ci.yml # CI/CD pipeline (Python, Terraform, Docker, Secrets)
├── .ruff.toml # Python linting configuration
├── .bandit # Python security scanning configuration
├── .hadolint.yaml # Dockerfile linting configuration
├── mypy.ini # Python static type checking configuration
├── .gitignore
├── config/
│ ├── __init__.py
│ └── settings.py # Domain filters, regions, SLAs
├── docs/
│ ├── architecture_diagram.html
│ ├── ARCHITECTURE.md # Detailed architecture documentation
│ ├── presentation.html
│ └── RUNBOOK.md # Operations runbook
├── infrastructure/
│ └── terraform/
│ ├── .tflint.hcl # Terraform linting configuration
│ ├── main.tf # Root module orchestration
│ ├── variables.tf # All configurable parameters
│ ├── outputs.tf # Outputs for scripts
│ ├── versions.tf # Terraform & provider versions
│ ├── backend.tf # S3 backend configuration
│ ├── providers.tf # AWS provider config
│ ├── locals.tf # Local values
│ ├── data.tf # Data sources
│ ├── terraform.tfvars.example # Example variables file
│ ├── bronze_restart_lambda.zip # Lambda deployment package
│ └── modules/ # Terraform modules
│ ├── networking/ # VPC, subnets, security groups
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── storage/ # S3 buckets, S3 Tables
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── streaming/ # MSK Kafka cluster
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── compute/ # ECS, EMR, ECR, Lambda
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ ├── variables.tf
│ │ └── templates/
│ │ └── bronze_restart_lambda.py.tftpl
│ ├── orchestration/ # Step Functions, EventBridge
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ ├── variables.tf
│ │ └── templates/
│ │ ├── batch_pipeline.json.tftpl
│ │ ├── data_quality.json.tftpl
│ │ ├── gold_processing.json.tftpl
│ │ └── silver_processing.json.tftpl
│ └── monitoring/ # CloudWatch, SNS, alarms
│ ├── main.tf
│ ├── outputs.tf
│ ├── variables.tf
│ └── templates/
│ └── dashboard.json.tftpl
├── producer/
│ ├── kafka_producer.py # SSE consumer → Kafka producer
│ ├── requirements.txt # Python dependencies
│ └── Dockerfile # Container image for ECS
├── spark/
│ ├── requirements.txt # Spark job dependencies
│ ├── jobs/
│ │ ├── __init__.py
│ │ ├── bronze_streaming_job.py # Kafka → Bronze (Spark Streaming)
│ │ ├── bronze_dq_gate.py # Bronze DQ gate
│ │ ├── silver_batch_job.py # Bronze → Silver (Batch)
│ │ ├── silver_dq_gate.py # Silver DQ gate
│ │ ├── gold_batch_job.py # Silver → Gold (Batch)
│ │ ├── gold_dq_gate.py # Gold DQ gate
│ │ └── dq/ # DQ module (PyDeequ)
│ │ ├── __init__.py
│ │ ├── deduplicate_bronze.py
│ │ ├── dq_checks.py
│ │ ├── dq_utils.py
│ │ └── requirements.txt
│ └── schemas/ # PySpark schemas
│ ├── __init__.py
│ ├── bronze_schema.py
│ ├── silver_schema.py
│ └── gold_schema.py
├── scripts/
│ ├── setup_terraform_backend.sh # Setup S3 + DynamoDB for state
│ ├── create_infra.sh # Start infrastructure (supports environments)
│ ├── destroy_all.sh # Full teardown (supports environments)
│ ├── destroy_infra_only.sh # Stop costly resources
│ ├── test_kafka_messages.py # Test Kafka message consumption
│ └── verify_emr_resources.sh # Verify EMR Serverless resources
# Development (default)
./scripts/create_infra.sh
# Staging
./scripts/create_infra.sh staging
# Production
./scripts/create_infra.sh prodWikiStream includes a comprehensive CI pipeline that automatically runs on every push to any branch. The workflow validates code quality, infrastructure integrity, and security posture before changes can be deployed.
| Job | Purpose | Tools | Behavior |
|---|---|---|---|
| Python & Dockerfile Linting | Code quality and security | ruff, bandit, mypy, hadolint | Warnings only (won't fail pipeline) |
| Terraform CI | Infrastructure validation | terraform fmt/validate, tflint, tfsec | Fails on issues |
| Docker Security Scan | Container vulnerability scanning | Trivy | Fails on CRITICAL, warns on HIGH |
| Secrets Scanning | Detect leaked credentials | Trivy secret scanner | Fails on CRITICAL/HIGH |
.github/workflows/ci.yml- Main CI pipeline definition.ruff.toml- Python linting rules.bandit- Security scanning configurationmypy.ini- Static type checking configuration.tflint.hcl- Terraform linting rules.hadolint.yaml- Dockerfile linting rules
- Python: Code style, unused imports, security vulnerabilities, type hints
- Terraform: Syntax, formatting, AWS best practices, security issues
- Docker: Security vulnerabilities in images, Dockerfile best practices
- Secrets: API keys, passwords, tokens across all files
The CI ensures code quality and security standards are maintained throughout development.
| Component | Status | Notes |
|---|---|---|
| ECS Kafka Producer | ✅ Implemented | Python, IAM auth, DLQ support |
| MSK Cluster (KRaft) | ✅ Implemented | Kafka 3.9.x, 2 brokers |
| Bronze Streaming Job | ✅ Implemented | 3 min micro-batches, MERGE INTO |
| Silver Batch Job | ✅ Implemented | Deduplication, normalization |
| Gold Batch Job | ✅ Implemented | Aggregations, risk scores, daily analytics summary |
| Bronze DQ Gate | ✅ Implemented | Completeness, timeliness (95% ≤3min), validity |
| Silver DQ Gate | ✅ Implemented | Accuracy, consistency, uniqueness, drift detection |
| Gold DQ Gate | ✅ Implemented | Upstream validation, aggregation consistency |
| DQ Audit Tables | ✅ Implemented | dq_audit.quality_results, dq_audit.profile_metrics |
| Step Functions Pipeline | ✅ Implemented | Self-looping: Bronze DQ → Silver → Silver DQ → Gold → Gold DQ → Wait 10min → Repeat |
| Auto-Restart Lambda | ✅ Implemented | CloudWatch alarm trigger |
| CloudWatch Dashboard | ✅ Implemented | Pipeline health + DQ gate metrics |
| Grafana (Docker) | ✅ Implemented | Local operational monitoring |
| SNS Alerts | ✅ Implemented | Failure notifications to mrshihabullah@gmail.com |
| S3 Tables (Iceberg) | ✅ Implemented | bronze, silver, gold, dq_audit namespaces |
| Cost Optimization Scripts | ✅ Implemented | destroy/create for dev workflow |
Built as a portfolio project demonstrating modern data engineering on AWS.
Skills Demonstrated:
- Real-time streaming with Kafka (MSK Kafka 3.9.x)
- Apache Iceberg 1.10.0 on S3 Tables
- Serverless Spark (EMR Serverless 7.12.0, Spark 3.5)
- Infrastructure as Code (Terraform)
- Medallion Architecture (Bronze streaming, Silver/Gold batch)
- Data Quality Gates (AWS Deequ 2.0.7 with audit trail)
- Pipeline Orchestration (self-looping batch pipeline via Step Functions )
- Observability (CloudWatch Dashboard, Alarms, SNS Alerts)
- Self-healing infrastructure (Lambda auto-restart on health check failure)
- Cost-optimized development workflow (destroy/create scripts)