1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
| package stream
import (
"context"
"errors"
"io"
"strings"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type StreamValueReceiver[Recv any] interface {
Recv() (*Recv, error)
}
// stream to chan discussion: https://github.com/grpc/grpc-go/issues/847
type StreamValueListener[Recv any] struct {
log *zap.Logger
recv chan *Recv
err chan error
running bool
label string
}
func CreateStreamValueListener[Recv any](log *zap.Logger, r StreamValueReceiver[Recv], label string) *StreamValueListener[Recv] {
o := &StreamValueListener[Recv]{
log: log,
recv: make(chan *Recv),
err: make(chan error),
running: false,
label: label,
}
o.start(r)
return o
}
func (o *StreamValueListener[Recv]) start(r StreamValueReceiver[Recv]) {
if r == nil {
o.err <- errors.New("output receiver is nil")
return
}
if o.running {
return
}
go func() {
o.running = true
for {
o.log.Debug("Waiting for message from stream.")
resp, err := r.Recv()
if err != nil {
if err == io.EOF {
o.log.Debug("End of stream.")
o.running = false
return
}
st, ok := status.FromError(err)
if (ok && st.Code() == codes.Canceled) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
o.log.Debug("Stream disconnected by client.")
o.running = false
return
}
if strings.Contains(err.Error(), "EOF") && strings.Contains(err.Error(), "NO_ERROR") {
o.log.Warn("Stream disconnected by server.")
o.running = false
return
}
o.log.Debug("Failed to receive message from stream.")
o.err <- err
o.running = false
return
}
o.log.Debug("Received message from stream.")
o.recv <- resp
}
}()
}
func (o StreamValueListener[Recv]) ListenRecv() chan *Recv {
return o.recv
}
func (o StreamValueListener[Recv]) ListenErr() chan error {
return o.err
}
|