Skip to content

A streaming data pipeline using only AWS that captures live Wikipedia edits from Wikimedia EventStreams, processes them through a medallion architecture, and surfaces insights via dashboards for content monitoring and risk detection with pipeline observability.

Notifications You must be signed in to change notification settings

mdshihabullah/wikistream-event-data-pipeline-aws

Repository files navigation

WikiStream: Real-Time Wikipedia Edit Analytics Platform

A streaming data pipeline that captures live Wikipedia edits from Wikimedia EventStreams, processes them through a medallion architecture, and surfaces insights for content monitoring and risk detection.

Tech Stack: Amazon MSK (KRaft 3.9.x) · EMR Serverless (Spark 3.5) · AWS Step Functions · Amazon S3 Tables (Iceberg 1.10.0) · ECS Fargate · AWS Deequ
Deployment: AWS Cloud (us-east-1) · Terraform 1.6+

📐 For detailed architecture diagrams and component breakdown, see docs/ARCHITECTURE.md


Table of Contents

  1. Business Problem
  2. Solution Overview
  3. Architecture
  4. Data Model
  5. Technology Choices
  6. Cost Estimation
  7. Quick Start
  8. Project Structure

Business Problem

The Data Source

Wikimedia operates one of the largest collaborative platforms globally:

Metric Value
Wikipedia editions 300+ languages
Edits per minute ~500-700 (peak: 1,500+)
Monthly active editors 280,000+
Bot edits ~20-30% of total

Wikimedia EventStreams provides real-time Server-Sent Events (SSE) of all edits across all Wikimedia projects.

The Use Cases

Use Case Description Value
Content Monitoring Track edit velocity, top contributors, trending pages Understand community activity
Vandalism Detection Identify suspicious edit patterns (rapid edits, large deletions) Protect content quality
Regional Analysis Compare activity across language editions Geographic insights
Bot Activity Tracking Monitor automated vs human contributions Community health

Target Domains

We filter events from high-activity Wikipedia editions:

HIGH_ACTIVITY: en, de, ja, fr, zh, es, ru
REGIONAL: asia_pacific, europe, americas, middle_east

Solution Overview

WikiStream implements the Medallion Architecture (Bronze → Silver → Gold) for progressive data refinement using Apache Iceberg format on S3 Tables:

flowchart LR
    subgraph Sources["📡 Source"]
        WM["Wikipedia<br/>EventStreams"]
    end

    subgraph Ingestion["📥 Ingestion"]
        ECS["ECS Fargate<br/>Producer"]
        MSK["Amazon MSK<br/>Kafka 3.9.x"]
    end

    subgraph Processing["⚡ EMR Serverless"]
        BRONZE["🥉 Bronze<br/>Streaming"]
        BDQ["🔍 Bronze DQ"]
        SILVER["🥈 Silver<br/>Batch"]
        SDQ["🔍 Silver DQ"]
        GOLD["🥇 Gold<br/>Batch"]
        GDQ["🔍 Gold DQ"]
    end

    subgraph Storage["💾 S3 Tables"]
        ICE["Apache Iceberg<br/>Tables"]
    end

    subgraph Orchestration["🔄 Step Functions"]
        SF["Self-Looping<br/>Batch Pipeline"]
    end

    WM -->|SSE| ECS
    ECS -->|Produce| MSK
    MSK -->|3 min micro-batch| BRONZE
    BRONZE --> ICE
    ICE --> BDQ
    BDQ --> SILVER
    SILVER --> ICE
    ICE --> SDQ
    SDQ --> GOLD
    GOLD --> ICE
    ICE --> GDQ
    SF -.->|Orchestrates| BDQ
    SF -.->|Orchestrates| SILVER
    SF -.->|Orchestrates| SDQ
    SF -.->|Orchestrates| GOLD
    SF -.->|Orchestrates| GDQ
Loading

Processing Flow with DQ Gates

Layer Job Type Trigger Description
Bronze Streaming 3 min micro-batches Kafka → Iceberg with exactly-once semantics
Bronze DQ Gate Batch Before Silver Completeness, timeliness (95% ≤3min), validity checks
Silver Batch After Bronze DQ passes Deduplication, normalization, enrichment
Silver DQ Gate Batch After Silver Accuracy, consistency, uniqueness, drift detection
Gold Batch After Silver DQ passes Aggregations: hourly stats, entity trends, risk scores
Gold DQ Gate Batch After Gold Upstream validation + aggregation consistency

