Control plane
Function-level reference for main.go, the API server, scheduler, alert manager, and usage recorder.
Files: main.go, internal/api/*, internal/scheduler/scheduler.go, internal/alerts/*, internal/usage/recorder.go.
main.go
Application entry point. Wires every subsystem in dependency order: loads configuration, opens and configures the database, applies optional plan overrides, constructs the scheduler, log bus, webhook dispatcher, alert manager + threshold poller, usage recorder, API server, and web handler, then blocks on an OS signal before performing an orderly shutdown.
Functions
main()— Bootstraps and runs the entire platform; starts the scheduler, worker-health monitor, API server, alert threshold poller, and usage recorder as goroutines, then waits forSIGINT/SIGTERM. Side effects: spawns goroutines forsched.Start,sched.MonitorWorkerHealth,server.Run,thresholdPoller.Start,usageRecorder.Start, plus a 6-hour ticker that callsdb.PruneRevokedSessions(30d)anddb.PrunePendingTOTPs(); registers theHub.PushJobcallback with the scheduler; wires the webhook publisher into the threshold poller; defersStop()on each background subsystem and database close.
internal/api/server.go
Defines the HTTP API server, its Gin router setup, authentication middleware, and the REST endpoints (/health, /v1/workers/register, /v1/stats). Also exposes GenerateAPIKey.
Type
Server— Holds the Gin router, DB, scheduler, config, log bus, and the WSHub.
Functions
NewServer(db, sched, cfg, bus, publisher) *Server— Constructs a server, sets Gin to release mode, registers a logger that skips/v1/workers/wsand/health, creates the Hub (passingcfg.APIServerSecret()so it can mint/verify worker JWTs), and registers routes.(s *Server) Hub() *Hub— Returns the WS hub.(s *Server) setupRoutes()— RegistersGET /health,POST /v1/workers/register(controlAuth),POST /v1/workers/refresh-token(controlAuth — Bearer raw API key),GET /v1/workers/ws,GET /v1/traefik/config(workerAuth — per-worker legacy endpoint),GET /v1/traefik/proxy-config(proxy bearer token auth — new endpoint for the proxy server),GET /v1/stats(controlAuth).(s *Server) Run(addr string) error— Starts HTTP or TLS 1.3 listener.(s *Server) Router() *gin.Engine— Returns the engine for external mounting (used byweb.Handler).extractBearer(header) string— Strips"Bearer "prefix.(s *Server) controlAuth(c *gin.Context)— Constant-time compare of bearer againstapi_server.secret; aborts with 401 on mismatch.(s *Server) healthCheck(c)— Returns{"status":"ok","time":...}.(s *Server) registerWorker(c)— Validatesapi_key, callsdb.UpsertWorker(stores hashed key only), then mints a 15-minute worker JWT viasignWorkerJWT. Response:{worker, token, token_expires_at}.(s *Server) refreshWorkerToken(c)— Bearer is the raw long-lived API key; resolves the worker via the hashed lookup and returns a fresh JWT + expiry. Workers call this ~2 minutes before their current token expires.(s *Server) getStats(c)— Returnsscheduler.GetWorkerStats()aggregate.GenerateAPIKey(byteLen) (string, error)— Hex-encodedcrypto/randbytes.
internal/api/ws.go
WebSocket hub and per-connection lifecycle for worker nodes. Handles upgrade/auth, maintains the active workerConn map, and provides read/write loops, sequence dedup, log rate-limiting, and outbound job delivery.
Types
WebhookPublisher— interface withPublish(e webhooks.Event).Hub— registry of live worker connections (conns map[int]*workerConn) undersync.RWMutex; holds a*statsStore.workerConn— single authenticated WS connection:sendchannel,donechannel,atomic.Bool closed, outboundoutSeq, inbound dedup state, log-token-bucket state.
Constants
wsWriteWait 10 s, wsPongWait 90 s, wsPingPeriod 72 s, log ingress 256 KiB/s sustained / 1 MiB burst, max line 8 KiB.
Functions
NewHub(db, sched, bus, publisher, alertMgr, masterSecret) *Hub— Allocates a Hub with a 45-second TTL stats store.masterSecretisapi_server.secret; the hub uses it to verify worker JWTs presented at WSS upgrade.(h *Hub) AppStats(appID) (cpuPct, memBytes, memLimit, updatedAt, ok)— Reads cached snapshot.(h *Hub) AppStatsLive(appID) (LiveAppStats, ok)— Rich snapshot (CPU%, mem, mem-limit, net Mbps, net total bytes, disk Mbps).(h *Hub) PushJob(workerID, jobID)— Looks up live conn under RLock, callssendJob.(h *Hub) resolveAuthForUpgrade(token) (*Worker, []byte hmacKey, ok bool)— Two-path auth. Iftokenhas the JWT shape (two dots),verifyWorkerJWTovermasterSecret, thenFindWorkerByID(claims.Sub), then a constant-time check thatclaims.KeyHash == w.APIKeyHashso a leaked JWT becomes useless after key rotation; the HMAC signing key for envelopes is recovered by hex-decodingAPIKeyHash(the raw 32-byte sha256 output). Else falls back to the raw API key path:FindOnlineWorkerByAPIKey+HashAPIKeyBytes(token)for the HMAC key.(h *Hub) handleUpgrade(c)— Reads the bearer token fromAuthorization. CallsresolveAuthForUpgrade(accepts JWT or raw key). Marks worker online if it was offline, upgrades HTTP to WS, replaces any existing conn for that worker, launcheswriteLoop,readLoop,replayPendingJobs.(h *Hub) deregister(wc)— Removes conn fromh.connsif it is still current.(wc *workerConn) close(reason)— Idempotent shutdown viaatomic.Bool; cancels ctx, closesdone, closes WS, deregisters.(wc *workerConn) writeLoop()— Drainssend, sends periodic pings.(wc *workerConn) readLoop()— Reads frames, verifies HMAC, validates clock, checks sequence, callsdispatch.(wc *workerConn) acceptSeq(seq) bool— Sliding-window dedup (256 entries, prune at 1024).(wc *workerConn) writeEnvelope(env) error— Non-blocking enqueue with 5 s timeout.(wc *workerConn) emit(msgType, payload) error— Builds signed envelope, enqueues.(wc *workerConn) sendError(inReplyTo, code, msg)— Emits aTypeError.(wc *workerConn) sendJob(jobID) error— Loads job from DB, decodesDeployPayload, emitsTypeJob.(wc *workerConn) replayPendingJobs()— QueriesNextAssignedJobfor the worker andsendJobif any.stripANSI(s) string— Removes ANSI escapes and null bytes.capLine(s) string— Truncates a string to 8 KiB.(wc *workerConn) allowLogBytes(n) bool— Token-bucket rate limiter.
internal/api/ws_dispatch.go
Message-type dispatcher and inbound handler methods for the worker WSS protocol.
(wc) dispatch(env)— Routes envelope byType; sendsunknown_typeerror otherwise.(wc) handleAppStats(env)— DecodesAppStatsPayload; verifies app belongs to this worker; stores snapshot inhub.stats.(wc) handleHeartbeat(env)— Decodes payload; callsdb.UpdateWorkerHeartbeat.(wc) handleJobStatus(env)— Decodes payload; callsdb.UpdateJobStatus.(wc) handleJobResult(env)— Decodes, sanitises logs, callsdb.CompleteJob, releases worker resources via scheduler, invalidates app cache, emits webhook event, spawnsreplayPendingJobs.(wc) emitJobEvent(job, p, deploy)— PublishesEventDeploymentSucceeded/Failedfordeploy/restartonly.(wc) handleAppStatus(env)— On"stopped", marks app stopped + publishesEventAppStopped.(wc) handleViolation(env)— Marks app violated, invalidates cache, publishesEventAppViolation.(wc) handleRuntimeLog(env)— Rate-checks, decodes, caps lines at 5000, verifies app ownership per line, sanitises, pushes to log bus.(wc) handleBuildLog(env)— Rate-checks, decodes, callsdb.SetDeploymentBuildLogs.capLogs(s, max) string— Whenlen(s) > max, keeps the firstmax/2and lastmax/2bytes joined with a...[truncated]...marker — preserves both the early failure context and the end of the run rather than only the tail.
internal/api/traefik.go
Traefik HTTP-provider endpoints. Two endpoints exist: traefikConfig (legacy, per-worker) and proxyConfig (new, for the proxy server).
Types
traefikDynamicConfig,traefikHTTP,traefikRouter,traefikService,traefikLB,traefikServer— JSON shape Traefik consumes on each poll. Only emitted fields are typed.
Functions
(s) workerAuth(c)— Bearer-token middleware that resolves either a JWT or the raw long-lived API key (the Traefik HTTP provider can't refresh, so it presents the raw key). Stashes the resolved*Workerinc.(s) resolveWorkerToken(token) *Worker— JWT path (3 dot-separated segments) verifies viaverifyWorkerJWTand binds againstAPIKeyHash; raw-key path hashes once and constant-time-compares. Returns nil on any failure.(s) traefikConfig(c)— Legacy per-worker endpoint. For each app on this worker that has at least one verified custom domain, emits one router (Host(\a`) || Host(`b`) …) and one service pointing at the app's container hostname on the podman network (http://APP_NAME:INTERNAL_PORT`).(s) proxyConfig(c)— Proxy server endpoint (GET /v1/traefik/proxy-config). Authenticates via a static Bearer token compared againstcfg.ProxyToken()(constant-time). Callsdb.ListRunningAppsWithWorkers()to enumerate all running apps that have a worker with a non-emptyWireGuardIP. For each app, emits one router with ruleHost(`slug.base`) || Host(`custom.domain`)(wildcard subdomain + any verified custom domains), one service pointing athttp://WORKER_WG_IP:CONTAINER_PORT(direct WireGuard upstream),entryPoints: ["web", "websecure"], andtls.certResolver = "letsencrypt". Responses are cached in-memory for 5 seconds per worker to reduce DB load during the 5-second Traefik poll interval.internalPort(env) int— Mirrorsworker/agent.go:appPort; honoursPORTenv if it parses, otherwise 8080.buildHostRule(hosts) string— RendersHost(\a`) || Host(`b`)from a slice of validated hostnames (already passedvalidateHostname` upstream so backtick injection is impossible).safeRouterName(name) string— Belt-and-suspenders: keeps router/service names ASCII-safe. App slugs already meet the rule.
internal/api/worker_jwt.go
Short-lived bearer tokens scoped to a specific worker. Reduces the blast radius of a leaked Authorization header by bounding useful lifetime to ~15 minutes — far shorter than the long-lived API key that mints them. Signing key is HKDF-derived from the master secret so it rotates whenever the master secret rotates.
Constants
workerJWTTTL = 15 * time.MinuteworkerJWTRefreshSkew = 2 * time.Minute— workers refresh this far before expiryworkerJWTKeyLen = 32workerJWTHKDFPurpose = "wisehosting-worker-jwt-v1"
Type
workerJWTClaims { Sub int (worker ID), KeyHash string (sha256 hex of API key — binds JWT to current key), Iat, Exp int64, Jti string }
Functions
hkdfKey(masterSecret, purpose, n) []byte— Local HKDF-SHA256 deriver with a purpose-specific info label so JWT-signing, AES, and OAuth-state keys never collide.signWorkerJWT(masterSecret, workerID, apiKeyHash) (token, exp, err)— HS256, header{"alg":"HS256","typ":"JWT"}, base64url-no-pad segments. Random 12-byte JTI.verifyWorkerJWT(masterSecret, token) (*workerJWTClaims, error)— Splits on., checks segment count, HMAC-compares signature in constant time, parses claims, rejects onnow ≥ exp.
internal/api/stats_store.go
Thread-safe in-memory cache for per-app CPU/memory snapshots. Lazy GC on Put.
Type — AppStatSnapshot { CPUPct, MemBytes, MemLimit, UpdatedAt }, statsStore (RWMutex map).
newStatsStore(ttl) *statsStore— Pre-sized map.(s) Put(appID, snap)— Writes; sweeps stale entries when TTL has passed since last GC.(s) Get(appID) (snap, ok)— Returns false if missing or stale.(s) DeleteApp(appID)— Removes entry.
internal/scheduler/scheduler.go
Job scheduling engine: picks pending jobs, assigns them inside a serialisable transaction to the lowest-utilisation eligible worker, recovers stuck jobs, and monitors worker heartbeats. Failed jobs stay failed — there is no automatic retry path; users redeploy from the UI.
Type
Scheduler—*GormDB, pollinginterval, cancellable ctx,onJobAssignedcallback.
Functions
NewScheduler(db, interval) *Scheduler— Allocates with derived ctx.(s) SetJobAssignedCallback(fn)— Used bymainto wirehub.PushJob.(s) SelectAvailableWorker(resources) (workerID, zone, err)— Read-only weighted-score query (CPU%, memory%, running-app count); no DB writes.(s) Start()— Ticks atinterval:processPendingJobs,recoverStuckJobs.(s) Stop()— Cancels context.(s) processPendingJobs() error— OpensSELECT ... FOR UPDATE SKIP LOCKEDtx, picks up to 100 pending jobs ordered by priority/age, assigns or honours pre-setWorkerID. After commit, firesonJobAssignedfor each.(s) selectWorkerTx(tx, required) (id, err)— Same scoring inside an existing tx.(s) assignJobTx(tx, jobID, workerID, resources) error— Atomically incrementsused_cpu/used_memory(with capacity guard), sets jobassigned.(s) ReleaseWorkerResources(workerID, resources) error— Decrements counters withGREATEST(..., 0)guards.(s) recoverStuckJobs() error— Marksassigned/processingjobs older than 15 min asfailed. Stays failed — no retry.(s) MonitorWorkerHealth()— Ticks every minute, callscheckWorkerHealth.(s) checkWorkerHealth() error— Marks online workers withlast_heartbeatolder than 2 min as offline; bulk-stops their running apps.(s) GetWorkerStats() (map, err)— Aggregates total/online/offline workers + total and used CPU/memory.IsRecordNotFound(err) bool— Wrapserrors.Is(err, gorm.ErrRecordNotFound).
internal/alerts/manager.go
Alert manager: emits new alert rows, deduplicates against an active alert with the same (user_id, source_type, source_id), resolves them when the underlying condition clears.
Types
Spec { UserID, AppID, Kind, Severity, Source, SourceID, Title, Message }— input forEmit.Manager— wraps*GormDB.
Functions
NewManager(db) *Manager.(m) Emit(s Spec)— UPSERT-like: returns the existing active alert if(user, source_type, source_id)already fires; otherwise inserts a new row.(m) ResolveBySource(userID int, sourceType, sourceID string)— flips matching active alerts toresolvedand stampsresolved_at.
internal/alerts/threshold.go
Per-app threshold poller. Runs every 30s, evaluates each enabled AppAlertRule against the live stats snapshot from the WSS hub, and fires/resolves with a configurable sustain window.
Types
Snapshot { CPUPct, MemBytes, MemLimit, NetMBps, NetTotalBytes, DiskMBps, UpdatedAt }— what the poller needs from each running app.StatsReader—AppStatsLive(appID) (Snapshot, ok).ThresholdPoller— wraps DB, stats source, manager, optional webhook publisher.
Functions
NewThresholdPoller(db, stats, mgr) *ThresholdPoller.(p) SetPublisher(pub)— wires webhook dispatch (optional; alerts still emit rows without it).(p) Start()/(p) Stop()— goroutine lifecycle.(p) loop()— 30s ticker.(p) tick()— loads running apps + their rules, evaluates each.(p) evaluateMetric(app, rule, val, now)— handlescpu/memory/network/diskwith hysteresis (sustain over → fire; sustain under → resolve).(p) evaluateOffline(app, rule, snap, snapOK, now)— same hysteresis but driven off heartbeat staleness (no metric value).metricValue(kind, snap, ok)/metricEventName(kind)/metricTitle(kind, app)/metricMessage(kind, val, threshold, sustainMin)— helpers shared by fire paths.
The fire path inserts an alerts row via Manager.Emit and (if a publisher is wired) calls Publish with one of app.cpu, app.memory, app.network, app.disk, app.offline. Event-driven kinds (crashloop, deployment_failed) come from elsewhere — the dispatcher and worker — and bypass this poller.
internal/usage/recorder.go
Background sampler that writes per-app live stats into 5-minute usage_samples buckets, then prunes rows older than 90 days once a day.
Constants
BucketSize = 5 * time.Minuteretention = 90 * 24 * time.Hour
Types
LiveStats—AppStatsLive(appID) (LiveSnapshot, ok)interface.LiveSnapshot { CPUPct, MemBytes, NetMBps, NetTotalBytes }— per-tick reading.Recorder— owns the in-memory accumulator, ticker, prune loop.acc— running totals for the current bucket: avg/max for cpu/mem/net, last-seenNetTotalBytesfor delta computation.
Functions
New(db, stats) *Recorder.(r) Start()— kicks off the 60s sample loop and the daily pruner.(r) Stop()— cancels both.(r) sample(now)— pulls every running app from the stats source, updates the accumulator, and on bucket rollover callscommit.(r) commit(appID, bucket, a)— UPSERTs oneusage_samplesrow.buildSample(appID, userID, bucket, a)— derivesnet_bytes_deltafromNetTotalByteschange, fills*_avg/*_maxcolumns.bucketStart(t)— UTC-truncate to 5 minutes.(r) pruneLoop()/(r) prune()— daily DELETE WHERE bucket_start < now-90d.
internal/webhooks/formatters.go
Renders compact {emoji} {title} — {app}\nhttps://{subdomain} strings for Shoutrrr-style targets where signed JSON is the wrong fit (Discord, Slack, ntfy, …).
formatShoutrrrMessage(ev) string— entry point used by the dispatcher when the URL is a Shoutrrr scheme.eventEmoji(name)/eventTitle(name)— closed lookup tables.short(sha)— 7-char SHA prefix.firstLine(s)— strips multi-line commit messages down to a chat-friendly title.