Seasonality code

 import pandas as pd

import numpy as np

from scipy import stats

from scipy.signal import find_peaks

from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.ensemble import RandomForestClassifier

from sklearn.model_selection import train_test_split, cross_val_score

from sklearn.metrics import classification_report, confusion_matrix

from sklearn.preprocessing import StandardScaler

from datetime import datetime, timedelta

import warnings

warnings.filterwarnings(‘ignore’)


class SeasonalClassifier:

def **init**(self, min_weeks=16, min_observations_per_season=2):

“””

Initialize the seasonal classifier


```

    Args:

        min_weeks: Minimum weeks of data required for K-W test

        min_observations_per_season: Minimum observations per season for reliability

    """

    self.min_weeks = min_weeks

    self.min_observations_per_season = min_observations_per_season

    self.high_confidence_products = []

    self.text_model = None

    self.tfidf_vectorizer = None

    self.scaler = StandardScaler()

    

    # Korean seasonal keywords for feature engineering

    self.korean_seasonal_keywords = {

        'spring': ['봄', '스프링', '경량', '얇은', '가벼운', '신선한', '파스텔', '벚꽃'],

        'summer': ['여름', '썸머', '시원한', '반팔', '반바지', '린넨', '면', '민소매', '샌들', '수영복'],

        'fall': ['가을', '아우터', '자켓', '긴팔', '브라운', '베이지', '트렌치', '부츠'],

        'winter': ['겨울', '윈터', '따뜻한', '두꺼운', '패딩', '코트', '스카프', '부츠', '울', '플리스']

    }

    

def assign_season(self, date):

    """Assign Korean season based on date"""

    month = date.month

    if month in [3, 4, 5]:

        return 'spring'

    elif month in [6, 7, 8]:

        return 'summer'

    elif month in [9, 10, 11]:

        return 'fall'

    else:

        return 'winter'


def prepare_seasonal_data(self, df):

    """

    Prepare data for seasonal analysis

    

    Args:

        df: DataFrame with columns ['product_id', 'date', 'views', 'product_name', 'description']

    

    Returns:

        DataFrame with seasonal aggregations

    """

    df = df.copy()

    df['date'] = pd.to_datetime(df['date'])

    df['season'] = df['date'].apply(self.assign_season)

    df['year'] = df['date'].dt.year

    df['week'] = df['date'].dt.isocalendar().week

    df['year_week'] = df['year'].astype(str) + '_' + df['week'].astype(str)

    

    # Weekly aggregation by season

    weekly_seasonal = df.groupby(['product_id', 'year_week', 'season']).agg({

        'views': 'sum',

        'product_name': 'first',

        'description': 'first'

    }).reset_index()

    

    return weekly_seasonal


def check_data_sufficiency(self, product_data):

    """Check if product has sufficient data for K-W test"""

    total_weeks = len(product_data)

    seasons_present = product_data['season'].nunique()

    

    # Check minimum weeks

    if total_weeks < self.min_weeks:

        return False, f"Insufficient weeks: {total_weeks} < {self.min_weeks}"

    

    # Check seasons represented

    if seasons_present < 3:

        return False, f"Insufficient seasons: {seasons_present} < 3"

    

    # Check observations per season

    season_counts = product_data.groupby('season').size()

    min_obs_per_season = season_counts.min()

    if min_obs_per_season < self.min_observations_per_season:

        return False, f"Insufficient obs per season: {min_obs_per_season} < {self.min_observations_per_season}"

    

    return True, "Sufficient data"


def kruskal_wallis_test(self, product_data):

    """

    Perform Kruskal-Wallis test for seasonal differences

    

    Args:

        product_data: DataFrame with seasonal data for one product

        

    Returns:

        dict with test results

    """

    # Group data by season

    season_groups = []

    season_names = []

    

    for season in ['spring', 'summer', 'fall', 'winter']:

        season_data = product_data[product_data['season'] == season]['views'].values

        if len(season_data) > 0:

            season_groups.append(season_data)

            season_names.append(season)

    

    if len(season_groups) < 3:

        return {'significant': False, 'p_value': 1.0, 'reason': 'insufficient_seasons'}

    

    # Perform Kruskal-Wallis test

    try:

        h_stat, p_value = stats.kruskal(*season_groups)

        

        # Calculate seasonal averages for pattern detection

        seasonal_avgs = {}

        for i, season in enumerate(season_names):

            seasonal_avgs[season] = np.mean(season_groups[i])

        

        return {

            'significant': p_value < 0.05,

            'p_value': p_value,

            'h_statistic': h_stat,

            'seasonal_averages': seasonal_avgs,

            'season_names': season_names

        }

    except:

        return {'significant': False, 'p_value': 1.0, 'reason': 'test_failed'}


def detect_seasonal_pattern(self, seasonal_avgs):

    """

    Detect if product has single-modal or bi-modal seasonal pattern

    

    Args:

        seasonal_avgs: Dictionary of season -> average views

        

    Returns:

        dict with pattern classification

    """

    if not seasonal_avgs:

        return {'pattern': 'year_round', 'primary_seasons': [], 'confidence': 'low'}

    

    # Convert to sorted array for peak detection

    seasons_order = ['spring', 'summer', 'fall', 'winter']

    values = []

    available_seasons = []

    

    for season in seasons_order:

        if season in seasonal_avgs:

            values.append(seasonal_avgs[season])

            available_seasons.append(season)

    

    if len(values) < 3:

        return {'pattern': 'insufficient_data', 'primary_seasons': [], 'confidence': 'low'}

    

    values = np.array(values)

    

    # Normalize values for peak detection

    if values.max() > 0:

        normalized_values = values / values.max()

    else:

        normalized_values = values

    

    # Find peaks (seasons with high relative performance)

    # Use a threshold of 70% of the maximum value

    threshold = 0.7

    peak_indices = np.where(normalized_values >= threshold)[0]

    

    peak_seasons = [available_seasons[i] for i in peak_indices]

    

    # Classify pattern

    if len(peak_seasons) == 0:

        # No clear peaks - check if one season is notably higher

        max_idx = np.argmax(values)

        if values[max_idx] > np.mean(values) * 1.5:  # 50% higher than average

            return {

                'pattern': 'single_season',

                'primary_seasons': [available_seasons[max_idx]],

                'confidence': 'medium'

            }

        else:

            return {'pattern': 'year_round', 'primary_seasons': [], 'confidence': 'medium'}

    

    elif len(peak_seasons) == 1:

        return {

            'pattern': 'single_season',

            'primary_seasons': peak_seasons,

            'confidence': 'high'

        }

    

    elif len(peak_seasons) == 2:

        # Check if seasons are adjacent (extended season) or separate (bimodal)

        season_indices = [seasons_order.index(s) for s in peak_seasons]

        season_indices.sort()

        

        # Handle wrap-around (winter-spring)

        if season_indices == [0, 3]:  # spring and winter

            pattern_type = 'bimodal'

        elif abs(season_indices[1] - season_indices[0]) == 1:

            pattern_type = 'extended_season'

        else:

            pattern_type = 'bimodal'

        

        return {

            'pattern': pattern_type,

            'primary_seasons': peak_seasons,

            'confidence': 'high'

        }

    

    else:

        return {'pattern': 'year_round', 'primary_seasons': peak_seasons, 'confidence': 'medium'}


def classify_products_kw(self, df):

    """

    Classify products using Kruskal-Wallis test

    

    Args:

        df: DataFrame with seasonal data

        

    Returns:

        DataFrame with classifications

    """

    seasonal_data = self.prepare_seasonal_data(df)

    results = []

    

    for product_id in seasonal_data['product_id'].unique():

        product_data = seasonal_data[seasonal_data['product_id'] == product_id].copy()

        

        # Get product text data

        product_name = product_data['product_name'].iloc[0]

        product_desc = product_data['description'].iloc[0] if pd.notna(product_data['description'].iloc[0]) else ""

        

        # Check data sufficiency

        sufficient, reason = self.check_data_sufficiency(product_data)

        

        result = {

            'product_id': product_id,

            'product_name': product_name,

            'description': product_desc,

            'data_sufficient': sufficient,

            'reason': reason

        }

        

        if sufficient:

            # Perform K-W test

            kw_result = self.kruskal_wallis_test(product_data)

            

            if kw_result['significant']:

                # Detect seasonal pattern

                pattern_result = self.detect_seasonal_pattern(kw_result.get('seasonal_averages', {}))

                

                result.update({

                    'kw_significant': True,

                    'kw_p_value': kw_result['p_value'],

                    'pattern': pattern_result['pattern'],

                    'primary_seasons': ','.join(pattern_result['primary_seasons']),

                    'confidence': 'high',

                    'classification_method': 'kruskal_wallis'

                })

                

                # Store high confidence products for semi-supervised learning

                if pattern_result['confidence'] == 'high':

                    self.high_confidence_products.append(result)

            

            else:

                result.update({

                    'kw_significant': False,

                    'kw_p_value': kw_result['p_value'],

                    'pattern': 'year_round',

                    'primary_seasons': '',

                    'confidence': 'medium',

                    'classification_method': 'kruskal_wallis'

                })

        

        else:

            result.update({

                'kw_significant': None,

                'kw_p_value': None,

                'pattern': 'insufficient_data',

                'primary_seasons': '',

                'confidence': 'low',

                'classification_method': 'insufficient_data'

            })

        

        results.append(result)

    

    return pd.DataFrame(results)


def extract_korean_text_features(self, texts):

    """Extract Korean text features using TF-IDF and seasonal keywords"""

    

    # Initialize TF-IDF vectorizer for Korean text

    if self.tfidf_vectorizer is None:

        self.tfidf_vectorizer = TfidfVectorizer(

            max_features=5000,

            ngram_range=(1, 2),

            min_df=2,

            max_df=0.95,

            stop_words=None,  # Keep Korean stopwords for now

            lowercase=False   # Preserve Korean characters

        )

        

        # Fit on training texts

        tfidf_features = self.tfidf_vectorizer.fit_transform(texts)

    else:

        tfidf_features = self.tfidf_vectorizer.transform(texts)

    

    # Add seasonal keyword features

    seasonal_features = []

    for text in texts:

        text_lower = text.lower() if isinstance(text, str) else ""

        features = []

        

        for season, keywords in self.korean_seasonal_keywords.items():

            # Count keyword matches

            keyword_count = sum(1 for keyword in keywords if keyword in text_lower)

            features.append(keyword_count)

        

        seasonal_features.append(features)

    

    seasonal_features = np.array(seasonal_features)

    

    # Combine TF-IDF with seasonal keyword features

    if seasonal_features.shape[0] > 0:

        combined_features = np.hstack([tfidf_features.toarray(), seasonal_features])

    else:

        combined_features = tfidf_features.toarray()

    

    return combined_features


def train_semi_supervised_model(self, all_products_df):

    """

    Train semi-supervised model using high-confidence K-W labels

    

    Args:

        all_products_df: DataFrame with all products and their classifications

    """

    

    # Get high-confidence labeled data

    high_conf_data = all_products_df[

        (all_products_df['confidence'] == 'high') & 

        (all_products_df['pattern'] != 'insufficient_data')

    ].copy()

    

    if len(high_conf_data) < 10:

        print(f"Warning: Only {len(high_conf_data)} high-confidence samples for training")

        return None

    

    print(f"Training with {len(high_conf_data)} high-confidence samples")

    

    # Prepare text data

    texts = []

    labels = []

    

    for _, row in high_conf_data.iterrows():

        text = str(row['product_name']) + " " + str(row['description'] if pd.notna(row['description']) else "")

        texts.append(text)

        

        # Create label from pattern and primary seasons

        if row['pattern'] == 'single_season' and row['primary_seasons']:

            labels.append(row['primary_seasons'].split(',')[0])

        elif row['pattern'] == 'bimodal' and row['primary_seasons']:

            # For bimodal, create combined label

            seasons = row['primary_seasons'].split(',')

            if len(seasons) == 2:

                labels.append(f"bimodal_{seasons[0]}_{seasons[1]}")

            else:

                labels.append('year_round')

        else:

            labels.append('year_round')

    

    # Extract features

    X = self.extract_korean_text_features(texts)

    y = np.array(labels)

    

    # Split for validation

    X_train, X_test, y_train, y_test = train_test_split(

        X, y, test_size=0.2, random_state=42, stratify=y

    )

    

    # Scale features

    X_train_scaled = self.scaler.fit_transform(X_train)

    X_test_scaled = self.scaler.transform(X_test)

    

    # Train Random Forest model

    self.text_model = RandomForestClassifier(

        n_estimators=100,

        max_depth=10,

        min_samples_split=5,

        random_state=42,

        class_weight='balanced'

    )

    

    self.text_model.fit(X_train_scaled, y_train)

    

    # Evaluate model

    train_score = self.text_model.score(X_train_scaled, y_train)

    test_score = self.text_model.score(X_test_scaled, y_test)

    

    print(f"Training Accuracy: {train_score:.3f}")

    print(f"Test Accuracy: {test_score:.3f}")

    

    # Cross-validation

    cv_scores = cross_val_score(self.text_model, X_train_scaled, y_train, cv=5)

    print(f"CV Accuracy: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")

    

    # Classification report

    y_pred = self.text_model.predict(X_test_scaled)

    print("\nClassification Report:")

    print(classification_report(y_test, y_pred))

    

    return {

        'train_accuracy': train_score,

        'test_accuracy': test_score,

        'cv_accuracy': cv_scores.mean(),

        'cv_std': cv_scores.std(),

        'classification_report': classification_report(y_test, y_pred, output_dict=True)

    }


def predict_seasons_text(self, product_names, descriptions=None):

    """

    Predict seasons for products using trained text model

    

    Args:

        product_names: List of product names

        descriptions: List of product descriptions (optional)

    

    Returns:

        predictions and confidence scores

    """

    if self.text_model is None:

        raise ValueError("Model not trained. Call train_semi_supervised_model first.")

    

    # Prepare texts

    if descriptions is None:

        descriptions = [""] * len(product_names)

    

    texts = [str(name) + " " + str(desc) for name, desc in zip(product_names, descriptions)]

    

    # Extract features

    X = self.extract_korean_text_features(texts)

    X_scaled = self.scaler.transform(X)

    

    # Predict

    predictions = self.text_model.predict(X_scaled)

    probabilities = self.text_model.predict_proba(X_scaled)

    

    # Get confidence scores (max probability)

    confidence_scores = np.max(probabilities, axis=1)

    

    return predictions, confidence_scores


def classify_all_products(self, df):

    """

    Complete classification pipeline: K-W for sufficient data, text model for insufficient data

    

    Args:

        df: DataFrame with all product data

        

    Returns:

        DataFrame with complete classifications

    """

    print("Step 1: Running Kruskal-Wallis analysis...")

    kw_results = self.classify_products_kw(df)

    

    print(f"Step 2: Training semi-supervised model on {len(self.high_confidence_products)} high-confidence products...")

    model_performance = self.train_semi_supervised_model(kw_results)

    

    print("Step 3: Predicting seasons for products with insufficient data...")

    

    # Get products with insufficient data

    insufficient_data = kw_results[kw_results['confidence'] == 'low'].copy()

    

    if len(insufficient_data) > 0 and self.text_model is not None:

        # Predict using text model

        names = insufficient_data['product_name'].fillna("").tolist()

        descriptions = insufficient_data['description'].fillna("").tolist()

        

        predictions, confidence_scores = self.predict_seasons_text(names, descriptions)

        

        # Update insufficient data products with text predictions

        insufficient_data['pattern'] = predictions

        insufficient_data['confidence'] = ['medium' if conf > 0.7 else 'low' for conf in confidence_scores]

        insufficient_data['classification_method'] = 'text_model'

        insufficient_data['text_confidence'] = confidence_scores

        

        # Update main results

        for idx, row in insufficient_data.iterrows():

            kw_results.loc[kw_results['product_id'] == row['product_id'], 'pattern'] = row['pattern']

            kw_results.loc[kw_results['product_id'] == row['product_id'], 'confidence'] = row['confidence']

            kw_results.loc[kw_results['product_id'] == row['product_id'], 'classification_method'] = 'text_model'

    

    return kw_results, model_performance

```


