Skip to content

Commit

Permalink
Fix parquet file reader. (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
hjarraya authored Mar 10, 2020
1 parent 1c60540 commit e04c446
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
11 changes: 4 additions & 7 deletions gordo/machine/dataset/data_provider/file_type.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pandas as pd
import pyarrow.parquet as pa
import numpy as np

from abc import ABCMeta, abstractmethod
Expand Down Expand Up @@ -100,10 +99,8 @@ def __init__(self, time_series_columns: TimeSeriesColumns):
def read_df(self, f: BinaryIO) -> pd.DataFrame:
columns = self.time_series_columns.columns
datetime_column = self.time_series_columns.datetime_column
pf = pa.ParquetFile(f)
table = pf.read(columns)
df = table.to_pandas()
df[datetime_column] = df[datetime_column].apply(
lambda v: pd.to_datetime(v, utc=True)
df = pd.read_parquet(f, engine="pyarrow", columns=columns).set_index(
datetime_column
)
return df.set_index(datetime_column)
df.index = pd.to_datetime(df.index, utc=True)
return df
9 changes: 6 additions & 3 deletions gordo/machine/dataset/data_provider/ncs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import timeit
from typing import Iterable, List, Optional
from urllib.parse import quote

Expand All @@ -22,7 +23,6 @@

logger = logging.getLogger(__name__)


time_series_columns = TimeSeriesColumns("Time", "Value", "Status")


Expand Down Expand Up @@ -346,13 +346,16 @@ def read_tag_files(
if dry_run:
logger.info("Dry run only, returning empty frame early")
return pd.Series()

before_downloading = timeit.default_timer()
with adls_file_system_client.open(file_path, "rb") as f:
df = file_type.read_df(f)
df = df.rename(columns={"Value": tag.name})
df = df[~df["Status"].isin(remove_status_codes)]
df.sort_index(inplace=True)
all_years.append(df)
logger.info(f"Done parsing file {file_path}")
logger.info(
f"Done in {(timeit.default_timer()-before_downloading):.2f} sec {file_path}"
)

except FileNotFoundError as e:
logger.debug(f"{file_path} not found, skipping it: {e}")
Expand Down

0 comments on commit e04c446

Please sign in to comment.