Step Function Pipeline Flow

The batch pipeline is orchestrated by AWS Step Functions with a self-looping continuous execution pattern:

┌─────────────────────────────────────────────────────────────────────────────┐
│                    BATCH PIPELINE STATE MACHINE                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌─────────────────────┐                                                    │
│  │ RecordPipelineStart │ ← Initial trigger (after 15 min of Bronze data)   │
│  └──────────┬──────────┘                                                    │
│             ↓                                                               │
│  ┌──────────────────────┐      ❌ CATCH     ┌──────────────────────┐       │
│  │    BronzeDQGate      │ ────────────────→ │   NotifyFailure      │       │
│  │  (EMR Serverless)    │                   │   → RecordMetric     │       │
│  └──────────┬───────────┘                   │   → FAIL (stop loop) │       │
│             ↓ ✅                             └──────────────────────┘       │
│  ┌──────────────────────┐                                                   │
│  │   StartSilverJob     │ ──────────❌────→ Same failure pattern            │
│  └──────────┬───────────┘                                                   │
│             ↓ ✅                                                             │
│  ┌──────────────────────┐                                                   │
│  │    SilverDQGate      │ ──────────❌────→ Same failure pattern            │
│  └──────────┬───────────┘                                                   │
│             ↓ ✅                                                             │
│  ┌──────────────────────┐                                                   │
│  │    StartGoldJob      │ ──────────❌────→ Same failure pattern            │
│  └──────────┬───────────┘                                                   │
│             ↓ ✅                                                             │
│  ┌──────────────────────┐                                                   │
│  │     GoldDQGate       │ ──────────❌────→ Same failure pattern            │
│  └──────────┬───────────┘                                                   │
│             ↓ ✅ ALL GATES PASSED                                           │
│  ┌────────────────────────┐                                                 │
│  │ RecordPipelineComplete │                                                 │
│  └──────────┬─────────────┘                                                 │
│             ↓                                                               │
│  ┌────────────────────────┐                                                 │
│  │ WaitBeforeNextCycle    │  ⏱️ Wait 10 minutes                             │
│  └──────────┬─────────────┘                                                 │
│             ↓                                                               │
│  ┌────────────────────────┐                                                 │
│  │   TriggerNextCycle     │ ───→ StartExecution (self) ───→ END            │
│  └────────────────────────┘           ↑                                    │
│                                       │                                    │
│                              (New execution starts)                        │
│                                                                            │
├────────────────────────────────────────────────────────────────────────────┤
│  TERMINAL STATES:                                                          │
│  • ✅ SUCCESS: TriggerNextCycle → starts new execution → continuous loop   │
│  • ❌ FAILURE: Fail state → stops loop → requires manual restart           │
└────────────────────────────────────────────────────────────────────────────┘

Pipeline Timing

Phase Duration Notes
Initial wait (Bronze data collection) 15 min Ensures Bronze has enough data
EMR job startup (cold start) ~5-8 min JARs download, Spark init
Actual DQ/batch processing ~2-3 min Per job
Full batch pipeline execution ~15-25 min 5 jobs × (startup + processing)
Wait between successful cycles 10 min Prevents resource contention
Total cycle time ~25-35 min End-to-end

Note: Step Function job timeouts are set to 15 minutes (900s) to accommodate EMR Serverless cold starts and JAR downloads.

Data Quality Gates

DQ gates block downstream processing when checks fail:

Bronze → [Bronze DQ Gate] → Silver → [Silver DQ Gate] → Gold → [Gold DQ Gate]
               ↓                            ↓                       ↓
           ❌ FAIL                      ❌ FAIL                  ❌ FAIL
      (stops pipeline)             (stops pipeline)         (stops pipeline)
Gate Check Types Failure Action
Bronze DQ Completeness (100% for IDs, 95% for optional), Timeliness (95% ≤3min) Stops pipeline, SNS alert
Silver DQ Accuracy, Consistency, Uniqueness, Drift detection Stops pipeline, SNS alert
Gold DQ Upstream verification, Aggregation consistency Stops pipeline, SNS alert

