From d9aea3e8d7fa0682e45f7895c1456414774e43b8 Mon Sep 17 00:00:00 2001 From: thloyi Date: Wed, 7 Jan 2026 21:15:54 +0800 Subject: [PATCH] parallel parsing --- cmd/shreder/main.go | 6 +- pkg/shreder/addresstables.go | 83 ++++++++------ pkg/shreder/client.go | 71 +++++++++--- pkg/shreder/dflow.go | 19 ++-- pkg/shreder/juptierv6.go | 43 ++++--- pkg/shreder/okxonchainlab.go | 19 ++-- pkg/shreder/tx.go | 3 + pkg/shreder/txparser.go | 209 +++++++++++++++++++++++------------ 8 files changed, 290 insertions(+), 163 deletions(-) diff --git a/cmd/shreder/main.go b/cmd/shreder/main.go index b39d6e2..c9f4aca 100644 --- a/cmd/shreder/main.go +++ b/cmd/shreder/main.go @@ -75,7 +75,7 @@ func main() { <-exitSignal cancel() }() - + shrederClient.EnableClientStats = true // async read from shreder txCh := make(chan shreder.TxSignalBatch, 1000) go func() { @@ -94,8 +94,8 @@ func main() { case txBatch := <-txCh: //jsonData, _ := json.MarshalIndent(txBatch, "", " ") for _, tx := range txBatch { - if tx.Label == "dflow" { - fmt.Println("===============", tx.TxHash, tx.Event, tx.Token0Address, "token:", tx.Token0Amount) + if tx.Label == "okxdexroutev2" { + fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart)) } } //fmt.Println(txBatch[0].TxHash) diff --git a/pkg/shreder/addresstables.go b/pkg/shreder/addresstables.go index 741f07b..ea08bfa 100644 --- a/pkg/shreder/addresstables.go +++ b/pkg/shreder/addresstables.go @@ -11,11 +11,17 @@ import ( "github.com/panjf2000/ants/v2" ) +type TableInfo struct { + overErrCount int + + addresses []solana.PublicKey +} + type AddressTables struct { rpcClient *rpc.Client mux sync.RWMutex loadMux sync.Mutex - tables *lru.Cache[solana.PublicKey, []solana.PublicKey] + tables *lru.Cache[solana.PublicKey, *TableInfo] loading map[solana.PublicKey]struct{} pool *ants.Pool @@ -23,7 +29,7 @@ type AddressTables struct { func NewAddressTables(rpcClient *rpc.Client) *AddressTables { pool, _ := ants.NewPool(5, ants.WithPreAlloc(true), ants.WithNonblocking(true)) - cache, _ := lru.New[solana.PublicKey, []solana.PublicKey](10000) + cache, _ := lru.New[solana.PublicKey, *TableInfo](10000) return &AddressTables{ rpcClient: rpcClient, tables: cache, @@ -54,53 +60,60 @@ func (at *AddressTables) loadAddressTable(tablePubkey solana.PublicKey) ([]solan return addresses, nil } +func (at *AddressTables) load(tablePubkey solana.PublicKey) { + _ = at.pool.Submit(func() { + at.loadMux.Lock() + _, loading := at.loading[tablePubkey] + if loading { + at.loadMux.Unlock() + return + } + at.loading[tablePubkey] = struct{}{} + at.loadMux.Unlock() + + table, err := at.loadAddressTable(tablePubkey) + if err != nil { + logger.Error("loadAddressTable failed", "err", err, "table", tablePubkey) + at.loadMux.Lock() + delete(at.loading, tablePubkey) + at.loadMux.Unlock() + return + } + at.loadMux.Lock() + delete(at.loading, tablePubkey) + at.loadMux.Unlock() + + at.mux.Lock() + at.tables.Add(tablePubkey, &TableInfo{ + addresses: table, + }) + total := at.tables.Len() + at.mux.Unlock() + logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) + }) +} func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey { at.mux.RLock() addresses, ok := at.tables.Get(tablePubkey) if !ok { at.mux.RUnlock() - _ = at.pool.Submit(func() { - at.loadMux.Lock() - _, loading := at.loading[tablePubkey] - if loading { - at.loadMux.Unlock() - return - } - at.loading[tablePubkey] = struct{}{} - at.loadMux.Unlock() - - table, err := at.loadAddressTable(tablePubkey) - if err != nil { - logger.Error("loadAddressTable failed", "err", err, "table", tablePubkey) - at.loadMux.Lock() - delete(at.loading, tablePubkey) - at.loadMux.Unlock() - return - } - at.loadMux.Lock() - delete(at.loading, tablePubkey) - at.loadMux.Unlock() - - at.mux.Lock() - at.tables.Add(tablePubkey, table) - total := at.tables.Len() - at.mux.Unlock() - logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) - }) - + at.load(tablePubkey) return nil } at.mux.RUnlock() var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx)) for _, i := range idx { - if int(i) >= len(addresses) { + if int(i) >= len(addresses.addresses) { logger.Error("over loadAddressTable failed", "idx", i, "table", tablePubkey) - //todo... update table? - continue + addresses.overErrCount++ + if addresses.overErrCount > 10 { + at.load(tablePubkey) + } + break } - result = append(result, addresses[i]) + result = append(result, addresses.addresses[i]) } return result } diff --git a/pkg/shreder/client.go b/pkg/shreder/client.go index 87a0a9a..14e1934 100644 --- a/pkg/shreder/client.go +++ b/pkg/shreder/client.go @@ -3,17 +3,26 @@ package shreder import ( "context" "fmt" + "runtime" + "time" "github.com/gagliardetto/solana-go/rpc" + "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { - conn *grpc.ClientConn - client ShrederServiceClient - tableLoader *AddressTables - subscription map[string]*SubscribeRequestFilterTransactions + EnableClientStats bool + conn *grpc.ClientConn + client ShrederServiceClient + tableLoader *AddressTables + subscription map[string]*SubscribeRequestFilterTransactions + + pool *ants.Pool + + lastSlot uint64 + lastSlotTime time.Time } func NewShrederClient( @@ -30,11 +39,19 @@ func NewShrederClient( return nil, func() {}, err } + poolSize := runtime.NumCPU()*2 + 2 + logger.Info("creating shreder client", "url", url, "pool_size", poolSize) + pool, err := ants.NewPool(poolSize, ants.WithNonblocking(false)) + if err != nil { + return nil, func() {}, err + } + s := &Client{ conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, tableLoader: NewAddressTables(rpcClient), + pool: pool, } return s, func() { @@ -45,6 +62,10 @@ func NewShrederClient( func (c *Client) Wait() { logger.Debug("waiting for shreder client to stop") + if c.pool != nil { + c.pool.Release() + } + err := c.conn.Close() if err != nil { logger.Error("failed to close connection: ", "err", err) @@ -59,6 +80,8 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error return err } + logger.Debug("subscribing to transactions") + err = stream.Send(&SubscribeTransactionsRequest{ Transactions: c.subscription, }) @@ -72,20 +95,38 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error return err } - txBatch := ParseTransaction(response.Transaction, c.tableLoader) - if len(txBatch) == 0 { - continue + if c.EnableClientStats { + slot := response.Transaction.Slot + now := time.Now() + if c.lastSlotTime.IsZero() || slot > c.lastSlot { + if !c.lastSlotTime.IsZero() { + logger.Info("block processed", "running", c.pool.Running(), "slot", slot, "prev_slot", c.lastSlot, "delta_ms", now.Sub(c.lastSlotTime).Milliseconds()) + } + c.lastSlot = slot + c.lastSlotTime = now + } } - // set fixed source for tx signals - for _, tx := range txBatch { - tx.Source = "shreder" - } + txData := response.Transaction - select { - case <-ctx.Done(): - return ctx.Err() - case txCh <- txBatch: + err = c.pool.Submit(func() { + txBatch := ParseTransaction(txData, c.tableLoader, c.EnableClientStats) + if len(txBatch) == 0 { + return + } + + for _, tx := range txBatch { + tx.Source = "shreder" + } + + select { + case <-ctx.Done(): + return + case txCh <- txBatch: + } + }) + if err != nil { + return err } } } diff --git a/pkg/shreder/dflow.go b/pkg/shreder/dflow.go index 2bfbc47..7e39295 100644 --- a/pkg/shreder/dflow.go +++ b/pkg/shreder/dflow.go @@ -11,7 +11,8 @@ import ( ) var ( - dflowProgramID = solana.MustPublicKeyFromBase58("DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH") + dflowProgramID = solana.MustPublicKeyFromBase58("DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH") + dflowProgramString = dflowProgramID.String() dflowSwapDisc = []byte{248, 198, 158, 145, 225, 117, 135, 200} dflowSwap2Disc = []byte{65, 75, 63, 76, 235, 91, 91, 136} @@ -276,10 +277,11 @@ func parseDFlowInstruction(tx *versionedTransaction, instructionIndex int) (*TxS var ( srcIdx uint8 ) - for i, acctIdx := range ix.Accounts { - if i < 6 { - continue - } + if len(ix.Accounts) <= 6 { + return nil, nil + } + accounts := ix.Accounts[5:] + for i, acctIdx := range accounts { key, err := getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx)) if err != nil { return nil, err @@ -289,15 +291,15 @@ func parseDFlowInstruction(tx *versionedTransaction, instructionIndex int) (*TxS break } } - if srcIdx == 0 || srcIdx+1 >= uint8(len(ix.Accounts)) { + if srcIdx == 0 || srcIdx+1 >= uint8(len(accounts)) { return nil, nil } - baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(ix.Accounts[srcIdx])) + baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx])) if err != nil { return nil, err } - quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(ix.Accounts[srcIdx+1])) + quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1])) if err != nil { return nil, err } @@ -307,7 +309,6 @@ func parseDFlowInstruction(tx *versionedTransaction, instructionIndex int) (*TxS // Build TxSignal sig := &TxSignal{ - Label: "dflow", TxHash: tx.Signatures[0].String(), Maker: tx.Message.StaticAccountKeys[0].String(), Program: "PumpAMM", diff --git a/pkg/shreder/juptierv6.go b/pkg/shreder/juptierv6.go index 2ae3af1..8f02204 100644 --- a/pkg/shreder/juptierv6.go +++ b/pkg/shreder/juptierv6.go @@ -935,10 +935,11 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( var ( srcIdx uint8 ) - for i, acctIdx := range instruction.Accounts { - if i < 9 { - continue - } + if len(instruction.Accounts) <= 9 { + return nil, nil + } + accounts := instruction.Accounts[8:] + for i, acctIdx := range accounts { key, err := getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx)) if err != nil { return nil, err @@ -948,18 +949,18 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( break } } - if srcIdx == 0 || srcIdx+1 >= uint8(len(instruction.Accounts)) { + if srcIdx == 0 || srcIdx+1 >= uint8(len(accounts)) { return nil, nil } - baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx])) + baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx])) if err != nil { return nil, err } if !sourceMint.Equals(baseMint) { return nil, nil } - quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx+1])) + quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1])) if err != nil { return nil, err } @@ -978,10 +979,11 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( var ( srcIdx uint8 ) - for i, acctIdx := range instruction.Accounts { - if i < 12 { - continue - } + if len(instruction.Accounts) <= 12 { + return nil, nil + } + accounts := instruction.Accounts[11:] + for i, acctIdx := range accounts { key, err := getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx)) if err != nil { return nil, err @@ -991,11 +993,11 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( break } } - if srcIdx == 0 || srcIdx+1 >= uint8(len(instruction.Accounts)) { + if srcIdx == 0 || srcIdx+1 >= uint8(len(accounts)) { return nil, nil } - baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx])) + baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx])) if err != nil { return nil, err } @@ -1003,7 +1005,7 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( return nil, nil } - quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx+1])) + quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1])) if err != nil { return nil, err } @@ -1018,10 +1020,8 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( srcIdx uint8 ) - for i, acctIdx := range instruction.Accounts { - if i < 9 { - continue - } + accounts := instruction.Accounts[9:] + for i, acctIdx := range accounts { key, err := getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx)) if err != nil { return nil, err @@ -1031,15 +1031,15 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( break } } - if srcIdx == 0 || srcIdx+1 >= uint8(len(instruction.Accounts)) { + if srcIdx == 0 || srcIdx+1 >= uint8(len(accounts)) { return nil, nil } - sourceMint, err = getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx])) + sourceMint, err = getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx])) if err != nil { return nil, err } - quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(instruction.Accounts[srcIdx+1])) + quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1])) if err != nil { return nil, err } @@ -1049,7 +1049,6 @@ func parseJupiterV6Instruction(tx *versionedTransaction, instructionIndex int) ( } signal := &TxSignal{ - Label: "jupiterV6", TxHash: tx.Signatures[0].String(), Maker: tx.Message.StaticAccountKeys[0].String(), Token0Address: sourceMint.String(), diff --git a/pkg/shreder/okxonchainlab.go b/pkg/shreder/okxonchainlab.go index 6fcb961..b1c546b 100644 --- a/pkg/shreder/okxonchainlab.go +++ b/pkg/shreder/okxonchainlab.go @@ -11,7 +11,8 @@ import ( ) var ( - okxDexRouteV2ProgramID = solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u") + okxDexRouteV2ProgramID = solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u") + okxDexRouteV2ProgramIDString = okxDexRouteV2ProgramID.String() okxSwapTobDisc = []byte{170, 41, 85, 177, 132, 80, 31, 53} okxSwapTobWithReceiverDisc = []byte{223, 170, 216, 234, 204, 6, 241, 25} @@ -314,10 +315,11 @@ func parseOkxDexRouteV2Instruction(tx *versionedTransaction, instructionIndex in var ( srcIdx uint8 ) - for i, acctIdx := range ix.Accounts { - if i < 15 { - continue - } + if len(ix.Accounts) <= 15 { + return nil, nil + } + accounts := ix.Accounts[14:] + for i, acctIdx := range accounts { key, err := getStaticKey(tx.Message.StaticAccountKeys, int(acctIdx)) if err != nil { return nil, err @@ -327,11 +329,11 @@ func parseOkxDexRouteV2Instruction(tx *versionedTransaction, instructionIndex in break } } - if srcIdx == 0 || int(srcIdx+1) >= len(ix.Accounts) { + if srcIdx == 0 || int(srcIdx+1) >= len(accounts) { return nil, nil } - baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(ix.Accounts[srcIdx])) + baseMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx])) if err != nil { return nil, err } @@ -339,7 +341,7 @@ func parseOkxDexRouteV2Instruction(tx *versionedTransaction, instructionIndex in return nil, nil } - quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(ix.Accounts[srcIdx+1])) + quoteMint, err := getStaticKey(tx.Message.StaticAccountKeys, int(accounts[srcIdx+1])) if err != nil { return nil, err } @@ -348,7 +350,6 @@ func parseOkxDexRouteV2Instruction(tx *versionedTransaction, instructionIndex in } return &TxSignal{ - Label: "okxdexroutev2", TxHash: tx.Signatures[0].String(), Maker: tx.Message.StaticAccountKeys[0].String(), Token0Address: baseMint.String(), diff --git a/pkg/shreder/tx.go b/pkg/shreder/tx.go index b263f72..cd534ed 100644 --- a/pkg/shreder/tx.go +++ b/pkg/shreder/tx.go @@ -52,6 +52,9 @@ type TxSignal struct { // parsed values Token0AmountUint64 uint64 `json:"-"` Token1AmountUint64 uint64 `json:"-"` + + ParseStart time.Time `json:"parse_start"` + ParseEnd time.Time `json:"parse_end"` } func (t *TxSignal) Parse() *TxSignal { diff --git a/pkg/shreder/txparser.go b/pkg/shreder/txparser.go index bc4e1dd..9116e45 100644 --- a/pkg/shreder/txparser.go +++ b/pkg/shreder/txparser.go @@ -6,6 +6,8 @@ import ( "fmt" "math/big" "strings" + "sync" + "time" "github.com/gagliardetto/solana-go" "github.com/mr-tron/base58" @@ -20,31 +22,40 @@ const ( // program ids var ( - pumpProgramID = solana.MustPublicKeyFromBase58("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P") - + pumpProgramID = solana.MustPublicKeyFromBase58("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P") + pumpProgramIDString = pumpProgramID.String() // has no sell function with pump and pump.amm program - azczProgramID = solana.MustPublicKeyFromBase58("AzcZqCRUQgKEg5FTAgY7JacATABEYCEfMbjXEzspLYFB") + azczProgramID = solana.MustPublicKeyFromBase58("AzcZqCRUQgKEg5FTAgY7JacATABEYCEfMbjXEzspLYFB") + azczProgramIDString = azczProgramID.String() // only buy function with pump program - f5tfProgramID = solana.MustPublicKeyFromBase58("F5tfvbLog9VdGUPqBDTT8rgXvTTcq7e5UiGnupL1zvBq") - + f5tfProgramID = solana.MustPublicKeyFromBase58("F5tfvbLog9VdGUPqBDTT8rgXvTTcq7e5UiGnupL1zvBq") + f5tfProgramIDString = f5tfProgramID.String() // only pump.fun function - photonProgramID = solana.MustPublicKeyFromBase58("BSfD6SHZigAfDWSjzD5Q41jw8LmKwtmjskPH9XW1mrRW") + photonProgramID = solana.MustPublicKeyFromBase58("BSfD6SHZigAfDWSjzD5Q41jw8LmKwtmjskPH9XW1mrRW") + photonProgramIDString = photonProgramID.String() - pumpAmmProgramID = solana.MustPublicKeyFromBase58("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA") + pumpAmmProgramID = solana.MustPublicKeyFromBase58("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA") + pumpAmmProgramIDString = pumpAmmProgramID.String() - boboProgramID = solana.MustPublicKeyFromBase58("BobogA5N2KN2GG4XN3E3rNNRw3L8H1QPXp7QLxGrNHGM") + boboProgramID = solana.MustPublicKeyFromBase58("BobogA5N2KN2GG4XN3E3rNNRw3L8H1QPXp7QLxGrNHGM") + boboProgramIDString = boboProgramID.String() - qtkvProgramID = solana.MustPublicKeyFromBase58("qtkvapJEvRWWrB7i5K6RaA1kvq5x3qmMKZ98ad71XQ7") + qtkvProgramID = solana.MustPublicKeyFromBase58("qtkvapJEvRWWrB7i5K6RaA1kvq5x3qmMKZ98ad71XQ7") + qtkvProgramIDString = qtkvProgramID.String() // only buy function with pump program - fjszProgramID = solana.MustPublicKeyFromBase58("FJsZbftBqRLfF7uqUKpm4s2goDr6xsQ5Q3mN7AFJB6hK") + fjszProgramID = solana.MustPublicKeyFromBase58("FJsZbftBqRLfF7uqUKpm4s2goDr6xsQ5Q3mN7AFJB6hK") + fjszProgramIDString = fjszProgramID.String() - flasProgramID = solana.MustPublicKeyFromBase58("FLASHX8DrLbgeR8FcfNV1F5krxYcYMUdBkrP1EPBtxB9") + flasProgramID = solana.MustPublicKeyFromBase58("FLASHX8DrLbgeR8FcfNV1F5krxYcYMUdBkrP1EPBtxB9") + flasProgramIDString = flasProgramID.String() - terminalProgramID = solana.MustPublicKeyFromBase58("term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3") + terminalProgramID = solana.MustPublicKeyFromBase58("term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3") + terminalProgramIDString = terminalProgramID.String() - jupiterV6ProgramID = solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4") + jupiterV6ProgramID = solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4") + jupiterV6ProgramIDString = jupiterV6ProgramID.String() ) type AccountNotFoundError struct { @@ -118,6 +129,7 @@ type versionedTransaction struct { Signatures []solana.Signature Message versionedMessage Block uint64 + Time time.Time } type pumpExtendedSellArgs struct { @@ -192,15 +204,72 @@ type fjszBuyArgs struct { TokenAmount uint64 } +var ( + versionedPool = sync.Pool{} + + accIdxPool = sync.Pool{} +) + +func requireAccIdxSlice() []uint8 { + v := accIdxPool.Get() + if v == nil { + return make([]uint8, 0, 16) + } + return v.([]uint8) +} + +func releaseAccIdxSlice(s []uint8) { + if s == nil { + return + } + s = s[:0] + accIdxPool.Put(s) +} + +func requireVersionedPool() *versionedTransaction { + v := versionedPool.Get() + if v == nil { + return &versionedTransaction{ + Signatures: make([]solana.Signature, 0, 10), + Message: versionedMessage{ + StaticAccountKeys: make([]solana.PublicKey, 0, 256), + Instructions: make([]compiledInstruction, 0, 16), + AddressTableLookups: make([]addressTableLookup, 0, 10), + }, + } + } + return v.(*versionedTransaction) +} + +func releaseVersionedPool(v *versionedTransaction) { + if v == nil { + return + } + for i := range v.Message.Instructions { + releaseAccIdxSlice(v.Message.Instructions[i].Accounts) + } + for i := range v.Message.AddressTableLookups { + releaseAccIdxSlice(v.Message.AddressTableLookups[i].WritableIndexes) + releaseAccIdxSlice(v.Message.AddressTableLookups[i].ReadonlyIndexes) + } + versionedPool.Put(v) +} + // ParseTransaction mirrors the Rust parse_transaction entry point. -func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables) []*TxSignal { +func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables, stats bool) []*TxSignal { + var now time.Time + if stats { + now = time.Now() + } versioned, err := toVersionedTransaction(update) if err != nil || versioned == nil || len(versioned.Signatures) == 0 { return nil } - + defer func() { + releaseVersionedPool(versioned) + }() txHash := versioned.Signatures[0] - staticKeys := versioned.Message.StaticAccountKeys + // staticKeys := versioned.Message.StaticAccountKeys instructions := versioned.Message.Instructions if loader != nil && len(versioned.Message.AddressTableLookups) > 0 { @@ -214,7 +283,7 @@ func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables) lookupTableOk = false break } - staticKeys = append(staticKeys, accounts...) + versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...) } if lookupTableOk { @@ -226,68 +295,68 @@ func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables) if len(accounts) != len(lookup.ReadonlyIndexes) { break } - staticKeys = append(staticKeys, accounts...) + versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, accounts...) } } - versioned.Message.StaticAccountKeys = staticKeys + // versioned.Message.StaticAccountKeys = staticKeys } - var parsed []*TxSignal + var parsed []*TxSignal = make([]*TxSignal, 0, 3) for i := range instructions { inst := instructions[i] - if int(inst.ProgramIDIndex) >= len(staticKeys) { + if int(inst.ProgramIDIndex) >= len(versioned.Message.StaticAccountKeys) { continue } - programID := staticKeys[inst.ProgramIDIndex] + programID := versioned.Message.StaticAccountKeys[inst.ProgramIDIndex] switch programID { case pumpProgramID: txRes, err := parsePumpInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "pump", pumpProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "pump", pumpProgramIDString) case azczProgramID: txRes, err := parseAzczInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "azcz", azczProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "azcz", azczProgramIDString) case f5tfProgramID: txRes, err := parseF5tfInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "f5tf", f5tfProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "f5tf", f5tfProgramIDString) case flasProgramID: txRes, err := parseFlasInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "flas", flasProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "flas", flasProgramIDString) case photonProgramID: txRes, err := parsePhotonInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "photon", photonProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "photon", photonProgramIDString) case pumpAmmProgramID: txRes, err := parsePumpAmmInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "pumpamm", pumpAmmProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "pumpamm", pumpAmmProgramIDString) case boboProgramID: txRes, err := parseBoboInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "bobo", boboProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "bobo", boboProgramIDString) case qtkvProgramID: txRes, err := parseQtkvInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "qtkv", qtkvProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "qtkv", qtkvProgramIDString) case fjszProgramID: txRes, err := parseFjszInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "fjsz", fjszProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "fjsz", fjszProgramIDString) case terminalProgramID: txRes, err := parseTermInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "terminal", terminalProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "terminal", terminalProgramIDString) case jupiterV6ProgramID: txRes, err := parseJupiterV6Instruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "jupiterv6", jupiterV6ProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "jupiterv6", jupiterV6ProgramIDString) case okxDexRouteV2ProgramID: txRes, err := parseOkxDexRouteV2Instruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "okxdexroutev2", okxDexRouteV2ProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "okxdexroutev2", okxDexRouteV2ProgramIDString) case dflowProgramID: txRes, err := parseDFlowInstruction(versioned, i) - parsed = appendParsed(parsed, txRes, err, txHash, "dflow", dflowProgramID.String()) + parsed = appendParsed(now, parsed, txRes, err, txHash, "dflow", dflowProgramString) } } return parsed } -func appendParsed(list []*TxSignal, parsed *TxSignal, err error, txHash [64]byte, label string, entryContract string) []*TxSignal { +func appendParsed(start time.Time, list []*TxSignal, parsed *TxSignal, err error, txHash [64]byte, label string, entryContract string) []*TxSignal { if err != nil { if !strings.HasPrefix(err.Error(), "account index") { logger.Debug("txparser: failed to parse", "label", label, "instruction", err, "tx_hash", base58.Encode(txHash[:])) @@ -296,6 +365,11 @@ func appendParsed(list []*TxSignal, parsed *TxSignal, err error, txHash [64]byte } if parsed != nil { parsed.EntryContract = entryContract + parsed.Label = label + if !start.IsZero() { + parsed.ParseEnd = time.Now() + parsed.ParseStart = start + } list = append(list, parsed) } return list @@ -308,47 +382,42 @@ func toVersionedTransaction(update *SubscribeUpdateTransaction) (*versionedTrans protoTx := update.Transaction msg := protoTx.Message - - signatures := make([]solana.Signature, len(protoTx.Signatures)) - for i, rawSig := range protoTx.Signatures { - signatures[i] = solana.SignatureFromBytes(rawSig) + versioned := requireVersionedPool() + versioned.Signatures = versioned.Signatures[:0] + for _, rawSig := range protoTx.Signatures { + versioned.Signatures = append(versioned.Signatures, solana.SignatureFromBytes(rawSig)) + } + versioned.Message.StaticAccountKeys = versioned.Message.StaticAccountKeys[:0] + for _, key := range msg.AccountKeys { + versioned.Message.StaticAccountKeys = append(versioned.Message.StaticAccountKeys, solana.PublicKeyFromBytes(key)) + } + versioned.Message.Instructions = versioned.Message.Instructions[:0] + for _, instr := range msg.Instructions { + accounts := requireAccIdxSlice() + accounts = append(accounts, instr.Accounts...) + versioned.Message.Instructions = append(versioned.Message.Instructions, + compiledInstruction{ + ProgramIDIndex: uint8(instr.ProgramIdIndex), + Accounts: accounts, + Data: instr.Data, + }) } - staticKeys := make([]solana.PublicKey, len(msg.AccountKeys)) - for i, key := range msg.AccountKeys { - staticKeys[i] = solana.PublicKeyFromBytes(key) - } - - instructions := make([]compiledInstruction, len(msg.Instructions)) - for i, instr := range msg.Instructions { - accounts := append([]uint8(nil), instr.Accounts...) - instructions[i] = compiledInstruction{ - ProgramIDIndex: uint8(instr.ProgramIdIndex), - Accounts: accounts, - Data: instr.Data, - } - } - - lookups := make([]addressTableLookup, len(msg.AddressTableLookups)) - for i, lookup := range msg.AddressTableLookups { - writable := append([]uint8(nil), lookup.WritableIndexes...) - readonly := append([]uint8(nil), lookup.ReadonlyIndexes...) - lookups[i] = addressTableLookup{ + versioned.Message.AddressTableLookups = versioned.Message.AddressTableLookups[:0] + for _, lookup := range msg.AddressTableLookups { + writable := requireAccIdxSlice() + writable = append(writable, lookup.WritableIndexes...) + readonly := requireAccIdxSlice() + readonly = append(readonly, lookup.ReadonlyIndexes...) + versioned.Message.AddressTableLookups = append(versioned.Message.AddressTableLookups, addressTableLookup{ AccountKey: solana.PublicKeyFromBytes(lookup.AccountKey), WritableIndexes: writable, ReadonlyIndexes: readonly, - } + }) } - return &versionedTransaction{ - Signatures: signatures, - Message: versionedMessage{ - StaticAccountKeys: staticKeys, - Instructions: instructions, - AddressTableLookups: lookups, - }, - Block: update.GetSlot(), - }, nil + versioned.Block = update.GetSlot() + return versioned, nil } func formatTokenAmount(amount uint64) decimal.Decimal {