WiseHosting
Reference

Worker & WSS protocol

Function-level reference for the worker agent, transport layer, and signed WSS envelope format.

Files: cmd/worker-agent/main.go, internal/worker/*, internal/wsproto/*, internal/logbus/bus.go, internal/httpx/tls.go.

cmd/worker-agent/main.go

Binary entry point. Loads config, constructs the agent, installs SIGINT/SIGTERM handler, then blocks on agent.Start().

  • main() — Bootstraps; spawns one goroutine for signal handling.
  • version package var — "dev" by default; overridden at link time via -ldflags.

internal/worker/agent.go

Core worker logic: registers with the control server over HTTPS, manages the WSS connection, executes deployment jobs by orchestrating git, podman, ss, nsenter/tc, and findmnt. Background loops cover heartbeats, container health, runtime log collection, and per-app stats.

Functions

  • NewAgent(cfg) *Agent — Allocates with derived ctx, secure HTTP client, 32-deep job channel.
  • (a) Start() error — Probes disk-quota support, registers, creates WSS client, launches heartbeatLoop / monitorLoop / jobLoop / jwtRefreshLoop / ws.Run (5 goroutines), blocks until ctx done.
  • (a) Stop() — Cancels context.
  • validateControlPlaneURL(base) error — Accepts https:// unconditionally; rejects http:// over the public internet but allows it when the host parses as a loopback or RFC 1918 IP (i.e. the WireGuard tunnel address). Used to keep API keys off the wire when running over a private mesh that already provides end-to-end encryption.
  • (a) register() error — POST worker metadata to /v1/workers/register; stores returned numeric worker ID plus the short-lived JWT and its token_expires_at via setJWT. The raw API key is included in the body so the control plane can validate sha256(rawKey) against the stored api_key_hash before issuing the JWT.
  • (a) setJWT(token, expires) — Stores a freshly minted token under tokenMu. Read by authToken on every WSS reconnect.
  • (a) authToken() string — Returns the current JWT if non-expired; otherwise falls back to the raw API key. Passed to wsClient as tokenFn so reconnects always pick up the latest credential without restarting the agent.
  • (a) refreshJWT() error — POSTs to /v1/workers/refresh-token with Authorization: Bearer <rawKey>, decodes {token, token_expires_at}, calls setJWT. The raw API key (not the expiring JWT) is the renewal credential — only the long-lived secret can mint new JWTs.
  • (a) jwtRefreshLoop() — Refreshes 2 minutes before jwtExpiresAt; retries every minute on failure or while waiting for the first JWT. Errors are logged but never fatal because authToken fails open to the raw API key.
  • readAll(resp) (string, error) — Drains response body.
  • (a) handleInbound(env) — Dispatches inbound envelopes; enqueues TypeJob to a.jobs (drops if full).
  • (a) jobLoop() — Drains a.jobs sequentially, calls runJob.
  • (a) runJob(job) — Validates app name, emits processing, dispatches to execute method, emits final result. Holds jobMu throughout.
  • (a) heartbeatLoop() — Ticks every 30 s, calls sendHeartbeat.
  • (a) sendHeartbeat() error — Collects resource usage, emits TypeHeartbeat.
  • (a) getResourceUsage() WorkerResourcespodman ps -q + podman stats --no-stream --format json.
  • (a) executeDeploy(jobID, deploymentID, payload) JobResult — Full pipeline: temp dir, git clone, optional commit checkout, generate or use Dockerfile, podman build --network=wisehosting-build --no-cache (default OCI isolation — no --isolation=chroot, no --network=host), reads back the produced image ID via podman image inspect --format {{.Id}} and passes that bare image ID to podman run instead of the tag (so any tampering between build and run causes the run to fail with an unresolvable reference). Then: remove old container, allocate port, podman run with all flags, wait for running state, optional bandwidth limit, prune old images.
  • (a) waitContainerRunning(id, timeout) bool — Polls podman inspect every 500 ms.
  • probeDiskQuotaSupport() bool — Inspects graph driver + filesystem to determine --storage-opt size= support.
  • authenticatedCloneURL(repoURL, token) (string, string) — Injects creds; returns authed + redacted.
  • scrubToken(b, token) []byte — Replaces token bytes with ***.
  • (a) monitorLoop() — Three tickers: health 30 s, runtime logs 3 s, app stats 10 s.
  • (a) collectAppStats() — Lists wisehosting-labelled containers, runs podman stats --no-stream, emits TypeAppStats.
  • lastLine(s) string — Last non-empty line of podman run stdout.
  • parseCPUPercent(s) float64, parseMemUsage(s) (int64, int64), parseHumanBytes(s) int64 — Parsers.
  • (a) checkContainers(tracker)podman ps -a + checkOne per entry.
  • (a) checkOne(name, tracker) — Inspects one; enforces OOM/disk-quota/crash-loop policies; emits violation + podman stop on offence; reports stopped after 30 s.
  • (a) reportAppStatus(appID, status) — Emits TypeAppStatus.
  • (a) reportViolation(v) — Emits TypeViolation.
  • (a) applyBandwidthLimit(containerID, rate) error — Resolves PID via podman inspect, uses nsenter + tc qdisc replace ... tbf inside container netns.
  • (a) pruneOldImages(appName, keepTag) — Removes app-labelled images whose tag is not keepTag.
  • (a) allocatePort() (int, error)ss -tlnH to enumerate listeners; random free port in configured range with wraparound.
  • (a) updateJobStatus(jobID, status, message) / updateJobProgress(...) — Emits TypeJobStatus.
  • (a) completeJob(jobID, result) — Emits TypeJobResult.
  • (a) executeRestart(...)podman stop + rm, then executeDeploy.
  • (a) executeStop(jobID, payload) JobResultpodman stop -t 5; tolerates "not found".
  • (a) executeDelete(jobID, payload) JobResultpodman stop + force rm + pruneOldImages.
  • (a) collectRuntimeLogs(cursors)podman logs --timestamps per container since cursor; emits TypeRuntimeLog batch.
  • splitPodmanLogLine(line) — RFC3339Nano timestamp + body.
  • (l) Write(p) (int, error)localhostStripper removes localhost/ prefixes from build output.
  • newSafeBuf() *safeBuf — Mutex-wrapped bytes.Buffer.
  • (s) WriteString / Write / String / Snapshot — Thread-safe wrappers; Snapshot returns string + length atomically.
  • (a) streamBuildLogsLoop(ctx, deploymentID, logs) — Ticks every 2 s; emits TypeBuildLog when buffer has grown.

Types

  • Agent — config, HTTP client, worker ID, ctx, port mutex, job mutex, disk-quota flag, token mutex (tokenMu sync.RWMutex), currentJWT string, jwtExpiresAt time.Time, WSS client, jobs channel.
  • appHealth — restart-count baseline, last violation time, stopped-since, reported flag.
  • safeBuf, localhostStripper — log buffer helpers.

internal/worker/transport.go

Worker's outbound WSS transport: connection lifecycle with exponential backoff, dedicated write loop with keep-alive pings, read loop with HMAC + clock-skew checks, sliding-window dedup.

  • newWSClient(baseURL, apiKey, tokenFn, handler) (*wsClient, error) — Parses URL, rewrites scheme to ws/wss, appends /v1/workers/ws; 64-deep send channel, 256-slot dedup map. Stores tokenFn for the Authorization header and derives the HMAC key from sha256(rawKey) so the worker and control plane independently arrive at the same signing key without ever transmitting it.
  • (c) Run() — Reconnect loop: connectAndServe with 1 s to 30 s backoff + jitter until ctx cancel.
  • (c) Stop() — Cancels ctx, force-closes current conn.
  • (c) connectAndServe() error — Dials with Authorization: Bearer <tokenFn()> (JWT preferred, raw key fallback) and TLS 1.3 plus NextProtos: ["http/1.1"] to defeat ALPN-driven HTTP/2 upgrades — a CDN that negotiates h2 for wss:// will silently break the upgrade handshake. Sets read limit, resets sequence, spawns writeLoop, runs readLoop synchronously.
  • (c) writeLoop(conn, done) — Dequeues envelopes, writes JSON; pings every wsPingPeriod; exits on ctx or write error.
  • (c) readLoop(conn) error — Reads, unmarshals, verifies HMAC + clock + dedup, calls handler.
  • (c) acceptSeq(seq) bool — Sliding window (capacity 1024, prune at gap > 256).
  • (c) Emit(msgType, payload) error — Increments outbound seq, builds signed envelope, non-blocking enqueue (5 s timeout).

Type

  • wsClient — URL, tokenFn func() string (returns JWT or raw API key per connect), apiKey []byte (32-byte sha256(rawKey) HMAC key), live *Conn under mutex, send channel, atomic outbound seq, dedup map, handler, ctx.

Constants: handshake 10 s, write 10 s, pong 90 s, ping 72 s.


internal/wsproto/envelope.go

Signed time-stamped WSS envelope format + crypto.

  • NewID() string — 16 random bytes hex-encoded.
  • canonical(e) []bytetype|id|seq|ts|payload.
  • Sign(e, key) — HMAC-SHA256 stored hex in e.HMAC.
  • Verify(e, key) error — Constant-time HMAC compare.
  • ValidateClock(e, now) error — Reject if skew > 5 min.
  • Build(msgType, seq, payload, key) (*Envelope, error) — Marshals payload, fills envelope, signs.
  • (e) Decode(out) error — JSON-decodes via LimitReader with DisallowUnknownFields.

Envelope { t, i, s, ts, p, h }. Constants: MaxMessageBytes = 2 MiB, MaxClockSkew = 5 min. Type constants: TypeRegister, TypeRegistered, TypeHeartbeat, TypeJob, TypeJobAck, TypeJobStatus, TypeJobResult, TypeAppStatus, TypeViolation, TypeRuntimeLog, TypeBuildLog, TypeAppStats, TypeError.


internal/wsproto/payloads.go

Concrete payload structs:

  • RegisterPayload, RegisteredPayload
  • HeartbeatPayload (status + WorkerResources)
  • JobPayload (job ID, type, app ID, deployment ID, full DeployPayload)
  • JobAckPayload, JobStatusPayload, JobResultPayload
  • AppStatusPayload (alias of models.AppStatusReport)
  • ViolationPayload (alias of models.ViolationReport)
  • RuntimeLogLine, RuntimeLogPayload, BuildLogPayload
  • AppStatLine, AppStatsPayload
  • ErrorPayload

internal/wsproto/limit_reader.go

  • newLimitReader(b, max)bytes.Reader wrapped in io.LimitReader; used only by Envelope.Decode.

internal/logbus/bus.go

In-process per-app log ring buffer. Eviction by count and by age.

  • New(maxKeepPerApp, maxAge) — Defaults 1000 lines / 1 hour if zero.
  • (b) Push(appID, stream, msg, at) — Appends to ring; truncates if over maxKeep. Holds write lock.
  • (b) Tail(appID, sinceSeq, limit) []Line — Lines newer than sinceSeq and within maxAge, capped to last limit.
  • (b) TailMulti(appIDs, sinceSeq, limit) []Line — Multi-app merge sorted by sequence.
  • (b) Drop(appID) — Removes entire ring.

Types: Bus (RWMutex map to ring), ring, Line { Seq, AppID, Stream, Msg, At }.


internal/httpx/tls.go

  • NewSecureClient(timeout) *http.Client — TLS 1.3 minimum, HTTP/2 forced, 50/10 conn pool, 90 s idle, 10 s TLS handshake, 30 s response header, 10 s dial. Used by the worker for register/refresh and any HTTPS clone fallback.
  • NewWebhookClient(timeout) *http.Client — Same TLS posture plus a DialContext that resolves the target URL up-front and refuses to dial loopback / link-local / RFC 1918 / unique-local / multicast addresses. Returns the opaque error "webhook URL resolves to a private or reserved IP address" so attackers can't fingerprint internal topology by URL probing. Used by the control plane's outbound webhook dispatcher; mirrored here because the worker's HTTP client construction lives in this package.
  • NewSecureTLSConfig() *tls.Config — Bare MinVersion: VersionTLS13. Used standalone (e.g. by the WSS hub).
  • isPrivateIP(ip) bool — Internal helper backing the SSRF guard.

On this page