Source code for time_series_transform.sklearn.transformer

import numpy as np
import collections
import pandas as pd
from sklearn.base import (BaseEstimator, TransformerMixin)
from time_series_transform.io.numpy import (from_numpy,to_numpy)
from time_series_transform.io.pandas import (from_pandas,to_pandas)
from time_series_transform.io.parquet import (from_parquet, to_parquet)
from time_series_transform.stock_transform.stock_transfromer import Stock_Transformer
from time_series_transform.transform_core_api.time_series_transformer import Time_Series_Transformer


[docs]class Base_Time_Series_Transformer(BaseEstimator, TransformerMixin): def __init__(self,time_col,category_col=None,len_preprocessing = 'ignore',remove_time=True,remove_category=True,remove_org_data=True,cache_data_path = None): """ __init__ Base class for sklearn transformer implmemnting Time_Series_Transformer This class prepared the data into Time_Series_Transformer. It can also use parquet to save data for future transformation. Since transforming time series data is usually associated with past data, this class cache the past data and check whether it will be used during transformation Parameters ---------- time_col : str or int the index of time series period column category_col : str or int, optional category column index, by default None len_preprocessing : ['ignore','pad','remove'] , optional data preprocessing for categories, by default 'ignore' remove_time : bool, optional whether to remove time column for output, by default True remove_category : bool, optional whether to remove category column for output, by default True remove_org_data : bool, optional whether to remove orign data for output, by default True cache_data_path : str, optional the path to cache data, by default None """ self.time_col = time_col self.category_col = category_col self.time_series_cache = None self.len_preprocessing = len_preprocessing self.remove_time = remove_time self.cache_data_path = cache_data_path self.remove_org_data = remove_org_data self.remove_category = remove_category self.time_series_data = None self.category_cache= None def _cache_data(self,time_series_data): return to_parquet(self.cache_data_path,time_series_data,False,False,'ignore') def _to_time_series_data(self,X): if isinstance(X, pd.DataFrame): self.time_series_cache = X[self.time_col].tolist() time_series_data = from_pandas(X,self.time_col,self.category_col) if self.category_col is not None: self.category_cache = X[self.category_col].tolist() else: self.time_series_cache = list(X[:,self.time_col]) time_series_data = from_numpy(X,self.time_col,self.category_col) if self.category_col is not None: self.category_cache = list(X[:,self.category_col]) return time_series_data def _check_time_not_exist(self,timeList,categoryList): checkedList = [] if categoryList is None: for i in timeList: if i not in self.time_series_cache: checkedList.append(True) continue checkedList.append(False) else: tmpDict = collections.defaultdict(list) for c,t in zip(self.category_cache,self.time_series_cache): tmpDict[c].append(t) for t,c in zip(timeList,categoryList): if t not in tmpDict[c]: checkedList.append(True) continue checkedList.append(False) return checkedList
[docs] def fit(self,X,y = None): """ fit train model Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- self """ time_series_data = self._to_time_series_data(X) if self.cache_data_path is not None: self._cache_data(time_series_data) return self self.time_series_data = time_series_data return self
[docs] def transform(self,X,y = None): """ transform prepare the data as Time_Series_Transformer and other helper data Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- tst Time_Series_Transformer the output Time_Series_Transformer X_time list time column list X_header list column name list X_category list category name list """ X_category = None if self.cache_data_path is not None: df = pd.read_parquet(self.cache_data_path) else: df = to_pandas(self.time_series_data,False,False,'ignore') X_time, X_category, X_header,new_df,check_list = self._prep_transform_data(X, X_category) df = df.append(pd.DataFrame(new_df),ignore_index = True) tst = Time_Series_Transformer.from_pandas( df, self.time_col, self.category_col ) if self.category_col is None: return tst,X_time,X_header,None return tst,X_time,X_header,X_category
def _prep_transform_data(self, X, X_category): if isinstance(X,pd.DataFrame): X_time = X[self.time_col].tolist() if self.category_col is None: X_header = X.drop(self.time_col,axis =1).columns.tolist() else: X_header = X.drop(self.time_col,axis =1).drop(self.category_col,axis =1).columns.tolist() X_category = X[self.category_col].tolist() check_list = self._check_time_not_exist(X_time,X_category) new_df = X[check_list] else: X_time = list(X[:,self.time_col]) if self.category_col is not None: X_category = list(X[:,self.category_col]) X_header=[] for i in range(X.shape[1]): if i != int(self.time_col): if self.category_col is not None: if i == int(self.category_col): continue X_header.append(i) check_list = self._check_time_not_exist(X_time,X_category) new_df = pd.DataFrame(X[check_list,:]) return X_time, X_category, X_header,new_df, check_list def _transform_output_wrapper(self,df,X_category,X_time,X_header): """ _transform_output_wrapper the helper function for transformed data output Parameters ---------- df : pandas dataFrame transformerd data X_time list time column list X_header list column name list X_category list category name list Returns ------- [type] [description] """ if X_category is None: df = df[df[self.time_col].isin(X_time)] else: tmpdf = None tmpDict = collections.defaultdict(list) for ix,v in zip(X_time,X_category): tmpDict[v].append(ix) for i in tmpDict: if tmpdf is None: tmpdf = df[df[self.category_col]==i][df[self.time_col].isin(tmpDict[i])] continue tmpdf = tmpdf.append(df[df[self.category_col]==i][df[self.time_col].isin(tmpDict[i])]) df = tmpdf if self.remove_category and self.category_col is not None: df = df.drop(self.category_col,axis =1) if self.remove_time: df = df.drop(self.time_col,axis =1) if self.remove_org_data: df= df.drop(X_header,axis =1) return df.values
[docs] def get_time_series_index_cache (self): """ get_time_series_index_cache the fitted time series index help to see when is the latest timestamp of the model Returns ------- list cached time series index """ return self.time_series_cache
[docs]class Lag_Transformer(Base_Time_Series_Transformer): def __init__( self, lag_nums, time_col, category_col=None, remove_time = True, remove_category=True, remove_org_data=True, cache_data_path=None): """ __init__ Transform input data into series of lag data Parameters ---------- lag_nums : int or list of int lag period numbers time_col : str or int the index of time series period column category_col : str or int, optional category column index, by default None len_preprocessing : ['ignore','pad','remove'] , optional data preprocessing for categories, by default 'ignore' remove_time : bool, optional whether to remove time column for output, by default True remove_category : bool, optional whether to remove category column for output, by default True remove_org_data : bool, optional whether to remove orign data for output, by default True cache_data_path : str, optional the path to cache data, by default None """ super().__init__(time_col,category_col,'ignore',remove_time,remove_category,remove_org_data,cache_data_path) if not isinstance(lag_nums,list): self.lag_nums = [lag_nums] else: self.lag_nums = lag_nums
[docs] def fit(self,X,y=None): """ fit train model Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- self """ super().fit(X) return self
[docs] def transform(self,X,y=None): """ transform transforming lag data Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- numpy ndArray transformed data """ tst,X_time,X_header,X_category = super().transform(X,y) for i in self.lag_nums: tst = tst.make_lag(X_header,lagNum=i,suffix=None) df = tst.to_pandas() return self._transform_output_wrapper(df,X_category,X_time,X_header)
[docs]class Function_Transformer(Base_Time_Series_Transformer): def __init__( self, func, inputLabels, time_col, category_col=None, remove_time = True, remove_category=True, remove_org_data=True, cache_data_path=None, parameterDict={}): """ __init__ Function Transformer provides an implmentation for custom functions Parameters ---------- func : function the data manipulation function inputLabels : str, numeric data or list of data or numeric data the input data columns passing to function time_col : str or int the index of time series period column category_col : str or int, optional category column index, by default None len_preprocessing : ['ignore','pad','remove'] , optional data preprocessing for categories, by default 'ignore' remove_time : bool, optional whether to remove time column for output, by default True remove_category : bool, optional whether to remove category column for output, by default True remove_org_data : bool, optional whether to remove orign data for output, by default True cache_data_path : str, optional the path to cache data, by default None parameterDict : dict, optional input parameters, by default {} """ super().__init__(time_col,category_col,'ignore',remove_time,remove_category,remove_org_data,cache_data_path) self.parameterDict = parameterDict self.parameterDict['func']= func self.parameterDict['inputLabels']= inputLabels self.parameterDict['newName']='newName'
[docs] def fit(self,X,y=None): """ fit train model Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- self """ super().fit(X) return self
[docs] def transform(self,X,y=None): """ transform transforming data Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- numpy ndArray transformed data """ tst,X_time,X_header,X_category = super().transform(X,y) tst = tst.transform(**self.parameterDict) df = tst.to_pandas() return self._transform_output_wrapper(df,X_category,X_time,X_header)
[docs]class Base_Stock_Time_Series_Transform(Base_Time_Series_Transformer): def __init__( self, time_col, category_col=None, len_preprocessing = 'ignore', remove_time=True, remove_category=True, remove_org_data=True, cache_data_path = None, High='High', Low='Low', Close='Close', Open='Open', Volume='Volume'): """ __init__ The base class implmenting Stock_Transformer Parameters ---------- time_col : str or int the index of time series period column category_col : str or int, optional category column index, by default None len_preprocessing : ['ignore','pad','remove'] , optional data preprocessing for categories, by default 'ignore' remove_time : bool, optional whether to remove time column for output, by default True remove_category : bool, optional whether to remove category column for output, by default True remove_org_data : bool, optional whether to remove orign data for output, by default True cache_data_path : str, optional the path to cache data, by default None High : str or int, optional the index or name for High, by default 'High' Low : str or int, optional the index or name for Low, by default 'Low' Close : str or int, optional the index or name for Close, by default 'Close' Open : str or int, optional the index or name for Open, by default 'Open' Volume : str or int, optional the index or name for Volume, by default 'Volume' """ super().__init__(time_col,category_col,len_preprocessing,remove_time,remove_category,remove_org_data,cache_data_path) self.high = High self.low = Low self.open = Open self.close = Close self.volume = Volume
[docs] def transform(self,X,y= None): """ transform prepare data as Stock_Transformer and helper data Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- tst Stock_Transformer the output Stock_Transformer X_time list time column list X_header list column name list X_category list category name list """ tst,X_time,X_header,X_category = super().transform(X,y) tst = Stock_Transformer.from_time_series_transformer( tst, High = self.high, Low = self.low, Close = self.close, Open = self.open, Volume = self.volume ) return tst, X_time,X_header,X_category
[docs]class Stock_Technical_Indicator_Transformer(Base_Stock_Time_Series_Transform): def __init__( self, strategy, time_col, symbol_col=None, remove_time = True, remove_category=True, remove_org_data=True, cache_data_path=None, High='High', Low='Low', Close='Close', Open='Open', Volume='Volume', n_jobs = 1, verbose = 0, backend='loky'): """ __init__ transforming data into techinical indicator through pandas-ta api Note: when using this transformer in pipeline, it is important to understand whether the input data is numpy or pandas dataFrame. Open, Close, High, Low, and Volume column index must match with input sources. Parameters ---------- strategy : Strategy pandas-ta strategy time_col : str the name of time_col symbolIx : str or int the symbol column index of the data remove_time : bool, optional [description], by default True remove_category : bool, optional [description], by default True remove_org_data : bool, optional [description], by default True cache_data_path : [type], optional [description], by default None High : str or int, optional the index or name for High, by default 'High' Low : str or int, optional the index or name for Low, by default 'Low' Close : str or int, optional the index or name for Close, by default 'Close' Open : str or int, optional the index or name for Open, by default 'Open' Volume : str or int, optional the index or name for Volume, by default 'Volume' n_jobs : int, optional number of processes (joblib), by default 1 verbose : int, optional log level (joblib), by default 0 backend : str, optional backend type (joblib), by default 'loky' """ super().__init__(time_col,symbol_col,'ignore',remove_time,remove_category,remove_org_data,cache_data_path,High,Low,Close,Open,Volume) self.strategy = strategy self.n_jobs = n_jobs self.verbose = verbose self.backend = backend
[docs] def fit(self,X,y=None): """ fit train model Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- self """ super().fit(X,y) return self
[docs] def transform(self,X,y=None): """ transform transforming data according to the strategy Parameters ---------- X : pandas DataFrame or numpy ndArray input values y : depreciated not used, optional following sklearn convention (not used), by default None Returns ------- numpy ndArray transformed data """ tst, X_time,X_header,X_category = super().transform(X,y) tst = tst.get_technial_indicator(self.strategy,self.n_jobs,self.verbose,self.backend) df = tst.to_pandas() return self._transform_output_wrapper(df,X_category,X_time,X_header)