package shreder import ( "context" "fmt" "runtime" "time" "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { enableBlockStats bool enableParseStats bool conn *grpc.ClientConn client ShrederServiceClient tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions pool *ants.Pool lastSlot uint64 lastSlotTime time.Time } type ClientOpts struct { blockStats bool showTableLoaded bool logParseStats bool } type ClientOption func(*ClientOpts) func ShowTableLoaded(enable bool) ClientOption { return func(opts *ClientOpts) { opts.showTableLoaded = enable } } func BlocksStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.blockStats = enable } } func LogParsedStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.logParseStats = enable } } func NewShrederClient( url string, rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, options ...ClientOption, ) (*Client, func(), error) { if rpcClient == nil { return nil, func() {}, fmt.Errorf("rpc client is nil") } conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } poolSize := runtime.NumCPU()*2 + 2 logger.Info("creating shreder client", "url", url, "pool_size", poolSize) pool, err := ants.NewPool(poolSize, ants.WithNonblocking(true)) if err != nil { return nil, func() {}, err } o := &ClientOpts{ blockStats: false, showTableLoaded: true, logParseStats: false, } for _, option := range options { option(o) } s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), pool: pool, enableBlockStats: o.blockStats, enableParseStats: o.logParseStats, } return s, func() { s.Wait() }, nil } func (c *Client) Wait() { logger.Debug("waiting for shreder client to stop") if c.pool != nil { c.pool.Release() } err := c.conn.Close() if err != nil { logger.Error("failed to close connection: ", "err", err) } logger.Debug("shreder client stopped") } func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeEntries(ctx, &SubscribeEntriesRequest{}) if err != nil { return err } logger.Debug("reading entries from shreder client") for { response, err := stream.Recv() if err != nil { return err } slot := response.Slot if c.enableBlockStats { now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } entries := response.Entries err = c.pool.Submit(func() { ParseEntries(slot, entries, c.tableLoader, txCh, c.enableParseStats) }) if err != nil { return err } } } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } logger.Debug("subscribing to transactions") err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) if err != nil { return err } for { response, err := stream.Recv() if err != nil { return err } if c.enableBlockStats { slot := response.Transaction.Slot now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } txData := response.Transaction err = c.pool.Submit(func() { ParseTransaction(txData, c.tableLoader, txCh, c.enableParseStats) }) if err != nil { logger.Error("failed to submit transaction: ", "err", err) } } }