diff --git a/cmd/shreder/main.go b/cmd/shreder/main.go index c9f4aca..04623ed 100644 --- a/cmd/shreder/main.go +++ b/cmd/shreder/main.go @@ -61,7 +61,7 @@ func main() { }, }, // TODO: axiom, gmgn, etc. - }) + }, shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false)) if err != nil { panic(err) } @@ -75,7 +75,6 @@ func main() { <-exitSignal cancel() }() - shrederClient.EnableClientStats = true // async read from shreder txCh := make(chan shreder.TxSignalBatch, 1000) go func() { @@ -94,7 +93,7 @@ func main() { case txBatch := <-txCh: //jsonData, _ := json.MarshalIndent(txBatch, "", " ") for _, tx := range txBatch { - if tx.Label == "okxdexroutev2" { + if tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow" { fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart)) } } diff --git a/pkg/shreder/addresstables.go b/pkg/shreder/addresstables.go index ea08bfa..3c42b77 100644 --- a/pkg/shreder/addresstables.go +++ b/pkg/shreder/addresstables.go @@ -18,8 +18,9 @@ type TableInfo struct { } type AddressTables struct { + showTableLoaded bool + rpcClient *rpc.Client - mux sync.RWMutex loadMux sync.Mutex tables *lru.Cache[solana.PublicKey, *TableInfo] loading map[solana.PublicKey]struct{} @@ -27,7 +28,7 @@ type AddressTables struct { pool *ants.Pool } -func NewAddressTables(rpcClient *rpc.Client) *AddressTables { +func NewAddressTables(rpcClient *rpc.Client, showTableLoaded bool) *AddressTables { pool, _ := ants.NewPool(5, ants.WithPreAlloc(true), ants.WithNonblocking(true)) cache, _ := lru.New[solana.PublicKey, *TableInfo](10000) return &AddressTables{ @@ -35,6 +36,8 @@ func NewAddressTables(rpcClient *rpc.Client) *AddressTables { tables: cache, loading: make(map[solana.PublicKey]struct{}), pool: pool, + + showTableLoaded: showTableLoaded, } } @@ -83,25 +86,43 @@ func (at *AddressTables) load(tablePubkey solana.PublicKey) { delete(at.loading, tablePubkey) at.loadMux.Unlock() - at.mux.Lock() at.tables.Add(tablePubkey, &TableInfo{ addresses: table, }) - total := at.tables.Len() - at.mux.Unlock() - logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) + if at.showTableLoaded { + total := at.tables.Len() + logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) + } }) } -func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey { - at.mux.RLock() +func (at *AddressTables) FillToTx(tx *versionedTransaction, tablePubkey solana.PublicKey, idx []uint8) bool { + addresses, ok := at.tables.Get(tablePubkey) + if !ok { + at.load(tablePubkey) + return false + } + + for _, i := range idx { + if int(i) >= len(addresses.addresses) { + logger.Error("over loadAddressTable failed", "idx", i, "table", tablePubkey) + addresses.overErrCount++ + if addresses.overErrCount > 10 { + at.load(tablePubkey) + } + return false + } + tx.Message.StaticAccountKeys = append(tx.Message.StaticAccountKeys, addresses.addresses[i]) + } + return true +} + +func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey { addresses, ok := at.tables.Get(tablePubkey) if !ok { - at.mux.RUnlock() at.load(tablePubkey) return nil } - at.mux.RUnlock() var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx)) for _, i := range idx { diff --git a/pkg/shreder/client.go b/pkg/shreder/client.go index 14e1934..d4fcb80 100644 --- a/pkg/shreder/client.go +++ b/pkg/shreder/client.go @@ -13,11 +13,13 @@ import ( ) type Client struct { - EnableClientStats bool - conn *grpc.ClientConn - client ShrederServiceClient - tableLoader *AddressTables - subscription map[string]*SubscribeRequestFilterTransactions + enableBlockStats bool + enableParseStats bool + + conn *grpc.ClientConn + client ShrederServiceClient + tableLoader *AddressTables + subscription map[string]*SubscribeRequestFilterTransactions pool *ants.Pool @@ -25,10 +27,37 @@ type Client struct { lastSlotTime time.Time } +type ClientOpts struct { + blockStats bool + showTableLoaded bool + logParseStats bool +} + +type ClientOption func(*ClientOpts) + +func ShowTableLoaded(enable bool) ClientOption { + return func(opts *ClientOpts) { + opts.showTableLoaded = enable + } +} + +func BlocksStats(enable bool) ClientOption { + return func(opts *ClientOpts) { + opts.blockStats = enable + } +} + +func LogParsedStats(enable bool) ClientOption { + return func(opts *ClientOpts) { + opts.logParseStats = enable + } +} + func NewShrederClient( url string, rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, + options ...ClientOption, ) (*Client, func(), error) { if rpcClient == nil { return nil, func() {}, fmt.Errorf("rpc client is nil") @@ -45,13 +74,23 @@ func NewShrederClient( if err != nil { return nil, func() {}, err } - + o := &ClientOpts{ + blockStats: false, + showTableLoaded: true, + logParseStats: false, + } + for _, option := range options { + option(o) + } s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, - tableLoader: NewAddressTables(rpcClient), + tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), pool: pool, + + enableBlockStats: o.blockStats, + enableParseStats: o.logParseStats, } return s, func() { @@ -95,7 +134,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error return err } - if c.EnableClientStats { + if c.enableBlockStats { slot := response.Transaction.Slot now := time.Now() if c.lastSlotTime.IsZero() || slot > c.lastSlot { @@ -110,7 +149,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error txData := response.Transaction err = c.pool.Submit(func() { - txBatch := ParseTransaction(txData, c.tableLoader, c.EnableClientStats) + txBatch := ParseTransaction(txData, c.tableLoader, c.enableParseStats) if len(txBatch) == 0 { return } diff --git a/pkg/shreder/txparser.go b/pkg/shreder/txparser.go index 9116e45..1854047 100644 --- a/pkg/shreder/txparser.go +++ b/pkg/shreder/txparser.go @@ -278,24 +278,20 @@ func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables, if len(lookup.WritableIndexes) == 0 { continue } - accounts := loader.GetAddressTable(lookup.AccountKey, lookup.WritableIndexes) - if len(accounts) != len(lookup.WritableIndexes) { - lookupTableOk = false + lookupTableOk = loader.FillToTx(versioned, lookup.AccountKey, lookup.WritableIndexes) + if !lookupTableOk { break } - versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...) - } if lookupTableOk { for _, lookup := range versioned.Message.AddressTableLookups { if len(lookup.ReadonlyIndexes) == 0 { continue } - accounts := loader.GetAddressTable(lookup.AccountKey, lookup.ReadonlyIndexes) - if len(accounts) != len(lookup.ReadonlyIndexes) { + lookupTableOk = loader.FillToTx(versioned, lookup.AccountKey, lookup.ReadonlyIndexes) + if !lookupTableOk { break } - versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...) } } // versioned.Message.StaticAccountKeys = staticKeys