#!/usr/bin/env python3
import requests
import json
import time
import os
from datetime import datetime
from typing import Set, Dict, List, Optional
class DropMonitor:
def __init__(self, api_url: str, storage_file: str = "known_slugs.json"):
self.api_url = api_url
self.storage_file = storage_file
self.known_slugs: Set[str] = self.load_known_slugs()
def load_known_slugs(self) -> Set[str]:
"""Load previously seen slugs from storage file."""
if os.path.exists(self.storage_file):
try:
with open(self.storage_file, 'r') as f:
data = json.load(f)
return set(data.get('slugs', []))
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not load known slugs: {e}")
return set()
def save_known_slugs(self) -> None:
"""Save current known slugs to storage file."""
try:
with open(self.storage_file, 'w') as f:
json.dump({
'slugs': list(self.known_slugs),
'last_updated': datetime.now().isoformat()
}, f, indent=2)
except IOError as e:
print(f"Warning: Could not save known slugs: {e}")
def fetch_data(self) -> Optional[Dict]:
"""Fetch data from the API endpoint."""
try:
response = requests.get(self.api_url, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching data: {e}")
return None
except json.JSONDecodeError as e:
print(f"Error parsing JSON response: {e}")
return None
def extract_pack_templates(self, data: Dict) -> List[Dict]:
"""Extract packTemplates from API response."""
pack_templates = data.get('packTemplates', [])
if not isinstance(pack_templates, list):
print("Warning: packTemplates is not a list")
return []
return pack_templates
def check_for_new_drops(self) -> List[Dict]:
"""Check for new drops and return details of any new ones found."""
data = self.fetch_data()
if not data:
return []
pack_templates = self.extract_pack_templates(data)
new_drops = []
current_slugs = set()
for template in pack_templates:
slug = template.get('slug')
if not slug:
print("Warning: Found packTemplate without slug")
continue
current_slugs.add(slug)
# Check if this is a new drop
if slug not in self.known_slugs:
new_drops.append(template)
print(f"🆕 NEW DROP DETECTED: {slug}")
self.print_drop_details(template)
# Update known slugs with current ones
self.known_slugs.update(current_slugs)
self.save_known_slugs()
return new_drops
def print_drop_details(self, drop: Dict) -> None:
"""Print detailed information about a drop."""
print("=" * 60)
print("NEW DROP DETAILS:")
print("=" * 60)
# Print key information
slug = drop.get('slug', 'Unknown')
name = drop.get('name', 'Unknown')
description = drop.get('description', 'No description')
print(f"🏷️ Slug: {slug}")
print(f"📦 Name: {name}")
print(f"📝 Description: {description}")
# Print other available fields
for key, value in drop.items():
if key not in ['slug', 'name', 'description']:
print(f"🔹 {key}: {value}")
print("=" * 60)
print()
def monitor_continuously(self, interval: int = 300) -> None:
"""Continuously monitor for new drops at specified interval (seconds)."""
print(f"🔍 Starting drop monitor...")
print(f"📡 API URL: {self.api_url}")
print(f"⏱️ Check interval: {interval} seconds")
print(f"💾 Storage file: {self.storage_file}")
print(f"📊 Currently tracking {len(self.known_slugs)} known drops")
print("-" * 60)
while True:
try:
print(f"🔄 Checking for new drops at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
new_drops = self.check_for_new_drops()
if not new_drops:
print("✅ No new drops found")
else:
print(f"🎉 Found {len(new_drops)} new drop(s)!")
print(f"😴 Sleeping for {interval} seconds...")
print("-" * 40)
time.sleep(interval)
except KeyboardInterrupt:
print("\n🛑 Monitoring stopped by user")
break
except Exception as e:
print(f"❌ Unexpected error: {e}")
print(f"🔄 Retrying in {interval} seconds...")
time.sleep(interval)
def main():
# Configuration
API_URL = "https://api.prod.rock-palisade-352518.com/collectibles/burn-stats"
CHECK_INTERVAL = 300 # 5 minutes
# Create monitor instance
monitor = DropMonitor(API_URL)
# Option 1: Single check
print("Choose an option:")
print("1. Single check for new drops")
print("2. Continuous monitoring")
choice = input("Enter your choice (1 or 2): ").strip()
if choice == "1":
print("🔍 Performing single check...")
new_drops = monitor.check_for_new_drops()
if new_drops:
print(f"✨ Found {len(new_drops)} new drop(s)")
else:
print("✅ No new drops found")
elif choice == "2":
# Option 2: Continuous monitoring
try:
custom_interval = input(f"Enter check interval in seconds (default: {CHECK_INTERVAL}): ").strip()
if custom_interval:
CHECK_INTERVAL = int(custom_interval)
except ValueError:
print(f"Invalid input, using default interval: {CHECK_INTERVAL} seconds")
monitor.monitor_continuously(CHECK_INTERVAL)
else:
print("Invalid choice. Exiting.")
if __name__ == "__main__":
main()