package shreder import ( "context" "log/slog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { log *slog.Logger conn *grpc.ClientConn client ShrederServiceClient subscription map[string]*SubscribeRequestFilterTransactions } func NewShrederClient( url string, subscription map[string]*SubscribeRequestFilterTransactions, ) (*Client, func(), error) { conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } logger := slog.Default() s := &Client{ log: logger, conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, } return s, func() { s.Wait() }, nil } func (c *Client) Wait() { c.log.Debug("waiting for shreder client to stop") err := c.conn.Close() if err != nil { c.log.Error("failed to close connection: ", "err", err) } c.log.Debug("shreder client stopped") } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) if err != nil { return err } for { response, err := stream.Recv() if err != nil { return err } txBatch := ParseTransaction(response.Transaction) if len(txBatch) == 0 { continue } // set fixed source for tx signals for _, tx := range txBatch { tx.Source = "shreder" } select { case <-ctx.Done(): return ctx.Err() case txCh <- txBatch: } } }