All DQ results are persisted to dq_audit.quality_results for audit and trend analysis.

Key Features

  • ≤3 minute Bronze ingestion latency (Spark Structured Streaming)
  • ≤35 minute end-to-end SLA for dashboard freshness (continuous batch cycles)
  • Exactly-once semantics via Spark checkpointing and idempotent MERGE
  • Self-looping batch pipeline with automatic 10-minute intervals between cycles
  • 15-minute job timeout to accommodate EMR Serverless cold starts
  • Fail-fast on DQ failure - pipeline stops on any gate failure (prevents cascading bad data)
  • Auto-recovery Lambda restarts Bronze streaming job on health check failure
  • Data quality gates that block downstream processing on failures
  • DQ audit trail with full evidence in Iceberg tables
  • SNS alerts for DQ gate failures and pipeline errors

Architecture

📐 See docs/ARCHITECTURE.md for detailed architecture diagrams, component breakdown, and data flow visualizations.

High-Level Components

┌─────────────────────────────────────────────────────────────────────────────────────┐
│                              WikiStream Pipeline                                     │
└─────────────────────────────────────────────────────────────────────────────────────┘

  INGESTION           STREAMING           BATCH PROCESSING              STORAGE
┌────────────┐     ┌────────────┐     ┌─────────────────────────┐    ┌───────────┐
│ Wikipedia  │     │  Amazon    │     │    EMR Serverless       │    │ S3 Tables │
│ SSE Feed   │────▶│   MSK      │────▶│                         │───▶│ (Iceberg) │
└────────────┘     │  (Kafka)   │     │ Bronze (streaming)      │    │           │
      │            └────────────┘     │   ↓                     │    │ bronze    │
      ▼                               │ Bronze DQ Gate          │    │ silver    │
┌────────────┐                        │   ↓                     │    │ gold      │
│ECS Fargate │                        │ Silver → Silver DQ Gate │    │ dq_audit  │
│ Producer   │                        │   ↓                     │    └───────────┘
└────────────┘                        │ Gold → Gold DQ Gate     │
                                      └─────────────────────────┘
                                                  │
  ORCHESTRATION                                   │              MONITORING
┌──────────────────────┐                         │            ┌─────────────┐
│   Step Functions     │◀────────────────────────┘            │ CloudWatch  │
│ (Self-Looping Batch  │                                      │ Dashboard   │
│    Pipeline)         │─────────────────────────────────────▶│ + Alarms    │
└──────────────────────┘                                      │ + SNS       │
                                                              └─────────────┘

System Components

Component Service Description
Data Source Wikipedia EventStreams Real-time SSE feed
Producer ECS Fargate Python Kafka producer with IAM auth
Message Broker Amazon MSK (KRaft 3.9.x) 2 brokers, topics: raw-events, dlq-events
Processing EMR Serverless (Spark 3.5) Bronze streaming + Silver/Gold batch
Storage S3 Tables (Iceberg 1.10.0) Medallion architecture tables
Initial Trigger EventBridge Scheduler One-time serverless trigger (auto-deletes after use)
Orchestration Step Functions (self-looping) Continuous batch pipeline with 10-min intervals
Auto-Recovery Lambda + CloudWatch Bronze job health monitoring
Monitoring CloudWatch + SNS Dashboard, metrics, alerts

Data Model

Wikimedia Event Schema

Sample event from stream.wikimedia.org/v2/stream/recentchange:

{
  "meta": {
    "id": "c3b60285-58c0-493e-ad60-ddab7732fcc4",
    "domain": "en.wikipedia.org",
    "dt": "2025-01-01T10:00:00Z"
  },
  "id": 1234567890,
  "type": "edit",
  "namespace": 0,
  "title": "Example_Article",
  "title_url": "https://en.wikipedia.org/wiki/Example_Article",
  "user": "Editor123",
  "bot": false,
  "comment": "Fixed typo in introduction",
  "wiki": "enwiki",
  "server_name": "en.wikipedia.org",
  "timestamp": 1704103200,
  "length": {"old": 1000, "new": 1050},
  "revision": {"old": 123456, "new": 123457}
}

