Skip to content

Conversation

@vinodkc
Copy link
Contributor

@vinodkc vinodkc commented Nov 26, 2025

What changes were proposed in this pull request?

This PR adds a new SQL function time_bucket() that buckets TIME values into fixed-width intervals, returning the start time of each bucket. This enables histogram generation and time-of-day pattern analysis for TIME columns.

Why are the changes needed?

The TIME type currently lacks a bucketing function for aggregation and analysis. Users cannot easily group TIME values by arbitrary intervals (e.g., 15-minute or 1-hour buckets) without complex manual calculations.

Current Gap:

Existing functions don't support TIME bucketing:

  • window(): Only works with TIMESTAMP, not TIME. Returns a struct, not a scalar.
  • date_trunc(): Doesn't support TIME type
  • time_trunc(): Only supports fixed calendar units (HOUR, MINUTE), not arbitrary intervals like "15 minutes" or "90 minutes"

Current workarounds are error-prone, hard to maintain:

-- Manual calculation (error-prone, hard to maintain)
SELECT TIME(FLOOR(TIME_TO_SECONDS(event_time) / 900) * 900) as bucket FROM events;

Proposed solution:

SELECT time_bucket(INTERVAL '15' MINUTE, event_time) as bucket FROM events;

Use Cases:

This function addresses common real-world analytics needs:

  1. Retail Analytics: Analyze customer traffic by 30-minute slots to optimize staffing
  2. Healthcare: Group appointments by 15-minute intervals for scheduling optimization
  3. Manufacturing: Aggregate sensor readings by hourly buckets to detect production patterns
  4. DevOps: Bucket system events by 5-minute intervals for performance monitoring
  5. Business Intelligence: Create time-of-day histograms for reporting

Industry Precedent:

  • SQL Server 2022: DATE_BUCKET() supports TIME type bucketing
  • TimescaleDB: time_bucket() is one of their most popular functions for time-series analytics
  • This fills a critical gap in Spark's TIME type functionality and brings it on par with leading databases

Does this PR introduce any user-facing change?

Yes. This PR adds a new SQL function time_bucket() available in SQL, Scala, Python, and Spark Connect.

Function Signature

time_bucket(bucket_width, time) -> TIME

Parameters:

  • bucket_width: A day-time interval expression (e.g., INTERVAL '15' MINUTE)
  • time: A TIME value to bucket

Behavior:

  • Returns the start of the time bucket containing the input time
  • Buckets are aligned to midnight (00:00:00)
  • Buckets cannot span across midnight
  • Returns the same precision as the input TIME type
  • Returns NULL if either input is NULL

Examples

Example 1: Basic Bucketing

-- 15-minute buckets
SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22');
-- Result: 09:30:00

-- 30-minute buckets
SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00');
-- Result: 14:30:00

-- 1-hour buckets
SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00');
-- Result: 16:00:00

-- 2-hour buckets
SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00');
-- Result: 14:00:00

Example 2: Retail Analytics - Peak Shopping Hours

-- Find busiest 30-minute slots in a store
SELECT time_bucket(INTERVAL '30' MINUTE, purchase_time) AS time_slot,
       COUNT(*) AS customer_count,
       SUM(total_amount) AS revenue
FROM sales
WHERE date = '2024-01-15'
GROUP BY time_slot
ORDER BY customer_count DESC
LIMIT 10;

-- Sample Output:
-- +----------+---------------+---------+
-- |time_slot |customer_count |revenue  |
-- +----------+---------------+---------+
-- |14:00:00  |           245 | 12450.50|
-- |14:30:00  |           231 | 11890.25|
-- |12:00:00  |           198 |  9875.00|
-- +----------+---------------+---------+

Example 3: Healthcare - Appointment Scheduling

-- Analyze appointment distribution by 15-minute slots
SELECT time_bucket(INTERVAL '15' MINUTE, appointment_time) AS slot,
       COUNT(*) AS appointments,
       AVG(duration_minutes) AS avg_duration,
       SUM(CASE WHEN status = 'no_show' THEN 1 ELSE 0 END) AS no_shows
