Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: UDFs not present on pyspark workers #10273

Open
1 task done
NickCrews opened this issue Oct 5, 2024 · 0 comments
Open
1 task done

bug: UDFs not present on pyspark workers #10273

NickCrews opened this issue Oct 5, 2024 · 0 comments
Labels
bug Incorrect behavior inside of ibis

Comments

@NickCrews
Copy link
Contributor

NickCrews commented Oct 5, 2024

What happened?

Discovered in NickCrews/mismo#64. CC @jstammers. Here is a more minimal reproducer.

Run with uv run script.py to get uv to install the deps automatically, or install them manually and then run python script.py.

# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "ibis-framework[duckdb]",
#     "pyspark",
# ]
# ///
from __future__ import annotations

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import ibis

spark = SparkSession.builder.master("local").appName("My App").getOrCreate()


@ibis.udf.scalar.python
def ibis_udf_global(x: int) -> int:
    return x + 1


@F.pandas_udf(returnType="int")
def spark_udf(inp: pd.Series) -> pd.Series:
    @ibis.udf.scalar.python
    def ibis_udf_local(x: int) -> int:
        return x + 1

    t = ibis.memtable({"inp": inp})
    t = t.mutate(out=ibis_udf_global(t.inp))
    return t.out.to_pandas()


df = spark.createDataFrame(pd.DataFrame({"inp": (1, 2, 3)}))
print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas())

This gives the below AttributeError. If I swap out the call to ibis_udf_global with the call to ibis_udf_local, then this script works.

Traceback
  File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 36, in <module>
    print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py", line 202, in toPandas
    rows = self.collect()
           ^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1263, in collect
    sock_info = self._jdf.collectToPython()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 31, in spark_udf
    t = t.mutate(out=ibis_udf_global(t.inp))
                     ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/common/deferred.py", line 613, in inner
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/expr/operations/udf.py", line 165, in construct
    return node(*args, **kwargs).to_expr()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/bases.py", line 72, in __call__
    return cls.__create__(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/grounds.py", line 120, in __create__
    return super().__create__(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/grounds.py", line 199, in __init__
    object.__setattr__(self, name, value)
AttributeError: 'ibis_udf_global_0' object has no attribute 'x'

What version of ibis are you using?

main

What backend(s) are you using, if any?

pyspark

Relevant log output

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct
@NickCrews NickCrews added the bug Incorrect behavior inside of ibis label Oct 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Incorrect behavior inside of ibis
Projects
Status: backlog
Development

No branches or pull requests

1 participant