package shreder import ( "bytes" "context" "errors" "fmt" "runtime" "time" "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { enableBlockStats bool enableParseStats bool conn *grpc.ClientConn client ShrederServiceClient tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions pool *ants.Pool lastSlot uint64 lastSlotTime time.Time } type ClientOpts struct { blockStats bool showTableLoaded bool logParseStats bool } type ClientOption func(*ClientOpts) func ShowTableLoaded(enable bool) ClientOption { return func(opts *ClientOpts) { opts.showTableLoaded = enable } } func BlocksStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.blockStats = enable } } // LogParsedStats enables logging of parsed transaction statistics. // Deprecated: do not use. func LogParsedStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.logParseStats = enable } } func NewShrederClient( url string, rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, options ...ClientOption, ) (*Client, func(), error) { if rpcClient == nil { return nil, func() {}, fmt.Errorf("rpc client is nil") } conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } poolSize := runtime.NumCPU()*2 + 2 logger.Info("creating shreder client", "url", url, "pool_size", poolSize) pool, err := ants.NewPool(poolSize, ants.WithNonblocking(false)) if err != nil { return nil, func() {}, err } o := &ClientOpts{ blockStats: false, showTableLoaded: true, logParseStats: false, } for _, option := range options { option(o) } s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), pool: pool, enableBlockStats: o.blockStats, enableParseStats: o.logParseStats, } return s, func() { s.Wait() }, nil } func (c *Client) Wait() { logger.Debug("waiting for shreder client to stop") if c.pool != nil { c.pool.Release() } err := c.conn.Close() if err != nil { logger.Error("failed to close connection: ", "err", err) } logger.Debug("shreder client stopped") } func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeEntries(ctx, &SubscribeEntriesRequest{}) if err != nil { return err } logger.Debug("reading entries from shreder client") for { response, err := stream.Recv() if err != nil { return err } slot := response.Slot if c.enableBlockStats { now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } err = c.pool.Submit(func() { ParseTransactionForEntries(ctx, slot, bytes.NewReader(response.Entries), c.tableLoader, txCh) }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") } } } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } logger.Debug("subscribing to transactions") err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) if err != nil { return err } // reboot the pool c.pool.Reboot() for { var response *SubscribeTransactionsResponse response, err = stream.Recv() if err != nil { break } if c.enableBlockStats { slot := response.Transaction.Slot now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } txData := response.Transaction err := c.pool.Submit(func() { ParseTransactionForSubscribe(ctx, txData, c.tableLoader, txCh, nil) }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") } } // sync waiting for all tasks to complete c.pool.Release() return err }