production_data)CREATE TABLE production_data (
id INT PRIMARY KEY AUTO_INCREMENT,
machine_id VARCHAR(50),
shift_date DATE,
shift_name VARCHAR(20),
planned_time_minutes INT,
actual_production_count INT,
planned_production_count INT,
good_count INT,
rejected_count INT,
total_count INT,
downtime_minutes INT,
maintenance_downtime_minutes INT,
changeover_time_minutes INT,
minor_stops_minutes INT,
created_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);downtime_events)CREATE TABLE downtime_events (
id INT PRIMARY KEY AUTO_INCREMENT,
machine_id VARCHAR(50),
event_start TIMESTAMP,
event_end TIMESTAMP,
downtime_type VARCHAR(50), -- 'Breakdown', 'Maintenance', 'Changeover', 'Minor_Stop'
downtime_reason VARCHAR(200),
duration_minutes INT,
operator_notes TEXT,
created_by VARCHAR(50)
);shift_config)CREATE TABLE shift_config (
id INT PRIMARY KEY AUTO_INCREMENT,
shift_name VARCHAR(20),
start_time TIME,
end_time TIME,
planned_duration_minutes INT,
break_duration_minutes INT,
is_active BOOLEAN DEFAULT TRUE
);OEE/
├── RotoMolding_Machine_01/
│ ├── Current_Shift/
│ │ ├── Availability_Percent (Float)
│ │ ├── Performance_Percent (Float)
│ │ ├── Quality_Percent (Float)
│ │ ├── OEE_Percent (Float)
│ │ ├── Planned_Time_Minutes (Integer)
│ │ ├── Actual_Runtime_Minutes (Integer)
│ │ ├── Downtime_Minutes (Integer)
│ │ └── Maintenance_Downtime_Minutes (Integer)
│ │
│ ├── Daily/
│ │ ├── Daily_Availability (Float)
│ │ ├── Daily_Performance (Float)
│ │ ├── Daily_Quality (Float)
│ │ ├── Daily_OEE (Float)
│ │ ├── Planned_Production_Target (Integer)
│ │ └── Actual_Production_Count (Integer)
│ │
│ └── Configuration/
│ ├── Standard_Cycle_Time_Seconds (Integer)
│ ├── Target_OEE_Percent (Float)
│ ├── Shift_Duration_Hours (Float)
│ └── Planned_Break_Minutes (Integer)PLC_Data/
├── Machine_Status (Boolean) -- On/Off from PLC
├── Production_Count (Integer) -- Total production
├── Good_Count (Integer) -- Good parts
├── Rejected_Count (Integer) -- Rejected parts
├── Cycle_Active (Boolean) -- Current cycle status
├── Fault_Code (Integer) -- Machine fault code
└── Current_Cycle_Time (Integer) -- Current cycle durationdef calculate_availability():
machine_id = "RotoMolding_Machine_01"
# Get current shift information
current_shift = get_current_shift()
shift_start = current_shift['start_time']
shift_end = current_shift['end_time']
planned_time = current_shift['planned_duration_minutes']
# Calculate elapsed time in current shift
now = system.date.now()
elapsed_minutes = get_elapsed_shift_minutes(shift_start, now)
# Get machine status from PLC
machine_status = system.tag.readBlocking("[default]PLC_Data/Machine_Status")[0].value
# Calculate downtime
downtime_minutes = calculate_downtime(machine_id, shift_start, now)
maintenance_downtime = get_maintenance_downtime(machine_id, shift_start, now)
# Calculate availability
actual_runtime = elapsed_minutes - downtime_minutes - maintenance_downtime
availability = (actual_runtime / planned_time) * 100 if planned_time > 0 else 0
# Update availability tag
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/Availability_Percent", availability)
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/Actual_Runtime_Minutes", actual_runtime)
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/Downtime_Minutes", downtime_minutes)
def calculate_downtime(machine_id, shift_start, current_time):
# Query downtime events from database
query = """
SELECT SUM(duration_minutes) as total_downtime
FROM downtime_events
WHERE machine_id = ?
AND event_start >= ?
AND event_start <= ?
AND downtime_type != 'Maintenance'
"""
result = system.db.runPrepQuery(query, [machine_id, shift_start, current_time])
return result[0]['total_downtime'] if result[0]['total_downtime'] else 0
def get_maintenance_downtime(machine_id, shift_start, current_time):
# Query maintenance downtime separately
query = """
SELECT SUM(duration_minutes) as maintenance_downtime
FROM downtime_events
WHERE machine_id = ?
AND event_start >= ?
AND event_start <= ?
AND downtime_type = 'Maintenance'
"""
result = system.db.runPrepQuery(query, [machine_id, shift_start, current_time])
return result[0]['maintenance_downtime'] if result[0]['maintenance_downtime'] else 0def calculate_performance():
machine_id = "RotoMolding_Machine_01"
# Get planned production (manually entered daily)
planned_production = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Daily/Planned_Production_Target")[0].value
# Get actual production from PLC
actual_production = system.tag.readBlocking("[default]PLC_Data/Production_Count")[0].value
# Get standard cycle time
standard_cycle_time = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Configuration/Standard_Cycle_Time_Seconds")[0].value
# Calculate performance based on actual vs planned
if planned_production > 0:
performance = (actual_production / planned_production) * 100
else:
performance = 0
# Alternative calculation: Based on ideal cycle time
# actual_runtime_minutes = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Actual_Runtime_Minutes")[0].value
# ideal_production = (actual_runtime_minutes * 60) / standard_cycle_time
# performance = (actual_production / ideal_production) * 100 if ideal_production > 0 else 0
# Update performance tag
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/Performance_Percent", performance)
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Actual_Production_Count", actual_production)
# Schedule this function to run every 5 minutesdef calculate_quality():
# Get counts from PLC
good_count = system.tag.readBlocking("[default]PLC_Data/Good_Count")[0].value
total_count = system.tag.readBlocking("[default]PLC_Data/Production_Count")[0].value
rejected_count = system.tag.readBlocking("[default]PLC_Data/Rejected_Count")[0].value
# Validate data consistency
if total_count != (good_count + rejected_count):
# Log data inconsistency
logger.warn("Data inconsistency detected: Total count doesn't match good + rejected")
# Calculate quality
if total_count > 0:
quality = (good_count / total_count) * 100
else:
quality = 100 # No production yet, assume 100%
# Update quality tag
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/Quality_Percent", quality)
# This function runs on PLC tag change eventsdef calculate_oee():
# Read the three factors
availability = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Availability_Percent")[0].value
performance = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Performance_Percent")[0].value
quality = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Quality_Percent")[0].value
# Calculate OEE
oee = (availability * performance * quality) / 10000
# Update OEE tag
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Current_Shift/OEE_Percent", oee)
# Log significant changes
if oee < 50: # Below target threshold
logger.info(f"OEE Alert: Current OEE is {oee:.1f}% (A:{availability:.1f}%, P:{performance:.1f}%, Q:{quality:.1f}%)")
# Store in database for historical tracking
store_oee_data(availability, performance, quality, oee)
def store_oee_data(availability, performance, quality, oee):
query = """
INSERT INTO production_data
(machine_id, shift_date, shift_name, availability_percent, performance_percent, quality_percent, oee_percent)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
availability_percent = VALUES(availability_percent),
performance_percent = VALUES(performance_percent),
quality_percent = VALUES(quality_percent),
oee_percent = VALUES(oee_percent),
updated_timestamp = CURRENT_TIMESTAMP
"""
machine_id = "RotoMolding_Machine_01"
shift_date = system.date.format(system.date.now(), "yyyy-MM-dd")
shift_name = get_current_shift_name()
system.db.runPrepUpdate(query, [machine_id, shift_date, shift_name, availability, performance, quality, oee])def on_machine_status_change(tag, tagpath, previousValue, currentValue, initialChange, missedEvents):
"""
Triggered when machine status changes from PLC
"""
if not initialChange:
machine_id = "RotoMolding_Machine_01"
timestamp = system.date.now()
if currentValue.value == False and previousValue.value == True:
# Machine went down - start downtime event
start_downtime_event(machine_id, timestamp)
elif currentValue.value == True and previousValue.value == False:
# Machine came back up - end downtime event
end_downtime_event(machine_id, timestamp)
def start_downtime_event(machine_id, timestamp):
query = """
INSERT INTO downtime_events (machine_id, event_start, downtime_type, downtime_reason)
VALUES (?, ?, 'Breakdown', 'Automatic Detection - Machine Status Off')
"""
system.db.runPrepUpdate(query, [machine_id, timestamp])
def end_downtime_event(machine_id, timestamp):
# Find the latest open downtime event
query = """
SELECT id, event_start FROM downtime_events
WHERE machine_id = ? AND event_end IS NULL
ORDER BY event_start DESC LIMIT 1
"""
result = system.db.runPrepQuery(query, [machine_id])
if result:
event_id = result[0]['id']
event_start = result[0]['event_start']
duration_minutes = (timestamp.getTime() - event_start.getTime()) / (1000 * 60)
update_query = """
UPDATE downtime_events
SET event_end = ?, duration_minutes = ?
WHERE id = ?
"""
system.db.runPrepUpdate(update_query, [timestamp, duration_minutes, event_id])def update_planned_production(new_target):
"""
Called from HMI when operator enters planned production
"""
machine_id = "RotoMolding_Machine_01"
shift_date = system.date.format(system.date.now(), "yyyy-MM-dd")
# Update memory tag
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Planned_Production_Target", new_target)
# Store in database
query = """
INSERT INTO production_data (machine_id, shift_date, planned_production_count)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE planned_production_count = VALUES(planned_production_count)
"""
system.db.runPrepUpdate(query, [machine_id, shift_date, new_target])def add_maintenance_downtime(start_time, end_time, reason, operator_notes):
"""
Called from HMI when operator logs maintenance downtime
"""
machine_id = "RotoMolding_Machine_01"
duration_minutes = (end_time.getTime() - start_time.getTime()) / (1000 * 60)
query = """
INSERT INTO downtime_events
(machine_id, event_start, event_end, downtime_type, downtime_reason, duration_minutes, operator_notes, created_by)
VALUES (?, ?, ?, 'Maintenance', ?, ?, ?, ?)
"""
operator = system.security.getUsername()
system.db.runPrepUpdate(query, [machine_id, start_time, end_time, reason, duration_minutes, operator_notes, operator])def process_shift_completion():
"""
Called at end of each shift to calculate final OEE values
"""
machine_id = "RotoMolding_Machine_01"
shift_date = system.date.format(system.date.now(), "yyyy-MM-dd")
current_shift = get_current_shift_name()
# Get final values
availability = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Availability_Percent")[0].value
performance = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Performance_Percent")[0].value
quality = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Quality_Percent")[0].value
oee = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/OEE_Percent")[0].value
# Store final shift data
query = """
UPDATE production_data
SET availability_percent = ?, performance_percent = ?, quality_percent = ?, oee_percent = ?
WHERE machine_id = ? AND shift_date = ? AND shift_name = ?
"""
system.db.runPrepUpdate(query, [availability, performance, quality, oee, machine_id, shift_date, current_shift])
# Reset counters for next shift
reset_shift_counters()
def calculate_daily_oee():
"""
Aggregate OEE across all shifts for daily reporting
"""
machine_id = "RotoMolding_Machine_01"
today = system.date.format(system.date.now(), "yyyy-MM-dd")
query = """
SELECT
AVG(availability_percent) as avg_availability,
AVG(performance_percent) as avg_performance,
AVG(quality_percent) as avg_quality,
AVG(oee_percent) as avg_oee
FROM production_data
WHERE machine_id = ? AND shift_date = ?
"""
result = system.db.runPrepQuery(query, [machine_id, today])
if result:
daily_data = result[0]
# Update daily OEE tags
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Daily_Availability", daily_data['avg_availability'])
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Daily_Performance", daily_data['avg_performance'])
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Daily_Quality", daily_data['avg_quality'])
system.tag.writeAsync("[default]OEE/RotoMolding_Machine_01/Daily/Daily_OEE", daily_data['avg_oee'])def validate_oee_data():
"""
Periodic validation of OEE calculation data
"""
# Check for data inconsistencies
good_count = system.tag.readBlocking("[default]PLC_Data/Good_Count")[0].value
rejected_count = system.tag.readBlocking("[default]PLC_Data/Rejected_Count")[0].value
total_count = system.tag.readBlocking("[default]PLC_Data/Production_Count")[0].value
# Validate production counts
if total_count != (good_count + rejected_count):
# Log error and potentially reset counters
logger.error(f"Production count mismatch: Total={total_count}, Good={good_count}, Rejected={rejected_count}")
# Optionally trigger alarm
system.alarm.acknowledge("OEE/Data_Inconsistency")
# Validate time calculations
availability = system.tag.readBlocking("[default]OEE/RotoMolding_Machine_01/Current_Shift/Availability_Percent")[0].value
if availability > 100:
logger.warn(f"Availability calculation error: {availability}% > 100%")
# Trigger recalculation
calculate_availability()
# Schedule validation to run every 15 minutesdef get_oee_dashboard_data(start_date, end_date, machine_id):
"""
Retrieve OEE data for dashboard visualization
"""
query = """
SELECT
shift_date,
shift_name,
availability_percent,
performance_percent,
quality_percent,
oee_percent,
actual_production_count,
planned_production_count,
good_count,
rejected_count
FROM production_data
WHERE machine_id = ?
AND shift_date BETWEEN ? AND ?
ORDER BY shift_date DESC, shift_name
"""
return system.db.runPrepQuery(query, [machine_id, start_date, end_date])
def calculate_oee_losses():
"""
Calculate the six big losses for OEE analysis
"""
# Availability Losses
availability_losses = {
'breakdowns': get_breakdown_time(),
'setup_adjustments': get_setup_time()
}
# Performance Losses
performance_losses = {
'minor_stops': get_minor_stops_time(),
'reduced_speed': get_reduced_speed_losses()
}
# Quality Losses
quality_losses = {
'defects': get_defect_count(),
'startup_rejects': get_startup_rejects()
}
return {
'availability': availability_losses,
'performance': performance_losses,
'quality': quality_losses
}This comprehensive OEE calculation flow provides:
The system handles edge cases like shift changes, maintenance periods, and data inconsistencies while providing accurate OEE calculations for operational decision-making.