TypedChrono

Code Examples

Learn by example. Copy-paste ready code snippets for common SchoBase use cases, from basic queries to industrial protocol integration.

Schema Definition

Define a Type-Safe Time Series Schema

Use Zod validators for compile-time and runtime type safety.

import { z } from 'zod'

// Define your time series measurement schema
const TemperatureSensorSchema = z.object({
  sensorId: z.string(),
  timestamp: z.date(),
  temperature: z.number().min(-40).max(125), // °C
  humidity: z.number().min(0).max(100), // %
  location: z.string(),
  deviceStatus: z.enum(['online', 'offline', 'warning']),
})

type TemperatureSensor = z.infer<typeof TemperatureSensorSchema>

// Create a time series table
const schema = defineSchema({
  tables: {
    sensorReadings: {
      validator: TemperatureSensorSchema,
      timeIndex: 'timestamp',
      indexes: ['sensorId', 'location'],
    },
  },
})

Robot Telemetry Schema (6-Axis Industrial Robot)

Capture joint positions, velocities, torques, and Cartesian coordinates.

import { z } from 'zod'

const RobotTelemetrySchema = z.object({
  robotId: z.string(),
  timestamp: z.date(),

  // Joint space (6 axes)
  jointPositions: z.array(z.number()).length(6), // radians
  jointVelocities: z.array(z.number()).length(6), // rad/s
  jointTorques: z.array(z.number()).length(6), // N·m

  // Cartesian space
  position: z.object({
    x: z.number(), // mm
    y: z.number(),
    z: z.number(),
  }),
  orientation: z.object({
    roll: z.number(), // radians
    pitch: z.number(),
    yaw: z.number(),
  }),

  // Status
  operationMode: z.enum(['auto', 'manual', 'maintenance']),
  payload: z.number().min(0).max(50), // kg
  tcpSpeed: z.number().min(0), // mm/s
  powerConsumption: z.number(), // watts
})

type RobotTelemetry = z.infer<typeof RobotTelemetrySchema>

Data Ingestion

Write a Single Data Point

Type-safe write with automatic validation.

import { schobase } from './client'

// Write validated data
await schobase.sensorReadings.write({
  sensorId: 'temp-sensor-01',
  timestamp: new Date(),
  temperature: 23.5,
  humidity: 45.2,
  location: 'warehouse-a',
  deviceStatus: 'online',
})

// TypeScript will catch errors at compile-time:
// await schobase.sensorReadings.write({
//   temperature: 200, // ❌ Error: max 125
// })

Batch Write (High Volume)

Optimized for ingesting thousands of data points.

import { schobase } from './client'

// Batch insert for high-throughput scenarios
const readings = Array.from({ length: 1000 }, (_, i) => ({
  sensorId: `sensor-${i % 10}`,
  timestamp: new Date(Date.now() - i * 1000),
  temperature: 20 + Math.random() * 10,
  humidity: 40 + Math.random() * 20,
  location: 'factory-floor',
  deviceStatus: 'online' as const,
}))

await schobase.sensorReadings.writeBatch(readings)

// SchoBase handles batching, compression, and deduplication
console.log('Ingested 1,000 readings successfully')

Time Series Queries

Query Recent Data with Filters

Type-safe filtering and time range selection.

import { schobase } from './client'

// Query last 24 hours of data
const recentReadings = await schobase.sensorReadings
  .query()
  .where('sensorId', '==', 'temp-sensor-01')
  .timeRange({
    start: new Date(Date.now() - 24 * 60 * 60 * 1000),
    end: new Date(),
  })
  .orderBy('timestamp', 'desc')
  .limit(100)
  .execute()

// Results are fully typed
recentReadings.forEach((reading) => {
  console.log(`${reading.timestamp}: ${reading.temperature}°C`)
})

Aggregations and Downsampling

Calculate averages, min/max over time windows.

import { schobase } from './client'

// Calculate hourly averages for the last week
const hourlyAvgs = await schobase.sensorReadings
  .query()
  .where('location', '==', 'warehouse-a')
  .timeRange({
    start: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
    end: new Date(),
  })
  .groupBy('1h') // 1-hour buckets
  .aggregate({
    avgTemp: { field: 'temperature', fn: 'avg' },
    maxTemp: { field: 'temperature', fn: 'max' },
    minTemp: { field: 'temperature', fn: 'min' },
    count: { fn: 'count' },
  })
  .execute()

// Result: Array<{ bucket: Date; avgTemp: number; maxTemp: number; ... }>
console.log(hourlyAvgs)

Real-Time Data Subscription

Stream live data updates with automatic reconnection.

import { schobase } from './client'

// Subscribe to real-time updates
const unsubscribe = schobase.sensorReadings
  .subscribe()
  .where('sensorId', '==', 'temp-sensor-01')
  .onUpdate((reading) => {
    console.log('New reading:', reading)

    // Alert if temperature exceeds threshold
    if (reading.temperature > 30) {
      alert(`High temperature: ${reading.temperature}°C`)
    }
  })
  .onError((error) => {
    console.error('Subscription error:', error)
  })

