Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write_parquet followed by read_parquet may lead to a deadlock #19380

Open
2 tasks done
MTandHJ opened this issue Oct 22, 2024 · 5 comments
Open
2 tasks done

write_parquet followed by read_parquet may lead to a deadlock #19380

MTandHJ opened this issue Oct 22, 2024 · 5 comments
Labels
bug Something isn't working invalid A bug report that is not actually a bug needs repro Bug does not yet have a reproducible example needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@MTandHJ
Copy link

MTandHJ commented Oct 22, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

                    chunk.write_parquet(
                        os.path.join(
                            path,
                            self.DEFAULT_PARQUET_FILE.format(chunk=k)
                        )
                    )
        for k in range(num_chunks):
            parquet_file = path.format(chunk=k)
            yield pl.read_parquet(
                parquet_file, columns=columns
            )

Log output

None.

Issue description

I use polars to preprocess the .csv data and write it into parquet files. However, I found if I immediately read this file (using read_parquet), the process will be hanged in case multiprocess sampling (num_workers > 0) is adopted in DataLoader (torch).
Moreover, when I restart the script again (no need to process the data), the data loading process can be regular.
Hence, maybe something after write_parquet is not completely terminated?

Thanks!

Expected behavior

None.

Installed versions

--------Version info---------
Polars:              1.9.0
Index type:          UInt32
Platform:            Linux-5.15.0-118-generic-x86_64-with-glibc2.31
Python:              3.9.18 (main, Sep 11 2023, 13:41:44) 
[GCC 11.2.0]

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               2023.10.0
gevent               <not installed>
great_tables         <not installed>
matplotlib           3.8.3
nest_asyncio         1.6.0
numpy                1.22.4
openpyxl             <not installed>
pandas               2.2.1
pyarrow              <not installed>
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                2.2.2
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@MTandHJ MTandHJ added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Oct 22, 2024
@coastalwhite
Copy link
Collaborator

Can you provide a somewhat reproducible example of this happening?

@coastalwhite coastalwhite added the needs repro Bug does not yet have a reproducible example label Oct 22, 2024
@MTandHJ
Copy link
Author

MTandHJ commented Oct 23, 2024

The following codes is hanged:

import torchdata.datapipes as dp
import polars as pl
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService


class DataSet(dp.iter.IterDataPipe):

    def __init__(self):
        super().__init__()

        df = pl.DataFrame(
            {
                'a': range(10),
                'b': range(10),
                'c': range(10),
            }
        )

        df.write_parquet("test.parquet")
        print(">>> Save parquet")
    
    def __iter__(self):
        df = pl.read_parquet("test.parquet")
        print(">>> Load parquet")
        for row in df.iter_rows():
            print(">>> Yield row")
            yield row

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=4)
)

for row in dataloader:
    print(row)

Outputs:

>>> Save parquet

The following code is regular:

import torchdata.datapipes as dp
import polars as pl
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService


class DataSet(dp.iter.IterDataPipe):

    def __init__(self):
        super().__init__()

        df = pl.DataFrame(
            {
                'a': range(10),
                'b': range(10),
                'c': range(10),
            }
        )

        # test.parquet is already saved
        # df.write_parquet("test.parquet")
        # print(">>> Save parquet")
    
    def __iter__(self):
        df = pl.read_parquet("test.parquet")
        print(">>> Load parquet")
        for row in df.iter_rows():
            print(">>> Yield row")
            yield row

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=4)
)

for row in dataloader:
    print(row)

Outputs:

>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Load parquet
>>> Load parquet
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
>>> Yield row
(0, 0, 0)
(0, 0, 0)
(0, 0, 0)
(0, 0, 0)
(1, 1, 1)
(1, 1, 1)
(1, 1, 1)
(1, 1, 1)
(2, 2, 2)
(2, 2, 2)
(2, 2, 2)
(2, 2, 2)
(3, 3, 3)
(3, 3, 3)
(3, 3, 3)
(3, 3, 3)
(4, 4, 4)
(4, 4, 4)
(4, 4, 4)
(4, 4, 4)
(5, 5, 5)
(5, 5, 5)
(5, 5, 5)
(5, 5, 5)
(6, 6, 6)
(6, 6, 6)
(6, 6, 6)
(6, 6, 6)
(7, 7, 7)
(7, 7, 7)
(7, 7, 7)
(7, 7, 7)
(8, 8, 8)
(8, 8, 8)
(8, 8, 8)
(8, 8, 8)
(9, 9, 9)
(9, 9, 9)
(9, 9, 9)
(9, 9, 9)

@MTandHJ
Copy link
Author

MTandHJ commented Oct 23, 2024

Can you provide a somewhat reproducible example of this happening?

Besides, if the number_workers is set to 0:

dataloader = DataLoader2(
    DataSet(),
    reading_service=MultiProcessingReadingService(num_workers=0)
)

Outputs:

>>> Load parquet
>>> Yield row
(0, 0, 0)
>>> Yield row
(1, 1, 1)
>>> Yield row
(2, 2, 2)
>>> Yield row
(3, 3, 3)
>>> Yield row
(4, 4, 4)
>>> Yield row
(5, 5, 5)
>>> Yield row
(6, 6, 6)
>>> Yield row
(7, 7, 7)
>>> Yield row
(8, 8, 8)
>>> Yield row
(9, 9, 9)

@ritchie46
Copy link
Member

You are using multiprocessing. That is likely the source of your deadlocks:

https://docs.pola.rs/user-guide/misc/multiprocessing/

@ritchie46 ritchie46 added the invalid A bug report that is not actually a bug label Oct 23, 2024
@MTandHJ
Copy link
Author

MTandHJ commented Oct 23, 2024

You are using multiprocessing. That is likely the source of your deadlocks:

https://docs.pola.rs/user-guide/misc/multiprocessing/

Thanks! This answers the raising of deadlocks:

For reading the file with pl.read_parquet the file has to be locked. Then os.fork() is called, copying the state of the parent process, including mutexes. Thus all child processes will copy the file lock in an acquired state, leaving them hanging indefinitely waiting for the file lock to be released, which never happens.

Somewhat differently, in my case the file lock is caused by write_parquet instead of read_parquet. It will be helpful if the state can be manually changed after saving (any risk here?).

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working invalid A bug report that is not actually a bug needs repro Bug does not yet have a reproducible example needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

3 participants