package shreder import ( "bytes" "context" "errors" "fmt" "runtime" "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { enableBlockStats bool enableParseStats bool conn *grpc.ClientConn client ShrederServiceClient tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions entriesFilter map[string]FilterParams parser map[solana.PublicKey]Handler pool *ants.Pool lastSlot uint64 lastSlotTime time.Time } type ClientOpts struct { blockStats bool showTableLoaded bool logParseStats bool parser map[solana.PublicKey]Handler } type ClientOption func(*ClientOpts) func ShowTableLoaded(enable bool) ClientOption { return func(opts *ClientOpts) { opts.showTableLoaded = enable } } func WithCustomParsers(parsers map[solana.PublicKey]Handler) ClientOption { return func(opts *ClientOpts) { opts.parser = parsers } } func BlocksStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.blockStats = enable } } // LogParsedStats enables logging of parsed transaction statistics. // Deprecated: do not use. func LogParsedStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.logParseStats = enable } } func NewShrederClient( url string, rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, options ...ClientOption, ) (*Client, func(), error) { if rpcClient == nil { return nil, func() {}, fmt.Errorf("rpc client is nil") } conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } poolSize := runtime.NumCPU()*2 + 2 logger.Info("creating shreder client", "url", url, "pool_size", poolSize) pool, err := ants.NewPool(poolSize, ants.WithNonblocking(false)) if err != nil { return nil, func() {}, err } o := &ClientOpts{ blockStats: false, showTableLoaded: true, logParseStats: false, parser: registered, } for _, option := range options { option(o) } filterParams := make(map[string]FilterParams) for name, params := range subscription { filterParams[name] = FilterParams{ Exclude: parseAccountArray(params.AccountExclude), Require: parseAccountArray(params.AccountRequired), Include: parseAccountArray(params.AccountInclude), } } if len(filterParams) == 0 { filterParams["default"] = FilterParams{ Include: defaultFilterAccount, } } s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, entriesFilter: filterParams, parser: o.parser, tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), pool: pool, enableBlockStats: o.blockStats, enableParseStats: o.logParseStats, } return s, func() { s.Wait() }, nil } func (c *Client) Wait() { logger.Debug("waiting for shreder client to stop") if c.pool != nil { c.pool.Release() } err := c.conn.Close() if err != nil { logger.Error("failed to close connection: ", "err", err) } logger.Debug("shreder client stopped") } func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeEntries(ctx, &SubscribeEntriesRequest{}) if err != nil { return err } logger.Debug("reading entries from shreder client") for { response, err := stream.Recv() if err != nil { return err } slot := response.Slot if c.enableBlockStats { now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } err = c.pool.Submit(func() { err := entriesToVersionedTransaction(slot, bytes.NewReader(response.Entries), func(versioned VersionedTransaction) { // filter out vote transactions if FilterTransactionForEntriesWithFilter(versioned, c.entriesFilter) { return } go ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser) }) if err != nil { logger.Debug("txparser: failed to parse entries", "error", err) } }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") } } } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error { stream, err := c.client.SubscribeTransactions(ctx) if err != nil { return err } logger.Debug("subscribing to transactions") err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) if err != nil { return err } // reboot the pool c.pool.Reboot() for { var response *SubscribeTransactionsResponse response, err = stream.Recv() if err != nil { break } if c.enableBlockStats { slot := response.Transaction.Slot now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { if !c.lastSlotTime.IsZero() { logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) } c.lastSlot = slot c.lastSlotTime = now } } // txData := response.Transaction err := c.pool.Submit(func() { versioned, err := toVersionedTransaction(response.Transaction) if err != nil { logger.Debug("txparser: failed to convert to versioned transaction", "error", err) return } ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser) }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") } } // sync waiting for all tasks to complete c.pool.Release() return err } func parseAccountArray(accountArray []string) []solana.PublicKey { var result []solana.PublicKey for _, acc := range accountArray { result = append(result, solana.MustPublicKeyFromBase58(acc)) } return result }