// Cleanup when done
// unsubscribe()

Industrial Protocol Integration

OPC UA Client

Connect to OPC UA servers and ingest data automatically.

import { OpcUaAdapter, schobase } from 'schobase'

// Configure OPC UA connection
const opcua = new OpcUaAdapter({
  endpoint: 'opc.tcp://plc-01.factory.local:4840',
  securityMode: 'SignAndEncrypt',
  securityPolicy: 'Basic256Sha256',
  credentials: {
    username: process.env.OPCUA_USER,
    password: process.env.OPCUA_PASSWORD,
  },
})

// Subscribe to OPC UA nodes
await opcua.subscribe([
  {
    nodeId: 'ns=2;s=Temperature',
    samplingInterval: 1000, // 1 second
    onChange: async (value) => {
      await schobase.processData.write({
        nodeId: 'ns=2;s=Temperature',
        timestamp: new Date(),
        value: value.value,
        quality: value.statusCode.isGood,
      })
    },
  },
])

console.log('OPC UA adapter connected and streaming data')

Modbus TCP/IP

Poll Modbus registers and store time series data.

import { ModbusAdapter, schobase } from 'schobase'

// Configure Modbus TCP connection
const modbus = new ModbusAdapter({
  host: '192.168.1.100',
  port: 502,
  unitId: 1,
})

// Poll holding registers every 5 seconds
modbus.poll({
  interval: 5000,
  registers: [
    { address: 40001, type: 'FLOAT32', name: 'pressure' },
    { address: 40003, type: 'FLOAT32', name: 'flowRate' },
    { address: 40005, type: 'UINT16', name: 'valvePosition' },
  ],
  onData: async (data) => {
    await schobase.modbusReadings.write({
      deviceId: 'modbus-device-01',
      timestamp: new Date(),
      ...data,
    })
  },
})

console.log('Modbus adapter polling every 5 seconds')

MQTT Broker Integration

Subscribe to MQTT topics from IoT sensors.

import { MqttAdapter, schobase } from 'schobase'

// Connect to MQTT broker
const mqtt = new MqttAdapter({
  brokerUrl: 'mqtt://mqtt.factory.local:1883',
  username: process.env.MQTT_USER,
  password: process.env.MQTT_PASSWORD,
  clientId: 'schobase-collector',
})

// Subscribe to sensor topics
await mqtt.subscribe('sensors/+/temperature', async (topic, payload) => {
  const sensorId = topic.split('/')[1]
  const data = JSON.parse(payload.toString())

  await schobase.sensorReadings.write({
    sensorId,
    timestamp: new Date(data.timestamp),
    temperature: data.value,
    humidity: data.humidity || 0,
    location: data.location,
    deviceStatus: 'online',
  })
})

console.log('MQTT adapter subscribed to sensors/+/temperature')

Real-Time Analytics

Anomaly Detection

Detect anomalies using statistical methods or ML models.

import { schobase } from './client'

// Calculate moving average and detect anomalies
const anomalies = await schobase.sensorReadings
  .query()
  .where('sensorId', '==', 'temp-sensor-01')
  .timeRange({
    start: new Date(Date.now() - 24 * 60 * 60 * 1000),
    end: new Date(),
  })
  .window({
    size: '1h',
    slide: '5m',
  })
  .aggregate({
    avg: { field: 'temperature', fn: 'avg' },
    stddev: { field: 'temperature', fn: 'stddev' },
  })
  .having((row) => {
    // Flag readings > 3 standard deviations from mean
    const threshold = row.avg + 3 * row.stddev
    return row.temperature > threshold
  })
  .execute()

console.log('Detected anomalies:', anomalies)

Predictive Maintenance

Calculate equipment health scores and predict failures.

import { schobase } from './client'

// Calculate equipment health score
async function calculateHealthScore(robotId: string) {
  const last24h = await schobase.robotTelemetry
    .query()
    .where('robotId', '==', robotId)
    .timeRange({
      start: new Date(Date.now() - 24 * 60 * 60 * 1000),
      end: new Date(),
    })
    .execute()

  // Analyze torque spikes (potential bearing wear)
  const torqueSpikes = last24h.filter((r) =>
    r.jointTorques.some((t) => Math.abs(t) > 80)
  ).length

  // Analyze power consumption trends
  const avgPower = last24h.reduce((sum, r) => sum + r.powerConsumption, 0) / last24h.length

  // Calculate health score (0-100)
  const healthScore = 100 - (torqueSpikes / last24h.length) * 100 - (avgPower > 5000 ? 20 : 0)

  return {
    robotId,
    healthScore: Math.max(0, healthScore),
    recommendation:
      healthScore < 50 ? 'Schedule maintenance' : 'Normal operation',
  }
}

const health = await calculateHealthScore('robot-01')
console.log(health)

Ready to Start Building?

Install SchoBase and start ingesting time series data in minutes.

npm install schobase