Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cached dataframe makes join slower? #19480

Open
2 tasks done
jackxxu opened this issue Oct 27, 2024 · 0 comments
Open
2 tasks done

cached dataframe makes join slower? #19480

jackxxu opened this issue Oct 27, 2024 · 0 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@jackxxu
Copy link

jackxxu commented Oct 27, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import polars as pl
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import List

BINS_COUNT = 10
BINS_COUNT2 = 5

def process_file(file_paths):
    # print(f"Processing file: {file_path}")
    return (
        pl.scan_parquet(file_paths[0])  # Lazily read the Parquet file
        .join(pl.scan_parquet(file_paths[1]), on='id', how='left')
        .with_columns(
            pl.col('a').qcut(BINS_COUNT, labels=[str(i) for i in range(BINS_COUNT)]).cast(pl.Int8).alias('bins'),
            pl.col('a').qcut(BINS_COUNT2, labels=[str(i) for i in range(BINS_COUNT2)]).cast(pl.Int8).alias('bins_2')
        )
    )

def stitch_files(file_paths: List[List]):
    with ThreadPoolExecutor() as executor:
        # Execute the processing function for each file concurrently
        lazy_frames = list(executor.map(process_file, file_paths))

    return pl.concat(
        lazy_frames,
        rechunk=True  # Rechunking for better memory usage
    )

non_repeated_paths = list(zip(
    Path("data").glob("*.parquet"),
    Path("data2").glob("*.parquet")))

repeated_paths = list(zip(
    Path("data").glob("*.parquet"),
    ['data2/2025-01-22.parquet'] * len(list(Path("data").glob("*.parquet"))))) # 1 file repeated

%%timeit

stitch_files(non_repeated_paths).collect()  # ==>  around 2 seconds

print(stitch_files(non_repeated_paths).explain())
%%timeit

stitch_files(repeated_paths).collect()         # ==>  around 2.8 seconds
print(stitch_files(repeated_paths).explain())

Log output

this the partial output of the explain() method

    ## uncached dataframe (one of the plans) 

    Parquet SCAN [data2/2025-09-05.parquet]
    PROJECT */4 COLUMNS
    ## cached dataframe (one of the plans )

    CACHE[id: 0, cache_hits: 499]
      simple π 4/4 ["id", "a", "cat", "date"]
        Parquet SCAN [data2/2025-01-22.parquet]
        PROJECT 4/4 COLUMNS

Issue description

I am trying switch multiple parquets both horizontally (2 parquets) and vertically (hundreds of parquets) into a large dataframe. I join first horizontally and then vertically stack them.

i tried two ways. one way is each time dataframe is unique, and the second one, one of the 2 dataframe is the same for each layer. as expected, polars cached the 2nd dataframe as the explain() output shows.

however, the cache version is 40% slower than the hte uncached version in my experiments. and also, CPU utilization is limited in the cached version, which may indicate some kind of locking.

the code can be found here https://github.com/jackxxu/polars_merge/tree/main.

Expected behavior

i expected the cached version to be faster, but it turns out the opposite.

also, the CPU usage of the cached version is much lower, which explains why it is slower.

Installed versions

Polars:              1.10.0
Index type:          UInt32
Platform:            macOS-14.6.1-arm64-arm-64bit
Python:              3.10.1 (main, Apr  5 2024, 20:49:47) [Clang 15.0.0 (clang-1500.3.9.4)]
LTS CPU:             False

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               <not installed>
gevent               <not installed>
great_tables         <not installed>
matplotlib           <not installed>
nest_asyncio         1.6.0
numpy                2.1.2
openpyxl             <not installed>
pandas               <not installed>
pyarrow              <not installed>
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@jackxxu jackxxu added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Oct 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

1 participant