# Standard library imports from typing import Optional, Iterable # Third-party library imports from transformers import PretrainedConfig, AutoformerForPrediction from functools import partial import gradio as gr import spaces import torch import pandas as pd import plotly.graph_objects as go from plotly.subplots import make_subplots # External imports # GluonTS imports from gluonts.dataset.field_names import FieldName from gluonts.transform import ( AddAgeFeature, AddObservedValuesIndicator, AddTimeFeatures, AsNumpyArray, Chain, ExpectedNumInstanceSampler, InstanceSplitter, RemoveFields, TestSplitSampler, Transformation, ValidationSplitSampler, VstackFeatures, RenameFields, ) from gluonts.time_feature import time_features_from_frequency_str from gluonts.transform.sampler import InstanceSampler # Hugging Face Datasets imports from datasets import Dataset, Features, Value, Sequence, load_dataset # GluonTS Loader imports from gluonts.dataset.loader import as_stacked_batches import matplotlib.pyplot as plt import matplotlib.dates as mdates import numpy as np def convert_to_pandas_period(date, freq): return pd.Period(date, freq) def transform_start_field(batch, freq): batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]] return batch def create_transformation(freq: str, config: PretrainedConfig, prediction_length: int) -> Transformation: remove_field_names = [] if config.num_static_real_features == 0: remove_field_names.append(FieldName.FEAT_STATIC_REAL) if config.num_dynamic_real_features == 0: remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL) if config.num_static_categorical_features == 0: remove_field_names.append(FieldName.FEAT_STATIC_CAT) # a bit like torchvision.transforms.Compose return Chain( # step 1: remove static/dynamic fields if not specified [RemoveFields(field_names=remove_field_names)] # step 2: convert the data to NumPy (potentially not needed) + ( [ AsNumpyArray( field=FieldName.FEAT_STATIC_CAT, expected_ndim=1, dtype=int, ) ] if config.num_static_categorical_features > 0 else [] ) + ( [ AsNumpyArray( field=FieldName.FEAT_STATIC_REAL, expected_ndim=1, ) ] if config.num_static_real_features > 0 else [] ) + [ AsNumpyArray( field=FieldName.TARGET, # we expect an extra dim for the multivariate case: expected_ndim=1 if config.input_size == 1 else 2, ), # step 3: handle the NaN's by filling in the target with zero # and return the mask (which is in the observed values) # true for observed values, false for nan's # the decoder uses this mask (no loss is incurred for unobserved values) # see loss_weights inside the xxxForPrediction model AddObservedValuesIndicator( target_field=FieldName.TARGET, output_field=FieldName.OBSERVED_VALUES, ), # step 4: add temporal features based on freq of the dataset # and the desired prediction length AddTimeFeatures( start_field=FieldName.START, target_field=FieldName.TARGET, output_field=FieldName.FEAT_TIME, time_features=time_features_from_frequency_str(freq), pred_length=prediction_length, ), # step 5: add another temporal feature (just a single number) # tells the model where in its life the value of the time series is, # sort of a running counter AddAgeFeature( target_field=FieldName.TARGET, output_field=FieldName.FEAT_AGE, pred_length=prediction_length, log_scale=True, ), # step 6: vertically stack all the temporal features into the key FEAT_TIME VstackFeatures( output_field=FieldName.FEAT_TIME, input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE] + ( [FieldName.FEAT_DYNAMIC_REAL] if config.num_dynamic_real_features > 0 else [] ), ), # step 7: rename to match HuggingFace names RenameFields( mapping={ FieldName.FEAT_STATIC_CAT: "static_categorical_features", FieldName.FEAT_STATIC_REAL: "static_real_features", FieldName.FEAT_TIME: "time_features", FieldName.TARGET: "values", FieldName.OBSERVED_VALUES: "observed_mask", } ), ] ) def create_instance_splitter( config: PretrainedConfig, mode: str, prediction_length: int, train_sampler: Optional[InstanceSampler] = None, validation_sampler: Optional[InstanceSampler] = None, ) -> Transformation: assert mode in ["train", "validation", "test"] instance_sampler = { "train": train_sampler or ExpectedNumInstanceSampler( num_instances=1.0, min_future=prediction_length ), "validation": validation_sampler or ValidationSplitSampler(min_future=prediction_length), "test": TestSplitSampler(), }[mode] return InstanceSplitter( target_field="values", is_pad_field=FieldName.IS_PAD, start_field=FieldName.START, forecast_start_field=FieldName.FORECAST_START, instance_sampler=instance_sampler, past_length=config.context_length + max(config.lags_sequence), future_length=prediction_length, time_series_fields=["time_features", "observed_mask"], ) def create_test_dataloader( config: PretrainedConfig, freq: str, data: Dataset, batch_size: int, prediction_length: int, **kwargs, ): PREDICTION_INPUT_NAMES = [ "past_time_features", "past_values", "past_observed_mask", "future_time_features", ] if config.num_static_categorical_features > 0: PREDICTION_INPUT_NAMES.append("static_categorical_features") if config.num_static_real_features > 0: PREDICTION_INPUT_NAMES.append("static_real_features") transformation = create_transformation(freq, config, prediction_length) transformed_data = transformation.apply(data, is_train=False) # we create a Test Instance splitter which will sample the very last # context window seen during training only for the encoder. instance_sampler = create_instance_splitter( config, "test", prediction_length=prediction_length ) # we apply the transformations in test mode testing_instances = instance_sampler.apply(transformed_data, is_train=False) return as_stacked_batches( testing_instances, batch_size=batch_size, output_type=torch.tensor, field_names=PREDICTION_INPUT_NAMES, ) def plot(ts_index, test_dataset, forecasts, prediction_length): # Length of the target data target_length = len(test_dataset[ts_index]['target']) # Creating a period range for the entire dataset plus forecast period index = pd.period_range( start=test_dataset[ts_index]['start'], periods=target_length + prediction_length, freq='1D' ).to_timestamp() # Plotting actual data actual_data = go.Scatter( x=index[:target_length], y=test_dataset[ts_index]['target'], name="Actual", mode='lines', ) # Plotting the forecast data forecast_data = go.Scatter( x=index[target_length:], y=forecasts[ts_index][0][:prediction_length], name="Prediction", mode='lines', ) forecast_median = np.median(forecasts[ts_index][0][:prediction_length]) forecast_median_data = go.Scatter( x=index[target_length:], y=[forecast_median] * prediction_length, name="Prediction Median", mode='lines', ) forecast_std = np.std(forecasts[ts_index][0][:prediction_length]) forecast_std_data = go.Scatter( x=index[target_length:], y=[forecast_median + forecast_std] * prediction_length, name="Prediction Std", mode='lines', ) # Create the figure fig = make_subplots(rows=1, cols=1) fig.add_trace(actual_data, row=1, col=1) fig.add_trace(forecast_data, row=1, col=1) fig.add_trace(forecast_median_data, row=1, col=1) fig.add_trace(forecast_std_data, row=1, col=1) # Set layout and title fig.update_layout( xaxis_title="Date", yaxis_title="Value", title="Actual vs. Predicted Values", xaxis_rangeslider_visible=True, ) return fig def do_prediction(days_to_predict: int): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Define the desired prediction length prediction_length = days_to_predict # Number of time steps to predict into the future freq = "1D" # Daily frequency dataset = load_dataset("thesven/BTC-Daily-Avg-Market-Value") dataset['test'].set_transform(partial(transform_start_field, freq=freq)) model = AutoformerForPrediction.from_pretrained("thesven/BTC-Autoformer-v1") config = model.config print(f"Config: {config}") test_dataloader = create_test_dataloader( config=config, freq=freq, data=dataset['test'], batch_size=64, prediction_length=prediction_length, ) model.to(device) model.eval() forecasts = [] for batch in test_dataloader: outputs = model.generate( static_categorical_features=batch["static_categorical_features"].to(device) if config.num_static_categorical_features > 0 else None, static_real_features=batch["static_real_features"].to(device) if config.num_static_real_features > 0 else None, past_time_features=batch["past_time_features"].to(device), past_values=batch["past_values"].to(device), future_time_features=batch["future_time_features"].to(device), past_observed_mask=batch["past_observed_mask"].to(device), ) forecasts.append(outputs.sequences.cpu().numpy()) forecasts = np.vstack(forecasts) print(forecasts.shape) return plot(0, dataset['test'], forecasts, prediction_length) interface = gr.Interface( fn=do_prediction, inputs=gr.Slider(minimum=1, maximum=30, step=1, label="Days to Predict"), outputs="plot", title="Prediction Plot", description="Adjust the slider to set the number of days to predict.", allow_flagging=False, # Disable flagging for simplicity ) interface.launch()