import os
import time
from pathlib import Path
from PIL import Image
import argparse
class PNGToJPGMonitor:
def __init__(self, watch_dir, output_dir, max_size_mb=2):
self.watch_dir = Path(watch_dir)
self.output_dir = Path(output_dir)
self.max_size_bytes = max_size_mb * 1024 * 1024
self.processed_files = set()
# Create output directory if it doesn't exist
self.output_dir.mkdir(parents=True, exist_ok=True)
# Track existing PNG files to ignore them
self._scan_existing_files()
def _scan_existing_files(self):
"""Scan directory for existing PNG files to ignore them"""
if self.watch_dir.exists():
for file_path in self.watch_dir.glob('*.png'):
self.processed_files.add(file_path.name)
print(f"Found {len(self.processed_files)} existing PNG files to ignore")
def _convert_png_to_jpg(self, png_path):
"""Convert PNG to JPG with size optimization"""
try:
# Open the PNG image
with Image.open(png_path) as img:
# Convert RGBA to RGB if necessary (PNG supports transparency, JPG doesn't)
if img.mode in ('RGBA', 'LA', 'P'):
# Create white background
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
# Generate output filename
output_filename = png_path.stem + '.jpg'
output_path = self.output_dir / output_filename
# Start with high quality and reduce if needed
quality = 95
while quality > 10:
# Save to a temporary path first
temp_path = output_path.with_suffix('.tmp.jpg')
img.save(temp_path, 'JPEG', quality=quality, optimize=True)
# Check file size
if temp_path.stat().st_size <= self.max_size_bytes:
# Size is acceptable, move to final location
temp_path.rename(output_path)
print(f"✓ Converted {png_path.name} → {output_filename} (quality: {quality}%, size: {output_path.stat().st_size / 1024:.1f} KB)")
return True
else:
# File too large, reduce quality
temp_path.unlink()
quality -= 10
# If we get here, even lowest quality is too large
# Try reducing image dimensions
return self._resize_and_convert(img, output_path, png_path.name)
except Exception as e:
print(f"✗ Error converting {png_path.name}: {e}")
return False
def _resize_and_convert(self, img, output_path, original_name):
"""Resize image and convert if quality reduction wasn't enough"""
try:
width, height = img.size
scale_factor = 0.9
while scale_factor > 0.3: # Don't make it too small
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Try with medium quality
temp_path = output_path.with_suffix('.tmp.jpg')
resized_img.save(temp_path, 'JPEG', quality=85, optimize=True)
if temp_path.stat().st_size <= self.max_size_bytes:
temp_path.rename(output_path)
print(f"✓ Converted {original_name} → {output_path.name} (resized to {new_width}x{new_height}, size: {output_path.stat().st_size / 1024:.1f} KB)")
return True
else:
temp_path.unlink()
scale_factor -= 0.1
print(f"✗ Could not reduce {original_name} to under {self.max_size_bytes / 1024 / 1024} MB")
return False
except Exception as e:
print(f"✗ Error resizing {original_name}: {e}")
return False
def _check_for_new_files(self):
"""Check for new PNG files in the watch directory"""
if not self.watch_dir.exists():
return
current_files = set()
for file_path in self.watch_dir.glob('*.png'):
current_files.add(file_path.name)
# Find new files
new_files = current_files - self.processed_files
for filename in new_files:
file_path = self.watch_dir / filename
print(f"📁 New PNG detected: {filename}")
if self._convert_png_to_jpg(file_path):
self.processed_files.add(filename)
else:
# Still mark as processed to avoid repeated attempts
self.processed_files.add(filename)
def run(self, check_interval=2):
"""Main monitoring loop"""
print(f"🔍 Monitoring directory: {self.watch_dir}")
print(f"📤 Output directory: {self.output_dir}")
print(f"📏 Max file size: {self.max_size_bytes / 1024 / 1024} MB")
print(f"⏱️ Check interval: {check_interval} seconds")
print("Press Ctrl+C to stop monitoring...\n")
try:
while True:
self._check_for_new_files()
time.sleep(check_interval)
except KeyboardInterrupt:
print("\n🛑 Monitoring stopped by user")
def main():
parser = argparse.ArgumentParser(description='Monitor directory for PNG files and convert to JPG')
parser.add_argument('watch_dir', help='Directory to watch for PNG files')
parser.add_argument('output_dir', help='Directory to save converted JPG files')
parser.add_argument('--max-size', type=float, default=2.0,
help='Maximum JPG file size in MB (default: 2.0)')
parser.add_argument('--interval', type=int, default=2,
help='Check interval in seconds (default: 2)')
args = parser.parse_args()
# Validate directories
watch_path = Path(args.watch_dir)
if not watch_path.exists():
print(f"Error: Watch directory '{args.watch_dir}' does not exist")
return
if not watch_path.is_dir():
print(f"Error: '{args.watch_dir}' is not a directory")
return
# Create and run monitor
monitor = PNGToJPGMonitor(args.watch_dir, args.output_dir, args.max_size)
monitor.run(args.interval)
if __name__ == "__main__":
main()