Skip to content

Commit

Permalink
Fix conversion from arrow column to arrow RecordBatch (#3359)
Browse files Browse the repository at this point in the history
wjsi authored Sep 25, 2023
1 parent 2175678 commit 0a42ba8
Showing 19 changed files with 71 additions and 41 deletions.
6 changes: 6 additions & 0 deletions mars/dataframe/arrays.py
Original file line number Diff line number Diff line change
@@ -301,6 +301,12 @@ def __repr__(self):
def _array(self):
return self._arrow_array if self._use_arrow else self._ndarray

def __arrow_array__(self, type=None):
if self._use_arrow:
combined = self._arrow_array.combine_chunks()
return combined.cast(type) if type else combined
return super().__arrow_array__(type=type)

@property
def dtype(self) -> "Type[ArrowDtype]":
return self._dtype
5 changes: 5 additions & 0 deletions mars/dataframe/core.py
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@
tokenize,
estimate_pandas_size,
calc_nsplits,
is_debugger_repr_thread,
)
from .utils import fetch_corner_data, ReprSeries, parse_index, merge_index_value

@@ -1430,6 +1431,10 @@ def __mars_tensor__(self, dtype=None, order="K"):
return tensor.astype(dtype=dtype, order=order, copy=False)

def iteritems(self, batch_size=10000, session=None):
if is_build_mode():
raise NotImplementedError("Not implemented when building dags")
if is_debugger_repr_thread() and len(self._executed_sessions) == 0:
raise NotImplementedError("Not implemented when not executed under debug")
for batch_data in self.iterbatch(batch_size=batch_size, session=session):
yield from getattr(batch_data, "iteritems")()

4 changes: 4 additions & 0 deletions mars/dataframe/tests/test_arrays.py
Original file line number Diff line number Diff line change
@@ -480,3 +480,7 @@ def test_to_pandas():
s2 = df2["b"].str[:2]
expected = df["b"].astype("string").str[:2]
pd.testing.assert_series_equal(s2, expected)

# test reverse conversion to arrow
arrow_data = pa.RecordBatch.from_pandas(df2)
assert arrow_data.num_rows == len(df2)
2 changes: 1 addition & 1 deletion mars/learn/contrib/joblib/backend.py
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ def configure(self, n_jobs=1, parallel=None, **backend_args):

def effective_n_jobs(self, n_jobs):
eff_n_jobs = super(MarsDistributedBackend, self).effective_n_jobs(n_jobs)
if n_jobs == -1:
if n_jobs == -1 or not eff_n_jobs:
eff_n_jobs = self.n_parallel
return eff_n_jobs

5 changes: 3 additions & 2 deletions mars/learn/contrib/lightgbm/classifier.py
Original file line number Diff line number Diff line change
@@ -24,8 +24,9 @@
lightgbm = None


LGBMClassifier = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMClassifier = make_import_error_func("lightgbm")
else:

class LGBMClassifier(LGBMScikitLearnBase, lightgbm.LGBMClassifier):
def fit(
3 changes: 2 additions & 1 deletion mars/learn/contrib/lightgbm/core.py
Original file line number Diff line number Diff line change
@@ -19,8 +19,9 @@
import numpy as np
import pandas as pd

from ....tensor import tensor as mars_tensor
from ....dataframe import DataFrame as MarsDataFrame, Series as MarsSeries
from ....lib.version import parse as parse_version
from ....tensor import tensor as mars_tensor


class LGBMModelType(enum.Enum):
5 changes: 3 additions & 2 deletions mars/learn/contrib/lightgbm/ranker.py
Original file line number Diff line number Diff line change
@@ -24,8 +24,9 @@
lightgbm = None


LGBMRanker = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMRanker = make_import_error_func("lightgbm")
else:

class LGBMRanker(LGBMScikitLearnBase, lightgbm.LGBMRanker):
def fit(
5 changes: 3 additions & 2 deletions mars/learn/contrib/lightgbm/regressor.py
Original file line number Diff line number Diff line change
@@ -24,8 +24,9 @@
lightgbm = None


LGBMRegressor = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMRegressor = make_import_error_func("lightgbm")
else:

class LGBMRegressor(LGBMScikitLearnBase, lightgbm.LGBMRegressor):
def fit(
18 changes: 9 additions & 9 deletions mars/learn/contrib/lightgbm/tests/test_classifier.py
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ async def create_cluster():
def test_local_classifier(create_cluster):
y_data = (y * 10).astype(mt.int32)
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X, y_data, eval_set=[(X, y_data)], verbose=True)
classifier.fit(X, y_data, eval_set=[(X, y_data)])
prediction = classifier.predict(X)

assert prediction.ndim == 1
@@ -76,7 +76,7 @@ def test_local_classifier(create_cluster):
X_sparse_data = X_sparse
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(
X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)], verbose=True
X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)]
)
prediction = classifier.predict(X_sparse_data)

