From 78d5615e431b1e3eaa1232a399614fd697d3b085 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Mon, 25 Sep 2023 16:48:00 +0800 Subject: [PATCH] Implements accessor for categorical types --- docs/source/conf.py | 4 +- docs/source/reference/dataframe/series.rst | 25 +++ mars/core/__init__.py | 1 + mars/dataframe/__init__.py | 22 +-- mars/dataframe/base/__init__.py | 15 +- mars/dataframe/base/accessor.py | 53 +++++- mars/dataframe/base/astype.py | 86 ++++------ mars/dataframe/base/categorical.py | 152 ++++++++++++++++++ .../base/tests/test_base_execution.py | 36 +++++ mars/learn/contrib/lightgbm/core.py | 1 - .../contrib/lightgbm/tests/test_classifier.py | 8 +- mars/opcodes.py | 1 + 12 files changed, 328 insertions(+), 76 deletions(-) create mode 100644 mars/dataframe/base/categorical.py diff --git a/docs/source/conf.py b/docs/source/conf.py index 41a531c7c4..58287412c0 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -183,13 +183,13 @@ # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { "dateutil": ("https://dateutil.readthedocs.io/en/latest/", None), - "matplotlib": ("https://matplotlib.org/", None), + "matplotlib": ("https://matplotlib.org/stable/", None), "numpy": ("https://numpy.org/doc/stable/", None), "pandas": ("https://pandas.pydata.org/docs/", None), "pandas-gbq": ("https://pandas-gbq.readthedocs.io/en/latest/", None), "py": ("https://pylib.readthedocs.io/en/latest/", None), "python": ("https://docs.python.org/3/", None), - "scipy": ("https://docs.scipy.org/doc/scipy/reference/", None), + "scipy": ("https://docs.scipy.org/doc/scipy/", None), "statsmodels": ("https://www.statsmodels.org/devel/", None), "pyarrow": ("https://arrow.apache.org/docs/", None), } diff --git a/docs/source/reference/dataframe/series.rst b/docs/source/reference/dataframe/series.rst index 3b939b665b..a9be3363fb 100644 --- a/docs/source/reference/dataframe/series.rst +++ b/docs/source/reference/dataframe/series.rst @@ -386,6 +386,31 @@ strings and apply several methods to it. These can be accessed like Series.str Series.dt + +.. _generated.series.cat: + +Categorical accessor +~~~~~~~~~~~~~~~ + +Categorical-dtype specific methods and attributes are available under +the ``Series.cat`` accessor. + +.. autosummary:: + :toctree: generated/ + :template: accessor_method.rst + + Series.cat.categories + Series.cat.ordered + Series.cat.codes + Series.cat.rename_categories + Series.cat.reorder_categories + Series.cat.add_categories + Series.cat.remove_categories + Series.cat.set_categories + Series.cat.as_ordered + Series.cat.as_unordered + + Plotting -------- ``Series.plot`` is both a callable method and a namespace attribute for diff --git a/mars/core/__init__.py b/mars/core/__init__.py index abc0385114..3ae5248572 100644 --- a/mars/core/__init__.py +++ b/mars/core/__init__.py @@ -15,6 +15,7 @@ # noinspection PyUnresolvedReferences from ..typing import ChunkType, TileableType, EntityType, OperandType from .base import ExecutionError +from .context import get_context from .entity import ( Entity, EntityData, diff --git a/mars/dataframe/__init__.py b/mars/dataframe/__init__.py index 5ae63f1633..85849a9ed0 100644 --- a/mars/dataframe/__init__.py +++ b/mars/dataframe/__init__.py @@ -45,32 +45,32 @@ from . import arithmetic from . import base +from . import datastore +from . import groupby from . import indexing from . import merge as merge_ from . import missing +from . import plotting from . import reduction -from . import statistics from . import sort -from . import groupby +from . import statistics from . import ufunc -from . import datastore from . import window -from . import plotting del ( - reduction, - statistics, arithmetic, - indexing, - merge_, base, + datastore, groupby, + indexing, + merge_, missing, - ufunc, - datastore, + plotting, + reduction, sort, + statistics, + ufunc, window, - plotting, ) del DataFrameFetch, DataFrameFetchShuffle diff --git a/mars/dataframe/base/__init__.py b/mars/dataframe/base/__init__.py index 4105b5afba..fdfb3173b1 100644 --- a/mars/dataframe/base/__init__.py +++ b/mars/dataframe/base/__init__.py @@ -55,9 +55,15 @@ def _install(): from ..core import DATAFRAME_TYPE, SERIES_TYPE, INDEX_TYPE from .standardize_range_index import ChunkStandardizeRangeIndex + from .categorical import _categorical_method_to_handlers from .string_ import _string_method_to_handlers from .datetimes import _datetime_method_to_handlers - from .accessor import StringAccessor, DatetimeAccessor, CachedAccessor + from .accessor import ( + CachedAccessor, + CategoricalAccessor, + DatetimeAccessor, + StringAccessor, + ) for t in DATAFRAME_TYPE: setattr(t, "to_gpu", to_gpu) @@ -134,6 +140,10 @@ def _install(): setattr(t, "is_monotonic_increasing", property(fget=is_monotonic_increasing)) setattr(t, "is_monotonic_decreasing", property(fget=is_monotonic_decreasing)) + for method in _categorical_method_to_handlers: + if not hasattr(CategoricalAccessor, method): + CategoricalAccessor._register(method) + for method in _string_method_to_handlers: if not hasattr(StringAccessor, method): StringAccessor._register(method) @@ -143,8 +153,9 @@ def _install(): DatetimeAccessor._register(method) for series in SERIES_TYPE: - series.str = CachedAccessor("str", StringAccessor) + series.cat = CachedAccessor("cat", CategoricalAccessor) series.dt = CachedAccessor("dt", DatetimeAccessor) + series.str = CachedAccessor("str", StringAccessor) _install() diff --git a/mars/dataframe/base/accessor.py b/mars/dataframe/base/accessor.py index 2751d5c2e7..b43a48af72 100644 --- a/mars/dataframe/base/accessor.py +++ b/mars/dataframe/base/accessor.py @@ -17,15 +17,17 @@ import pandas as pd from pandas.api.types import ( + is_categorical_dtype, is_datetime64_dtype, is_datetime64tz_dtype, - is_timedelta64_dtype, is_period_dtype, + is_timedelta64_dtype, ) from ...utils import adapt_mars_docstring -from .string_ import _string_method_to_handlers, SeriesStringMethod +from .categorical import _categorical_method_to_handlers, SeriesCategoricalMethod from .datetimes import _datetime_method_to_handlers, SeriesDatetimeMethod +from .string_ import _string_method_to_handlers, SeriesStringMethod class StringAccessor: @@ -262,6 +264,53 @@ def __dir__(self) -> Iterable[str]: return list(s) +class CategoricalAccessor: + def __init__(self, series): + if not is_categorical_dtype(series.dtype): + raise AttributeError("Can only use .cat accessor with categorical values") + self._series = series + + @property + def ordered(self): + return self._series.dtype.ordered + + @property + def categories(self): + return getattr(self, "_get_categories")() + + @classmethod + def _gen_func(cls, method, is_property): + def _inner(self, *args, **kwargs): + op = SeriesCategoricalMethod( + method=method, + is_property=is_property, + method_args=args, + method_kwargs=kwargs, + ) + return op(self._series) + + if hasattr(pd.Series.cat, method): + _inner = wraps(getattr(pd.Series.cat, method))(_inner) + _inner.__doc__ = adapt_mars_docstring( + getattr(pd.Series.cat, method).__doc__ + ) + return _inner + + @classmethod + def _register(cls, method): + # non-existing members are considered methods by default + is_property = not callable(getattr(pd.Series.cat, method, lambda: None)) + func = cls._gen_func(method, is_property) + if is_property: + func = property(func) + setattr(cls, method, func) + + def __dir__(self) -> Iterable[str]: + s = set(super().__dir__()) + s.update(_categorical_method_to_handlers.keys()) + return list(s) + + class CachedAccessor: def __init__(self, name: str, accessor) -> None: self._name = name diff --git a/mars/dataframe/base/astype.py b/mars/dataframe/base/astype.py index 4b50e1c55c..a8692af8ef 100644 --- a/mars/dataframe/base/astype.py +++ b/mars/dataframe/base/astype.py @@ -16,9 +16,9 @@ import pandas as pd from pandas.api.types import CategoricalDtype -from ... import opcodes as OperandDef +from ... import opcodes from ...core import recursive_tile -from ...serialization.serializables import AnyField, StringField, ListField +from ...serialization.serializables import AnyField, ListField, StringField from ...tensor.base import sort from ...utils import pd_release_version from ..core import DATAFRAME_TYPE, SERIES_TYPE @@ -29,42 +29,23 @@ class DataFrameAstype(DataFrameOperand, DataFrameOperandMixin): - _op_type_ = OperandDef.ASTYPE - - _dtype_values = AnyField("dtype_values") - _errors = StringField("errors") - _category_cols = ListField("category_cols") - - def __init__( - self, - dtype_values=None, - errors=None, - category_cols=None, - output_types=None, - **kw - ): - super().__init__( - _dtype_values=dtype_values, - _errors=errors, - _category_cols=category_cols, - _output_types=output_types, - **kw - ) + _op_type_ = opcodes.ASTYPE - @property - def dtype_values(self): - return self._dtype_values + dtype_values = AnyField("dtype_values", default=None) + errors = StringField("errors", default=None) + category_cols = ListField("category_cols", default=None) - @property - def errors(self): - return self._errors + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) - @property - def category_cols(self): - return self._category_cols + @classmethod + def _is_categories_missing(cls, dtype): + return (isinstance(dtype, str) and dtype == "category") or ( + isinstance(dtype, pd.CategoricalDtype) and dtype.categories is None + ) @classmethod - def _tile_one_chunk(cls, op): + def _tile_one_chunk(cls, op: "DataFrameAstype"): c = op.inputs[0].chunks[0] chunk_op = op.copy().reset_key() chunk_params = op.outputs[0].params.copy() @@ -80,15 +61,14 @@ def _tile_one_chunk(cls, op): ) @classmethod - def _tile_series_index(cls, op): + def _tile_series_index(cls, op: "DataFrameAstype"): in_series = op.inputs[0] out = op.outputs[0] unique_chunk = None - if op.dtype_values == "category" and isinstance(op.dtype_values, str): - unique_chunk = (yield from recursive_tile(sort(in_series.unique()))).chunks[ - 0 - ] + if cls._is_categories_missing(op.dtype_values): + unique = yield from recursive_tile(sort(in_series.unique())) + unique_chunk = unique.chunks[0] chunks = [] for c in in_series.chunks: @@ -96,7 +76,7 @@ def _tile_series_index(cls, op): params = c.params.copy() params["dtype"] = out.dtype if unique_chunk is not None: - chunk_op._category_cols = [in_series.name] + chunk_op.category_cols = [in_series.name] new_chunk = chunk_op.new_chunk([c, unique_chunk], **params) else: new_chunk = chunk_op.new_chunk([c], **params) @@ -108,13 +88,13 @@ def _tile_series_index(cls, op): ) @classmethod - def _tile_dataframe(cls, op): + def _tile_dataframe(cls, op: "DataFrameAstype"): in_df = op.inputs[0] out = op.outputs[0] cum_nsplits = np.cumsum((0,) + in_df.nsplits[1]) out_chunks = [] - if op.dtype_values == "category": + if cls._is_categories_missing(op.dtype_values): # all columns need unique values for c in in_df.chunks: chunk_op = op.copy().reset_key() @@ -123,21 +103,19 @@ def _tile_dataframe(cls, op): cum_nsplits[c.index[1]] : cum_nsplits[c.index[1] + 1] ] params["dtypes"] = dtypes - chunk_op._category_cols = list(c.columns_value.to_pandas()) + chunk_op.category_cols = list(c.columns_value.to_pandas()) unique_chunks = [] for col in c.columns_value.to_pandas(): unique = yield from recursive_tile(sort(in_df[col].unique())) unique_chunks.append(unique.chunks[0]) new_chunk = chunk_op.new_chunk([c] + unique_chunks, **params) out_chunks.append(new_chunk) - elif ( - isinstance(op.dtype_values, dict) and "category" in op.dtype_values.values() + elif isinstance(op.dtype_values, dict) and any( + cls._is_categories_missing(t) for t in op.dtype_values.values() ): # some columns' types are category category_cols = [ - c - for c, v in op.dtype_values.items() - if isinstance(v, str) and v == "category" + c for c, v in op.dtype_values.items() if cls._is_categories_missing(v) ] unique_chunks = dict() for col in category_cols: @@ -156,7 +134,7 @@ def _tile_dataframe(cls, op): if col in category_cols: chunk_category_cols.append(col) chunk_unique_chunks.append(unique_chunks[col]) - chunk_op._category_cols = chunk_category_cols + chunk_op.category_cols = chunk_category_cols new_chunk = chunk_op.new_chunk([c] + chunk_unique_chunks, **params) out_chunks.append(new_chunk) else: @@ -176,7 +154,7 @@ def _tile_dataframe(cls, op): ) @classmethod - def tile(cls, op): + def tile(cls, op: "DataFrameAstype"): if len(op.inputs[0].chunks) == 1: return cls._tile_one_chunk(op) elif isinstance(op.inputs[0], DATAFRAME_TYPE): @@ -185,13 +163,14 @@ def tile(cls, op): return (yield from cls._tile_series_index(op)) @classmethod - def execute(cls, ctx, op): + def execute(cls, ctx, op: "DataFrameAstype"): in_data = ctx[op.inputs[0].key] if not isinstance(op.dtype_values, dict): if op.category_cols is not None: uniques = [ctx[c.key] for c in op.inputs[1:]] + ordered = getattr(op.dtype_values, "ordered", None) dtype = dict( - (col, CategoricalDtype(unique_values)) + (col, CategoricalDtype(unique_values, ordered=ordered)) for col, unique_values in zip(op.category_cols, uniques) ) ctx[op.outputs[0].key] = in_data.astype(dtype, errors=op.errors) @@ -212,7 +191,10 @@ def execute(cls, ctx, op): if op.category_cols is not None: uniques = [ctx[c.key] for c in op.inputs[1:]] for col, unique_values in zip(op.category_cols, uniques): - selected_dtype[col] = CategoricalDtype(unique_values) + ordered = getattr(selected_dtype[col], "ordered", None) + selected_dtype[col] = CategoricalDtype( + unique_values, ordered=ordered + ) ctx[op.outputs[0].key] = in_data.astype(selected_dtype, errors=op.errors) def __call__(self, df): diff --git a/mars/dataframe/base/categorical.py b/mars/dataframe/base/categorical.py new file mode 100644 index 0000000000..343f745cba --- /dev/null +++ b/mars/dataframe/base/categorical.py @@ -0,0 +1,152 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect + +import numpy as np +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...serialization.serializables import BoolField, DictField, StringField, TupleField +from ..operands import DataFrameOperand, DataFrameOperandMixin +from ..utils import build_empty_series, parse_index + + +class SeriesCategoricalMethod(DataFrameOperand, DataFrameOperandMixin): + _op_type_ = opcodes.CATEGORICAL_METHOD + + method = StringField("method") + method_args = TupleField("method_args") + method_kwargs = DictField("method_kwargs") + is_property = BoolField("is_property") + + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) + if not self.output_types: + self.output_types = [OutputType.series] + + def __call__(self, inp): + return _categorical_method_to_handlers[self.method].call(self, inp) + + @classmethod + def tile(cls, op: "SeriesCategoricalMethod"): + tiled = _categorical_method_to_handlers[op.method].tile(op) + if inspect.isgenerator(tiled): + return (yield from tiled) + else: + return tiled + + @classmethod + def execute(cls, ctx, op): + return _categorical_method_to_handlers[op.method].execute(ctx, op) + + +class SeriesCategoricalMethodBaseHandler: + @classmethod + def call(cls, op: "SeriesCategoricalMethod", inp): + empty_series = build_empty_series(inp.dtype) + rseries = getattr(empty_series.cat, op.method) + if not op.is_property: + rseries = rseries(*op.method_args, **op.method_kwargs) + dtype = rseries.dtype + return op.new_series( + [inp], + shape=inp.shape, + dtype=dtype, + index_value=inp.index_value, + name=inp.name, + ) + + @classmethod + def tile(cls, op: "SeriesCategoricalMethod"): + out = op.outputs[0] + out_chunks = [] + for series_chunk in op.inputs[0].chunks: + chunk_op = op.copy().reset_key() + out_chunk = chunk_op.new_chunk( + [series_chunk], + shape=series_chunk.shape, + dtype=out.dtype, + index=series_chunk.index, + index_value=series_chunk.index_value, + name=series_chunk.name, + ) + out_chunks.append(out_chunk) + + params = out.params + params["chunks"] = out_chunks + params["nsplits"] = op.inputs[0].nsplits + new_op = op.copy() + return new_op.new_tileables([op.inputs[0]], kws=[params]) + + @classmethod + def execute(cls, ctx, op: "SeriesCategoricalMethod"): + inp = ctx[op.inputs[0].key] + result = getattr(inp.cat, op.method) + if not op.is_property: + result = result(*op.method_args, **op.method_kwargs) + ctx[op.outputs[0].key] = result + + +class GetCategoriesHandler(SeriesCategoricalMethodBaseHandler): + @classmethod + def call(cls, op: "SeriesCategoricalMethod", inp): + dtype = inp.dtype.categories.dtype + return op.new_index( + [inp], + shape=(np.nan,), + dtype=dtype, + index_value=parse_index(pd.Index([], dtype=dtype)), + ) + + @classmethod + def tile(cls, op: "SeriesCategoricalMethod"): + out = op.outputs[0] + + chunk_op = op.copy().reset_key() + out_chunk = chunk_op.new_chunk( + [op.inputs[0].chunks[0]], + index=(0,), + shape=out.shape, + dtype=out.dtype, + index_value=out.index_value, + ) + + params = out.params + params["chunks"] = [out_chunk] + params["nsplits"] = ((np.nan,),) + new_op = op.copy() + return new_op.new_tileables([op.inputs[0]], kws=[params]) + + @classmethod + def execute(cls, ctx, op: "SeriesCategoricalMethod"): + inp = ctx[op.inputs[0].key] + ctx[op.outputs[0].key] = inp.cat.categories + + +_categorical_method_to_handlers = {} +_not_implements = ["categories", "ordered", "remove_unused_categories"] +# start to register handlers for string methods +# register special methods first +_categorical_method_to_handlers["_get_categories"] = GetCategoriesHandler +# then come to the normal methods +for method in dir(pd.Series.cat): + if method.startswith("_") and method != "__getitem__": + continue + if method in _not_implements: + continue + if method in _categorical_method_to_handlers: + continue + _categorical_method_to_handlers[method] = SeriesCategoricalMethodBaseHandler diff --git a/mars/dataframe/base/tests/test_base_execution.py b/mars/dataframe/base/tests/test_base_execution.py index eb2d3994fa..0910bd6465 100644 --- a/mars/dataframe/base/tests/test_base_execution.py +++ b/mars/dataframe/base/tests/test_base_execution.py @@ -700,6 +700,37 @@ def test_transform_with_arrow_dtype_execution(setup): pd.testing.assert_series_equal(result, expected) +def test_categorical_method_execution(setup): + with pytest.raises(AttributeError): + series = from_pandas_series(pd.Series([0, 1])) + series.cat.add_categories(["err"]) + + s = pd.Series(["a", "c", "d", "b"], dtype="category") + s2 = pd.concat([s, s, s]).astype(pd.CategoricalDtype(ordered=True)) + + series = from_pandas_series(s, chunk_size=2) + series2 = from_pandas_series(s2, chunk_size=2) + + assert not series.cat.ordered + assert series2.cat.ordered + + pd.testing.assert_index_equal( + series.cat.categories.execute().fetch(), + pd.Index(["a", "b", "c", "d"]), + ) + pd.testing.assert_series_equal(series.cat.codes.execute().fetch(), s.cat.codes) + pd.testing.assert_series_equal( + series.cat.add_categories("e").execute().fetch(), + s.cat.add_categories("e"), + ) + + pd.testing.assert_series_equal(series2.cat.codes.execute().fetch(), s2.cat.codes) + pd.testing.assert_series_equal( + series2.cat.add_categories("e").execute().fetch(), + s2.cat.add_categories("e"), + ) + + def test_string_method_execution(setup): s = pd.Series(["s1,s2", "ef,", "dd", np.nan]) s2 = pd.concat([s, s, s]) @@ -1569,6 +1600,11 @@ def test_astype(setup): expected = raw.astype(pd.CategoricalDtype(["a", "c", "b", "d"])) pd.testing.assert_series_equal(expected, result) + series = from_pandas_series(raw, chunk_size=6) + result = series.astype(pd.CategoricalDtype(ordered=True)).execute().fetch() + expected = raw.astype(pd.CategoricalDtype(ordered=True)) + pd.testing.assert_series_equal(expected, result) + def test_drop(setup): # test dataframe drop diff --git a/mars/learn/contrib/lightgbm/core.py b/mars/learn/contrib/lightgbm/core.py index ff050cdbb0..7da06cfa56 100644 --- a/mars/learn/contrib/lightgbm/core.py +++ b/mars/learn/contrib/lightgbm/core.py @@ -20,7 +20,6 @@ import pandas as pd from ....dataframe import DataFrame as MarsDataFrame, Series as MarsSeries -from ....lib.version import parse as parse_version from ....tensor import tensor as mars_tensor diff --git a/mars/learn/contrib/lightgbm/tests/test_classifier.py b/mars/learn/contrib/lightgbm/tests/test_classifier.py index db7425fc86..28fd623421 100644 --- a/mars/learn/contrib/lightgbm/tests/test_classifier.py +++ b/mars/learn/contrib/lightgbm/tests/test_classifier.py @@ -75,9 +75,7 @@ def test_local_classifier(create_cluster): # test sparse tensor X_sparse_data = X_sparse classifier = LGBMClassifier(n_estimators=2) - classifier.fit( - X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)] - ) + classifier.fit(X_sparse_data, y_data, eval_set=[(X_sparse_data, y_data)]) prediction = classifier.predict(X_sparse_data) assert prediction.ndim == 1 @@ -118,9 +116,7 @@ def test_local_classifier(create_cluster): # should raise error if weight.ndim > 1 with pytest.raises(ValueError): - LGBMClassifier(n_estimators=2).fit( - X, y_df, sample_weight=mt.random.rand(1, 1) - ) + LGBMClassifier(n_estimators=2).fit(X, y_df, sample_weight=mt.random.rand(1, 1)) # test binary classifier new_y = (y_data > 0.5).astype(mt.int32) diff --git a/mars/opcodes.py b/mars/opcodes.py index 6036585977..5ec13f70a5 100644 --- a/mars/opcodes.py +++ b/mars/opcodes.py @@ -386,6 +386,7 @@ DUPLICATED = 739 DELETE = 740 ALIGN = 741 +CATEGORICAL_METHOD = 742 FUSE = 801