Layer 1 is the sensory nervous system of the Factory Intelligence System. It transforms three chaotic data streams (PLC, ERP, and Human inputs) into a unified, intelligent data platform ready for gap discovery and value creation.
Factories generate three types of data, each with unique problems:
Layer 1 solves this by creating an intelligent collection system that processes data at the source, unifies storage, and maintains business context - all using free tools.
What It Is:
The Core Problem: Imagine a temperature sensor sending readings every second:
This creates three problems:
Our Strategic Solution - Edge Intelligence:
Instead of collecting everything and processing later (traditional approach), we implement intelligence at the edge:
Traditional: Sensor → Database → Process → Insights (fails at scale)
Our Way: Sensor → Edge Processing → Intelligent Data → Database → Immediate InsightsThis means:
Why This Philosophy Works:
What It Is:
The Core Problem: Real-time KPIs need ERP context for every calculation:
If we query ERP directly:
Our Strategic Solution - Intelligent Caching:
We create a synchronized cache of only the active data needed for real-time decisions:
What We Cache:
- Active production orders (next 24 hours)
- Product specifications for active products
- Quality standards for current production
- Current bill of materials
What We Don't Cache:
- Historical orders (query when needed)
- Inactive products
- Financial data
- Archived records
Refresh Strategy:
- Orders: Every 5 minutes (they change rarely during shift)
- Products: Daily (specifications are stable)
- Quality: Hourly (may be updated during production)Why This Philosophy Works:
What It Is:
The Core Problem:
Our Strategic Solution - Structured Collection:
Transform chaos into structure through:
Why This Philosophy Works:
Starting Point: Raw electrical signals in PLC registers Ending Point: Business-ready metrics in TimescaleDB Transformation: From meaningless numbers to actionable intelligence
What Happens:
PLC Register DB100.DBD0 = 3247 (meaningless number)
↓
Node-RED S7 Connection reads every 1 second
↓
Node-RED knows: DB100.DBD0 = Temperature Sensor Zone 1
↓
Applies scaling: (3247 * 0.1) - 50 = 274.7°CCritical Details:
Data Enrichment at Source:
// In Node-RED Function Node
msg.payload = {
timestamp: new Date().toISOString(),
equipment_id: "LINE_01_OVEN_01",
sensor_type: "temperature",
tag_name: "zone1_temp",
raw_value: 3247,
scaled_value: 274.7,
unit: "celsius",
quality: 192, // OPC quality code
alarm_limits: {high: 300, low: 250}
}For Sensor Data:
Raw: 274.7, 274.8, 274.6, 274.9... (60 readings/minute)
↓
Edge Processing: Buffer 60 values
↓
Calculate: avg=274.75, min=274.6, max=274.9, std_dev=0.13
↓
Output: One enriched message per minuteFor Counter Data:
Raw: Counter shows 10547
(100ms later) Counter shows 10548
↓
Edge Processing: Delta = 1 piece in 100ms
↓
Calculate: Rate = 10 pieces/second = 600 pieces/minute
↓
Output: Instantaneous production rateFor State Data:
Previous State: "RUNNING" (for last 45 minutes)
New State: "STOPPED"
↓
Edge Processing: State changed!
↓
Calculate: Duration in RUNNING = 2700 seconds
↓
Output: State transition event with durationMessage Structure:
Topic: factory/plc/line01/sensor/zone1_temp
Payload: {
"timestamp": "2024-01-15T10:30:00Z",
"equipment_id": "LINE_01_OVEN_01",
"avg_1min": 274.75,
"min_1min": 274.6,
"max_1min": 274.9,
"std_dev": 0.13,
"sample_count": 60,
"quality": "GOOD"
}Why This Structure:
Python Subscriber Logic:
# Receives MQTT message
# Validates against business rules
if avg_temp > 350 or avg_temp < 200:
flag_as_suspicious()
# Enriches with context
message['shift'] = get_current_shift()
message['product'] = get_current_product(equipment_id)
# Batches for efficiency
batch.append(message)
if len(batch) >= 1000 or time_elapsed > 10:
bulk_insert_to_db(batch)Final Storage Structure:
sensor_data table:
time | 2024-01-15 10:30:00
equipment_id | LINE_01_OVEN_01
sensor_name | zone1_temp
avg_1min | 274.75
min_1min | 274.6
max_1min | 274.9
std_dev | 0.13
shift | SHIFT_A
product_id | PROD_12345Starting Point: Business transactions in ERP Ending Point: Cached context ready for real-time joins Transformation: From slow queries to instant context
What We Need from ERP:
Production Orders:
- Order ID, Product ID, Target Quantity
- Standard Rate (pieces/hour)
- Planned Start/End times
- Current Status
Product Specifications:
- Product ID, Name, Category
- Cycle Time (seconds/piece)
- Temperature Requirements
- Quality Specifications
Bill of Materials:
- Product ID → Component IDs
- Component Quantities
- Scrap AllowancesWhat We DON'T Need:
Instead of: "SELECT * FROM orders" We Do: Incremental queries with business logic
-- Production Orders Query (runs every 5 minutes)
SELECT
order_no,
product_code,
target_quantity,
standard_rate_per_hour,
planned_start_time,
planned_end_time,
order_status,
last_modified_timestamp
FROM production_orders
WHERE
-- Only active or upcoming orders
order_status IN ('RELEASED', 'IN_PROCESS', 'SCHEDULED')
-- Only if changed since last sync
AND last_modified_timestamp > :last_sync_time
-- Only for next 24 hours
AND planned_start_time < NOW() + INTERVAL '24 hours'
ORDER BY planned_start_time;ERP Returns: Raw business data We Transform: Into operational context
def process_erp_order(erp_record):
return {
'order_id': erp_record['order_no'],
'product_id': erp_record['product_code'],
'target_quantity': erp_record['target_quantity'],
'standard_rate_per_hour': erp_record['standard_rate_per_hour'],
'standard_rate_per_minute': erp_record['standard_rate_per_hour'] / 60,
'cycle_time_seconds': 3600 / erp_record['standard_rate_per_hour'],
'planned_duration_minutes': calculate_duration(
erp_record['target_quantity'],
erp_record['standard_rate_per_hour']
),
'efficiency_target': 0.85, # Business rule
'status': erp_record['order_status'],
'sync_timestamp': datetime.now()
}Cache Storage Strategy:
-- In same PostgreSQL instance as TimescaleDB
CREATE SCHEMA erp_cache;
-- Current orders (refreshed every 5 min)
CREATE TABLE erp_cache.active_orders (
order_id TEXT PRIMARY KEY,
product_id TEXT NOT NULL,
target_quantity INTEGER,
standard_rate_per_hour NUMERIC,
cycle_time_seconds NUMERIC,
planned_start TIMESTAMPTZ,
planned_end TIMESTAMPTZ,
last_sync TIMESTAMPTZ DEFAULT NOW()
);
-- Product specs (refreshed daily)
CREATE TABLE erp_cache.product_specs (
product_id TEXT PRIMARY KEY,
product_name TEXT,
temp_min NUMERIC,
temp_max NUMERIC,
pressure_min NUMERIC,
pressure_max NUMERIC,
quality_specs JSONB,
last_sync TIMESTAMPTZ DEFAULT NOW()
);
-- Create indexes for join performance
CREATE INDEX idx_orders_product ON erp_cache.active_orders(product_id);
CREATE INDEX idx_orders_time ON erp_cache.active_orders(planned_start);Starting Point: Knowledge in people's heads or Excel files Ending Point: Validated data linked to automated systems Transformation: From unstructured to structured intelligence
Method 1: Standardized Excel Templates
Template Structure:
- Protected headers (no renaming)
- Dropdown lists for equipment IDs
- Data validation for ranges
- Required timestamp column
- Auto-generated upload ID
Example: Quality Test Results
| Timestamp | Equipment_ID | Product_ID | Test_Type | Result | Pass_Fail |
|-----------|--------------|------------|-----------|--------|-----------|
| 10:30:00 | LINE_01 | PROD_123 | Thickness | 2.54 | PASS |Method 2: Web Forms (Mobile-Friendly)
# Flask web form for downtime entry
@app.route('/downtime', methods=['POST'])
def record_downtime():
data = {
'timestamp': request.form['timestamp'],
'equipment_id': request.form['equipment_id'],
'duration_minutes': request.form['duration'],
'reason_category': request.form['category'],
'reason_detail': request.form['detail'],
'operator_id': session['operator_id'],
'entry_timestamp': datetime.now()
}
validate_and_store(data)Method 3: Direct Database Entry
Multi-Level Validation:
def validate_user_input(data):
# Level 1: Format validation
if not is_valid_timestamp(data['timestamp']):
raise ValidationError("Invalid timestamp format")
# Level 2: Business rule validation
if data['equipment_id'] not in get_valid_equipment_ids():
raise ValidationError(f"Unknown equipment: {data['equipment_id']}")
# Level 3: Sanity checks
time_diff = datetime.now() - data['timestamp']
if time_diff > timedelta(hours=24):
raise ValidationError("Data too old (>24 hours)")
# Level 4: Cross-reference with automated data
if not equipment_was_running(data['equipment_id'], data['timestamp']):
flag_for_review("Test performed while equipment stopped")
return validated_dataMatching Process:
def reconcile_manual_with_automated(manual_entry):
# Find corresponding automated data
window_start = manual_entry['timestamp'] - timedelta(minutes=5)
window_end = manual_entry['timestamp'] + timedelta(minutes=5)
automated_data = query_plc_data(
equipment_id=manual_entry['equipment_id'],
time_range=(window_start, window_end)
)
# Link the data
manual_entry['linked_production_count'] = automated_data['production_count']
manual_entry['linked_machine_state'] = automated_data['machine_state']
manual_entry['linked_sensor_values'] = automated_data['sensor_values']
# Calculate confidence score
if automated_data['machine_state'] == 'RUNNING':
manual_entry['confidence_score'] = 0.95
else:
manual_entry['confidence_score'] = 0.70 # Lower confidence
return manual_entryFinal Storage Structure:
CREATE TABLE user_input.quality_tests (
entry_id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
equipment_id TEXT NOT NULL,
product_id TEXT NOT NULL,
test_type TEXT NOT NULL,
test_result NUMERIC,
pass_fail BOOLEAN,
-- Linked automated data
linked_production_count INTEGER,
linked_machine_state TEXT,
linked_temp_avg NUMERIC,
-- Metadata
entered_by TEXT,
entry_timestamp TIMESTAMPTZ,
confidence_score NUMERIC,
validation_flags TEXT[]
);In Layer 2, these three data streams merge to create complete operational intelligence:
-- Example: Real-time OEE Calculation combining all three sources
WITH production_data AS (
-- From PLC data
SELECT
time_bucket('1 hour', time) as hour,
equipment_id,
SUM(pieces_delta) as actual_production,
SUM(CASE WHEN state = 'RUNNING' THEN duration ELSE 0 END) as run_time
FROM plc_data.production_data
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY hour, equipment_id
),
order_context AS (
-- From ERP cache
SELECT
equipment_id,
standard_rate_per_hour,
target_quantity,
product_id
FROM erp_cache.active_orders
WHERE NOW() BETWEEN planned_start AND planned_end
),
quality_data AS (
-- From user input
SELECT
equipment_id,
COUNT(CASE WHEN pass_fail = true THEN 1 END) as passed,
COUNT(*) as total_tested
FROM user_input.quality_tests
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY equipment_id
)
-- Combine all three for complete OEE
SELECT
p.equipment_id,
p.hour,
-- Availability (PLC data)
(p.run_time / 3600.0) as availability,
-- Performance (PLC + ERP data)
(p.actual_production / (o.standard_rate_per_hour * p.run_time / 3600.0)) as performance,
-- Quality (PLC + User data)
(COALESCE(q.passed, p.actual_production) / p.actual_production) as quality,
-- Combined OEE
(p.run_time / 3600.0) *
(p.actual_production / (o.standard_rate_per_hour * p.run_time / 3600.0)) *
(COALESCE(q.passed, p.actual_production) / p.actual_production) as oee,
-- Context for gap analysis
o.product_id,
o.standard_rate_per_hour,
p.actual_production
FROM production_data p
JOIN order_context o ON p.equipment_id = o.equipment_id
LEFT JOIN quality_data q ON p.equipment_id = q.equipment_id;Layer 2 uses these combined streams to:
This convergence transforms three separate data streams into unified intelligence, enabling the Factory Intelligence System to identify and quantify gaps for systematic improvement.
We chose each technology for specific strategic reasons:
Node-RED (PLC Connectivity)
MQTT/Mosquitto (Message Transport)
TimescaleDB (Time-Series Storage)
PostgreSQL (ERP Cache)
Python (Orchestration)
Options Considered:
Why Hybrid:
Options Considered:
Why All TimescaleDB:
Options Considered:
Why MQTT:
Options Considered:
Why Smart Caching:
At Collection:
At Processing:
At Storage:
Edge Level:
Transport Level:
Database Level:
Key Metrics:
Alert Triggers:
Symptom: Collecting raw data thinking you'll aggregate later Result: Database explosion, slow queries Solution: Process at edge from day one
Symptom: Direct ERP queries for real-time data Result: ERP overload, system failures Solution: Implement intelligent caching
Symptom: No retention policy Result: Queries become impossibly slow Solution: Define lifecycle from start
Symptom: Ignoring human inputs Result: Missing critical context Solution: Structured collection strategy
Symptom: Assuming network is reliable Result: Data loss during outages Solution: Buffer at every level
When Layer 1 is complete, you have:
This isn't just data collection - it's the foundation for the entire Factory Intelligence System. Every subsequent layer depends on the quality, completeness, and timeliness of Layer 1 data.
Layer 1 is successful when:
Layer 2 inherits a rich, unified data platform with three synchronized streams:
Layer 2 will run continuous queries that join all three streams:
-- Example: Real-time efficiency with full context
CREATE MATERIALIZED VIEW realtime_efficiency AS
WITH current_production AS (
SELECT
p.equipment_id,
p.rate_per_hour as actual_rate,
s.state_value as current_state,
s.duration_seconds as state_duration
FROM plc_data.current_production p
JOIN plc_data.current_state s ON p.equipment_id = s.equipment_id
),
active_orders AS (
SELECT
equipment_id,
product_id,
standard_rate_per_hour as target_rate,
cycle_time_seconds
FROM erp_cache.active_orders
WHERE NOW() BETWEEN planned_start AND planned_end
),
recent_quality AS (
SELECT
equipment_id,
AVG(CASE WHEN pass_fail THEN 1 ELSE 0 END) as quality_rate
FROM user_input.quality_tests
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY equipment_id
)
SELECT
cp.equipment_id,
cp.actual_rate,
ao.target_rate,
(cp.actual_rate / ao.target_rate) as performance_ratio,
cp.current_state,
cp.state_duration,
rq.quality_rate,
ao.product_id,
NOW() as calculation_time
FROM current_production cp
JOIN active_orders ao ON cp.equipment_id = ao.equipment_id
LEFT JOIN recent_quality rq ON cp.equipment_id = rq.equipment_id;Layer 2 will identify gaps by comparing:
The combined data enables pattern discovery:
# Layer 2 will correlate across streams
def identify_patterns():
# Pattern 1: Temperature impact on quality
correlate(
plc_data.sensor_data.temperature,
user_input.quality_tests.pass_fail
)
# Pattern 2: Order changes impact on efficiency
correlate(
erp_cache.order_changes,
plc_data.production_rates
)
# Pattern 3: Operator actions impact on performance
correlate(
user_input.operator_adjustments,
plc_data.efficiency_metrics
)Layer 1 ensures Layer 2 receives:
When Layer 2 processes this unified data:
Single Source Insight: "Machine stopped for 10 minutes" Combined Intelligence: "Machine stopped for 10 minutes during order changeover from Product A to Product B, operator noted material shortage, resulting in 150 pieces of lost production worth $450 based on current order margins"
This is the power of Layer 1's unified data platform - it transforms isolated data points into comprehensive business intelligence.
Layer 1 creates the foundation for the entire Factory Intelligence System by:
The success of the entire Factory Intelligence System depends on Layer 1's ability to deliver complete, accurate, timely, and contextualized data. With this foundation in place, Layer 2 can begin the real work of gap discovery and value creation.