@@ -94,7 +94,7 @@ def test_local_classifier(create_cluster):
# test dataframe
X_df_data = X_df
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X_df_data, y_data, verbose=True)
classifier.fit(X_df_data, y_data)
prediction = classifier.predict(X_df_data)

assert prediction.ndim == 1
@@ -110,7 +110,7 @@ def test_local_classifier(create_cluster):
y_df = md.DataFrame(y_data)
for weight in weights:
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X, y_df, sample_weight=weight, verbose=True)
classifier.fit(X, y_df, sample_weight=weight)
prediction = classifier.predict(X)

assert prediction.ndim == 1
@@ -119,13 +119,13 @@ def test_local_classifier(create_cluster):
# should raise error if weight.ndim > 1
with pytest.raises(ValueError):
LGBMClassifier(n_estimators=2).fit(
X, y_df, sample_weight=mt.random.rand(1, 1), verbose=True
X, y_df, sample_weight=mt.random.rand(1, 1)
)

# test binary classifier
new_y = (y_data > 0.5).astype(mt.int32)
classifier = LGBMClassifier(n_estimators=2)
classifier.fit(X, new_y, verbose=True)
classifier.fit(X, new_y)

prediction = classifier.predict(X)
assert prediction.ndim == 1
@@ -139,7 +139,7 @@ def test_local_classifier(create_cluster):
X_np = X.execute().fetch()
new_y_np = new_y.execute().fetch()
raw_classifier = lightgbm.LGBMClassifier(n_estimators=2)
raw_classifier.fit(X_np, new_y_np, verbose=True)
raw_classifier.fit(X_np, new_y_np)

classifier = LGBMClassifier(raw_classifier)
label_result = classifier.predict(X_df)
@@ -162,7 +162,7 @@ def test_local_classifier_from_to_parquet(setup):

# test with existing model
classifier = lightgbm.LGBMClassifier(n_estimators=2)
classifier.fit(X, y, verbose=True)
classifier.fit(X, y)

