package shreder import ( "context" "fmt" "runtime" "time" "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { enableBlockStats bool enableParseStats bool conn *grpc.ClientConn client ShrederServiceClient tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions pool *ants.Pool lastSlot uint64 lastSlotTime time.Time } type ClientOpts struct { blockStats bool showTableLoaded bool logParseStats bool } type ClientOption func(*ClientOpts) func ShowTableLoaded(enable bool) ClientOption { return func(opts *ClientOpts) { opts.showTableLoaded = enable } } func BlocksStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.blockStats = enable } } func LogParsedStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.logParseStats = enable } } func NewShrederClient( url string, rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, options ...ClientOption, ) (*Client, func(), error) { if rpcClient == nil { return nil, func() {}, fmt.Errorf("rpc client is nil") } conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } poolSize := runtime.NumCPU()*2 + 2 logger.Info("creating shreder client", "url", url, "pool_size", poolSize) pool, err := ants.NewPool(poolSize, ants.WithNonblocking(false)) if err != nil { return nil, func() {}, err } o := &ClientOpts{ blockStats: false, showTableLoaded: true, logParseStats: false, } for _, option := range options { option(o) } s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), pool: pool, enableBlockStats: o.blockStats, enableParseStats: o.logParseStats, } return s, func() { s.Wait() }, nil } func (c *Client) Wait() { logger.Debug("waiting for shreder client to stop") if c.pool != nil { c.pool.Release() } err := c.conn.Close() if err != nil { logger.Error("failed to close connection: ", "err", err) } logger.Debug("shreder client stopped") } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } logger.Debug("subscribing to transactions") err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) if err != nil { return err } // reboot the pool c.pool.Reboot() for { var response *SubscribeTransactionsResponse response, err = stream.Recv() if err != nil { break } if c.enableBlockStats { slot := response.Transaction.Slot now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } txData := response.Transaction err = c.pool.Submit(func() { txBatch := ParseTransaction(txData, c.tableLoader, c.enableParseStats) if len(txBatch) == 0 { return } for _, tx := range txBatch { tx.Source = "shreder" } select { case <-ctx.Done(): return case txCh <- txBatch: } }) if err != nil { break } } // sync waiting for all tasks to complete c.pool.Release() return err }