Service Registry Design - Extending pkg/registry
Overview
This document proposes extending the existing pkg/registry package to handle service registration in addition to device registration. The goal is to create a unified, authoritative service discovery and registration system that tracks all pollers, agents, and checkers across the ServiceRadar deployment.
Background
Current State
Device Registry (Existing):
- Located at
pkg/registry - Provides
Managerinterface for processing device updates - Handles device identity resolution, canonicalization, and persistence
- Publishes to
device_updatesstream for materialized view - Well-tested, production-ready
Service Tracking (Current - Implicit):
- Services tracked via
servicesstream (3-day TTL) - No persistent registry - services disappear after TTL expires
- Agents/checkers derived from service heartbeats
- No pre-registration support
- Cannot distinguish "never existed" from "stopped reporting"
Problem: Per onboarding review (docs/onboarding-review-2025.md), lack of centralized service registry creates gaps:
- No pre-registration of agents/checkers before first report
- No historical record of registered services
- Cannot track service lifecycle (pending → active → inactive)
- Difficult to implement proper service discovery
Design Goals
- Unified Registry Package: Extend
pkg/registryto handle both devices AND services - Consistent Patterns: Mirror the device registry architecture for services
- Explicit Registration: Support pre-registration via edge onboarding packages
- Lifecycle Tracking: Track services from creation through activation to retirement
- Minimal Disruption: Work alongside existing implicit service tracking
- Performance: Batch operations, efficient lookups, caching where appropriate
Architecture
Registry Package Structure
pkg/registry/
├── interfaces.go # Existing Manager interface + new ServiceManager
├── registry.go # Device registry (existing)
├── service_registry.go # NEW: Service registry implementation
├── service_models.go # NEW: Service registration models
├── service_lifecycle.go # NEW: Service lifecycle management
├── identity_resolver.go # Existing, may extend for services
├── identity_publisher.go # Existing, may extend for services
└── ...
Service Registry Interface
ServiceManager Interface
package registry
import (
"context"
"time"
"github.com/carverauto/serviceradar/pkg/models"
)
// ServiceManager manages the lifecycle and registration of all services
// (pollers, agents, checkers) in the ServiceRadar system.
type ServiceManager interface {
// RegisterPoller explicitly registers a new poller.
// Used during edge package creation, K8s ClusterSPIFFEID creation, etc.
RegisterPoller(ctx context.Context, reg *PollerRegistration) error
// RegisterAgent explicitly registers a new agent under a poller.
RegisterAgent(ctx context.Context, reg *AgentRegistration) error
// RegisterChecker explicitly registers a new checker under an agent.
RegisterChecker(ctx context.Context, reg *CheckerRegistration) error
// RecordHeartbeat records a service heartbeat from status reports.
// This updates last_seen and activates pending services.
RecordHeartbeat(ctx context.Context, heartbeat *ServiceHeartbeat) error
// RecordBatchHeartbeats handles batch heartbeat updates efficiently.
RecordBatchHeartbeats(ctx context.Context, heartbeats []*ServiceHeartbeat) error
// GetPoller retrieves a poller by ID.
GetPoller(ctx context.Context, pollerID string) (*RegisteredPoller, error)
// GetAgent retrieves an agent by ID.
GetAgent(ctx context.Context, agentID string) (*RegisteredAgent, error)
// GetChecker retrieves a checker by ID.
GetChecker(ctx context.Context, checkerID string) (*RegisteredChecker, error)
// ListPollers retrieves all pollers matching filter.
ListPollers(ctx context.Context, filter *ServiceFilter) ([]*RegisteredPoller, error)
// ListAgentsByPoller retrieves all agents under a poller.
ListAgentsByPoller(ctx context.Context, pollerID string) ([]*RegisteredAgent, error)
// ListCheckersByAgent retrieves all checkers under an agent.
ListCheckersByAgent(ctx context.Context, agentID string) ([]*RegisteredChecker, error)
// UpdateServiceStatus updates the status of a service.
UpdateServiceStatus(ctx context.Context, serviceID string, status ServiceStatus) error
// MarkInactive marks services as inactive if they haven't reported within threshold.
// This is typically called by a background job.
MarkInactive(ctx context.Context, threshold time.Duration) (int, error)
// DeleteService permanently deletes a service from the registry.
// This should only be called for services that are no longer needed (status: revoked or inactive).
// Returns error if service is still active.
DeleteService(ctx context.Context, serviceType, serviceID string) error
// PurgeInactive permanently deletes services that have been inactive or revoked
// for longer than the retention period. This is typically called by a background job.
PurgeInactive(ctx context.Context, retentionPeriod time.Duration) (int, error)
// IsKnownPoller checks if a poller is registered and active.
// Replaces the logic currently in pkg/core/pollers.go:701
IsKnownPoller(ctx context.Context, pollerID string) (bool, error)
}
Data Models
Service Registration Types
package registry
import (
"time"
"github.com/carverauto/serviceradar/pkg/models"
)
// ServiceStatus represents the lifecycle status of a service.
type ServiceStatus string
const (
ServiceStatusPending ServiceStatus = "pending" // Registered, waiting for first report
ServiceStatusActive ServiceStatus = "active" // Currently reporting
ServiceStatusInactive ServiceStatus = "inactive" // Stopped reporting
ServiceStatusRevoked ServiceStatus = "revoked" // Registration revoked
ServiceStatusDeleted ServiceStatus = "deleted" // Marked for deletion (soft delete)
)
// RegistrationSource indicates how a service was registered.
type RegistrationSource string
const (
RegistrationSourceEdgeOnboarding RegistrationSource = "edge_onboarding"
RegistrationSourceK8sSpiffe RegistrationSource = "k8s_spiffe"
RegistrationSourceConfig RegistrationSource = "config" // Static config file
RegistrationSourceImplicit RegistrationSource = "implicit" // From heartbeat
)
// PollerRegistration represents a poller registration request.
type PollerRegistration struct {
PollerID string
ComponentID string // From edge onboarding package
RegistrationSource RegistrationSource
Metadata map[string]string
SPIFFEIdentity string // Optional SPIFFE ID
CreatedBy string // Admin user ID or system
}
// AgentRegistration represents an agent registration request.
type AgentRegistration struct {
AgentID string
PollerID string // Parent poller (required)
ComponentID string
RegistrationSource RegistrationSource
Metadata map[string]string
SPIFFEIdentity string
CreatedBy string
}
// CheckerRegistration represents a checker registration request.
type CheckerRegistration struct {
CheckerID string
AgentID string // Parent agent (required)
PollerID string // Grandparent poller (denormalized for queries)
CheckerKind string // snmp, sysmon, rperf, etc.
ComponentID string
RegistrationSource RegistrationSource
Metadata map[string]string
SPIFFEIdentity string
CreatedBy string
}
// ServiceHeartbeat represents a service status report.
type ServiceHeartbeat struct {
ServiceID string
ServiceType string // "poller", "agent", "checker"
PollerID string
AgentID string // Empty for pollers
CheckerID string // Empty for agents/pollers
Timestamp time.Time
SourceIP string
Healthy bool
Metadata map[string]string
}
// RegisteredPoller represents a registered poller in the system.
type RegisteredPoller struct {
PollerID string
ComponentID string
Status ServiceStatus
RegistrationSource RegistrationSource
FirstRegistered time.Time
FirstSeen *time.Time // Nil if never reported
LastSeen *time.Time
Metadata map[string]string
SPIFFEIdentity string
CreatedBy string
// Derived stats
AgentCount int
CheckerCount int
}
// RegisteredAgent represents a registered agent in the system.
type RegisteredAgent struct {
AgentID string
PollerID string
ComponentID string
Status ServiceStatus
RegistrationSource RegistrationSource
FirstRegistered time.Time
FirstSeen *time.Time
LastSeen *time.Time
Metadata map[string]string
SPIFFEIdentity string
CreatedBy string
// Derived stats
CheckerCount int
}
// RegisteredChecker represents a registered checker in the system.
type RegisteredChecker struct {
CheckerID string
AgentID string
PollerID string
CheckerKind string
ComponentID string
Status ServiceStatus
RegistrationSource RegistrationSource
FirstRegistered time.Time
FirstSeen *time.Time
LastSeen *time.Time
Metadata map[string]string
SPIFFEIdentity string
CreatedBy string
}
// ServiceFilter filters service queries.
type ServiceFilter struct {
Statuses []ServiceStatus
Sources []RegistrationSource
Limit int
Offset int
}
Database Schema
Service Registry Tables
-- Poller registry
CREATE TABLE pollers_registry (
poller_id string,
component_id string,
status string,
registration_source string,
first_registered DateTime64(3),
first_seen Nullable(DateTime64(3)),
last_seen Nullable(DateTime64(3)),
metadata string, -- JSON
spiffe_identity string,
created_by string,
updated_at DateTime64(3) DEFAULT now64()
) ENGINE = ReplacingMergeTree(updated_at)
PRIMARY KEY (poller_id)
ORDER BY (poller_id, updated_at)
SETTINGS index_granularity = 8192;
-- Agent registry
CREATE TABLE agents_registry (
agent_id string,
poller_id string,
component_id string,
status string,
registration_source string,
first_registered DateTime64(3),
first_seen Nullable(DateTime64(3)),
last_seen Nullable(DateTime64(3)),
metadata string,
spiffe_identity string,
created_by string,
updated_at DateTime64(3) DEFAULT now64()
) ENGINE = ReplacingMergeTree(updated_at)
PRIMARY KEY (agent_id)
ORDER BY (agent_id, poller_id, updated_at)
SETTINGS index_granularity = 8192;
-- Checker registry
CREATE TABLE checkers_registry (
checker_id string,
agent_id string,
poller_id string,
checker_kind string,
component_id string,
status string,
registration_source string,
first_registered DateTime64(3),
first_seen Nullable(DateTime64(3)),
last_seen Nullable(DateTime64(3)),
metadata string,
spiffe_identity string,
created_by string,
updated_at DateTime64(3) DEFAULT now64()
) ENGINE = ReplacingMergeTree(updated_at)
PRIMARY KEY (checker_id)
ORDER BY (checker_id, agent_id, poller_id, updated_at)
SETTINGS index_granularity = 8192;
-- Service registration events (audit trail)
CREATE STREAM IF NOT EXISTS service_registration_events (
event_id string,
event_type string, -- 'registered', 'activated', 'deactivated', 'revoked'
service_id string,
service_type string, -- 'poller', 'agent', 'checker'
parent_id string, -- For agents: poller_id, for checkers: agent_id
registration_source string,
actor string, -- Who performed the action
timestamp DateTime64(3),
metadata string -- JSON
) ENGINE = Stream(1, 1, rand())
PARTITION BY int_div(to_unix_timestamp(timestamp), 86400)
ORDER BY (timestamp, service_type, service_id)
TTL to_start_of_day(coalesce(timestamp, _tp_time)) + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
Design Notes:
- ReplacingMergeTree for registry tables - supports updates via inserts with
updated_at - No automatic TTL on registry tables - persistent record, but manual deletion supported
- Nullable first_seen/last_seen - distinguish "never reported" from "reported once"
- Denormalized poller_id in checkers - easier queries, acceptable redundancy
- Audit stream - 90-day retention for compliance/debugging
- Deletion Strategy - See "Deletion and Retention Policy" section below
Deletion and Retention Policy
Problem: Unbounded Growth
Without a deletion mechanism, the registry tables will grow indefinitely. Even with status-based filtering (e.g., only querying 'active' services), the underlying tables continue to accumulate records.
Solution: Multi-Tier Deletion Strategy
1. Soft Delete (Status: deleted)
When a service is no longer needed but you want to retain audit trail:
// Mark service as deleted (soft delete)
err := serviceRegistry.UpdateServiceStatus(ctx, serviceID, registry.ServiceStatusDeleted)
- Service moves to
deletedstatus - No longer appears in active queries
- Still in database for audit/historical purposes
- Can be hard deleted later by background job
2. Hard Delete (Permanent Removal)
For immediate permanent deletion:
// Permanently delete a service
err := serviceRegistry.DeleteService(ctx, "poller", pollerID)
Implementation:
func (r *ServiceRegistry) DeleteService(ctx context.Context, serviceType, serviceID string) error {
// Verify service is not active
var status string
query := `SELECT status FROM pollers_registry WHERE poller_id = ? LIMIT 1`
if err := r.db.QueryRow(ctx, query, serviceID).Scan(&status); err != nil {
return fmt.Errorf("service not found: %w", err)
}
if status == string(ServiceStatusActive) || status == string(ServiceStatusPending) {
return fmt.Errorf("cannot delete active or pending service: %s", serviceID)
}
// Emit deletion event BEFORE deleting
r.emitRegistrationEvent(ctx, "deleted", serviceType, serviceID, "", "manual", getUserFromContext(ctx))
// Hard delete from table using ALTER TABLE DELETE
// Note: In ClickHouse/Timeplus, deletes are asynchronous and may take time
deleteQuery := `ALTER TABLE pollers_registry DELETE WHERE poller_id = ?`
if err := r.db.Exec(ctx, deleteQuery, serviceID); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
// Invalidate cache
r.invalidatePollerCache()
return nil
}
Important: In ClickHouse/Timeplus Proton:
ALTER TABLE DELETEis asynchronous - rows marked for deletion but not immediately removed- Deleted rows still counted in table size until merge happens
- Use
OPTIMIZE TABLE FINALto force merge (expensive operation)
3. Automated Purge (Background Job)
For automatic cleanup of old inactive/revoked/deleted services:
// Background job runs daily
func (r *ServiceRegistry) PurgeInactive(ctx context.Context, retentionPeriod time.Duration) (int, error) {
cutoff := time.Now().UTC().Add(-retentionPeriod)
// Find services to purge: inactive/revoked/deleted for > retention period
query := `SELECT service_type, service_id
FROM (
SELECT 'poller' AS service_type, poller_id AS service_id, updated_at, status
FROM pollers_registry
WHERE status IN ('inactive', 'revoked', 'deleted')
AND updated_at < ?
UNION ALL
SELECT 'agent', agent_id, updated_at, status
FROM agents_registry
WHERE status IN ('inactive', 'revoked', 'deleted')
AND updated_at < ?
UNION ALL
SELECT 'checker', checker_id, updated_at, status
FROM checkers_registry
WHERE status IN ('inactive', 'revoked', 'deleted')
AND updated_at < ?
)`
rows, err := r.db.Query(ctx, query, cutoff, cutoff, cutoff)
if err != nil {
return 0, fmt.Errorf("failed to query stale services: %w", err)
}
defer rows.Close()
count := 0
for rows.Next() {
var serviceType, serviceID string
if err := rows.Scan(&serviceType, &serviceID); err != nil {
continue
}
if err := r.DeleteService(ctx, serviceType, serviceID); err != nil {
r.logger.Warn().Err(err).Str("service_id", serviceID).Msg("Failed to purge service")
continue
}
count++
}
return count, nil
}
Retention Recommendations
Default Retention Periods:
- Active services: Never deleted automatically
- Pending services: 30 days (if never activated)
- Inactive services: 90 days after last heartbeat
- Revoked services: 90 days after revocation
- Deleted services: 7 days (grace period for recovery)
Configuration:
service_registry:
retention:
pending_days: 30 # Delete pending services that never activated
inactive_days: 90 # Delete inactive services after this period
revoked_days: 90 # Delete revoked services after this period
deleted_days: 7 # Hard delete soft-deleted services after grace period
purge_schedule: "0 2 * * *" # Daily at 2 AM
Service Lifecycle with Deletion
pending → active → inactive → [soft delete] → [hard delete]
↓ ↓ ↓ ↓
revoked → [soft delete] → [hard delete]
States:
- pending: Registered but never reported (auto-purge after 30 days)
- active: Currently reporting (never auto-deleted)
- inactive: Stopped reporting (auto-purge after 90 days)
- revoked: Admin revoked (auto-purge after 90 days)
- deleted: Soft deleted (auto-purge after 7 days)
- [removed]: Hard deleted via ALTER TABLE DELETE
API Endpoints for Deletion
// Admin endpoints
DELETE /api/admin/services/pollers/{id} // Hard delete
DELETE /api/admin/services/agents/{id}
DELETE /api/admin/services/checkers/{id}
PUT /api/admin/services/{id}/status // Soft delete (status: deleted)
{
"status": "deleted"
}
POST /api/admin/services/purge // Manual purge trigger
{
"retention_days": 90,
"dry_run": true // Preview what would be deleted
}
Audit Trail Preservation
Even after hard deletion from registry tables, the audit stream retains events for 90 days:
service_registration_eventsstream has 90-day TTL- All deletion events (
event_type: 'deleted') logged - Provides historical record of what was deleted and when
- Enables forensics and compliance reporting
Implementation
ServiceRegistry Struct
package registry
import (
"context"
"sync"
"github.com/carverauto/serviceradar/pkg/db"
"github.com/carverauto/serviceradar/pkg/logger"
)
type ServiceRegistry struct {
db db.Service
logger logger.Logger
// Cache for IsKnownPoller() - invalidated on registration changes
pollerCacheMu sync.RWMutex
pollerCache map[string]bool
cacheExpiry time.Time
}
func NewServiceRegistry(database db.Service, log logger.Logger) *ServiceRegistry {
return &ServiceRegistry{
db: database,
logger: log,
pollerCache: make(map[string]bool),
}
}
Key Methods
RegisterPoller
func (r *ServiceRegistry) RegisterPoller(ctx context.Context, reg *PollerRegistration) error {
now := time.Now().UTC()
// Check if already exists
existing, err := r.GetPoller(ctx, reg.PollerID)
if err == nil && existing != nil {
// Already registered - return error or update?
return fmt.Errorf("poller %s already registered", reg.PollerID)
}
// Insert into pollers_registry
query := `INSERT INTO pollers_registry (
poller_id, component_id, status, registration_source,
first_registered, metadata, spiffe_identity, created_by, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
metadataJSON, _ := json.Marshal(reg.Metadata)
err = r.db.Exec(ctx, query,
reg.PollerID,
reg.ComponentID,
ServiceStatusPending,
reg.RegistrationSource,
now,
string(metadataJSON),
reg.SPIFFEIdentity,
reg.CreatedBy,
now,
)
if err != nil {
return fmt.Errorf("failed to register poller: %w", err)
}
// Emit registration event
r.emitRegistrationEvent(ctx, "registered", "poller", reg.PollerID, "", reg.RegistrationSource, reg.CreatedBy)
// Invalidate cache
r.invalidatePollerCache()
return nil
}
RecordHeartbeat
func (r *ServiceRegistry) RecordHeartbeat(ctx context.Context, heartbeat *ServiceHeartbeat) error {
now := heartbeat.Timestamp
if now.IsZero() {
now = time.Now().UTC()
}
switch heartbeat.ServiceType {
case "poller":
return r.recordPollerHeartbeat(ctx, heartbeat.PollerID, now, heartbeat.SourceIP)
case "agent":
return r.recordAgentHeartbeat(ctx, heartbeat.AgentID, heartbeat.PollerID, now, heartbeat.SourceIP)
case "checker":
return r.recordCheckerHeartbeat(ctx, heartbeat.CheckerID, heartbeat.AgentID, heartbeat.PollerID, now)
default:
return fmt.Errorf("unknown service type: %s", heartbeat.ServiceType)
}
}
func (r *ServiceRegistry) recordPollerHeartbeat(ctx context.Context, pollerID string, timestamp time.Time, sourceIP string) error {
// Update last_seen, activate if pending, set first_seen if null
query := `INSERT INTO pollers_registry (
poller_id, status, first_seen, last_seen, updated_at
) SELECT
?,
if(status = 'pending', 'active', status) AS new_status,
coalesce(first_seen, ?) AS new_first_seen,
? AS new_last_seen,
? AS updated_at
FROM pollers_registry
WHERE poller_id = ?
LIMIT 1`
err := r.db.Exec(ctx, query, pollerID, timestamp, timestamp, timestamp, pollerID)
if err != nil {
return fmt.Errorf("failed to record poller heartbeat: %w", err)
}
// Check if status changed to active
poller, _ := r.GetPoller(ctx, pollerID)
if poller != nil && poller.Status == ServiceStatusActive && poller.FirstSeen != nil && poller.FirstSeen.Equal(timestamp) {
// First activation
r.emitRegistrationEvent(ctx, "activated", "poller", pollerID, "", poller.RegistrationSource, "system")
}
return nil
}
IsKnownPoller (Replaces core logic)
const pollerCacheTTL = 5 * time.Minute
func (r *ServiceRegistry) IsKnownPoller(ctx context.Context, pollerID string) (bool, error) {
// Check cache first
r.pollerCacheMu.RLock()
if time.Now().Before(r.cacheExpiry) {
known, exists := r.pollerCache[pollerID]
r.pollerCacheMu.RUnlock()
if exists {
return known, nil
}
}
r.pollerCacheMu.RUnlock()
// Query database
query := `SELECT COUNT(*) FROM pollers_registry
WHERE poller_id = ? AND status IN ('pending', 'active')`
var count int
row := r.db.QueryRow(ctx, query, pollerID)
if err := row.Scan(&count); err != nil {
return false, fmt.Errorf("failed to check poller: %w", err)
}
known := count > 0
// Update cache
r.pollerCacheMu.Lock()
if time.Now().After(r.cacheExpiry) {
// Refresh entire cache
r.refreshPollerCache(ctx)
} else {
r.pollerCache[pollerID] = known
}
r.pollerCacheMu.Unlock()
return known, nil
}
func (r *ServiceRegistry) refreshPollerCache(ctx context.Context) {
query := `SELECT poller_id FROM pollers_registry WHERE status IN ('pending', 'active')`
rows, err := r.db.Query(ctx, query)
if err != nil {
r.logger.Warn().Err(err).Msg("Failed to refresh poller cache")
return
}
defer rows.Close()
newCache := make(map[string]bool)
for rows.Next() {
var pollerID string
if err := rows.Scan(&pollerID); err != nil {
continue
}
newCache[pollerID] = true
}
r.pollerCache = newCache
r.cacheExpiry = time.Now().Add(pollerCacheTTL)
}
Integration Points
1. Edge Onboarding Integration
In pkg/core/edge_onboarding.go:
func (s *edgeOnboardingService) CreatePackage(ctx context.Context, req *models.CreateEdgeOnboardingPackageRequest) (*models.EdgeOnboardingPackage, error) {
// ... existing package creation logic ...
// NEW: Register service in service registry
switch req.ComponentType {
case models.EdgeOnboardingComponentTypePoller:
err := s.serviceRegistry.RegisterPoller(ctx, ®istry.PollerRegistration{
PollerID: pollerID,
ComponentID: pkg.ComponentID,
RegistrationSource: registry.RegistrationSourceEdgeOnboarding,
Metadata: req.Metadata,
SPIFFEIdentity: req.DownstreamSPIFFEID,
CreatedBy: getUserFromContext(ctx),
})
if err != nil {
return nil, fmt.Errorf("failed to register poller: %w", err)
}
case models.EdgeOnboardingComponentTypeAgent:
err := s.serviceRegistry.RegisterAgent(ctx, ®istry.AgentRegistration{
AgentID: req.ComponentID,
PollerID: req.ParentID,
ComponentID: pkg.ComponentID,
RegistrationSource: registry.RegistrationSourceEdgeOnboarding,
Metadata: req.Metadata,
CreatedBy: getUserFromContext(ctx),
})
if err != nil {
return nil, fmt.Errorf("failed to register agent: %w", err)
}
case models.EdgeOnboardingComponentTypeChecker:
err := s.serviceRegistry.RegisterChecker(ctx, ®istry.CheckerRegistration{
CheckerID: req.ComponentID,
AgentID: req.ParentID,
CheckerKind: req.CheckerKind,
ComponentID: pkg.ComponentID,
RegistrationSource: registry.RegistrationSourceEdgeOnboarding,
Metadata: req.Metadata,
CreatedBy: getUserFromContext(ctx),
})
if err != nil {
return nil, fmt.Errorf("failed to register checker: %w", err)
}
}
return pkg, nil
}
2. Core Service Integration
In pkg/core/pollers.go:
// OLD:
func (s *Server) isKnownPoller(ctx context.Context, pollerID string) bool {
for _, known := range s.config.KnownPollers {
if known == pollerID {
return true
}
}
if s.edgeOnboarding != nil {
if s.edgeOnboarding.isPollerAllowed(ctx, pollerID) {
return true
}
}
return false
}
// NEW:
func (s *Server) isKnownPoller(ctx context.Context, pollerID string) bool {
// Backwards compatibility: check static config first
for _, known := range s.config.KnownPollers {
if known == pollerID {
return true
}
}
// Primary path: check service registry
if s.serviceRegistry != nil {
known, err := s.serviceRegistry.IsKnownPoller(ctx, pollerID)
if err != nil {
s.logger.Warn().Err(err).Msg("Failed to check service registry")
}
if known {
return true
}
}
return false
}
In pkg/core/services.go:
func (s *Server) registerServiceDevice(ctx context.Context, /* ... */) error {
// ... existing device registration logic ...
// NEW: Record service heartbeat
if s.serviceRegistry != nil {
heartbeat := ®istry.ServiceHeartbeat{
ServiceType: determineServiceType(agentID, serviceType),
PollerID: pollerID,
AgentID: agentID,
Timestamp: timestamp,
SourceIP: sourceIP,
Healthy: true,
}
if err := s.serviceRegistry.RecordHeartbeat(ctx, heartbeat); err != nil {
s.logger.Warn().Err(err).Msg("Failed to record service heartbeat")
}
}
// ... rest of existing logic ...
}
3. K8s SPIFFE Controller Integration
Add webhook to SPIRE Controller Manager:
When ClusterSPIFFEID is created, call ServiceRadar API to register service:
// In SPIRE controller reconciliation loop
func (r *ClusterSPIFFEIDReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ... existing SPIRE entry creation ...
// NEW: Register with ServiceRadar service registry
if isServiceRadarWorkload(spiffeID) {
err := r.serviceRadarClient.RegisterService(ctx, &RegisterServiceRequest{
ServiceID: extractServiceID(spiffeID),
ServiceType: extractServiceType(spiffeID),
RegistrationSource: "k8s_spiffe",
SPIFFEIdentity: spiffeID.String(),
Metadata: extractMetadata(instance),
})
if err != nil {
// Log but don't fail reconciliation
logger.Error(err, "Failed to register with ServiceRadar")
}
}
return ctrl.Result{}, nil
}
Migration Plan
Phase 1: Core Infrastructure (Week 1)
- Create
pkg/registry/service_registry.go - Define
ServiceManagerinterface - Implement data models
- Create database migration for registry tables
- Unit tests for ServiceRegistry
Phase 2: Basic Integration (Week 1-2)
- Integrate with edge onboarding package creation
- Add heartbeat recording from
ReportStatusRPC - Replace
isKnownPoller()to use service registry - Integration tests
Phase 3: API & UI (Week 2)
- Add REST API endpoints:
GET /api/admin/services/pollersGET /api/admin/services/agentsGET /api/admin/services/checkersGET /api/admin/services/{id}
- Add service registry dashboard to UI
- Show parent-child relationships
Phase 4: K8s Integration (Week 3)
- Add SPIRE Controller webhook for service registration
- Register K8s services automatically on
ClusterSPIFFEIDcreation - Test with demo namespace
Phase 5: Background Jobs & Deletion (Week 3)
- Implement
MarkInactive()background job - Implement
PurgeInactive()background job with configurable retention - Implement
DeleteService()for manual hard deletion - Add DELETE API endpoints for service removal
- Add alerting for services stuck in 'pending' state
- Add metrics collection (including purge stats)
Phase 6: Migration & Cleanup (Week 4)
- Backfill existing services from
servicesstream - Update all documentation
- Remove legacy poller tracking code from edge onboarding
- Performance tuning and optimization
Benefits
1. Centralized Service Discovery
- Single source of truth for all services in the system
- Clear answer to "what services are registered?"
- Historical audit trail
2. Pre-Registration Support
- Register agents/checkers before they start reporting
- Track deployment progress (pending → active)
- Validate configuration before installation
3. Lifecycle Management
- Track services from creation to retirement
- Automatic activation on first heartbeat
- Detect and alert on inactive services
4. Consistent Patterns
- Mirror proven device registry architecture
- Reuse testing/observability patterns
- Developer familiarity
5. Scalability
- Caching for high-frequency queries (
IsKnownPoller) - Batch operations for efficiency
- Background jobs for maintenance
Security Considerations
-
Authorization:
- Only admins can register services explicitly
- Heartbeats from authenticated connections only
- Audit all registration events
-
Data Validation:
- Validate parent references (agent → poller, checker → agent)
- Prevent duplicate registrations
- Sanitize metadata
-
SPIFFE Integration:
- Store SPIFFE IDs in registry
- Cross-reference with SPIRE server state
- Alert on mismatches
Future Enhancements
Service Dependencies
Track dependencies between services (e.g., agent depends on poller being healthy).
Service Mesh Integration
Export service registry to service mesh control plane.
Auto-Decommission
Automatically revoke services that have been inactive for extended periods.
Multi-Tenancy
Extend registry to support multi-tenant deployments with namespace isolation.
Conclusion
Extending pkg/registry to handle service registration provides a unified, authoritative system for tracking all pollers, agents, and checkers. This design:
- ✅ Mirrors proven device registry patterns
- ✅ Solves all gaps identified in onboarding review
- ✅ Minimal disruption to existing code
- ✅ Enables rich service discovery and lifecycle management
- ✅ Foundation for future enhancements
By treating services as first-class citizens alongside devices, we create a robust foundation for scaling ServiceRadar deployments.
References
docs/onboarding-review-2025.md- Gap analysispkg/registry/- Existing device registry- GH-1909: Edge onboarding: support agents and checkers
- GH-1915 / serviceradar-57: Create common onboarding library
- GH-1891: Implement zero-touch onboarding
Document created: November 1, 2025 Author: ServiceRadar Core Team Status: Design Proposal