Learn by example. Copy-paste ready code snippets for common SchoBase use cases, from basic queries to industrial protocol integration.
Use Zod validators for compile-time and runtime type safety.
import { z } from 'zod'
// Define your time series measurement schema
const TemperatureSensorSchema = z.object({
sensorId: z.string(),
timestamp: z.date(),
temperature: z.number().min(-40).max(125), // °C
humidity: z.number().min(0).max(100), // %
location: z.string(),
deviceStatus: z.enum(['online', 'offline', 'warning']),
})
type TemperatureSensor = z.infer<typeof TemperatureSensorSchema>
// Create a time series table
const schema = defineSchema({
tables: {
sensorReadings: {
validator: TemperatureSensorSchema,
timeIndex: 'timestamp',
indexes: ['sensorId', 'location'],
},
},
})Capture joint positions, velocities, torques, and Cartesian coordinates.
import { z } from 'zod'
const RobotTelemetrySchema = z.object({
robotId: z.string(),
timestamp: z.date(),
// Joint space (6 axes)
jointPositions: z.array(z.number()).length(6), // radians
jointVelocities: z.array(z.number()).length(6), // rad/s
jointTorques: z.array(z.number()).length(6), // N·m
// Cartesian space
position: z.object({
x: z.number(), // mm
y: z.number(),
z: z.number(),
}),
orientation: z.object({
roll: z.number(), // radians
pitch: z.number(),
yaw: z.number(),
}),
// Status
operationMode: z.enum(['auto', 'manual', 'maintenance']),
payload: z.number().min(0).max(50), // kg
tcpSpeed: z.number().min(0), // mm/s
powerConsumption: z.number(), // watts
})
type RobotTelemetry = z.infer<typeof RobotTelemetrySchema>Type-safe write with automatic validation.
import { schobase } from './client'
// Write validated data
await schobase.sensorReadings.write({
sensorId: 'temp-sensor-01',
timestamp: new Date(),
temperature: 23.5,
humidity: 45.2,
location: 'warehouse-a',
deviceStatus: 'online',
})
// TypeScript will catch errors at compile-time:
// await schobase.sensorReadings.write({
// temperature: 200, // ❌ Error: max 125
// })Optimized for ingesting thousands of data points.
import { schobase } from './client'
// Batch insert for high-throughput scenarios
const readings = Array.from({ length: 1000 }, (_, i) => ({
sensorId: `sensor-${i % 10}`,
timestamp: new Date(Date.now() - i * 1000),
temperature: 20 + Math.random() * 10,
humidity: 40 + Math.random() * 20,
location: 'factory-floor',
deviceStatus: 'online' as const,
}))
await schobase.sensorReadings.writeBatch(readings)
// SchoBase handles batching, compression, and deduplication
console.log('Ingested 1,000 readings successfully')Type-safe filtering and time range selection.
import { schobase } from './client'
// Query last 24 hours of data
const recentReadings = await schobase.sensorReadings
.query()
.where('sensorId', '==', 'temp-sensor-01')
.timeRange({
start: new Date(Date.now() - 24 * 60 * 60 * 1000),
end: new Date(),
})
.orderBy('timestamp', 'desc')
.limit(100)
.execute()
// Results are fully typed
recentReadings.forEach((reading) => {
console.log(`${reading.timestamp}: ${reading.temperature}°C`)
})Calculate averages, min/max over time windows.
import { schobase } from './client'
// Calculate hourly averages for the last week
const hourlyAvgs = await schobase.sensorReadings
.query()
.where('location', '==', 'warehouse-a')
.timeRange({
start: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
end: new Date(),
})
.groupBy('1h') // 1-hour buckets
.aggregate({
avgTemp: { field: 'temperature', fn: 'avg' },
maxTemp: { field: 'temperature', fn: 'max' },
minTemp: { field: 'temperature', fn: 'min' },
count: { fn: 'count' },
})
.execute()
// Result: Array<{ bucket: Date; avgTemp: number; maxTemp: number; ... }>
console.log(hourlyAvgs)Stream live data updates with automatic reconnection.
import { schobase } from './client'
// Subscribe to real-time updates
const unsubscribe = schobase.sensorReadings
.subscribe()
.where('sensorId', '==', 'temp-sensor-01')
.onUpdate((reading) => {
console.log('New reading:', reading)
// Alert if temperature exceeds threshold
if (reading.temperature > 30) {
alert(`High temperature: ${reading.temperature}°C`)
}
})
.onError((error) => {
console.error('Subscription error:', error)
})
// Cleanup when done
// unsubscribe()Connect to OPC UA servers and ingest data automatically.
import { OpcUaAdapter, schobase } from 'schobase'
// Configure OPC UA connection
const opcua = new OpcUaAdapter({
endpoint: 'opc.tcp://plc-01.factory.local:4840',
securityMode: 'SignAndEncrypt',
securityPolicy: 'Basic256Sha256',
credentials: {
username: process.env.OPCUA_USER,
password: process.env.OPCUA_PASSWORD,
},
})
// Subscribe to OPC UA nodes
await opcua.subscribe([
{
nodeId: 'ns=2;s=Temperature',
samplingInterval: 1000, // 1 second
onChange: async (value) => {
await schobase.processData.write({
nodeId: 'ns=2;s=Temperature',
timestamp: new Date(),
value: value.value,
quality: value.statusCode.isGood,
})
},
},
])
console.log('OPC UA adapter connected and streaming data')Poll Modbus registers and store time series data.
import { ModbusAdapter, schobase } from 'schobase'
// Configure Modbus TCP connection
const modbus = new ModbusAdapter({
host: '192.168.1.100',
port: 502,
unitId: 1,
})
// Poll holding registers every 5 seconds
modbus.poll({
interval: 5000,
registers: [
{ address: 40001, type: 'FLOAT32', name: 'pressure' },
{ address: 40003, type: 'FLOAT32', name: 'flowRate' },
{ address: 40005, type: 'UINT16', name: 'valvePosition' },
],
onData: async (data) => {
await schobase.modbusReadings.write({
deviceId: 'modbus-device-01',
timestamp: new Date(),
...data,
})
},
})
console.log('Modbus adapter polling every 5 seconds')Subscribe to MQTT topics from IoT sensors.
import { MqttAdapter, schobase } from 'schobase'
// Connect to MQTT broker
const mqtt = new MqttAdapter({
brokerUrl: 'mqtt://mqtt.factory.local:1883',
username: process.env.MQTT_USER,
password: process.env.MQTT_PASSWORD,
clientId: 'schobase-collector',
})
// Subscribe to sensor topics
await mqtt.subscribe('sensors/+/temperature', async (topic, payload) => {
const sensorId = topic.split('/')[1]
const data = JSON.parse(payload.toString())
await schobase.sensorReadings.write({
sensorId,
timestamp: new Date(data.timestamp),
temperature: data.value,
humidity: data.humidity || 0,
location: data.location,
deviceStatus: 'online',
})
})
console.log('MQTT adapter subscribed to sensors/+/temperature')Detect anomalies using statistical methods or ML models.
import { schobase } from './client'
// Calculate moving average and detect anomalies
const anomalies = await schobase.sensorReadings
.query()
.where('sensorId', '==', 'temp-sensor-01')
.timeRange({
start: new Date(Date.now() - 24 * 60 * 60 * 1000),
end: new Date(),
})
.window({
size: '1h',
slide: '5m',
})
.aggregate({
avg: { field: 'temperature', fn: 'avg' },
stddev: { field: 'temperature', fn: 'stddev' },
})
.having((row) => {
// Flag readings > 3 standard deviations from mean
const threshold = row.avg + 3 * row.stddev
return row.temperature > threshold
})
.execute()
console.log('Detected anomalies:', anomalies)Calculate equipment health scores and predict failures.
import { schobase } from './client'
// Calculate equipment health score
async function calculateHealthScore(robotId: string) {
const last24h = await schobase.robotTelemetry
.query()
.where('robotId', '==', robotId)
.timeRange({
start: new Date(Date.now() - 24 * 60 * 60 * 1000),
end: new Date(),
})
.execute()
// Analyze torque spikes (potential bearing wear)
const torqueSpikes = last24h.filter((r) =>
r.jointTorques.some((t) => Math.abs(t) > 80)
).length
// Analyze power consumption trends
const avgPower = last24h.reduce((sum, r) => sum + r.powerConsumption, 0) / last24h.length
// Calculate health score (0-100)
const healthScore = 100 - (torqueSpikes / last24h.length) * 100 - (avgPower > 5000 ? 20 : 0)
return {
robotId,
healthScore: Math.max(0, healthScore),
recommendation:
healthScore < 50 ? 'Schedule maintenance' : 'Normal operation',
}
}
const health = await calculateHealthScore('robot-01')
console.log(health)Install SchoBase and start ingesting time series data in minutes.
npm install schobase