# Example usage and testing


def create_sample_data():

“”“Create sample data for testing”””

np.random.seed(42)


```

# Sample product data

products = [

    {'id': 1, 'name': '여름 반팔 티셔츠', 'desc': '시원한 면 소재 반팔'},

    {'id': 2, 'name': '겨울 패딩 점퍼', 'desc': '따뜻한 다운 패딩 코트'},

    {'id': 3, 'name': '봄 가디건', 'desc': '가벼운 니트 가디건'},

    {'id': 4, 'name': '가을 자켓', 'desc': '트렌치 스타일 아우터'},

    {'id': 5, 'name': '사계절 청바지', 'desc': '기본 데님 팬츠'},

]


# Generate sample data with seasonal patterns

data = []

start_date = datetime(2023, 1, 1)


for product in products:

    for week in range(52 * 2):  # 2 years of data

        current_date = start_date + timedelta(weeks=week)

        

        # Create seasonal patterns

        if '여름' in product['name'] or '반팔' in product['name']:

            # Summer peak

            base_views = 100 if current_date.month in [6, 7, 8] else 20

        elif '겨울' in product['name'] or '패딩' in product['name']:

            # Winter peak

            base_views = 100 if current_date.month in [12, 1, 2] else 25

        elif '봄' in product['name']:

            # Spring peak

            base_views = 100 if current_date.month in [3, 4, 5] else 30

        elif '가을' in product['name']:

            # Fall peak

            base_views = 100 if current_date.month in [9, 10, 11] else 30

        else:

            # Year-round

            base_views = 50

        

        # Add noise

        views = max(0, int(base_views + np.random.normal(0, 10)))

        

        data.append({

            'product_id': product['id'],

            'product_name': product['name'],

            'description': product['desc'],

            'date': current_date,

            'views': views

        })


return pd.DataFrame(data)

```


# Test the classifier


if **name** == “**main**”:

# Create sample data

sample_df = create_sample_data()

print(“Sample data created with”, len(sample_df), “records”)


```

# Initialize classifier

classifier = SeasonalClassifier(min_weeks=16, min_observations_per_season=2)


# Run complete classification

results, performance = classifier.classify_all_products(sample_df)


print("\n=== CLASSIFICATION RESULTS ===")

print(results[['product_id', 'product_name', 'pattern', 'confidence', 'classification_method']].to_string())


if performance:

    print(f"\n=== MODEL PERFORMANCE ===")

    print(f"Test Accuracy: {performance['test_accuracy']:.3f}")

    print(f"CV Accuracy: {performance['cv_accuracy']:.3f}")

```

Comments

Popular posts from this blog

Beyond Seasonality: Integrating External Events in Time Series Analysis

Improving Time Series Analysis: STL Decomposition with Price Normalization

Building a Smart Keyword Seasonality Analyzer: From Expert Intuition to AI-Powered Insights