Skip to content

Commit

Permalink
Merge code (#3330)
Browse files Browse the repository at this point in the history
* Implements ColumnPruning optimization rule (#30)

Co-authored-by: ChengjieLi <[email protected]>
Co-authored-by: aresnow <[email protected]>

* Fix execution hang on optimization error (#33)

* enh: Optimize groupby nunique implementation (#38)

* enh: Support passing mars objects when call df.map_chunk (#41)

* enh: Support `read_parquet` for GPU (#45)

* enh: Merge small data on shuffle mapper side (#49)

* Fix: `series.drop_duplicates()` failed (#53)

* Fix: dataframe.isna on scaler args (#55)

* enh: Support df or series type for apply function (#56)

* enh: Support running TPC-H queries on GPU (#60)

* Avoid creating too much thread pools (#62)

* enh: Optimize DataFrame.isin (#67)

* tst: Fix gpu CI (#42)

* enh: Refactor `read_buffers` & `write_buffers` for GPU (#68)

* Fix: ensure dataframe.all acts identical to pandas (#79)

* fix: Fix wrong results of `DataFrame.replace` (#86)

* enh: Add `skip_infer` paramter for all use defined functions (#76)

* fix: Modify signature of `df.map_chunk` (#87)

* BUG: DataFrame.agg with built-in functions (#91)

Co-authored-by: UranusSeven <[email protected]>

* Fix dataframe.ewm  (#97)

* BUG: Fix dataframe.sample() (#99)

* BUG: df.apply with list input returns wrong answer when axis=1 (#100)

* BUG: Fix md.unique (#102)

* Fix implement ordered in md.cut (#104)

Co-authored-by: UranusSeven <[email protected]>

* Fix: map_chunk output type inference (#111)

* Enh: support groupby plot (#113)

* ENH: Clear default context after execution (#115)

* BUG: fix supervisor start method (#116)

* BUG: Fix `date_range` and pin sphinx<6.0.0 (#118)

* BUG: handle missing merge key (#124)

* BUG: Fix `read_csv` with specific names and header (#130)

* BUG: needed cols being pruned (#134)

* BUG: Fix `read_parquet` with latest pyarrow (#135)

* BUG: Suppress `FutureWarning` (#108)

* Fix merge

* Add `_repr_mimebundle_`

... as `_repr_svg_` is deprecated in `Source` class of `graphviz`.

---------

Co-authored-by: UranusSeven <[email protected]>
Co-authored-by: ChengjieLi <[email protected]>
Co-authored-by: aresnow <[email protected]>
Co-authored-by: Chengjie Li <[email protected]>
Co-authored-by: aresnow1 <[email protected]>
Co-authored-by: qianduoduo0904 <[email protected]>
Co-authored-by: 黄浩杰 <[email protected]>
Co-authored-by: 刘宝 <[email protected]>
Co-authored-by: Wenjun Si <[email protected]>
  • Loading branch information
10 people authored Mar 14, 2023
1 parent 1863382 commit 9b08d4a
Show file tree
Hide file tree
Showing 133 changed files with 5,567 additions and 1,203 deletions.
3 changes: 1 addition & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ jobs:
# do compatibility test for earliest supported pandas release
if [[ "$(mars.test.module)" == "dataframe" ]]; then
pip install numpy\<1.24.0 sqlalchemy\<2.0
pip install -i https://pkgs.dev.azure.com/mars-project/mars/_packaging/pandas/pypi/simple/ pandas==1.0.5
pip install numpy\<1.24.0
pytest $PYTEST_CONFIG -m pd_compat mars/dataframe
mv .coverage build/.coverage.pd_compat.file
fi
Expand Down
46 changes: 24 additions & 22 deletions benchmarks/asv_bench/benchmarks/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,49 @@
from mars.utils import Timer, readable_size


def send_1_to_1(n: int = None):
def send_1_to_1(n: int = None, cpu: bool = True):
ctx = get_context()
workers = ctx.get_worker_addresses()

worker_to_gen_data = {
w: mr.spawn(_gen_data, kwargs=dict(n=n, worker=w), expect_worker=w)
for i, w in enumerate(workers)
bands = [
b
for b in ctx.get_worker_bands()
if (cpu and b[1].startswith("numa-")) or (not cpu and b[1].startswith("gpu-"))
]

band_to_gen_data = {
b: mr.spawn(_gen_data, kwargs=dict(n=n, band=b), expect_band=b)
for i, b in enumerate(bands)
}
all_data = mars.execute(list(worker_to_gen_data.values()))
all_data = mars.execute(list(band_to_gen_data.values()))
progress = 0.1
ctx.set_progress(progress)
infos = [d._fetch_infos(fields=["data_key", "store_size"]) for d in all_data]
data_size = infos[0]["store_size"][0]
worker_to_data_keys = dict(zip(workers, [info["data_key"][0] for info in infos]))
band_to_data_keys = dict(zip(bands, [info["data_key"][0] for info in infos]))

workers_to_durations = dict()
size = len(workers) * (len(workers) - 1)
for worker1, worker2 in itertools.permutations(workers, 2):
bands_to_durations = dict()
size = len(bands) * (len(bands) - 1)
for band1, band2 in itertools.permutations(bands, 2):
fetch_data = mr.spawn(
_fetch_data,
args=(worker_to_data_keys[worker1],),
kwargs=dict(worker=worker2),
expect_worker=worker2,
args=(band_to_data_keys[band1],),
kwargs=dict(band=band2),
expect_band=band2,
)
fetch_time = fetch_data.execute().fetch()
rate = readable_size(data_size / fetch_time)
workers_to_durations[worker1, worker2] = (
bands_to_durations[band1, band2] = (
readable_size(data_size),
f"{rate}B/s",
)
progress += 0.9 / size
ctx.set_progress(min(progress, 1.0))
return workers_to_durations
return bands_to_durations


def _gen_data(
n: int = None, worker: str = None, check_addr: bool = True
) -> pd.DataFrame:
def _gen_data(n: int = None, band: str = None, check_addr: bool = True) -> pd.DataFrame:
if check_addr:
ctx = get_context()
assert ctx.worker_address == worker
assert ctx.band == band
n = n if n is not None else 5_000_000
rs = np.random.RandomState(123)
data = {
Expand All @@ -75,10 +77,10 @@ def _gen_data(
return pd.DataFrame(data)


def _fetch_data(data_key: str, worker: str = None):
def _fetch_data(data_key: str, band: str = None):
# do nothing actually
ctx = get_context()
assert ctx.worker_address == worker
assert ctx.band == band
with Timer() as timer:
ctx.get_chunks_result([data_key], fetch_only=True)
return timer.duration
Expand Down
88 changes: 60 additions & 28 deletions benchmarks/tpch/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,55 +25,71 @@
queries: Optional[Union[Set[str], List[str]]] = None


def load_lineitem(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_lineitem(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/lineitem.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
df["L_SHIPDATE"] = md.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d")
df["L_RECEIPTDATE"] = md.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d")
df["L_COMMITDATE"] = md.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d")
return df


def load_part(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_part(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/part.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


def load_orders(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_orders(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/orders.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
df["O_ORDERDATE"] = md.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d")
return df


def load_customer(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_customer(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/customer.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


def load_nation(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_nation(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/nation.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


def load_region(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_region(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/region.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


def load_supplier(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_supplier(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/supplier.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


def load_partsupp(data_folder: str, use_arrow_dtype: bool = None) -> md.DataFrame:
def load_partsupp(
data_folder: str, use_arrow_dtype: bool = None, gpu: bool = False
) -> md.DataFrame:
data_path = data_folder + "/partsupp.pq"
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype)
df = md.read_parquet(data_path, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
return df


Expand Down Expand Up @@ -967,22 +983,25 @@ def q22(customer, orders):


def run_queries(
data_folder: str, select: List[str] = None, use_arrow_dtype: bool = None
data_folder: str,
select: List[str] = None,
use_arrow_dtype: bool = None,
gpu: bool = False,
):
if select:
global queries
queries = select

# Load the data
t1 = time.time()
lineitem = load_lineitem(data_folder, use_arrow_dtype=use_arrow_dtype)
orders = load_orders(data_folder, use_arrow_dtype=use_arrow_dtype)
customer = load_customer(data_folder, use_arrow_dtype=use_arrow_dtype)
nation = load_nation(data_folder, use_arrow_dtype=use_arrow_dtype)
region = load_region(data_folder, use_arrow_dtype=use_arrow_dtype)
supplier = load_supplier(data_folder, use_arrow_dtype=use_arrow_dtype)
part = load_part(data_folder, use_arrow_dtype=use_arrow_dtype)
partsupp = load_partsupp(data_folder, use_arrow_dtype=use_arrow_dtype)
lineitem = load_lineitem(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
orders = load_orders(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
customer = load_customer(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
nation = load_nation(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
region = load_region(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
supplier = load_supplier(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
part = load_part(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
partsupp = load_partsupp(data_folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
mars.execute([lineitem, orders, customer, nation, region, supplier, part, partsupp])
print("Reading time (s): ", time.time() - t1)

Expand Down Expand Up @@ -1038,7 +1057,15 @@ def main():
"--use-arrow-dtype",
type=str,
choices=["true", "false"],
help=("Use arrow dtype to read parquet"),
help="Use arrow dtype to read parquet",
)
parser.add_argument(
"--gpu", "-g", action="store_true", help="Use GPU to read parquet"
)
parser.add_argument(
"--cuda-devices",
type=str,
help="GPU devices to use, use comma to split, only available when using GPU",
)
args = parser.parse_args()
folder = args.folder
Expand All @@ -1051,9 +1078,14 @@ def main():
queries = (
set(x.lower().strip() for x in args.query.split(",")) if args.query else None
)
sess = mars.new_session(endpoint)
gpu = args.gpu
new_session_kwargs = dict()
if gpu and args.cuda_devices:
cuda_devices = args.cuda_devices.split(",")
new_session_kwargs["cuda_devices"] = [int(d) for d in cuda_devices]
sess = mars.new_session(endpoint, **new_session_kwargs)
try:
run_queries(folder, use_arrow_dtype=use_arrow_dtype)
run_queries(folder, use_arrow_dtype=use_arrow_dtype, gpu=gpu)
finally:
if endpoint is None:
sess.stop_server()
Expand Down
9 changes: 8 additions & 1 deletion mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import importlib
import itertools
Expand All @@ -21,6 +22,7 @@ import pkgutil
import time
import types
import uuid
import warnings
from datetime import date, datetime, timedelta, tzinfo
from enum import Enum
from functools import lru_cache, partial
Expand Down Expand Up @@ -138,7 +140,12 @@ cdef class TypeDispatcher:
cdef _reload_lazy_handlers(self):
for k, v in self._lazy_handlers.items():
mod_name, obj_name = k.rsplit('.', 1)
mod = importlib.import_module(mod_name, __name__)
with warnings.catch_warnings():
# the lazy imported cudf will warn no device found,
# when we set visible device to -1 for CPU processes,
# ignore the warning to not distract users
warnings.simplefilter("ignore")
mod = importlib.import_module(mod_name, __name__)
self.register(getattr(mod, obj_name), v)
self._lazy_handlers = dict()

Expand Down
26 changes: 19 additions & 7 deletions mars/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from ..typing import BandType, SessionType
from ..storage.base import StorageLevel
from ..utils import classproperty


class Context(ABC):
Expand All @@ -26,8 +27,7 @@ class Context(ABC):
used inside `tile` and `execute`.
"""

prev = None
current = None
all_contexts = []

def __init__(
self,
Expand Down Expand Up @@ -96,6 +96,16 @@ def get_worker_addresses(self) -> List[str]:
worker_addresses : list
"""

@abstractmethod
def get_worker_bands(self) -> List[BandType]:
"""
Get worker bands.
Returns
-------
worker_bands : list
"""

@abstractmethod
def get_total_n_cpu(self) -> int:
"""
Expand Down Expand Up @@ -276,16 +286,18 @@ def set_progress(self, progress: float):
"""

def __enter__(self):
Context.prev = Context.current
Context.current = self
Context.all_contexts.append(self)

def __exit__(self, *_):
Context.current = Context.prev
Context.prev = None
Context.all_contexts.pop()

@classproperty
def current(cls):
return cls.all_contexts[-1] if cls.all_contexts else None


def set_context(context: Context):
Context.current = context
Context.all_contexts.append(context)


def get_context() -> Context:
Expand Down
2 changes: 1 addition & 1 deletion mars/core/entity/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ChunkData(EntityData):
is_broadcaster = BoolField("is_broadcaster", default=False)
# If the operand is a shuffle mapper, this flag indicates whether the current chunk is mapper chunk when
# the operand produce multiple chunks such as TensorUnique.
is_mapper = BoolField("is_mapper", default=False)
is_mapper = BoolField("is_mapper", default=None)
# optional fields
_index = TupleField("index", FieldTypes.uint32)

Expand Down
13 changes: 13 additions & 0 deletions mars/core/entity/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def put(self, key: str, session_ref: ref):

class _TileableSession:
def __init__(self, tileable: TileableType, session: SessionType):
self._sess_id = id(session)
key = tileable.key

def cb(_, sess=ref(session)):
Expand All @@ -106,6 +107,9 @@ def cb(_, sess=ref(session)):

self.tileable = ref(tileable, cb)

def __eq__(self, other: "_TileableSession"):
return self._sess_id == other._sess_id


class _TileableDataCleaner:
def __init__(self):
Expand Down Expand Up @@ -189,6 +193,15 @@ def _attach_session(self, session: SessionType):
_cleaner.register(self, session)
self._executed_sessions.append(session)

def _detach_session(self, session: SessionType):
if session in self._executed_sessions:
sessions = _cleaner._tileable_to_sessions.get(self, [])
if sessions:
sessions.remove(_TileableSession(self, session))
if len(sessions) == 0:
del _cleaner._tileable_to_sessions[self]
self._executed_sessions.remove(session)


class _ExecuteAndFetchMixin:
__slots__ = ()
Expand Down
1 change: 1 addition & 0 deletions mars/core/entity/output_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OutputType(Enum):
categorical = 7
dataframe_groupby = 8
series_groupby = 9
df_or_series = 10

@classmethod
def serialize_list(cls, output_types):
Expand Down
2 changes: 1 addition & 1 deletion mars/core/entity/tileables.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class TileableData(EntityData, _ExecutableMixin):
# `nsplits` means the sizes of chunks for each dimension
_nsplits = TupleField(
"nsplits",
FieldTypes.tuple(FieldTypes.uint64),
FieldTypes.tuple(FieldTypes.tuple(FieldTypes.uint64)),
on_serialize=on_serialize_nsplits,
)
# cache tileable data, if true, this data will be materialized
Expand Down
Loading

0 comments on commit 9b08d4a

Please sign in to comment.