Source code for time_series_transform.transform_core_api.time_series_transformer
import gc
import uuid
import warnings
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import parquet as pq
from collections import defaultdict
from time_series_transform import io
from time_series_transform.transform_core_api.base import (Time_Series_Data,Time_Series_Data_Collection)
from time_series_transform.plot import *
[docs]class Time_Series_Transformer(object):
def __init__(self,data,timeSeriesCol,mainCategoryCol=None):
"""
__init__ the class for time series data manipulation
it can perform different data manipulation: making lag data,
lead data, lag sequence data, or do a customize data manipulation.
It also built in native plot and io functions. IO function currently
support pandas DataFrame, numpy ndArray, apache arrow table , apache feather,
and apache parquet
Parameters
----------
data : dict of list, Time_Series_Data, or Time_Series_Collection
the value of data.
timeSeriesCol : str
the time series period column of the data. For example, time or date
mainCategoryCol : str or None
the main category column of the time series data
for example, symbol ticker for stock data. Or, the product segment for inventory
"""
super().__init__()
if isinstance(data,(Time_Series_Data,Time_Series_Data_Collection)):
self.time_series_data = data
else:
self.time_series_data = self._setup_time_series_data(data,timeSeriesCol,mainCategoryCol)
self.timeSeriesCol = timeSeriesCol
self._isCollection = [True if mainCategoryCol is not None else False][0]
self.mainCategoryCol = mainCategoryCol
self.plot = TimeSeriesPlot(self.time_series_data)
def _setup_time_series_data(self,data,timeSeriesCol,mainCategoryCol):
if timeSeriesCol is None:
raise KeyError("time series index is required")
tsd = Time_Series_Data(data,timeSeriesCol)
if mainCategoryCol is None:
return tsd
tsc = Time_Series_Data_Collection(tsd,timeSeriesCol,mainCategoryCol)
return tsc
[docs] def transform(self,inputLabels,newName,func,n_jobs =1,verbose = 0,backend='loky',*args,**kwargs):
"""
transform the wrapper of functions performing data manipulation
This function provides a way to do different data manipulation.
The output data should be either pandas dataFrame, numpy ndArray, or list of dict.
Also, the data should have the same time length as the original data.
Parameters
----------
inputLabels : str, numeric data or list of data or numeric data
the input data columns passing to function
newName : str
the output data name or prefix
if the out function provides the new name, it will automatically become prefix
func : function
the data manipulation function
n_jobs : int, optional
joblib implemention, only used when mainCategoryCol is given, by default 1
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
backend : str, optional
joblib implmentation only used when mainCategoryCol is given, by default 'loky'
Returns
-------
self
"""
if isinstance(self.time_series_data,Time_Series_Data_Collection):
self.time_series_data = self.time_series_data.transform(inputLabels,newName,func,n_jobs =1,verbose = 0,backend='loky',*args,**kwargs)
else:
self.time_series_data = self.time_series_data.transform(inputLabels,newName,func,*args,**kwargs)
return self
def _transform_wrapper(self,inputLabels,newName,func,suffix,suffixNum,inputAsList,n_jobs,verbose,*args,**kwargs):
if isinstance(inputLabels,list) == False:
inputLabels = [inputLabels]
if self._isCollection:
if inputAsList == False:
for i in inputLabels:
labelName = [f'{i}{suffix}{str(suffixNum)}' if suffix is not None else f"{i}{str(suffixNum)}"][0]
self.time_series_data.transform(i,labelName,func,n_jobs =n_jobs,verbose = verbose,*args,**kwargs)
return
labelName = newName
self.time_series_data.transform(inputLabels,labelName,func,n_jobs =n_jobs,verbose = verbose,*args,**kwargs)
else:
if inputAsList == False:
for i in inputLabels:
labelName = [f'{i}{suffix}{str(suffixNum)}' if suffix is not None else f"{i}{str(suffixNum)}"][0]
self.time_series_data.transform(i,labelName,func,*args,**kwargs)
return
labelName = newName
self.time_series_data.transform(inputLabels,labelName,func,*args,**kwargs)
[docs] def make_lag(self,inputLabels,lagNum,suffix=None,fillMissing=np.nan,verbose=0,n_jobs=1):
"""
make_lag making lag data for a given list of data
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
lagNum : int
the target lag period to make
suffix : str, optional
the suffix of new data, by default None
fillMissing : object, optional
the data for filling missing data, by default np.nan
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
self
"""
self._transform_wrapper(
inputLabels,
None,
make_lag,
suffix,
lagNum,
False,
n_jobs,
verbose,
lagNum=lagNum,
fillMissing=fillMissing
)
return self
[docs] def make_lead(self,inputLabels,leadNum,suffix=None,fillMissing=np.nan,verbose=0,n_jobs=1):
"""
make_lead make_lead making lead data for a given list of data
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
leadNum : int
the target lead period to make
suffix : str, optional
the suffix of new data, by default None
fillMissing : object, optional
the data for filling missing data, by default np.nan
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
self
"""
self._transform_wrapper(
inputLabels,
None,
make_lead,
suffix,
leadNum,
False,
n_jobs,
verbose,
leadNum=leadNum,
fillMissing=fillMissing
)
return self
[docs] def make_lag_sequence(self,inputLabels,windowSize,lagNum,suffix=None,fillMissing=np.nan,verbose=0,n_jobs=1):
"""
make_lag_sequence making lag sequence data
this function could be useful for deep learning.
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
windowSize : int
the length of sequence
lagNum : int
the lag period of sequence
suffix : str, optional
the suffix of new data, by default None
fillMissing : object, optional
the data for filling missing data, by default np.nan
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
self
"""
self._transform_wrapper(
inputLabels,
None,
make_lag_sequnece,
suffix,
windowSize,
False,
n_jobs,
verbose,
windowSize=windowSize,
lagNum = lagNum,
fillMissing=fillMissing
)
return self
[docs] def make_lead_sequence(self,inputLabels,windowSize,leadNum,suffix=None,fillMissing=np.nan,verbose=0,n_jobs=1):
"""
make_lead_sequence making lead sequence data
this function could be useful for deep learning.
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
windowSize : int
the length of sequence
leadNum : int
the lead period of sequence
suffix : str, optional
the suffix of new data, by default None
fillMissing : object, optional
the data for filling missing data, by default np.nan
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
self
"""
self._transform_wrapper(
inputLabels,
None,
lead_sequence,
suffix,
windowSize,
False,
n_jobs,
verbose,
windowSize=windowSize,
leadNum=leadNum,
fillMissing=fillMissing
)
return self
[docs] def make_identical_sequence(self,inputLabels,windowSize,suffix=None,verbose=0,n_jobs=1):
"""
make_identical_sequence making sequences having same data
this function will make same data for a givne sequence.
it could be useful for category data in deep learning.
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
windowSize : int
the length of sequence
suffix : str, optional
the suffix of new data, by default None
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
self
"""
self._transform_wrapper(
inputLabels,
None,
identity_window,
suffix,
windowSize,
False,
n_jobs,
verbose,
windowSize=windowSize
)
return self
[docs] def make_stack_sequence(self,inputLabels,newName,axis =-1,verbose=0,n_jobs=1):
"""
make_stack_sequence stacking sequences data
making multiple seqeunce data into one on the given axis
Parameters
----------
inputLabels : str, numeric or list of str, or numeric
the name of input data
newName : str
new name for the stacking data
axis : int, optional
the axis for stacking (numpy stack implmentation), by default -1
verbose : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 0
n_jobs : int, optional
joblib implmentation only used when mainCategoryCol is given, by default 1
Returns
-------
[type]
[description]
"""
self._transform_wrapper(
inputLabels,
newName,
stack_sequence,
None,
'',
True,
n_jobs,
verbose,
axis =axis
)
return self
[docs] def make_label(self,key,collectionKey=None):
"""
make_label make label data
it will turn the data into label.
when using io functions, specifing sepLabel parameter can seperate label and data.
Parameters
----------
key : str or numeric data
the target data name
collectionKey : str or numeric data, optional
the target collection, if None, all collection is selected, by default None
Returns
-------
self
"""
if self._isCollection:
if collectionKey is None:
for i in self.time_series_data:
data = self.time_series_data[i][:,[key]][key]
self.time_series_data[i].set_labels(data,key)
self.time_series_data[i].remove(key,'data')
else:
data = self.time_series_data[collectionKey][:,[key]][key]
self.time_series_data[collectionKey].set_labels(data,key)
self.time_series_data[collectionKey].remove(key,'data')
else:
data = self.time_series_data[:,[key]][key]
self.time_series_data.set_labels(data,key)
self.time_series_data.remove(key,'data')
return self
[docs] def remove_different_category_time(self):
"""
remove_different_category_time
remove different time index for category
if mainCategoryCol is not specified, this function has no function.
Returns
-------
self
"""
if self._isCollection:
self.time_series_data.remove_different_time_index()
else:
warnings.warn('Setup mainCategoryCol is necessary for this function')
return self
[docs] def pad_different_category_time(self,fillMissing= np.nan):
"""
pad_different_category_time
pad time length
if mainCategoryCol is not specified, this function has no function.
Parameters
----------
fillMissing : object, optional
data for filling paded data, by default np.nan
Returns
-------
self
"""
if self._isCollection:
self.time_series_data.pad_time_index(fillMissing)
else:
warnings.warn('Setup mainCategoryCol is necessary for this function')
return self
[docs] def remove_category(self,categoryName):
"""
remove_category remove a specific category data
Parameters
----------
categoryName : str or numeric data
the target category to be removed
Returns
-------
self
"""
if self._isCollection:
self.time_series_data.remove(categoryName)
return self
[docs] def remove_feature(self,colName):
"""
remove_feature remove certain data or labels
Parameters
----------
colName : str or numeric
target column or data to be removed
Returns
-------
self
"""
if isinstance(self.time_series_data,Time_Series_Data_Collection):
for i in self.time_series_data:
self.time_series_data[i].remove(colName)
return self
self.time_series_data.remove(colName)
return self
[docs] def dropna(self,categoryKey=None):
"""
dropna drop null values
remove null values for all or a specific category
Parameters
----------
categoryKey : str or numeric, optional
if None all category will be chosen, by default None
Returns
-------
self
"""
if isinstance(self.time_series_data,Time_Series_Data):
self.time_series_data = self.time_series_data.dropna()
return self
self.time_series_data = self.time_series_data.dropna(categoryKey)
return self
[docs] @classmethod
def from_pandas(cls, pandasFrame,timeSeriesCol,mainCategoryCol):
"""
from_pandas import data from pandas dataFrame
Parameters
----------
pandasFrame : pandas DataFrame
input data
timeSeriesCol : str or numeric
time series column name
mainCategoryCol : str or numeric
main category name
Returns
-------
Time_Series_Transformer
"""
data = io.from_pandas(pandasFrame,timeSeriesCol,mainCategoryCol)
return cls(data,timeSeriesCol,mainCategoryCol)
[docs] @classmethod
def from_numpy(cls,numpyData,timeSeriesCol,mainCategoryCol):
"""
from_numpy import data from numpy
Parameters
----------
numpyData : numpy ndArray
input data
timeSeriesCol : int
index of time series column
mainCategoryCol : int
index of main category column
Returns
-------
Time_Series_Transformer
"""
data = io.from_numpy(numpyData,timeSeriesCol,mainCategoryCol)
return cls(data,timeSeriesCol,mainCategoryCol)
[docs] @classmethod
def from_feather(cls,feather_dir,timeSeriesCol,mainCategoryCol,columns=None):
"""
from_feather import data from feather
Parameters
----------
feather_dir : str
directory of feather file
timeSeriesCol : str or numeric
time series column name
mainCategoryCol : str or numeric
main category name
columns : str or numeric, optional
target columns (apache arrow implmentation), by default None
Returns
-------
Time_Series_Transformer
"""
data = io.from_feather(
feather_dir,
timeSeriesCol,
mainCategoryCol,
columns
)
return cls(data,timeSeriesCol,mainCategoryCol)
[docs] @classmethod
def from_parquet(cls,parquet_dir,timeSeriesCol,mainCategoryCol,columns = None,partitioning='hive',filters=None,filesystem=None):
"""
from_parquet import data from parquet file
Parameters
----------
parquet_dir : str
directory of parquet file
timeSeriesCol : str or numeric
time series column name
mainCategoryCol : str or numeric
main category name
columns : str or numeric, optional
target columns (apache arrow implmentation), by default None
partitioning : str, optional
type of partitioning, by default 'hive'
filters : str, optional
filter (apache arrow implmentation), by default None
filesystem : str, optional
filesystem (apache arrow implmentation), by default None
Returns
-------
Time_Series_Transformer
"""
data = io.from_parquet(
parquet_dir,
timeSeriesCol,
mainCategoryCol,
columns,
partitioning,
filters,
filesystem
)
return cls(data,timeSeriesCol,mainCategoryCol)
[docs] @classmethod
def from_arrow_table(cls,arrow_table,timeSeriesCol,mainCategoryCol):
"""
from_arrow_table import data from apache arrow table
Parameters
----------
arrow_table : arrow table
input data
timeSeriesCol : str or numeric
time series column name
mainCategoryCol : str or numeric
main category name
Returns
-------
Time_Series_Transformer
"""
data = io.from_arrow_table(arrow_table,timeSeriesCol,mainCategoryCol)
return cls(data,timeSeriesCol,mainCategoryCol)
[docs] def to_feather(self,dirPaths,expandCategory=False,expandTime=False,preprocessType='ignore',sepLabel = False,version = 1,chunksize=None):
"""
to_feather output data into feather format
Parameters
----------
dirPaths : str
directory of output data
expandCategory : bool, optional
whether to expand category, by default False
expandTime : bool, optional
whether to expand time index column, by default False
preprocessType : {'ignore','pad','remove'}, optional
the preprocessing type before out data, by default 'ignore'
sepLabel : bool, optional
whether to seperate label data, by default False
version : int, optional
fether version (apache arrow implmentation), by default 1
chunksize : int, optional
chunksize for output (apache arrow implmentation), by default None
"""
return io.to_feather(
dirPaths= dirPaths,
time_series_data= self.time_series_data,
expandCategory = expandCategory,
expandTime = expandTime,
preprocessType = preprocessType,
seperateLabels= sepLabel,
version= version,
chunksize= chunksize
)
[docs] def to_parquet(self,dirPaths,expandCategory=False,expandTime=False,preprocessType='ignore',sepLabel = False,version = '1.0',isDataset=False,partition_cols= None):
"""
to_parquet output data into parquet format
Parameters
----------
dirPaths : str
directory of output data
expandCategory : bool, optional
whether to expand category, by default False
expandTime : bool, optional
whether to expand time index column, by default False
preprocessType : {'ignore','pad','remove'}, optional
the preprocessing type before out data, by default 'ignore'
sepLabel : bool, optional
whether to seperate label data, by default False
version : str, optional
parquet version (apache arrow implmentation), by default '1.0'
isDataset : bool, optional
whether to output data as dataset format (apache arrow implmentation), by default False
partition_cols : str, optional
whether to partition data (apache arrow implmentation), by default None
"""
return io.to_parquet(
dirPaths= dirPaths,
time_series_data= self.time_series_data,
expandCategory=expandCategory,
expandTime =expandTime,
preprocessType= preprocessType,
seperateLabels= sepLabel,
version = version,
isDataset = isDataset,
partition_cols = partition_cols
)
[docs] def to_arrow_table(self,expandCategory=False,expandTime=False,preprocessType='ignore',sepLabel = False):
"""
to_arrow_table output data as apache arrow table format
Parameters
----------
expandCategory : bool, optional
whether to expand category, by default False
expandTime : bool, optional
whether to expand time index column, by default False
preprocessType : {'ignore','pad','remove'}, optional
the preprocessing type before out data, by default 'ignore'
sepLabel : bool, optional
whether to seperate label data, by default False
Returns
-------
arrow table
"""
return io.to_arrow_table(
time_series= self.time_series_data,
expandCategory= expandCategory,
expandTime= expandTime,
preprocessType = preprocessType,
seperateLabels= sepLabel
)
[docs] def to_pandas(self,expandCategory=False,expandTime=False,preprocessType='ignore',sepLabel = False):
"""
to_pandas output data into pandas dataFrame
Parameters
----------
expandCategory : bool, optional
whether to expand category, by default False
expandTime : bool, optional
whether to expand time index column, by default False
preprocessType : {'ignore','pad','remove'}, optional
the preprocessing type before out data, by default 'ignore'
sepLabel : bool, optional
whether to seperate label data, by default False
Returns
-------
pandas dataFrame
"""
return io.to_pandas(
self.time_series_data,
expandCategory = expandCategory,
expandTime = expandTime,
preprocessType=preprocessType,
seperateLabels = sepLabel
)
[docs] def to_numpy(self,expandCategory=False,expandTime=False,preprocessType='ignore',sepLabel = False):
"""
to_numpy output data into numpy format
Parameters
----------
expandCategory : bool, optional
whether to expand category, by default False
expandTime : bool, optional
whether to expand time index column, by default False
preprocessType : {'ignore','pad','remove'}, optional
the preprocessing type before out data, by default 'ignore'
sepLabel : bool, optional
whether to seperate label data, by default False
Returns
-------
numpy ndArray
"""
return io.to_numpy(self.time_series_data,expandCategory,expandTime,preprocessType,sepLabel)
[docs] def to_dict(self):
"""
to_dict output data as dictionary list
Returns
-------
dict of list
"""
return self.time_series_data[:]
def __eq__(self,other):
if isinstance(other,Time_Series_Transformer):
return self.time_series_data == other.time_series_data
return False
def _statement_maker(self,tsd,mainCategory):
dataCol = list(tsd._get_all_info().keys())
timeLength = tsd.time_length
statement = "data column\n-----------\n"
for i in dataCol:
statement += f"{i}\n"
statement += f"time length: {str(timeLength)}\n"
statement += f"category: {str(mainCategory)}\n\n"
return statement
def __repr__(self):
if isinstance(self.time_series_data,Time_Series_Data):
return self._statement_maker(self.time_series_data,self.mainCategoryCol)
statement = ""
for i in self.time_series_data:
statement+= self._statement_maker(self.time_series_data[i],i)
statement += f"main category column: {self.mainCategoryCol}"
return statement
def make_sequence(arr, window,fillMissing=np.nan):
"""
rolling_window create an rolling window tensor
this function create a rolling window numpy tensor given its original sequence and window size
Parameters
----------
arr : numpy 1D array
the original data sequence
window : int
aggregation window size
Returns
-------
numpy 2d array
the rolling window array
"""
shape = arr.shape[:-1] + (arr.shape[-1] - window + 1, window)
strides = arr.strides + (arr.strides[-1],)
seq = np.lib.stride_tricks.as_strided(arr, shape=shape, strides=strides)
empty = np.empty(((len(arr)-seq.shape[0],seq.shape[1])))
empty[:] = fillMissing
res = np.vstack([empty,seq])
return res
def make_lag_sequnece(data,windowSize,lagNum,fillMissing):
lagdata = np.array(make_lag(data,lagNum,fillMissing))
return make_sequence(lagdata,windowSize,fillMissing)
def identity_window(arr,windowSize):
return np.repeat(arr,windowSize).reshape((-1,windowSize))
def make_lead(data,leadNum,fillMissing):
res = np.empty((leadNum))
res[:] = fillMissing
res = res.tolist()
leadValues = data[leadNum:].tolist()
leadValues.extend(res)
return leadValues
def make_lag(data,lagNum,fillMissing):
res = np.empty((lagNum))
res[:] = fillMissing
res = res.tolist()
lagValues = data[:-lagNum]
res.extend(lagValues)
return res
def lead_sequence(arr,leadNum,windowSize,fillMissing=np.nan):
shape = arr.shape[:-1] + (arr.shape[-1] - windowSize + 1, windowSize)
strides = arr.strides + (arr.strides[-1],)
seq = np.lib.stride_tricks.as_strided(arr, shape=shape, strides=strides)
seq = seq[leadNum:]
empty = np.empty(((len(arr)-seq.shape[0],seq.shape[1])))
empty[:] = fillMissing
res = np.vstack([seq,empty])
return res
def stack_sequence(arrDict, axis = -1):
res = []
for ix, v in enumerate(arrDict):
data = np.array(arrDict[v])
res.append(data)
res = np.stack(res,axis = axis )
return res
__all__=[
"Time_Series_Transformer"
]