FROM appointments
WHERE appointment_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY slot
ORDER BY slot;

-- Sample Output:
-- +----------+-------------+-------------+---------+
-- |slot      |appointments |avg_duration |no_shows |
-- +----------+-------------+-------------+---------+
-- |08:00:00  |          45 |        22.3 |       2 |
-- |08:15:00  |          48 |        24.1 |       3 |
-- |08:30:00  |          52 |        21.8 |       1 |
-- +----------+-------------+-------------+---------+

Example 4: Edge Cases

-- Midnight (start of day)
SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00');
-- Result: 00:00:00

-- Just before midnight
SELECT time_bucket(INTERVAL '1' HOUR, TIME'23:59:59.999999');
-- Result: 23:00:00

-- Microsecond precision
SELECT time_bucket(INTERVAL '1' MICROSECOND, TIME'12:34:56.123456');
-- Result: 12:34:56.123456

-- Millisecond buckets
SELECT time_bucket(INTERVAL '100' MILLISECOND, TIME'12:34:56.789123');
-- Result: 12:34:56.700000

-- Null handling
SELECT time_bucket(INTERVAL '15' MINUTE, NULL);
-- Result: NULL

SELECT time_bucket(NULL, TIME'12:34:56');
-- Result: NULL

Scala API

import org.apache.spark.sql.functions._
import java.time.LocalTime

val events = Seq(
  (1, LocalTime.of(9, 5, 30), 45, 150.0),
  (2, LocalTime.of(9, 37, 45), 67, 175.0),
  (3, LocalTime.of(10, 12, 0), 28, 225.0)
).toDF("event_id", "event_time", "duration", "value")

events.createOrReplaceTempView("events")
val df = spark.table("events")

// Test Example 1
df.groupBy(time_bucket(expr("INTERVAL '15' MINUTE"), col("event_time")).as("bucket"))
  .agg(count("*").as("count"))
  .orderBy("bucket")
  .show()

// Test Example 2
df.groupBy(time_bucket("30 minutes", col("event_time")).as("bucket"))
  .count()
  .show()

// Test Example 3
df.groupBy(time_bucket("1 hour", col("event_time")).as("hour"))
  .agg(
    count("*").as("total_events"),
    avg("duration").as("avg_duration"),
    max("value").as("max_value")
  )
  .show()

Python API

from pyspark.sql import functions as F

# Example 1: Basic bucketing
df = spark.table("events")
df.groupBy(F.time_bucket(F.expr("INTERVAL '15' MINUTE"), "event_time").alias("bucket")) \
  .count() \
  .show()

# Example 2: Histogram generation
df.groupBy(F.time_bucket(F.expr("INTERVAL '30' MINUTE"), "event_time").alias("slot")) \
  .agg(
      F.count("*").alias("count"),
      F.avg("value").alias("avg_value"),
      F.stddev("value").alias("stddev_value")
  ) \
  .orderBy("slot") \
  .show()

# Example 3: Peak detection
peak_hours = df.groupBy(
    F.time_bucket(F.expr("INTERVAL '1' HOUR"), "purchase_time").alias("hour")
).agg(
    F.sum("amount").alias("revenue")
).filter(
    F.col("revenue") > 10000
).orderBy(F.desc("revenue"))

peak_hours.show()

How was this patch tested?

Added tests in TimeFunctionsSuiteBase and sql-tests/inputs/time.sql

Was this patch authored or co-authored using generative AI tooling?

No

@vinodkc
Copy link
Contributor Author

vinodkc commented Nov 27, 2025

@dongjoon-hyun , Could you please review this (4.2.0) PR?

@dongjoon-hyun
Copy link
Member

Sorry, I'm a bit low at bandwidth due to the Apache Spark 4.1.0 release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

2 participants