brett

Gists

gRPC Stream to Channel

Put gRPC stream values in a channel for a `select` statement

stream.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package stream

import (
	"context"
	"errors"
	"io"
	"strings"

	"go.uber.org/zap"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

type StreamValueReceiver[Recv any] interface {
	Recv() (*Recv, error)
}

// stream to chan discussion: https://github.com/grpc/grpc-go/issues/847
type StreamValueListener[Recv any] struct {
	log     *zap.Logger
	recv    chan *Recv
	err     chan error
	running bool
	label   string
}

func CreateStreamValueListener[Recv any](log *zap.Logger, r StreamValueReceiver[Recv], label string) *StreamValueListener[Recv] {
	o := &StreamValueListener[Recv]{
		log:     log,
		recv:    make(chan *Recv),
		err:     make(chan error),
		running: false,
		label:   label,
	}
	o.start(r)
	return o
}

func (o *StreamValueListener[Recv]) start(r StreamValueReceiver[Recv]) {
	if r == nil {
		o.err <- errors.New("output receiver is nil")
		return
	}
	if o.running {
		return
	}
	go func() {
		o.running = true
		for {
			o.log.Debug("Waiting for message from stream.")
			resp, err := r.Recv()
			if err != nil {
				if err == io.EOF {
					o.log.Debug("End of stream.")
					o.running = false
					return
				}

				st, ok := status.FromError(err)
				if (ok && st.Code() == codes.Canceled) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
					o.log.Debug("Stream disconnected by client.")
					o.running = false
					return
				}

				if strings.Contains(err.Error(), "EOF") && strings.Contains(err.Error(), "NO_ERROR") {
					o.log.Warn("Stream disconnected by server.")
					o.running = false
					return
				}

				o.log.Debug("Failed to receive message from stream.")
				o.err <- err
				o.running = false
				return
			}
			o.log.Debug("Received message from stream.")
			o.recv <- resp
		}
	}()
}

func (o StreamValueListener[Recv]) ListenRecv() chan *Recv {
	return o.recv
}

func (o StreamValueListener[Recv]) ListenErr() chan error {
	return o.err
}
watch.go
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package stream

import (
	"context"

	"go.uber.org/zap"
	"google.golang.org/grpc"
)

func watchStream(ctx context.Context, log *zap.Logger, stream pb.MyRPCStream, handle func(context.Context, *pb.Response) error) {
	o := CreateStreamValueListener(log, stream)
	go func() {
		for {
			select {
			case <-ctx.Done():
				log.Debug("Context for stream cancelled.", zap.Error(ctx.Err()))
				return
			case <-sctx.Done():
				log.Debug("Stream context cancelled.", zap.Error(sctx.Err()))
				return
			case err := <-o.ListenErr():
				log.Error("Failed to receive message from stream.", zap.Error(err))
				return
			case resp := <-o.ListenRecv():
				if resp != nil {
					log.Debug("Received response from stream.")
					if err := handle(ctx, resp); err != nil {
						log.Error("Failed to handle response from stream.", zap.Error(err))
					}
				} else {
					log.Warn("Received nil response from stream.")
				}
			}
		}
	}()
}