package shreder_client import ( "context" "github.com/samlior/libsam/pkg/logger" "github.com/samlior/libsam/pkg/txparser" "github.com/samlior/libsam/pkg/types" "github.com/samlior/libsam/third_party/shreder_protos" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type ShrederClient struct { log logger.Logger conn *grpc.ClientConn client shreder_protos.ShrederServiceClient } func NewShrederClient(logger logger.Logger, url string) (*ShrederClient, func(), error) { conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } s := &ShrederClient{ log: logger, conn: conn, client: shreder_protos.NewShrederServiceClient(conn), } return s, func() { s.Wait() }, nil } func (c *ShrederClient) Wait() { c.log.Debug("waiting for shreder client to stop") err := c.conn.Close() if err != nil { c.log.Errorf("failed to close connection: %v", err) } c.log.Debug("shreder client stopped") } func (c *ShrederClient) ReadSync(ctx context.Context, txCh chan<- types.TxSignalBatch) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } err = stream.Send(&shreder_protos.SubscribeTransactionsRequest{ Transactions: map[string]*shreder_protos.SubscribeRequestFilterTransactions{ "pumpfun": { AccountRequired: []string{ "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", }, }, "axiom": { AccountRequired: []string{ "F5tfvbLog9VdGUPqBDTT8rgXvTTcq7e5UiGnupL1zvBq", }, }, "photon": { AccountRequired: []string{ "BSfD6SHZigAfDWSjzD5Q41jw8LmKwtmjskPH9XW1mrRW", }, }, "fjsz": { AccountRequired: []string{ "FJsZbftBqRLfF7uqUKpm4s2goDr6xsQ5Q3mN7AFJB6hK", }, }, }, }) if err != nil { return err } for { response, err := stream.Recv() if err != nil { return err } txBatch := txparser.ParseTransaction(response.Transaction) if len(txBatch) == 0 { continue } // set fixed source for tx signals for _, tx := range txBatch { tx.Source = "shreder" } select { case <-ctx.Done(): return ctx.Err() case txCh <- txBatch: } } }