Medallion Tables

erDiagram
    BRONZE_RAW_EVENTS {
        string event_id PK
        string kafka_topic
        int kafka_partition
        bigint kafka_offset
        timestamp kafka_timestamp
        bigint rc_id
        string event_type
        int namespace
        string domain
        string title
        string title_url
        string user
        boolean is_bot
        string comment
        string wiki
        string server_name
        int length_old
        int length_new
        int length_delta
        bigint revision_old
        bigint revision_new
        timestamp event_timestamp
        timestamp producer_ingested_at
        timestamp bronze_processed_at
        string event_date "PARTITION"
        int event_hour "PARTITION"
        string schema_version
    }

    SILVER_CLEANED_EVENTS {
        string event_id PK
        string event_type
        string domain
        string region
        string language
        string user_normalized
        boolean is_bot
        boolean is_anonymous
        int length_delta
        boolean is_large_deletion
        boolean is_large_addition
        timestamp event_timestamp
        timestamp silver_processed_at
        string event_date "PARTITION"
        string schema_version
    }

    GOLD_HOURLY_STATS {
        string stat_date "PARTITION"
        int stat_hour
        string domain
        string region "PARTITION"
        bigint total_events
        bigint unique_users
        bigint unique_pages
        bigint bytes_added
        bigint bytes_removed
        double avg_edit_size
        bigint bot_edits
        bigint human_edits
        double bot_percentage
        bigint anonymous_edits
        bigint type_edit
        bigint type_new
        bigint type_categorize
        bigint type_log
        bigint large_deletions
        bigint large_additions
        timestamp gold_processed_at
        string schema_version
    }

    GOLD_RISK_SCORES {
        string stat_date "PARTITION"
        string entity_id
        string entity_type
        bigint total_edits
        double edits_per_hour_avg
        bigint large_deletions
        bigint domains_edited
        double risk_score
        string risk_level
        string evidence
        boolean alert_triggered
        timestamp gold_processed_at
        string schema_version
    }

    GOLD_DAILY_ANALYTICS_SUMMARY {
        string summary_date "PARTITION"
        bigint total_events
        bigint unique_users
        bigint active_domains
        bigint unique_pages_edited
        double bot_percentage
        double anonymous_percentage
        double registered_user_percentage
        bigint total_bytes_added
        bigint total_bytes_removed
        bigint net_content_change
        double avg_edit_size_bytes
        bigint new_pages_created
        bigint large_deletions_count
        double large_deletion_rate
        bigint high_risk_user_count
        bigint medium_risk_user_count
        bigint low_risk_user_count
        double platform_avg_risk_score
        double platform_max_risk_score
        bigint total_alerts_triggered
        double europe_percentage
        double americas_percentage
        double asia_pacific_percentage
        bigint peak_hour_events
        double avg_events_per_hour
        double platform_health_score
        timestamp gold_processed_at
        string schema_version
    }

    BRONZE_RAW_EVENTS ||--o{ SILVER_CLEANED_EVENTS : "transforms to"
    SILVER_CLEANED_EVENTS ||--o{ GOLD_HOURLY_STATS : "aggregates to"
    SILVER_CLEANED_EVENTS ||--o{ GOLD_RISK_SCORES : "generates"
    GOLD_HOURLY_STATS ||--o{ GOLD_DAILY_ANALYTICS_SUMMARY : "summarizes"
    GOLD_RISK_SCORES ||--o{ GOLD_DAILY_ANALYTICS_SUMMARY : "summarizes"
Loading

Gold Tables Overview

Table Purpose Key Metrics
hourly_stats Hourly activity by domain/region Events, users, bytes, bot %, edit types
risk_scores User-level risk assessment Risk score (0-100), risk level, evidence
daily_analytics_summary Executive KPI dashboard Platform health score, risk overview, trends

Risk Score Explained

The gold.risk_scores table identifies potentially problematic edit patterns:

Risk Factor Threshold Points
High edit velocity >50 edits/hour 40
Large deletions >3 large deletions 30
Anonymous activity >50% anonymous 20
Cross-domain activity >5 domains + high velocity 10

Risk Levels:

  • HIGH (70-100): Immediate attention needed
  • MEDIUM (40-69): Monitor closely
  • LOW (0-39): Normal activity

Daily Analytics Summary Explained

The gold.daily_analytics_summary table provides a single-row-per-day executive dashboard with KPIs:

Category Metrics Purpose
Volume KPIs total_events, unique_users, active_domains, unique_pages_edited Activity overview
User Mix bot_percentage, anonymous_percentage, registered_user_percentage Community composition
Content Health net_content_change, avg_edit_size_bytes, new_pages_created Content growth
Quality large_deletions_count, large_deletion_rate Vandalism indicator
Risk Overview high/medium/low_risk_user_count, platform_avg_risk_score, alerts_triggered Security posture
Regional europe/americas/asia_pacific_percentage Geographic distribution
Platform Health Score 0-100 composite score Overall platform wellness

Platform Health Score (0-100):

Factor Points Calculation
Low risk users 0-40 40 × (low_risk_users / total_scored_users)
Registered users 0-30 30 × (registered_events / total_events)
Content growth 0-20 20 if bytes_added > bytes_removed
Low deletion rate 0-10 10 if large_deletions < 1% of events

Partitioning Strategy

Table Partition Columns Rationale
bronze.raw_events (event_date, event_hour) Streaming micro-batches need time-based partitioning
silver.cleaned_events (event_date, region) Date + region for efficient regional time-range queries
gold.hourly_stats (stat_date, region) Date + region for dashboard drill-downs
gold.risk_scores (stat_date) Daily user risk assessments
gold.daily_analytics_summary (summary_date) One row per day for KPI trends

Operational Tables (DQ Audit)

In addition to the medallion architecture tables, the pipeline maintains DQ audit tables in a separate dq_audit namespace for data quality tracking and compliance:

Table Purpose Key Fields
quality_results Records all DQ gate check results run_id, layer, check_name, status, metric_value, evidence
profile_metrics Data profiling statistics for drift detection column_name, null_rate, distinct_count, mean_value, percentiles

These tables enable:

  • Audit trail for compliance and debugging
  • Trend analysis of data quality over time
  • Drift detection by comparing current profiles to historical baselines

Technology Choices

Component Service Why This Choice
Message Queue Amazon MSK (KRaft 3.9.x) Kafka without Zookeeper complexity. Native AWS IAM auth.
Stream Producer ECS Fargate Long-running SSE consumer. Pay-per-second, no server management.
Bronze Processing EMR Serverless (Streaming) Spark Structured Streaming with exactly-once semantics. Auto-restart via Lambda.
Silver/Gold Processing EMR Serverless (Batch) Zero idle cost. Pay only during job execution. Auto-scaling Spark.
Table Format S3 Tables (Iceberg 1.10.0) AWS-managed Iceberg with ACID transactions, time-travel, automatic compaction.
Data Quality AWS Deequ 2.0.7 Native Spark integration. Completeness, validity, uniqueness checks.
Orchestration Step Functions (self-looping) Serverless. Continuous batch pipeline with 10-min intervals. Fail-fast on errors.
Auto-Recovery Lambda Restarts Bronze job on CloudWatch alarm trigger

Why Not...?

Alternative Reason Not Used
MSK Serverless Less control over configuration; provisioned is more predictable
MWAA (Airflow) $250+/month minimum; Step Functions is 90% cheaper
Lambda for Producer 15-minute timeout; SSE stream requires continuous connection
Kinesis MSK provides more flexibility with Kafka ecosystem
All Streaming Silver/Gold transformations don't benefit from streaming; batch is more cost-effective
dbt Adds operational complexity. Native Spark SQL + Step Functions provides equivalent functionality

Cost Estimation

Monthly Cost (Dev/Portfolio)

Service Configuration Est. Cost
MSK 2× kafka.t3.small, 50GB each ~$90
ECS Fargate 0.25 vCPU, 0.5GB, 24/7 (Producer) ~$10
EMR Serverless - Bronze Streaming job, pre-warmed workers ~$100
EMR Serverless - Silver/Gold Batch jobs via Step Functions ~$15
S3 Tables + S3 ~10 GB storage ~$3
NAT Gateway Single AZ ~$35
Step Functions + Lambda Orchestration + auto-restart ~$2
CloudWatch Logs + Metrics ~$5
Total (24/7) ~$260/month

Cost Optimization Scripts

For development, use the provided scripts to destroy/recreate costly infrastructure:

# End of day - full teardown with verification (idempotent, safe to run multiple times)
AWS_PROFILE=YOUR_AWS_PROFILE AWS_REGION=YOUR_AWS_REGION ./scripts/destroy_all.sh

# Start of day - recreates infrastructure (~25-35 min)
# Safe to close terminal after completion - uses EventBridge Scheduler
AWS_PROFILE=YOUR_AWS_PROFILE AWS_REGION=YOUR_AWS_REGION ./scripts/create_infra.sh

destroy_all.sh features:

  • 13-step teardown with status tracking
  • Handles S3 Tables async deletion gracefully
  • Final verification step confirms all resources deleted
  • Clear summary report at completion

create_infra.sh features:

  • Idempotent (safe to re-run)
  • Uses EventBridge Scheduler for batch pipeline (serverless, survives terminal close)
  • Waits for MSK readiness before starting jobs

Quick Start

Prerequisites

aws --version          # AWS CLI v2.x required
terraform --version    # Terraform >= 1.6.0
docker --version       # Docker for building images

Deploy

# 1. Setup Terraform backend (first time only)
./scripts/setup_terraform_backend.sh

# 2. Deploy infrastructure
cd infrastructure/terraform
terraform init
terraform apply

# 3. Build and push producer image
ECR_URL=$(terraform output -raw ecr_repository_url)
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $(echo $ECR_URL | cut -d'/' -f1)
cd ../../producer
docker build --platform linux/amd64 -t ${ECR_URL}:latest .
docker push ${ECR_URL}:latest

# 4. Upload Spark jobs
DATA_BUCKET=$(terraform -chdir=../infrastructure/terraform output -raw data_bucket)
aws s3 sync ../spark/jobs/ s3://${DATA_BUCKET}/spark/jobs/

# 5. Start services
./scripts/create_infra.sh  # Starts ECS producer + Bronze streaming job + schedules batch pipeline

Note: The batch pipeline is scheduled via EventBridge Scheduler (serverless) to start 15 minutes after create_infra.sh. You can safely close the terminal immediately - the scheduler runs entirely in AWS. After the initial trigger, the pipeline self-loops with 10-minute intervals between cycles.

Verify Pipeline

# Check EMR jobs
EMR_APP_ID=$(terraform -chdir=infrastructure/terraform output -raw emr_serverless_app_id)
aws emr-serverless list-job-runs --application-id ${EMR_APP_ID} --output table

# Check producer logs
aws logs tail /ecs/wikistream-dev-producer --follow --since 5m

# Check Step Functions executions
aws stepfunctions list-executions \
  --state-machine-arn $(terraform -chdir=infrastructure/terraform output -raw batch_pipeline_state_machine_arn) \
  --max-results 5

Monitoring & Analytics

WikiStream provides two types of analytics:

Operational Monitoring (Grafana)

Local dockerized Grafana connected to CloudWatch for pipeline health:

# Start Grafana
cd monitoring/docker
docker-compose up -d

# Access at http://localhost:3000 (admin/wikistream)

Dashboard includes:

  • DQ Gate status (Bronze/Silver/Gold pass/fail)
  • Processing latency and throughput
  • Pipeline failures and trends
  • Infrastructure metrics (MSK, ECS, EMR)

Business Analytics (QuickSight - Terraform Provisioned)

AWS QuickSight is fully provisioned via Terraform with datasets connected to the Gold and Silver layers:

Terraform-provisioned resources:

  • QuickSight Athena Data Source
  • Datasets: hourly_stats, risk_scores, daily_analytics_summary, silver_cleaned_events
  • LakeFormation permissions for S3 Tables access
  • Glue catalog integration for S3 Tables

Available insights:

  • Edit activity trends and volume KPIs
  • Regional and language analysis
  • Risk score monitoring (HIGH/MEDIUM/LOW: 0-100 scale)
  • Bot vs human contributor patterns
  • Platform health score trends

Project Structure

wikistream/
├── README.md                          # This file
├── .github/
│   └── workflows/
│       └── ci.yml                    # CI/CD pipeline (Python, Terraform, Docker, Secrets)
├── .ruff.toml                        # Python linting configuration
├── .bandit                           # Python security scanning configuration
├── .hadolint.yaml                     # Dockerfile linting configuration
├── mypy.ini                          # Python static type checking configuration
├── .gitignore
├── config/
│   ├── __init__.py
│   └── settings.py                   # Domain filters, regions, SLAs
├── docs/
│   ├── architecture_diagram.html
│   ├── ARCHITECTURE.md                # Detailed architecture documentation
│   ├── presentation.html
│   └── RUNBOOK.md                     # Operations runbook
├── infrastructure/
│   └── terraform/
│       ├── .tflint.hcl                # Terraform linting configuration
│       ├── main.tf                     # Root module orchestration
│       ├── variables.tf                # All configurable parameters
│       ├── outputs.tf                  # Outputs for scripts
│       ├── versions.tf                 # Terraform & provider versions
│       ├── backend.tf                 # S3 backend configuration
│       ├── providers.tf                # AWS provider config
│       ├── locals.tf                  # Local values
│       ├── data.tf                    # Data sources
│       ├── terraform.tfvars.example     # Example variables file
│       ├── bronze_restart_lambda.zip    # Lambda deployment package
│       └── modules/                   # Terraform modules
│           ├── networking/            # VPC, subnets, security groups
│           │   ├── main.tf
│           │   ├── outputs.tf
│           │   └── variables.tf
│           ├── storage/               # S3 buckets, S3 Tables
│           │   ├── main.tf
│           │   ├── outputs.tf
│           │   └── variables.tf
│           ├── streaming/             # MSK Kafka cluster
│           │   ├── main.tf
│           │   ├── outputs.tf
│           │   └── variables.tf
│           ├── compute/               # ECS, EMR, ECR, Lambda
│           │   ├── main.tf
│           │   ├── outputs.tf
│           │   ├── variables.tf
│           │   └── templates/
│           │       └── bronze_restart_lambda.py.tftpl
│           ├── orchestration/         # Step Functions, EventBridge
│           │   ├── main.tf
│           │   ├── outputs.tf
│           │   ├── variables.tf
│           │   └── templates/
│           │       ├── batch_pipeline.json.tftpl
│           │       ├── data_quality.json.tftpl
│           │       ├── gold_processing.json.tftpl
│           │       └── silver_processing.json.tftpl
│           └── monitoring/            # CloudWatch, SNS, alarms
│               ├── main.tf
│               ├── outputs.tf
│               ├── variables.tf
│               └── templates/
│                   └── dashboard.json.tftpl
├── producer/
│   ├── kafka_producer.py              # SSE consumer → Kafka producer
│   ├── requirements.txt               # Python dependencies
│   └── Dockerfile                     # Container image for ECS
├── spark/
│   ├── requirements.txt               # Spark job dependencies
│   ├── jobs/
│   │   ├── __init__.py
│   │   ├── bronze_streaming_job.py    # Kafka → Bronze (Spark Streaming)
│   │   ├── bronze_dq_gate.py          # Bronze DQ gate
│   │   ├── silver_batch_job.py        # Bronze → Silver (Batch)
│   │   ├── silver_dq_gate.py          # Silver DQ gate
│   │   ├── gold_batch_job.py          # Silver → Gold (Batch)
│   │   ├── gold_dq_gate.py            # Gold DQ gate
│   │   └── dq/                        # DQ module (PyDeequ)
│   │       ├── __init__.py
│   │       ├── deduplicate_bronze.py
│   │       ├── dq_checks.py
│   │       ├── dq_utils.py
│   │       └── requirements.txt
│   └── schemas/                       # PySpark schemas
│       ├── __init__.py
│       ├── bronze_schema.py
│       ├── silver_schema.py
│       └── gold_schema.py
├── scripts/
│   ├── setup_terraform_backend.sh     # Setup S3 + DynamoDB for state
│   ├── create_infra.sh                # Start infrastructure (supports environments)
│   ├── destroy_all.sh                 # Full teardown (supports environments)
│   ├── destroy_infra_only.sh          # Stop costly resources
│   ├── test_kafka_messages.py          # Test Kafka message consumption
│   └── verify_emr_resources.sh       # Verify EMR Serverless resources

Multi-Environment Support

# Development (default)
./scripts/create_infra.sh

# Staging
./scripts/create_infra.sh staging

# Production
./scripts/create_infra.sh prod

Continuous Integration (CI)

WikiStream includes a comprehensive CI pipeline that automatically runs on every push to any branch. The workflow validates code quality, infrastructure integrity, and security posture before changes can be deployed.

CI Workflow

Job Purpose Tools Behavior
Python & Dockerfile Linting Code quality and security ruff, bandit, mypy, hadolint Warnings only (won't fail pipeline)
Terraform CI Infrastructure validation terraform fmt/validate, tflint, tfsec Fails on issues
Docker Security Scan Container vulnerability scanning Trivy Fails on CRITICAL, warns on HIGH
Secrets Scanning Detect leaked credentials Trivy secret scanner Fails on CRITICAL/HIGH

CI Configuration Files

  • .github/workflows/ci.yml - Main CI pipeline definition
  • .ruff.toml - Python linting rules
  • .bandit - Security scanning configuration
  • mypy.ini - Static type checking configuration
  • .tflint.hcl - Terraform linting rules
  • .hadolint.yaml - Dockerfile linting rules

What Gets Checked

  • Python: Code style, unused imports, security vulnerabilities, type hints
  • Terraform: Syntax, formatting, AWS best practices, security issues
  • Docker: Security vulnerabilities in images, Dockerfile best practices
  • Secrets: API keys, passwords, tokens across all files

The CI ensures code quality and security standards are maintained throughout development.


Implementation Status

Component Status Notes
ECS Kafka Producer ✅ Implemented Python, IAM auth, DLQ support
MSK Cluster (KRaft) ✅ Implemented Kafka 3.9.x, 2 brokers
Bronze Streaming Job ✅ Implemented 3 min micro-batches, MERGE INTO
Silver Batch Job ✅ Implemented Deduplication, normalization
Gold Batch Job ✅ Implemented Aggregations, risk scores, daily analytics summary
Bronze DQ Gate ✅ Implemented Completeness, timeliness (95% ≤3min), validity
Silver DQ Gate ✅ Implemented Accuracy, consistency, uniqueness, drift detection
Gold DQ Gate ✅ Implemented Upstream validation, aggregation consistency
DQ Audit Tables ✅ Implemented dq_audit.quality_results, dq_audit.profile_metrics
Step Functions Pipeline ✅ Implemented Self-looping: Bronze DQ → Silver → Silver DQ → Gold → Gold DQ → Wait 10min → Repeat
Auto-Restart Lambda ✅ Implemented CloudWatch alarm trigger
CloudWatch Dashboard ✅ Implemented Pipeline health + DQ gate metrics
Grafana (Docker) ✅ Implemented Local operational monitoring
SNS Alerts ✅ Implemented Failure notifications to mrshihabullah@gmail.com
S3 Tables (Iceberg) ✅ Implemented bronze, silver, gold, dq_audit namespaces
Cost Optimization Scripts ✅ Implemented destroy/create for dev workflow

Author

Built as a portfolio project demonstrating modern data engineering on AWS.

Skills Demonstrated:

  • Real-time streaming with Kafka (MSK Kafka 3.9.x)
  • Apache Iceberg 1.10.0 on S3 Tables
  • Serverless Spark (EMR Serverless 7.12.0, Spark 3.5)
  • Infrastructure as Code (Terraform)
  • Medallion Architecture (Bronze streaming, Silver/Gold batch)
  • Data Quality Gates (AWS Deequ 2.0.7 with audit trail)
  • Pipeline Orchestration (self-looping batch pipeline via Step Functions )
  • Observability (CloudWatch Dashboard, Alarms, SNS Alerts)
  • Self-healing infrastructure (Lambda auto-restart on health check failure)
  • Cost-optimized development workflow (destroy/create scripts)

About

A streaming data pipeline using only AWS that captures live Wikipedia edits from Wikimedia EventStreams, processes them through a medallion architecture, and surfaces insights via dashboards for content monitoring and risk detection with pipeline observability.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors