Beyond Seasonality: Integrating External Events in Time Series Analysis

In today's data-driven business environment, understanding the patterns in your time series data is essential for making informed decisions. While seasonal decomposition techniques like STL (Seasonal-Trend-Loess) are powerful tools for separating seasonal patterns from underlying trends, they often miss a critical factor: the impact of external events.


When analyzing sales, web traffic, or consumer behavior, external events like holidays, promotions, or cultural celebrations can cause significant spikes that traditional time series analysis might misattribute to noise or seasonality. This is particularly true in markets with strong cultural events that dramatically affect consumer behavior.


In this article, I'll share a comprehensive approach that extends STL decomposition to:


1. Normalize for price impacts using elasticity models

2. Account for external events using your own events calendar

3. Handle additional factors like promotions and out-of-stock situations

4. Automatically detect and quantify event-related spikes


## The Limitations of Standard STL Decomposition


Standard STL decomposition breaks down a time series into three components:


- **Trend**: The long-term progression of the series

- **Seasonality**: Regular, predictable patterns that repeat at fixed intervals

- **Residual**: The irregular remainder after removing trend and seasonality


However, this approach falls short when:


- Price fluctuations distort the underlying patterns

- Special events cause significant but irregular spikes

- Promotions or inventory issues create temporary anomalies


Let's explore how we can overcome these limitations.


## The Enhanced Approach: Integrating External Events


Our enhanced approach starts with a table of events (like holidays, marketing campaigns, or cultural celebrations) and integrates it with STL decomposition. Here's how it works:


### Step 1: Data Preparation and Price Normalization


First, we prepare our data and normalize for price impacts:


```python

# Set datetime indexes

df[time_col] = pd.to_datetime(df[time_col])

df = df.set_index(pd.DatetimeIndex(df[time_col]))


events_df['date'] = pd.to_datetime(events_df['date'])

events_df = events_df.set_index('date')


# Normalize for price impact

if price_col is not None and price_col in df.columns:

    # Create a price elasticity model using log-log regression

    valid_mask = (df[value_col] > 0) & (df[price_col] > 0)

    

    if valid_mask.sum() > 10:

        model = sm.OLS(

            np.log(df.loc[valid_mask, value_col]), 

            sm.add_constant(np.log(df.loc[valid_mask, price_col]))

        ).fit()

        

        elasticity = model.params[1]

        print(f"Price elasticity: {elasticity:.4f}")

        

        # Normalize the value using the estimated price impact

        baseline_price = df[price_col].median()

        df['normalized_value'] = df[value_col] * (baseline_price / df[price_col]) ** elasticity

        target_col = 'normalized_value'

```


This price normalization is crucial for isolating true seasonal and event-driven patterns from price-induced fluctuations. We use a log-log regression model to estimate price elasticity, then adjust our values to a baseline price.


### Step 2: Accounting for Promotions and Inventory Issues


Next, we adjust for promotions and out-of-stock situations, which can significantly impact sales or impressions data:


```python

# Account for promotions if provided

if promotion_col is not None and promotion_col in df.columns:

    # Create a flag for promotional periods

    df['has_promotion'] = df[promotion_col].notna() & (df[promotion_col] != 0)

    

    # If we have enough data, adjust for promotion effect

    if df['has_promotion'].sum() > 10:

        # Simple model to estimate promotion effect

        X = sm.add_constant(df['has_promotion'].astype(int))

        model = sm.OLS(df[target_col], X).fit()

        promotion_effect = model.params[1]

        

        # Adjust the target value for promotion effect

        if abs(promotion_effect) > 0:

            df['promotion_adjusted'] = df[target_col] - (df['has_promotion'].astype(int) * promotion_effect)

            target_col = 'promotion_adjusted'

            print(f"Promotion effect: {promotion_effect:.4f}")


# Account for out-of-stock impact if provided

if oos_col is not None and oos_col in df.columns:

    # Normalize OOS hours (0-24 scale)

    df['oos_ratio'] = df[oos_col] / 24.0

    

    # If we have enough data with OOS, adjust for OOS effect

    if (df['oos_ratio'] > 0).sum() > 10:

        # Simple model to estimate OOS effect

        X = sm.add_constant(df['oos_ratio'])

        model = sm.OLS(df[target_col], X).fit()

        oos_effect = model.params[1]

        

        # Adjust the target value for OOS effect

        if abs(oos_effect) > 0:

            df['oos_adjusted'] = df[target_col] - (df['oos_ratio'] * oos_effect)

            target_col = 'oos_adjusted'

            print(f"Out-of-stock effect: {oos_effect:.4f}")

```


By building simple statistical models, we estimate and remove the effects of promotions and out-of-stock situations, leading to a cleaner signal for our STL decomposition.


### Step 3: STL Decomposition and Event Integration


Now we apply STL decomposition and integrate our events data:


```python

# Apply STL decomposition

stl = STL(df[target_col], period=period, robust=True)

result = stl.fit()


# Extract components

trend = result.trend

seasonal = result.seasonal

residual = result.resid


# Create result DataFrame

result_df = pd.DataFrame({

    'original': df[value_col],

    'trend': trend,

    'seasonal': seasonal,

    'residual': residual

}, index=df.index)


# Add the normalized/adjusted values used for decomposition

result_df['adjusted_value'] = df[target_col]


# Integrate events data

result_df['event_name'] = None


# Map events to the result dataframe

for idx, row in events_df.iterrows():

    if idx in result_df.index:

        result_df.loc[idx, 'event_name'] = row['events']

```


After performing STL decomposition, we map our events data to the decomposition results. This allows us to identify which residuals might be attributable to specific events.


### Step 4: Spike Detection and Event Association


Next, we detect spikes in the residual component and associate them with nearby events:


```python

# Detect spikes in the residual component

result_df['is_spike'] = False


# Calculate z-scores of residuals

result_df['residual_zscore'] = (result_df['residual'] - result_df['residual'].mean()) / result_df['residual'].std()


# Mark significant positive spikes (adjust threshold as needed)

spike_threshold = 2.0  # Default threshold at 2 standard deviations

result_df.loc[result_df['residual_zscore'] > spike_threshold, 'is_spike'] = True


# Analyze spikes around events (include days before and after)

event_window = 7  # Days before and after the event - adjust based on your business context

result_df['event_relation'] = None


for idx, event_name in result_df[~result_df['event_name'].isnull()][['event_name']].iterrows():

    # Create a window around the event date

    window_start = idx - pd.Timedelta(days=event_window)

    window_end = idx + pd.Timedelta(days=event_window)

    

    # Find rows in the window

    mask = (result_df.index >= window_start) & (result_df.index <= window_end)

    

    if mask.any():

        # If any day in the window is a spike, mark as event-related spike

        window_df = result_df.loc[mask]

        if window_df['is_spike'].any():

            # Associate spikes with the event

            spike_days = window_df.loc[window_df['is_spike']].index

            

            for spike_day in spike_days:

                days_from_event = (spike_day - idx).days

                if days_from_event < 0:

                    relation = f"{abs(days_from_event)} day(s) before"

                elif days_from_event > 0:

                    relation = f"{days_from_event} day(s) after"

                else:

                    relation = "day of"

                

                result_df.loc[result_df.index == spike_day, 'event_relation'] = \

                    f"{relation} {event_name['event_name']}"

```


This detection system identifies statistically significant spikes in our data and correlates them with events in our calendar. 


#### The Importance of Lead and Lag Effects


One of the most valuable aspects of this approach is capturing what we call "lead and lag effects" - spikes that occur before, during, or after the actual event date:


1. **Lead Effects (Before the Event)**: Many events create significant activity before they occur:

   - Shopping spikes in the days leading up to holidays (e.g., the week before Christmas)

   - Travel bookings weeks before major events

   - Preparation activities (like buying ingredients days before Thanksgiving)


2. **Event-Day Effects**: The actual day of the event often shows distinctive patterns:

   - Some holidays show decreased activity (e.g., closed businesses)

   - Others show peaked activity (e.g., restaurant traffic on Valentine's Day)


3. **Lag Effects (After the Event)**: Many events continue to impact metrics after they're over:

   - Post-holiday sales or returns

   - Extended travel periods after major holidays

   - Social media engagement continuing days after an event


By establishing an appropriate "event window" (we use 7 days before and after by default, but you should adjust based on your specific business context), we can capture the full lifecycle of an event's impact on your time series.


### Step 5: Analyzing Event Impact


Finally, we can quantify the impact of each event on our time series:


```python

def analyze_event_impact(result_df, value_col='original', event_window=7):

    """Analyze the impact of events on the time series"""

    # Get all unique events

    events = result_df[~result_df['event_name'].isnull()].copy()

    

    if events.empty:

        print("No events found in the data")

        return pd.DataFrame()

    

    # Prepare results container

    event_impact = []

    

    for idx, row in events.iterrows():

        event_name = row['event_name']

        event_date = idx

        

        # Define window around event

        window_start = event_date - pd.Timedelta(days=event_window)

        window_end = event_date + pd.Timedelta(days=event_window)

        

        # Get window data

        window_df = result_df[(result_df.index >= window_start) & 

                             (result_df.index <= window_end)].copy()

        

        if not window_df.empty:

            # Get baseline period (same number of days, one period before)

            baseline_start = window_start - pd.Timedelta(days=event_window*2 + 1)

            baseline_end = window_end - pd.Timedelta(days=event_window*2 + 1)

            

            baseline_df = result_df[(result_df.index >= baseline_start) & 

                                   (result_df.index <= baseline_end)].copy()

            

            # Calculate overall window metrics

            event_avg = window_df[value_col].mean()

            event_spike = window_df['is_spike'].sum()

            

            # Calculate pre-event metrics (lead effects)

            pre_event_df = window_df[window_df.index < event_date]

            pre_event_avg = pre_event_df[value_col].mean() if not pre_event_df.empty else None

            pre_event_spikes = pre_event_df['is_spike'].sum() if not pre_event_df.empty else 0

            

            # Calculate post-event metrics (lag effects)

            post_event_df = window_df[window_df.index > event_date] 

            post_event_avg = post_event_df[value_col].mean() if not post_event_df.empty else None

            post_event_spikes = post_event_df['is_spike'].sum() if not post_event_df.empty else 0

            

            # Calculate event day metrics

            event_day_value = row[value_col]

            event_day_spike = 1 if row['is_spike'] else 0

            

            if not baseline_df.empty:

                baseline_avg = baseline_df[value_col].mean()

                lift_pct = ((event_avg / baseline_avg) - 1) * 100 if baseline_avg > 0 else 0

                

                # Calculate separate lift for pre, day of, and post event

                pre_event_lift = ((pre_event_avg / baseline_avg) - 1) * 100 if (pre_event_avg and baseline_avg > 0) else None

                post_event_lift = ((post_event_avg / baseline_avg) - 1) * 100 if (post_event_avg and baseline_avg > 0) else None

                event_day_lift = ((event_day_value / baseline_avg) - 1) * 100 if baseline_avg > 0 else None

            else:

                baseline_avg = None

                lift_pct = None

                pre_event_lift = None

                post_event_lift = None

                event_day_lift = None

            

            # Add to results

            event_impact.append({

                'event_name': event_name,

                'event_date': event_date,

                'event_day_value': event_day_value,

                'event_window_avg': event_avg,

                'baseline_avg': baseline_avg,

                'total_lift_percentage': lift_pct,

                

                # Lead, day-of, and lag effect metrics

                'pre_event_avg': pre_event_avg,

                'pre_event_lift': pre_event_lift,

                'pre_event_spikes': pre_event_spikes,

                

                'event_day_lift': event_day_lift,

                'event_day_spike': event_day_spike,

                

                'post_event_avg': post_event_avg,

                'post_event_lift': post_event_lift,

                'post_event_spikes': post_event_spikes,

                

                'total_spikes_in_window': event_spike,

                'residual_zscore': row['residual_zscore']

            })

    

    # Create DataFrame and sort by impact

    if event_impact:

        impact_df = pd.DataFrame(event_impact)

        

        # Add a dominant effect column to identify where the biggest impact occurs

        impact_df['dominant_effect_period'] = 'day of event'  # Default

        

        # Determine where the biggest effect occurs (pre, during, or post event)

        for idx, row in impact_df.iterrows():

            lifts = [

                ('pre-event', row['pre_event_lift'] if pd.notna(row['pre_event_lift']) else -float('inf')),

                ('day of event', row['event_day_lift'] if pd.notna(row['event_day_lift']) else -float('inf')),

                ('post-event', row['post_event_lift'] if pd.notna(row['post_event_lift']) else -float('inf'))

            ]

            max_period = max(lifts, key=lambda x: x[1])[0]

            impact_df.at[idx, 'dominant_effect_period'] = max_period

        

        # Sort by total impact

        impact_df = impact_df.sort_values('total_lift_percentage', ascending=False)

        return impact_df

    else:

        return pd.DataFrame()

```


This enhanced analysis breaks down the event impact into three distinct periods:


1. **Pre-Event Impact (Lead Effects)** - Captures changes in behavior before the event occurs

2. **Event Day Impact** - Measures the direct impact on the day of the event

3. **Post-Event Impact (Lag Effects)** - Identifies lasting effects after the event is over


The function also identifies the "dominant effect period" for each event - whether it primarily affects behavior before, during, or after the event. This is crucial information for planning and optimization, as it tells you when to focus your resources.


For example:

- If an event shows strong pre-event effects, you might need to ensure adequate inventory and staffing in advance

- If post-event effects dominate, you might extend promotions or customer service resources after the event

- If the event day itself shows the largest impact, you might focus maximum resources on that specific day


By comparing each period with a baseline period, we can isolate the true impact of the event across its entire lifecycle. This comprehensive view goes far beyond just looking at the event day itself.


## Visualizing the Results


To make our analysis more interpretable, we can visualize the time series with event markers and detected spikes:


```python

def plot_events_impact(result_df, event_analysis=None, value_col='original'):

    """Plot the time series with events and detected spikes"""

    plt.figure(figsize=(15, 8))

    plt.plot(result_df.index, result_df[value_col], label='Value', color='blue', alpha=0.7)

    

    # Plot event markers

    event_mask = ~result_df['event_name'].isnull()

    if event_mask.any():

        plt.scatter(result_df[event_mask].index, 

                    result_df[event_mask][value_col],

                    color='red', s=100, label='Events', marker='*')

        

        # Add event labels if not too many

        if event_mask.sum() <= 15:  # Limit labels to avoid clutter

            for idx, row in result_df[event_mask].iterrows():

                plt.annotate(row['event_name'], 

                             (idx, row[value_col]),

                             textcoords="offset points",

                             xytext=(0,10), 

                             ha='center')

    

    # Plot spike markers

    spike_mask = result_df['is_spike'] == True

    if spike_mask.any():

        plt.scatter(result_df[spike_mask].index, 

                    result_df[spike_mask][value_col],

                    color='green', s=80, label='Significant Spikes', marker='^')

    

    # Highlight top events if analysis is provided

    if event_analysis is not None and not event_analysis.empty:

        top_events = event_analysis.head(5)  # Top 5 events by impact

        for _, row in top_events.iterrows():

            event_date = row['event_date']

            if event_date in result_df.index:

                plt.axvline(x=event_date, color='orange', linestyle='--', alpha=0.5)

    

    plt.title('Time Series with Events and Detected Spikes')

    plt.xlabel('Date')

    plt.ylabel(value_col)

    plt.legend()

    plt.grid(True, alpha=0.3)

    plt.tight_layout()

    

    return plt

```


This visualization makes it easy to spot the relationship between events and spikes in your data, helping to validate your analysis and communicate insights.


## Real-World Applications


This enhanced time series analysis has numerous practical applications:


### 1. Marketing Optimization


By quantifying the impact of different events, marketers can:

- Allocate budget more effectively toward high-impact events

- Time promotions to coincide with natural demand patterns

- Understand lead and lag effects to optimize campaign timing


### 2. Inventory Planning


Better event impact analysis helps with:

- Accurate inventory forecasting for specific events

- Avoiding out-of-stock situations during high-demand periods

- Optimizing inventory levels based on quantified event impacts


### 3. Business Strategy


At a strategic level, this analysis enables:

- Market-specific planning based on local events and holidays

- Resource allocation that aligns with event-driven demand peaks

- Better forecasting by incorporating event calendars into predictions


### 4. Product Development


Product teams can use these insights to:

- Time product launches around beneficial events

- Develop event-specific product variants

- Understand which products have the strongest event-driven demand


## Bringing It All Together: A Complete Example


Here's how to implement this analysis end-to-end:


```python

# Load data

sales_data = pd.read_csv("sales_data.csv")

events_data = pd.read_csv("events.csv")


# Extract STL with events integration

result_df = extract_stl_with_events(

    df=sales_data,

    events_df=events_data,

    time_col='date',

    value_col='sales',

    price_col='price',

    product_id_col='productid',

    promotion_col='promotion',

    oos_col='out_of_stock_hours',

    period=52  # Weekly data

)


# Analyze event impact

event_analysis = analyze_event_impact(result_df)

print("\nEvent Impact Analysis:")

print(event_analysis.head(10).to_string())


# Plot results

plot = plot_events_impact(result_df, event_analysis)

plt.savefig('event_impact_analysis.png')

plt.show()


# Export results

result_df.to_csv('time_series_with_events_analysis.csv')

event_analysis.to_csv('event_impact_analysis.csv')

```


This code takes your sales data and events calendar, performs STL decomposition with price normalization and event integration, analyzes the impact of each event, visualizes the results, and exports the analysis for further use.


## Conclusion


Traditional time series decomposition is a powerful tool, but it often falls short when dealing with event-driven spikes and external factors like price changes, promotions, and inventory issues. By extending STL decomposition to incorporate these factors, we can uncover deeper insights and make more informed business decisions.


This integrated approach:

- Normalizes for price effects before performing decomposition

- Incorporates external events from your own calendar

- Accounts for promotions and inventory fluctuations

- Quantifies the impact of each event on your time series

- Identifies lead and lag effects around events


Whether you're analyzing retail sales, web traffic, or any other time series data influenced by external events, this enhanced approach provides a more complete picture of the factors driving your data.


Give it a try with your own data and discover the hidden patterns and event correlations that standard time series analysis might miss!


---


*Note: This implementation uses the statsmodels package for STL decomposition and various statistical models. Make sure you have pandas, numpy, statsmodels, and matplotlib installed in your Python environment.*

Comments

Popular posts from this blog

Improving Time Series Analysis: STL Decomposition with Price Normalization

Building a Smart Keyword Seasonality Analyzer: From Expert Intuition to AI-Powered Insights