Source code for time_series_transform.io.parquet

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import parquet as pq
from time_series_transform.io.base import io_base
from time_series_transform.io.pandas import (
    from_pandas,
    to_pandas
)
from time_series_transform.io.arrow import (
    to_arrow_table,
    from_arrow_table
)
from time_series_transform.transform_core_api.base import (
    Time_Series_Data, 
    Time_Series_Data_Collection
    )

class Parquet_IO(io_base):
    def __init__(self,dirPaths,time_series,timeSeriesCol,mainCategoryCol,version="1.0"):
        """
        Parquet_IO IO class for apache parquet
        
        Parameters
        ----------
        dirPaths : str
            directory to parquet file
        time_series : Time_Series_Data or Time_Series_Data_Collection
            input data
        timeSeriesCol : str or int
            index of time period column
        mainCategoryCol : str of int
            index of category column
        version : str, optional
            parquet version, by default "1.0"
        """
        super().__init__(time_series, timeSeriesCol, mainCategoryCol)
        if self.dictList is not None:
            self.dictList = time_series
        self.dirPaths = dirPaths
        self.version = version

    def from_parquet(self,columns,partitioning,filters,filesystem):
        """
        from_parquet transform parquet into Time_Series_Data or Time_Series_Data_Collection
        
        Parameters
        ----------
        columns : list
           apache arrow implmentation
        partitioning : list
            apache arrow implmentation
        filters : str
            apache arrow implmentation
        filesystem : str
            apache arrow implmentation 
        
        Returns
        -------
        Time_Series_Data or Time_Series_Data_Collection
        """
        table = pq.read_table(
            source = self.dirPaths,
            columns = columns,
            partitioning = partitioning,
            filters=filters,
            filesystem =filesystem 
        )
        return from_arrow_table(table,self.timeSeriesCol,self.mainCategoryCol)

    def to_parquet(self,expandCategory,expandTime,preprocessType,seperateLabels,partition_cols,isDataset):
        """
        to_parquet transform Time_Series_Data or Time_Series_Data_Collection
        to parquet
        
        Parameters
        ----------
        expandCategory : bool
            whether to expand category
        expandTime : bool
            whether to expand time
        preprocessType : ['ignore','pad','remove']
            preprocess data time across categories
        seperateLabels : bool
            whether to seperate labels and data
        partition_cols : list
            partition columns
        isDataset : bool
            whether output as parquet dataset
        """
        if seperateLabels ==False:
            table = to_arrow_table(
                time_series = self.time_series,
                expandCategory = expandCategory,
                expandTime= expandTime,
                preprocessType = preprocessType,
                seperateLabels = seperateLabels
                )
            if isDataset:
                pq.write_to_dataset(
                    table,
                    root_path = self.dirPaths,
                    partition_cols = partition_cols,
                    version = self.version
                    )
                return
            pq.write_table(
                table,
                self.dirPaths,
                version = self.version
            )
            return
        table, label_table = to_arrow_table(
                time_series = self.time_series,
                expandCategory = expandCategory,
                expandTime= expandTime,
                preprocessType = preprocessType,
                seperateLabels = seperateLabels
                )
        if isDataset:
            pq.write_to_dataset(
                table,
                root_path = self.dirPaths[0],
                partition_cols = partition_cols,
                version = self.version
                )
            pq.write_to_dataset(
                label_table,
                root_path = self.dirPaths[1],
                partition_cols = partition_cols,
                version = self.version
                )
            return
        pq.write_table(
            table,
            self.dirPaths[0],
            version = self.version
        )
        pq.write_table(
            label_table,
            self.dirPaths[1],
            version = self.version
        )
        return


[docs]def from_parquet(dirPath, timeSeriesCol, mainCategoryCol,columns=None,partitioning='hive',filters=None,filesystem=None): """ from_parquet transform parquet into Time_Series_Data or Time_Series_Data_Collection Parameters ---------- dirPaths : str directory to parquet file time_series : Time_Series_Data or Time_Series_Data_Collection input data timeSeriesCol : str or int index of time period column mainCategoryCol : str of int index of category column columns : list, optional columns to fetch, by default None partitioning : str, optional partition type, by default 'hive' filters : str, optional parquet filter, by default None filesystem : str, optional filesystem, by default None Returns ------- Time_Series_Data or Time_Series_Data_Collection """ pio = Parquet_IO(dirPath,None, timeSeriesCol, mainCategoryCol) return pio.from_parquet(columns,partitioning,filters,filesystem)
[docs]def to_parquet(dirPaths,time_series_data,expandCategory,expandTime,preprocessType,seperateLabels = False,version='1.0',isDataset=False,partition_cols=None): """ to_parquet transform Time_Series_Data or Time_Series_Data_Collection to parquet Parameters ---------- dirPaths : str directory to parquet file time_series_data : Time_Series_Data or Time_Series_Data_Collection input data timeSeriesCol : str or int index of time period column mainCategoryCol : str of int index of category column preprocessType : ['ignore','pad','remove'] preprocess data time across categories seperateLabels : bool whether to seperate labels and data version : str, optional parquet version, by default '1.0' isDataset : bool, optional whether to output as dataset, by default False partition_cols : list, optional partition columns, by default None """ pio = Parquet_IO(dirPaths,time_series_data,None,None,version) return pio.to_parquet( expandCategory=expandCategory, expandTime=expandTime, preprocessType=preprocessType, seperateLabels=seperateLabels, partition_cols=partition_cols, isDataset=isDataset )
__all__ = [ 'from_parquet', 'to_parquet' ]