This commit is contained in:
thloyi
2026-01-08 11:57:57 +08:00
parent d9aea3e8d7
commit 4c0abc5c34
4 changed files with 85 additions and 30 deletions

View File

@@ -61,7 +61,7 @@ func main() {
}, },
}, },
// TODO: axiom, gmgn, etc. // TODO: axiom, gmgn, etc.
}) }, shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -75,7 +75,6 @@ func main() {
<-exitSignal <-exitSignal
cancel() cancel()
}() }()
shrederClient.EnableClientStats = true
// async read from shreder // async read from shreder
txCh := make(chan shreder.TxSignalBatch, 1000) txCh := make(chan shreder.TxSignalBatch, 1000)
go func() { go func() {
@@ -94,7 +93,7 @@ func main() {
case txBatch := <-txCh: case txBatch := <-txCh:
//jsonData, _ := json.MarshalIndent(txBatch, "", " ") //jsonData, _ := json.MarshalIndent(txBatch, "", " ")
for _, tx := range txBatch { for _, tx := range txBatch {
if tx.Label == "okxdexroutev2" { if tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow" {
fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart)) fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart))
} }
} }

View File

@@ -18,8 +18,9 @@ type TableInfo struct {
} }
type AddressTables struct { type AddressTables struct {
showTableLoaded bool
rpcClient *rpc.Client rpcClient *rpc.Client
mux sync.RWMutex
loadMux sync.Mutex loadMux sync.Mutex
tables *lru.Cache[solana.PublicKey, *TableInfo] tables *lru.Cache[solana.PublicKey, *TableInfo]
loading map[solana.PublicKey]struct{} loading map[solana.PublicKey]struct{}
@@ -27,7 +28,7 @@ type AddressTables struct {
pool *ants.Pool pool *ants.Pool
} }
func NewAddressTables(rpcClient *rpc.Client) *AddressTables { func NewAddressTables(rpcClient *rpc.Client, showTableLoaded bool) *AddressTables {
pool, _ := ants.NewPool(5, ants.WithPreAlloc(true), ants.WithNonblocking(true)) pool, _ := ants.NewPool(5, ants.WithPreAlloc(true), ants.WithNonblocking(true))
cache, _ := lru.New[solana.PublicKey, *TableInfo](10000) cache, _ := lru.New[solana.PublicKey, *TableInfo](10000)
return &AddressTables{ return &AddressTables{
@@ -35,6 +36,8 @@ func NewAddressTables(rpcClient *rpc.Client) *AddressTables {
tables: cache, tables: cache,
loading: make(map[solana.PublicKey]struct{}), loading: make(map[solana.PublicKey]struct{}),
pool: pool, pool: pool,
showTableLoaded: showTableLoaded,
} }
} }
@@ -83,25 +86,43 @@ func (at *AddressTables) load(tablePubkey solana.PublicKey) {
delete(at.loading, tablePubkey) delete(at.loading, tablePubkey)
at.loadMux.Unlock() at.loadMux.Unlock()
at.mux.Lock()
at.tables.Add(tablePubkey, &TableInfo{ at.tables.Add(tablePubkey, &TableInfo{
addresses: table, addresses: table,
}) })
if at.showTableLoaded {
total := at.tables.Len() total := at.tables.Len()
at.mux.Unlock()
logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total)
}
}) })
} }
func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey { func (at *AddressTables) FillToTx(tx *versionedTransaction, tablePubkey solana.PublicKey, idx []uint8) bool {
at.mux.RLock() addresses, ok := at.tables.Get(tablePubkey)
if !ok {
at.load(tablePubkey)
return false
}
for _, i := range idx {
if int(i) >= len(addresses.addresses) {
logger.Error("over loadAddressTable failed", "idx", i, "table", tablePubkey)
addresses.overErrCount++
if addresses.overErrCount > 10 {
at.load(tablePubkey)
}
return false
}
tx.Message.StaticAccountKeys = append(tx.Message.StaticAccountKeys, addresses.addresses[i])
}
return true
}
func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey {
addresses, ok := at.tables.Get(tablePubkey) addresses, ok := at.tables.Get(tablePubkey)
if !ok { if !ok {
at.mux.RUnlock()
at.load(tablePubkey) at.load(tablePubkey)
return nil return nil
} }
at.mux.RUnlock()
var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx)) var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx))
for _, i := range idx { for _, i := range idx {

View File

@@ -13,7 +13,9 @@ import (
) )
type Client struct { type Client struct {
EnableClientStats bool enableBlockStats bool
enableParseStats bool
conn *grpc.ClientConn conn *grpc.ClientConn
client ShrederServiceClient client ShrederServiceClient
tableLoader *AddressTables tableLoader *AddressTables
@@ -25,10 +27,37 @@ type Client struct {
lastSlotTime time.Time lastSlotTime time.Time
} }
type ClientOpts struct {
blockStats bool
showTableLoaded bool
logParseStats bool
}
type ClientOption func(*ClientOpts)
func ShowTableLoaded(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.showTableLoaded = enable
}
}
func BlocksStats(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.blockStats = enable
}
}
func LogParsedStats(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.logParseStats = enable
}
}
func NewShrederClient( func NewShrederClient(
url string, url string,
rpcClient *rpc.Client, rpcClient *rpc.Client,
subscription map[string]*SubscribeRequestFilterTransactions, subscription map[string]*SubscribeRequestFilterTransactions,
options ...ClientOption,
) (*Client, func(), error) { ) (*Client, func(), error) {
if rpcClient == nil { if rpcClient == nil {
return nil, func() {}, fmt.Errorf("rpc client is nil") return nil, func() {}, fmt.Errorf("rpc client is nil")
@@ -45,13 +74,23 @@ func NewShrederClient(
if err != nil { if err != nil {
return nil, func() {}, err return nil, func() {}, err
} }
o := &ClientOpts{
blockStats: false,
showTableLoaded: true,
logParseStats: false,
}
for _, option := range options {
option(o)
}
s := &Client{ s := &Client{
conn: conn, conn: conn,
client: NewShrederServiceClient(conn), client: NewShrederServiceClient(conn),
subscription: subscription, subscription: subscription,
tableLoader: NewAddressTables(rpcClient), tableLoader: NewAddressTables(rpcClient, o.showTableLoaded),
pool: pool, pool: pool,
enableBlockStats: o.blockStats,
enableParseStats: o.logParseStats,
} }
return s, func() { return s, func() {
@@ -95,7 +134,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
return err return err
} }
if c.EnableClientStats { if c.enableBlockStats {
slot := response.Transaction.Slot slot := response.Transaction.Slot
now := time.Now() now := time.Now()
if c.lastSlotTime.IsZero() || slot > c.lastSlot { if c.lastSlotTime.IsZero() || slot > c.lastSlot {
@@ -110,7 +149,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
txData := response.Transaction txData := response.Transaction
err = c.pool.Submit(func() { err = c.pool.Submit(func() {
txBatch := ParseTransaction(txData, c.tableLoader, c.EnableClientStats) txBatch := ParseTransaction(txData, c.tableLoader, c.enableParseStats)
if len(txBatch) == 0 { if len(txBatch) == 0 {
return return
} }

View File

@@ -278,24 +278,20 @@ func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables,
if len(lookup.WritableIndexes) == 0 { if len(lookup.WritableIndexes) == 0 {
continue continue
} }
accounts := loader.GetAddressTable(lookup.AccountKey, lookup.WritableIndexes) lookupTableOk = loader.FillToTx(versioned, lookup.AccountKey, lookup.WritableIndexes)
if len(accounts) != len(lookup.WritableIndexes) { if !lookupTableOk {
lookupTableOk = false
break break
} }
versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...)
} }
if lookupTableOk { if lookupTableOk {
for _, lookup := range versioned.Message.AddressTableLookups { for _, lookup := range versioned.Message.AddressTableLookups {
if len(lookup.ReadonlyIndexes) == 0 { if len(lookup.ReadonlyIndexes) == 0 {
continue continue
} }
accounts := loader.GetAddressTable(lookup.AccountKey, lookup.ReadonlyIndexes) lookupTableOk = loader.FillToTx(versioned, lookup.AccountKey, lookup.ReadonlyIndexes)
if len(accounts) != len(lookup.ReadonlyIndexes) { if !lookupTableOk {
break break
} }
versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...)
} }
} }
// versioned.Message.StaticAccountKeys = staticKeys // versioned.Message.StaticAccountKeys = staticKeys