2

I am performing operations on a directed acyclic graph (DAG) of DataFrames using Polars (eager API).

Here’s a simplified example of my workflow:

  1. Read parquet files into df1.
  2. Use df1 to create df2 and df3.
  3. Use df3 to create df4.
  4. Use both df2 and df3 to create df5.

In reality, I have more than 200 such levels of DataFrame transformations to reach the final leaf nodes (output DataFrames).


Problem

Each DataFrame holds millions of rows and ~100 columns, so memory usage grows rapidly. I want to release memory for intermediate DataFrames as soon as they are no longer needed.

Example:

  • Once df2 and df3 are created, I no longer need df1 → release it from memory.
  • Once df4 and df5 are created, I no longer need df2 and df3.

I have tried:

del df1
import gc
gc.collect()

...but the memory is not being released.

Constraints

  • I am aware I could run the entire DAG execution in a subprocess, so that when the process finishes, memory is released.

  • However, I cannot wait for the entire graph to finish — I must free memory incrementally during execution because I run out of memory before completing the graph.

Question

How can I explicitly release memory of intermediate Polars DataFrames while processing a large DAG of transformations like this?

2

2 Answers 2

0

There are two non-mutually exclusive paths you can try:

  1. Make this setting which will tweak how jemalloc works:
import os
os.environ["_RJEM_MALLOC_CONF"]="background_thread:true,dirty_decay_ms:500,muzzy_decay_ms:-1"
import polars as pl
# Everything else

From the comments in the issue tracker, that is recommended by ritchie, the founder of polars.

  1. Have each step (or batch of steps) be its own process where you would write a file or just pass raw bytes from process to process. This will slow you down as you're committing the sin of serialization and deserialization between each step but you're guaranteed to free memory when a process closes and a new one starts.
Sign up to request clarification or add additional context in comments.

Comments

0

It almost the same as in: Memory Not Released After Each Request Despite Cleanup Attempts but instead we use pipe because we want to receive data from processing and use it going forward

1- Process the first Dataframe df1(ressource are freed when value are sent) then send those value through pipe to our two cosumers

2- Each cosumer process it's own dataframe(s) after each processing is done ressource are freed

import polars as pl
from multiprocessing import Process, Pipe
import gc
import ctypes

def malloc_trim():
    try:
        ctypes.CDLL("libc.so.6").malloc_trim(0)
    except:
        pass

# Load df1, produce df2 and df3, send to both consumers
# Producer: Loads input, prepares dataframes, and sends to consumers
def producer_load_and_distribute(pipe_for_both, pipe_for_df3):
    df1 = pl.read_parquet("input.parquet")

    # add processing logic for df2
    df2 = df1.filter(pl.col("value") > 10)
    # add processing logic for df3
    
    df3 = df1.select(["id", "value"]).with_columns(
        (pl.col("value") * 2).alias("double_value")
    )
    
    # serialize to arrow
    df2_arrow = df2.to_arrow()
    df3_arrow = df3.to_arrow()

    # Send to both consumers
    pipe_for_both.send((df2_arrow, df3_arrow))
    pipe_for_df3.send(df3_arrow)

    # Cleanup
    del df1, df2, df3
    gc.collect()
    malloc_trim()

# Needs both df2 and df3
# Consumer: Receives both df2 and df3, processes, and writes output
def consumer_process_both(pipe_conn):
    df2_arrow, df3_arrow = pipe_conn.recv()
    # deserialize from arrow
    df2 = pl.from_arrow(df2_arrow) 
    df3 = pl.from_arrow(df3_arrow)

    # Add processing logic for df2 and df3
    df5 = df2.join(df3, on="id", how="inner")
    df5.write_parquet("df5.parquet")

    del df2, df3, df5
    gc.collect()
    malloc_trim()

# Needs only df3
# Consumer: Receives only df3, processes, and writes output
def consumer_process_df3(pipe_conn):
    df3_arrow = pipe_conn.recv()
    df3 = pl.from_arrow(df3_arrow)

    # Add processing logic for df3
    df4 = df3.with_columns((pl.col("double_value") + 1).alias("incremented"))
    df4.write_parquet("df4.parquet")

    del df3, df4
    gc.collect()
    malloc_trim()


if __name__ == "__main__":
    # Create pipes for each consumer
    parent_both, child_both = Pipe()
    parent_df3, child_df3 = Pipe()

    # Producer process
    p_producer = Process(target=producer_load_and_distribute, args=(child_both, child_df3))
    # consumers
    p_consumer_both = Process(target=consumer_process_both, args=(parent_both,))
    p_consumer_df3 = Process(target=consumer_process_df3, args=(parent_df3,))
    # Start
    p_consumer_both.start()
    p_consumer_df3.start()
    p_producer.start()
    # Wait for producer to finish so df1 is freed
    p_producer.join()
    # Wait for consumers to finish
    p_consumer_both.join()
    p_consumer_df3.join()

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.