with tempfile.TemporaryDirectory() as d:
result_dir = os.path.join(d, "result")
@@ -239,7 +239,7 @@ def fit(

y_data = (y * 10).astype(mt.int32)
classifier = MockLGBMClassifier(n_estimators=2)
classifier.fit(X, y_data, eval_set=[(X, y_data)], verbose=True)
classifier.fit(X, y_data, eval_set=[(X, y_data)])
prediction = classifier.predict(X)

assert prediction.ndim == 1
2 changes: 1 addition & 1 deletion mars/learn/contrib/lightgbm/tests/test_ranker.py
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@
def test_local_ranker(setup):
y = (y_raw * 10).astype(mt.int32)
ranker = LGBMRanker(n_estimators=2)
ranker.fit(X_raw, y, group=[X_raw.shape[0]], verbose=True)
ranker.fit(X_raw, y, group=[X_raw.shape[0]])
prediction = ranker.predict(X_raw)

assert prediction.ndim == 1
6 changes: 3 additions & 3 deletions mars/learn/contrib/lightgbm/tests/test_regressor.py
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@
@pytest.mark.skipif(lightgbm is None, reason="LightGBM not installed")
def test_local_regressor(setup):
regressor = LGBMRegressor(n_estimators=2)
regressor.fit(X, y, verbose=True)
regressor.fit(X, y)
prediction = regressor.predict(X)

assert prediction.ndim == 1
@@ -62,7 +62,7 @@ def test_local_regressor(setup):

X_array, y_array = make_classification()
regressor = LGBMRegressor(n_estimators=2)
regressor.fit(X_array, y_array, verbose=True)
regressor.fit(X_array, y_array)
prediction = regressor.predict(X_array)

assert prediction.ndim == 1
@@ -71,7 +71,7 @@ def test_local_regressor(setup):
X_df = pd.DataFrame(X_array)
y_df = pd.Series(y_array)
regressor = LGBMRegressor(n_estimators=2)
regressor.fit(X_df, y_df, verbose=True)
regressor.fit(X_df, y_df)
prediction = regressor.predict(X_df)

assert prediction.ndim == 1
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/classifier.py
Original file line number Diff line number Diff line change
@@ -16,8 +16,9 @@
from .core import xgboost, XGBScikitLearnBase


XGBClassifier = make_import_error_func("xgboost")
if xgboost:
if not xgboost:
XGBClassifier = make_import_error_func("xgboost")
else:
from xgboost.sklearn import XGBClassifierBase

from .... import tensor as mt
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/core.py
Original file line number Diff line number Diff line change
@@ -22,8 +22,9 @@
from .dmatrix import MarsDMatrix


XGBScikitLearnBase = None
if xgboost:
if not xgboost:
XGBScikitLearnBase = None
else:

class XGBScikitLearnBase(xgboost.XGBModel):
"""
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/regressor.py
Original file line number Diff line number Diff line change
@@ -17,8 +17,9 @@
from .core import xgboost, XGBScikitLearnBase


XGBRegressor = make_import_error_func("xgboost")
if xgboost:
if not xgboost:
XGBRegressor = make_import_error_func("xgboost")
else:
from .core import wrap_evaluation_matrices
from .train import train
from .predict import predict
10 changes: 5 additions & 5 deletions mars/lib/mkl_interface.py
Original file line number Diff line number Diff line change
@@ -46,12 +46,12 @@ class MKLVersion(ctypes.Structure):
]


mkl_free_buffers = None
mkl_get_version = None
mkl_mem_stat = None

mkl_rt = _load_mkl_rt("mkl_rt")
if mkl_rt:
if not mkl_rt:
mkl_free_buffers = None
mkl_get_version = None
mkl_mem_stat = None
else:
try:
mkl_free_buffers = mkl_rt.mkl_free_buffers
mkl_free_buffers.argtypes = []
16 changes: 12 additions & 4 deletions mars/utils.py
Original file line number Diff line number Diff line change
@@ -1673,7 +1673,7 @@ def get_func_token(func):


def _get_func_token_values(func):
if hasattr(func, "__code__"):
if hasattr(func, "__code__") and func.__code__.co_code:
tokens = [func.__code__.co_code]
if func.__closure__ is not None:
cvars = tuple([x.cell_contents for x in func.__closure__])
@@ -1684,10 +1684,13 @@ def _get_func_token_values(func):
while isinstance(func, functools.partial):
tokens.extend([func.args, func.keywords])
func = func.func
if hasattr(func, "__code__"):
tokens.extend(_get_func_token_values(func))
elif isinstance(func, types.BuiltinFunctionType):
if (
isinstance(func, types.BuiltinFunctionType)
or "cython" in type(func).__name__
):
tokens.extend([func.__module__, func.__name__])
elif hasattr(func, "__code__"):
tokens.extend(_get_func_token_values(func))
else:
tokens.append(func)
return tokens
@@ -1912,3 +1915,8 @@ def get_node_ip_address(address="8.8.8.8:53"):
s.close()

return node_ip_address


def is_debugger_repr_thread():
thread_cls_name = type(threading.current_thread()).__name__
return "GetValue" in thread_cls_name and "Debug" in thread_cls_name
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ install_requires =
pandas>=1.0.0,<2.0.0
scipy>=1.0.0
scikit-learn>=0.20
numexpr>=2.6.4
numexpr>=2.6.4,!=2.8.5
cloudpickle>=1.5.0
pyyaml>=5.1
psutil>=5.9.0
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -20,11 +20,10 @@
import warnings
from sysconfig import get_config_vars

from pkg_resources import parse_version
from setuptools import setup, Extension, Command

import numpy as np
from Cython.Build import cythonize
from pkg_resources import parse_version
from setuptools import Command, Extension, setup
from setuptools.command.develop import develop
from setuptools.command.install import install
from setuptools.command.sdist import sdist
3 changes: 2 additions & 1 deletion versioneer.py
Original file line number Diff line number Diff line change
@@ -282,13 +282,13 @@

import configparser
import errno
import functools
import json
import os
import re
import subprocess
import sys
from typing import Callable, Dict
import functools


class VersioneerConfig:
@@ -1872,6 +1872,7 @@ def run(self):

if "cx_Freeze" in sys.modules: # cx_freeze enabled?
from cx_Freeze.dist import build_exe as _build_exe

# nczeczulin reports that py2exe won't like the pep440-style string
# as FILEVERSION, but it can be used for PRODUCTVERSION, e.g.
# setup(console=[{

0 comments on commit 0a42ba8

Please sign in to comment.