Seasonality code
import pandas as pd
import numpy as np
from scipy import stats
from scipy.signal import find_peaks
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings(‘ignore’)
class SeasonalClassifier:
def **init**(self, min_weeks=16, min_observations_per_season=2):
“””
Initialize the seasonal classifier
```
Args:
min_weeks: Minimum weeks of data required for K-W test
min_observations_per_season: Minimum observations per season for reliability
"""
self.min_weeks = min_weeks
self.min_observations_per_season = min_observations_per_season
self.high_confidence_products = []
self.text_model = None
self.tfidf_vectorizer = None
self.scaler = StandardScaler()
# Korean seasonal keywords for feature engineering
self.korean_seasonal_keywords = {
'spring': ['봄', '스프링', '경량', '얇은', '가벼운', '신선한', '파스텔', '벚꽃'],
'summer': ['여름', '썸머', '시원한', '반팔', '반바지', '린넨', '면', '민소매', '샌들', '수영복'],
'fall': ['가을', '아우터', '자켓', '긴팔', '브라운', '베이지', '트렌치', '부츠'],
'winter': ['겨울', '윈터', '따뜻한', '두꺼운', '패딩', '코트', '스카프', '부츠', '울', '플리스']
}
def assign_season(self, date):
"""Assign Korean season based on date"""
month = date.month
if month in [3, 4, 5]:
return 'spring'
elif month in [6, 7, 8]:
return 'summer'
elif month in [9, 10, 11]:
return 'fall'
else:
return 'winter'
def prepare_seasonal_data(self, df):
"""
Prepare data for seasonal analysis
Args:
df: DataFrame with columns ['product_id', 'date', 'views', 'product_name', 'description']
Returns:
DataFrame with seasonal aggregations
"""
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
df['season'] = df['date'].apply(self.assign_season)
df['year'] = df['date'].dt.year
df['week'] = df['date'].dt.isocalendar().week
df['year_week'] = df['year'].astype(str) + '_' + df['week'].astype(str)
# Weekly aggregation by season
weekly_seasonal = df.groupby(['product_id', 'year_week', 'season']).agg({
'views': 'sum',
'product_name': 'first',
'description': 'first'
}).reset_index()
return weekly_seasonal
def check_data_sufficiency(self, product_data):
"""Check if product has sufficient data for K-W test"""
total_weeks = len(product_data)
seasons_present = product_data['season'].nunique()
# Check minimum weeks
if total_weeks < self.min_weeks:
return False, f"Insufficient weeks: {total_weeks} < {self.min_weeks}"
# Check seasons represented
if seasons_present < 3:
return False, f"Insufficient seasons: {seasons_present} < 3"
# Check observations per season
season_counts = product_data.groupby('season').size()
min_obs_per_season = season_counts.min()
if min_obs_per_season < self.min_observations_per_season:
return False, f"Insufficient obs per season: {min_obs_per_season} < {self.min_observations_per_season}"
return True, "Sufficient data"
def kruskal_wallis_test(self, product_data):
"""
Perform Kruskal-Wallis test for seasonal differences
Args:
product_data: DataFrame with seasonal data for one product
Returns:
dict with test results
"""
# Group data by season
season_groups = []
season_names = []
for season in ['spring', 'summer', 'fall', 'winter']:
season_data = product_data[product_data['season'] == season]['views'].values
if len(season_data) > 0:
season_groups.append(season_data)
season_names.append(season)
if len(season_groups) < 3:
return {'significant': False, 'p_value': 1.0, 'reason': 'insufficient_seasons'}
# Perform Kruskal-Wallis test
try:
h_stat, p_value = stats.kruskal(*season_groups)
# Calculate seasonal averages for pattern detection
seasonal_avgs = {}
for i, season in enumerate(season_names):
seasonal_avgs[season] = np.mean(season_groups[i])
return {
'significant': p_value < 0.05,
'p_value': p_value,
'h_statistic': h_stat,
'seasonal_averages': seasonal_avgs,
'season_names': season_names
}
except:
return {'significant': False, 'p_value': 1.0, 'reason': 'test_failed'}
def detect_seasonal_pattern(self, seasonal_avgs):
"""
Detect if product has single-modal or bi-modal seasonal pattern
Args:
seasonal_avgs: Dictionary of season -> average views
Returns:
dict with pattern classification
"""
if not seasonal_avgs:
return {'pattern': 'year_round', 'primary_seasons': [], 'confidence': 'low'}
# Convert to sorted array for peak detection
seasons_order = ['spring', 'summer', 'fall', 'winter']
values = []
available_seasons = []
for season in seasons_order:
if season in seasonal_avgs:
values.append(seasonal_avgs[season])
available_seasons.append(season)
if len(values) < 3:
return {'pattern': 'insufficient_data', 'primary_seasons': [], 'confidence': 'low'}
values = np.array(values)
# Normalize values for peak detection
if values.max() > 0:
normalized_values = values / values.max()
else:
normalized_values = values
# Find peaks (seasons with high relative performance)
# Use a threshold of 70% of the maximum value
threshold = 0.7
peak_indices = np.where(normalized_values >= threshold)[0]
peak_seasons = [available_seasons[i] for i in peak_indices]
# Classify pattern
if len(peak_seasons) == 0:
# No clear peaks - check if one season is notably higher
max_idx = np.argmax(values)
if values[max_idx] > np.mean(values) * 1.5: # 50% higher than average
return {
'pattern': 'single_season',
'primary_seasons': [available_seasons[max_idx]],
'confidence': 'medium'
}
else:
return {'pattern': 'year_round', 'primary_seasons': [], 'confidence': 'medium'}
elif len(peak_seasons) == 1:
return {
'pattern': 'single_season',
'primary_seasons': peak_seasons,
'confidence': 'high'
}
elif len(peak_seasons) == 2:
# Check if seasons are adjacent (extended season) or separate (bimodal)
season_indices = [seasons_order.index(s) for s in peak_seasons]
season_indices.sort()
# Handle wrap-around (winter-spring)
if season_indices == [0, 3]: # spring and winter
pattern_type = 'bimodal'
elif abs(season_indices[1] - season_indices[0]) == 1:
pattern_type = 'extended_season'
else:
pattern_type = 'bimodal'
return {
'pattern': pattern_type,
'primary_seasons': peak_seasons,
'confidence': 'high'
}
else:
return {'pattern': 'year_round', 'primary_seasons': peak_seasons, 'confidence': 'medium'}
def classify_products_kw(self, df):
"""
Classify products using Kruskal-Wallis test
Args:
df: DataFrame with seasonal data
Returns:
DataFrame with classifications
"""
seasonal_data = self.prepare_seasonal_data(df)
results = []
for product_id in seasonal_data['product_id'].unique():
product_data = seasonal_data[seasonal_data['product_id'] == product_id].copy()
# Get product text data
product_name = product_data['product_name'].iloc[0]
product_desc = product_data['description'].iloc[0] if pd.notna(product_data['description'].iloc[0]) else ""
# Check data sufficiency
sufficient, reason = self.check_data_sufficiency(product_data)
result = {
'product_id': product_id,
'product_name': product_name,
'description': product_desc,
'data_sufficient': sufficient,
'reason': reason
}
if sufficient:
# Perform K-W test
kw_result = self.kruskal_wallis_test(product_data)
if kw_result['significant']:
# Detect seasonal pattern
pattern_result = self.detect_seasonal_pattern(kw_result.get('seasonal_averages', {}))
result.update({
'kw_significant': True,
'kw_p_value': kw_result['p_value'],
'pattern': pattern_result['pattern'],
'primary_seasons': ','.join(pattern_result['primary_seasons']),
'confidence': 'high',
'classification_method': 'kruskal_wallis'
})
# Store high confidence products for semi-supervised learning
if pattern_result['confidence'] == 'high':
self.high_confidence_products.append(result)
else:
result.update({
'kw_significant': False,
'kw_p_value': kw_result['p_value'],
'pattern': 'year_round',
'primary_seasons': '',
'confidence': 'medium',
'classification_method': 'kruskal_wallis'
})
else:
result.update({
'kw_significant': None,
'kw_p_value': None,
'pattern': 'insufficient_data',
'primary_seasons': '',
'confidence': 'low',
'classification_method': 'insufficient_data'
})
results.append(result)
return pd.DataFrame(results)
def extract_korean_text_features(self, texts):
"""Extract Korean text features using TF-IDF and seasonal keywords"""
# Initialize TF-IDF vectorizer for Korean text
if self.tfidf_vectorizer is None:
self.tfidf_vectorizer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 2),
min_df=2,
max_df=0.95,
stop_words=None, # Keep Korean stopwords for now
lowercase=False # Preserve Korean characters
)
# Fit on training texts
tfidf_features = self.tfidf_vectorizer.fit_transform(texts)
else:
tfidf_features = self.tfidf_vectorizer.transform(texts)
# Add seasonal keyword features
seasonal_features = []
for text in texts:
text_lower = text.lower() if isinstance(text, str) else ""
features = []
for season, keywords in self.korean_seasonal_keywords.items():
# Count keyword matches
keyword_count = sum(1 for keyword in keywords if keyword in text_lower)
features.append(keyword_count)
seasonal_features.append(features)
seasonal_features = np.array(seasonal_features)
# Combine TF-IDF with seasonal keyword features
if seasonal_features.shape[0] > 0:
combined_features = np.hstack([tfidf_features.toarray(), seasonal_features])
else:
combined_features = tfidf_features.toarray()
return combined_features
def train_semi_supervised_model(self, all_products_df):
"""
Train semi-supervised model using high-confidence K-W labels
Args:
all_products_df: DataFrame with all products and their classifications
"""
# Get high-confidence labeled data
high_conf_data = all_products_df[
(all_products_df['confidence'] == 'high') &
(all_products_df['pattern'] != 'insufficient_data')
].copy()
if len(high_conf_data) < 10:
print(f"Warning: Only {len(high_conf_data)} high-confidence samples for training")
return None
print(f"Training with {len(high_conf_data)} high-confidence samples")
# Prepare text data
texts = []
labels = []
for _, row in high_conf_data.iterrows():
text = str(row['product_name']) + " " + str(row['description'] if pd.notna(row['description']) else "")
texts.append(text)
# Create label from pattern and primary seasons
if row['pattern'] == 'single_season' and row['primary_seasons']:
labels.append(row['primary_seasons'].split(',')[0])
elif row['pattern'] == 'bimodal' and row['primary_seasons']:
# For bimodal, create combined label
seasons = row['primary_seasons'].split(',')
if len(seasons) == 2:
labels.append(f"bimodal_{seasons[0]}_{seasons[1]}")
else:
labels.append('year_round')
else:
labels.append('year_round')
# Extract features
X = self.extract_korean_text_features(texts)
y = np.array(labels)
# Split for validation
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Train Random Forest model
self.text_model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
random_state=42,
class_weight='balanced'
)
self.text_model.fit(X_train_scaled, y_train)
# Evaluate model
train_score = self.text_model.score(X_train_scaled, y_train)
test_score = self.text_model.score(X_test_scaled, y_test)
print(f"Training Accuracy: {train_score:.3f}")
print(f"Test Accuracy: {test_score:.3f}")
# Cross-validation
cv_scores = cross_val_score(self.text_model, X_train_scaled, y_train, cv=5)
print(f"CV Accuracy: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
# Classification report
y_pred = self.text_model.predict(X_test_scaled)
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
return {
'train_accuracy': train_score,
'test_accuracy': test_score,
'cv_accuracy': cv_scores.mean(),
'cv_std': cv_scores.std(),
'classification_report': classification_report(y_test, y_pred, output_dict=True)
}
def predict_seasons_text(self, product_names, descriptions=None):
"""
Predict seasons for products using trained text model
Args:
product_names: List of product names
descriptions: List of product descriptions (optional)
Returns:
predictions and confidence scores
"""
if self.text_model is None:
raise ValueError("Model not trained. Call train_semi_supervised_model first.")
# Prepare texts
if descriptions is None:
descriptions = [""] * len(product_names)
texts = [str(name) + " " + str(desc) for name, desc in zip(product_names, descriptions)]
# Extract features
X = self.extract_korean_text_features(texts)
X_scaled = self.scaler.transform(X)
# Predict
predictions = self.text_model.predict(X_scaled)
probabilities = self.text_model.predict_proba(X_scaled)
# Get confidence scores (max probability)
confidence_scores = np.max(probabilities, axis=1)
return predictions, confidence_scores
def classify_all_products(self, df):
"""
Complete classification pipeline: K-W for sufficient data, text model for insufficient data
Args:
df: DataFrame with all product data
Returns:
DataFrame with complete classifications
"""
print("Step 1: Running Kruskal-Wallis analysis...")
kw_results = self.classify_products_kw(df)
print(f"Step 2: Training semi-supervised model on {len(self.high_confidence_products)} high-confidence products...")
model_performance = self.train_semi_supervised_model(kw_results)
print("Step 3: Predicting seasons for products with insufficient data...")
# Get products with insufficient data
insufficient_data = kw_results[kw_results['confidence'] == 'low'].copy()
if len(insufficient_data) > 0 and self.text_model is not None:
# Predict using text model
names = insufficient_data['product_name'].fillna("").tolist()
descriptions = insufficient_data['description'].fillna("").tolist()
predictions, confidence_scores = self.predict_seasons_text(names, descriptions)
# Update insufficient data products with text predictions
insufficient_data['pattern'] = predictions
insufficient_data['confidence'] = ['medium' if conf > 0.7 else 'low' for conf in confidence_scores]
insufficient_data['classification_method'] = 'text_model'
insufficient_data['text_confidence'] = confidence_scores
# Update main results
for idx, row in insufficient_data.iterrows():
kw_results.loc[kw_results['product_id'] == row['product_id'], 'pattern'] = row['pattern']
kw_results.loc[kw_results['product_id'] == row['product_id'], 'confidence'] = row['confidence']
kw_results.loc[kw_results['product_id'] == row['product_id'], 'classification_method'] = 'text_model'
return kw_results, model_performance
```
# Example usage and testing
def create_sample_data():
“”“Create sample data for testing”””
np.random.seed(42)
```
# Sample product data
products = [
{'id': 1, 'name': '여름 반팔 티셔츠', 'desc': '시원한 면 소재 반팔'},
{'id': 2, 'name': '겨울 패딩 점퍼', 'desc': '따뜻한 다운 패딩 코트'},
{'id': 3, 'name': '봄 가디건', 'desc': '가벼운 니트 가디건'},
{'id': 4, 'name': '가을 자켓', 'desc': '트렌치 스타일 아우터'},
{'id': 5, 'name': '사계절 청바지', 'desc': '기본 데님 팬츠'},
]
# Generate sample data with seasonal patterns
data = []
start_date = datetime(2023, 1, 1)
for product in products:
for week in range(52 * 2): # 2 years of data
current_date = start_date + timedelta(weeks=week)
# Create seasonal patterns
if '여름' in product['name'] or '반팔' in product['name']:
# Summer peak
base_views = 100 if current_date.month in [6, 7, 8] else 20
elif '겨울' in product['name'] or '패딩' in product['name']:
# Winter peak
base_views = 100 if current_date.month in [12, 1, 2] else 25
elif '봄' in product['name']:
# Spring peak
base_views = 100 if current_date.month in [3, 4, 5] else 30
elif '가을' in product['name']:
# Fall peak
base_views = 100 if current_date.month in [9, 10, 11] else 30
else:
# Year-round
base_views = 50
# Add noise
views = max(0, int(base_views + np.random.normal(0, 10)))
data.append({
'product_id': product['id'],
'product_name': product['name'],
'description': product['desc'],
'date': current_date,
'views': views
})
return pd.DataFrame(data)
```
# Test the classifier
if **name** == “**main**”:
# Create sample data
sample_df = create_sample_data()
print(“Sample data created with”, len(sample_df), “records”)
```
# Initialize classifier
classifier = SeasonalClassifier(min_weeks=16, min_observations_per_season=2)
# Run complete classification
results, performance = classifier.classify_all_products(sample_df)
print("\n=== CLASSIFICATION RESULTS ===")
print(results[['product_id', 'product_name', 'pattern', 'confidence', 'classification_method']].to_string())
if performance:
print(f"\n=== MODEL PERFORMANCE ===")
print(f"Test Accuracy: {performance['test_accuracy']:.3f}")
print(f"CV Accuracy: {performance['cv_accuracy']:.3f}")
```
Comments
Post a Comment