Deep Dive: Protocol Implementations
Crankfire supports four protocols: HTTP, WebSocket, SSE (Server-Sent Events), and gRPC. Each protocol has a dedicated client implementation that abstracts protocol-specific behavior while conforming to the common Requester interface.
Overview
graph TB
subgraph "Requester Interface"
req[Requester Do ctx error]
end
subgraph "Protocol Implementations"
http[HTTP Requester]
ws[WebSocket Requester]
sse[SSE Requester]
grpc[gRPC Requester]
end
subgraph "Client Libraries"
httpClient[net/http]
wsClient[gorilla/websocket]
sseClient[SSE Client]
grpcClient[google.golang.org/grpc]
end
req --> http
req --> ws
req --> sse
req --> grpc
http --> httpClient
ws --> wsClient
sse --> sseClient
grpc --> grpcClient
HTTP Protocol
Architecture
The HTTP implementation uses Go’s standard net/http package with connection pooling:
classDiagram
class httpRequester {
-Client client
-RequestBuilder builder
-Collector collector
-baseRequesterHelper helper
+Do(ctx) error
}
class Client {
-http.Client httpClient
-Duration timeout
+Execute(req) (Response, Duration, error)
}
class RequestBuilder {
-string method
-string url
-map headers
-string body
-AuthProvider auth
-Feeder feeder
+Build(ctx) (*http.Request, error)
}
httpRequester --> Client
httpRequester --> RequestBuilder
Request Execution
// cmd/crankfire/http_requester.go
type httpRequester struct {
client *httpclient.Client
builder *httpclient.RequestBuilder
collector *metrics.Collector
helper baseRequesterHelper
}
func (r *httpRequester) Do(ctx context.Context) error {
// Build request with placeholders resolved
req, err := r.builder.Build(ctx)
if err != nil {
return err
}
// Execute with timing
resp, latency, err := r.client.Execute(req)
// Record metrics
meta := &metrics.RequestMetadata{
Protocol: "http",
}
if err != nil {
var statusErr *httpclient.StatusError
if errors.As(err, &statusErr) {
meta.StatusCode = strconv.Itoa(statusErr.Code)
}
}
r.collector.RecordRequest(latency, err, meta)
return err
}
Request Builder
The builder handles placeholder substitution and authentication:
// internal/httpclient/builder.go
type RequestBuilder struct {
method string
url string
headers map[string]string
body string
bodyFile string
auth auth.Provider
feeder feeder.Feeder
}
func (b *RequestBuilder) Build(ctx context.Context) (*http.Request, error) {
// Get feeder data if available
var record feeder.Record
if b.feeder != nil {
record, _ = b.feeder.Next(ctx)
}
// Merge with variables from context
store := variables.FromContext(ctx)
if store != nil {
record = store.Merge(record)
}
// Resolve placeholders in URL and body
url := placeholders.Replace(b.url, record)
body := placeholders.Replace(b.body, record)
// Create request
req, err := http.NewRequestWithContext(ctx, b.method, url, strings.NewReader(body))
if err != nil {
return nil, err
}
// Apply headers with placeholder resolution
for key, value := range b.headers {
req.Header.Set(key, placeholders.Replace(value, record))
}
// Inject auth token
if b.auth != nil {
b.auth.InjectHeader(ctx, req)
}
return req, nil
}
Connection Pooling
The HTTP client uses Go’s built-in connection pool:
// internal/httpclient/client.go
func NewClient(timeout time.Duration) *Client {
return &Client{
httpClient: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
},
}
}
func (c *Client) Execute(req *http.Request) (*Response, time.Duration, error) {
start := time.Now()
resp, err := c.httpClient.Do(req)
latency := time.Since(start)
if err != nil {
return nil, latency, err
}
defer resp.Body.Close()
// Read body for extraction
body, _ := io.ReadAll(resp.Body)
// Check status code
if resp.StatusCode >= 400 {
return nil, latency, &StatusError{Code: resp.StatusCode, Body: body}
}
return &Response{StatusCode: resp.StatusCode, Body: body}, latency, nil
}
WebSocket Protocol
Architecture
The WebSocket implementation uses gorilla/websocket for full-duplex messaging:
classDiagram
class wsRequester {
-Config config
-Collector collector
+Do(ctx) error
}
class Client {
-string url
-http.Header headers
-Dialer dialer
-Conn conn
-ClientMetrics metrics
+Connect(ctx) error
+SendMessage(ctx, msg) error
+ReceiveMessage(ctx) (Message, error)
+Close() error
+Metrics() Metrics
}
class Message {
+int Type
+[]byte Data
}
wsRequester --> Client
Client --> Message
Request Execution
Each Do() call represents a full WebSocket session:
// cmd/crankfire/websocket_requester.go
func (r *wsRequester) Do(ctx context.Context) error {
start := time.Now()
// Create client with resolved URL
url := r.resolveURL(ctx)
client := websocket.NewClient(websocket.Config{
URL: url,
Headers: r.headers,
HandshakeTimeout: r.config.HandshakeTimeout,
})
// Connect
if err := client.Connect(ctx); err != nil {
r.recordMetrics(time.Since(start), err, client.Metrics())
return err
}
defer client.Close()
// Send messages
for _, msgData := range r.config.Messages {
msg := websocket.Message{
Type: websocket.TextMessage,
Data: []byte(r.resolvePlaceholders(msgData, ctx)),
}
if err := client.SendMessage(ctx, msg); err != nil {
r.recordMetrics(time.Since(start), err, client.Metrics())
return err
}
// Wait for response
if _, err := client.ReceiveMessage(ctx); err != nil {
r.recordMetrics(time.Since(start), err, client.Metrics())
return err
}
// Optional interval between messages
if r.config.MessageInterval > 0 {
time.Sleep(r.config.MessageInterval)
}
}
r.recordMetrics(time.Since(start), nil, client.Metrics())
return nil
}
WebSocket Client
// internal/websocket/websocket.go
type Client struct {
url string
headers http.Header
dialer *websocket.Dialer
conn *websocket.Conn
metrics *clientmetrics.ClientMetrics
mu sync.Mutex
}
func (c *Client) Connect(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
conn, resp, err := c.dialer.DialContext(ctx, c.url, c.headers)
if err != nil {
c.metrics.IncrementErrors()
return fmt.Errorf("dial failed: %w", err)
}
c.conn = conn
c.metrics.MarkConnected()
return nil
}
func (c *Client) SendMessage(ctx context.Context, msg Message) error {
c.mu.Lock()
defer c.mu.Unlock()
if err := c.conn.WriteMessage(msg.Type, msg.Data); err != nil {
c.metrics.IncrementErrors()
return err
}
c.metrics.IncrementSent(int64(len(msg.Data)))
return nil
}
func (c *Client) ReceiveMessage(ctx context.Context) (Message, error) {
// Set read deadline from context
if deadline, ok := ctx.Deadline(); ok {
c.conn.SetReadDeadline(deadline)
}
msgType, data, err := c.conn.ReadMessage()
if err != nil {
c.metrics.IncrementErrors()
return Message{}, err
}
c.metrics.IncrementReceived(int64(len(data)))
return Message{Type: msgType, Data: data}, nil
}
WebSocket Metrics
type Metrics struct {
ConnectionDuration time.Duration
MessagesSent int64
MessagesReceived int64
BytesSent int64
BytesReceived int64
Errors int64
}
SSE Protocol
Architecture
SSE (Server-Sent Events) is a one-way streaming protocol:
classDiagram
class sseRequester {
-Config config
-Collector collector
+Do(ctx) error
}
class Client {
-string url
-http.Header headers
-http.Client httpClient
-http.Response resp
-bufio.Reader reader
-ClientMetrics metrics
+Connect(ctx) error
+ReadEvent(ctx) (Event, error)
+Close() error
+Metrics() Metrics
}
class Event {
+string ID
+string Event
+string Data
}
sseRequester --> Client
Client --> Event
Request Execution
Each Do() call connects and reads events:
// cmd/crankfire/sse_requester.go
func (r *sseRequester) Do(ctx context.Context) error {
start := time.Now()
// Apply timeout context
if r.config.ReadTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.config.ReadTimeout)
defer cancel()
}
// Create client
client := sse.NewClient(sse.Config{
URL: r.resolveURL(ctx),
Headers: r.headers,
})
// Connect
if err := client.Connect(ctx); err != nil {
r.recordMetrics(time.Since(start), err, client.Metrics())
return err
}
defer client.Close()
// Read events
eventCount := 0
for {
event, err := client.ReadEvent(ctx)
if err != nil {
// Connection closed or timeout is expected
if ctx.Err() != nil {
break
}
r.recordMetrics(time.Since(start), err, client.Metrics())
return err
}
eventCount++
// Check max events limit
if r.config.MaxEvents > 0 && eventCount >= r.config.MaxEvents {
break
}
}
r.recordMetrics(time.Since(start), nil, client.Metrics())
return nil
}
SSE Client
// internal/sse/sse.go
type Client struct {
url string
headers http.Header
httpClient *http.Client
resp *http.Response
reader *bufio.Reader
metrics *clientmetrics.ClientMetrics
eventsRecv int64
mu sync.Mutex
}
func (c *Client) Connect(ctx context.Context) error {
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
// SSE-specific headers
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Connection", "keep-alive")
// Copy custom headers
for key, values := range c.headers {
for _, value := range values {
req.Header.Add(key, value)
}
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return &StatusError{Code: resp.StatusCode}
}
c.resp = resp
c.reader = bufio.NewReader(resp.Body)
c.metrics.MarkConnected()
return nil
}
func (c *Client) ReadEvent(ctx context.Context) (Event, error) {
event := Event{}
var dataLines []string
for {
// Check context cancellation
select {
case <-ctx.Done():
return Event{}, ctx.Err()
default:
}
line, err := c.reader.ReadString('\n')
if err != nil {
return Event{}, err
}
c.metrics.IncrementReceived(int64(len(line)))
line = strings.TrimRight(line, "\r\n")
// Empty line marks end of event
if line == "" {
if len(dataLines) > 0 {
event.Data = strings.Join(dataLines, "\n")
c.eventsRecv++
return event, nil
}
continue
}
// Parse SSE field
if strings.HasPrefix(line, ":") {
continue // Comment
}
colonIdx := strings.Index(line, ":")
if colonIdx == -1 {
continue
}
field := line[:colonIdx]
value := strings.TrimPrefix(line[colonIdx+1:], " ")
switch field {
case "id":
event.ID = value
case "event":
event.Event = value
case "data":
dataLines = append(dataLines, value)
}
}
}
gRPC Protocol
Architecture
The gRPC implementation uses dynamic proto compilation:
classDiagram
class grpcRequester {
-Config config
-Collector collector
-Client client
-MethodDescriptor method
+Do(ctx) error
}
class Client {
-string target
-grpc.ClientConn conn
-string service
-string method
-metadata.MD md
-ClientMetrics metrics
+Connect(ctx) error
+Invoke(ctx, req, resp) error
+Close() error
+Metrics() Metrics
}
grpcRequester --> Client
Request Execution
// cmd/crankfire/grpc_requester.go
func (r *grpcRequester) Do(ctx context.Context) error {
start := time.Now()
// Connect if not connected
if !r.connected {
if err := r.client.Connect(ctx); err != nil {
r.recordMetrics(time.Since(start), err, r.client.Metrics())
return err
}
r.connected = true
}
// Resolve message placeholders
messageJSON := r.resolvePlaceholders(r.config.Message, ctx)
// Parse JSON to dynamic message
req := dynamicpb.NewMessage(r.method.Input())
if err := protojson.Unmarshal([]byte(messageJSON), req); err != nil {
r.recordMetrics(time.Since(start), err, r.client.Metrics())
return err
}
// Create response message
resp := dynamicpb.NewMessage(r.method.Output())
// Apply timeout
if r.config.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.config.Timeout)
defer cancel()
}
// Invoke RPC
if err := r.client.Invoke(ctx, req, resp); err != nil {
r.recordMetrics(time.Since(start), err, r.client.Metrics())
return err
}
r.recordMetrics(time.Since(start), nil, r.client.Metrics())
return nil
}
gRPC Client
// internal/grpcclient/client.go
type Client struct {
target string
conn *grpc.ClientConn
service string
method string
md metadata.MD
useTLS bool
insecure bool
metrics *clientmetrics.ClientMetrics
mu sync.Mutex
}
func (c *Client) Connect(ctx context.Context) error {
var opts []grpc.DialOption
if c.useTLS {
if c.insecure {
// TLS without verification
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
// TLS with system CA
creds := credentials.NewClientTLSFromCert(nil, "")
opts = append(opts, grpc.WithTransportCredentials(creds))
}
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.NewClient(c.target, opts...)
if err != nil {
return err
}
c.conn = conn
return nil
}
func (c *Client) Invoke(ctx context.Context, req, resp proto.Message) error {
// Add metadata to context
if len(c.md) > 0 {
ctx = metadata.NewOutgoingContext(ctx, c.md)
}
// Marshal request for metrics
reqBytes, _ := proto.Marshal(req)
// Build full method path
fullMethod := fmt.Sprintf("/%s/%s", c.service, c.method)
// Invoke RPC
err := c.conn.Invoke(ctx, fullMethod, req, resp)
// Record metrics
c.metrics.IncrementSent(int64(len(reqBytes)))
if err != nil {
c.metrics.IncrementErrors()
return err
}
respBytes, _ := proto.Marshal(resp)
c.metrics.IncrementReceived(int64(len(respBytes)))
return nil
}
Dynamic Proto Compilation
// Using jhump/protoreflect for runtime proto parsing
func compileProto(protoFile string) (*protoreflect.FileDescriptor, error) {
parser := protoparse.Parser{
ImportPaths: []string{filepath.Dir(protoFile)},
}
fds, err := parser.ParseFiles(filepath.Base(protoFile))
if err != nil {
return nil, err
}
return fds[0], nil
}
func findMethod(fd protoreflect.FileDescriptor, service, method string) (protoreflect.MethodDescriptor, error) {
svc := fd.Services().ByName(protoreflect.Name(service))
if svc == nil {
return nil, fmt.Errorf("service %q not found", service)
}
mtd := svc.Methods().ByName(protoreflect.Name(method))
if mtd == nil {
return nil, fmt.Errorf("method %q not found", method)
}
return mtd, nil
}
Shared Client Metrics
All protocols use a common metrics tracking structure:
// internal/clientmetrics/metrics.go
type ClientMetrics struct {
connectedAt time.Time
messagesSent int64
messagesReceived int64
bytesSent int64
bytesReceived int64
errors int64
mu sync.Mutex
}
type Snapshot struct {
ConnectionDuration time.Duration
MessagesSent int64
MessagesReceived int64
BytesSent int64
BytesReceived int64
Errors int64
}
func (m *ClientMetrics) MarkConnected() {
m.mu.Lock()
m.connectedAt = time.Now()
m.mu.Unlock()
}
func (m *ClientMetrics) IncrementSent(bytes int64) {
m.mu.Lock()
m.messagesSent++
m.bytesSent += bytes
m.mu.Unlock()
}
func (m *ClientMetrics) IncrementReceived(bytes int64) {
m.mu.Lock()
m.messagesReceived++
m.bytesReceived += bytes
m.mu.Unlock()
}
func (m *ClientMetrics) Snapshot() Snapshot {
m.mu.Lock()
defer m.mu.Unlock()
var duration time.Duration
if !m.connectedAt.IsZero() {
duration = time.Since(m.connectedAt)
}
return Snapshot{
ConnectionDuration: duration,
MessagesSent: m.messagesSent,
MessagesReceived: m.messagesReceived,
BytesSent: m.bytesSent,
BytesReceived: m.bytesReceived,
Errors: m.errors,
}
}
Protocol Comparison
| Feature | HTTP | WebSocket | SSE | gRPC |
|---|---|---|---|---|
| Direction | Request/Response | Bidirectional | Server→Client | Request/Response |
| Connection | Per-request (pooled) | Persistent | Persistent | Persistent |
| Message Format | Text/Binary | Text/Binary | Text | Protobuf |
| Retries | ✅ Supported | ❌ Not supported | ❌ Not supported | ❌ Not supported |
| Request Chaining | ✅ JSONPath/Regex | ❌ Not supported | ❌ Not supported | ❌ Not supported |
| Authentication | ✅ All types | ✅ All types | ✅ All types | ✅ Metadata |
Error Categorization
HTTP Errors
// Status code in status buckets
meta.Protocol = "http"
meta.StatusCode = "500" // "503", "429", etc.
WebSocket Errors
// Close code in status buckets
meta.Protocol = "websocket"
meta.StatusCode = "1006" // Abnormal closure
SSE Errors
// HTTP status for connection errors
meta.Protocol = "sse"
meta.StatusCode = "502" // Bad gateway
gRPC Errors
// gRPC status code
meta.Protocol = "grpc"
meta.StatusCode = "UNAVAILABLE" // "DEADLINE_EXCEEDED", etc.
Testing
Protocol-Specific Tests
Each protocol has dedicated integration tests:
// cmd/crankfire/protocol_integration_test.go
func TestHTTP_BasicRequest(t *testing.T)
func TestHTTP_WithAuthentication(t *testing.T)
func TestHTTP_WithFeeder(t *testing.T)
func TestHTTP_RequestChaining(t *testing.T)
func TestWebSocket_BasicSession(t *testing.T)
func TestWebSocket_MultipleMessages(t *testing.T)
func TestSSE_BasicStream(t *testing.T)
func TestSSE_MaxEvents(t *testing.T)
func TestGRPC_BasicCall(t *testing.T)
func TestGRPC_WithMetadata(t *testing.T)
Mock Servers
Test servers are provided in scripts/testservers/:
// HTTP mock server
func NewMockHTTPServer() *httptest.Server
// WebSocket mock server
func NewMockWebSocketServer() *httptest.Server
// SSE mock server
func NewMockSSEServer() *httptest.Server
// gRPC mock server
func NewMockGRPCServer() *grpc.Server
Potential Improvements
- HTTP/3 Support - QUIC-based HTTP
- Streaming gRPC - Server/client/bidirectional streaming
- WebSocket Subprotocols - Custom protocol negotiation
- SSE Reconnection - Automatic reconnect with Last-Event-ID
- Protocol Plugins - Dynamic protocol registration