diff --git a/cmd/shreder/main.go b/cmd/shreder/main.go index c1a18cb..9307b8f 100644 --- a/cmd/shreder/main.go +++ b/cmd/shreder/main.go @@ -25,6 +25,7 @@ func main() { } rpcClient := rpc.New(rpcUrl) shreder.SetLogLevel(slog.LevelDebug) + //handlers := shreder.GetRegisteredHandlers() shrederClient, cleanup, err := shreder.NewShrederClient( url, rpcClient, @@ -55,13 +56,14 @@ func main() { "proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u", }, }, - "dflow": { - AccountRequired: []string{ - "DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH", - }, - }, + // TODO: axiom, gmgn, etc. - }, shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false)) + }, + //shreder.WithCustomParsers(map[solana.PublicKey]shreder.Handler{ + // solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4"): handlers[solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4")], + // solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u"): handlers[solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u")], + //}), + shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false)) if err != nil { panic(err) } @@ -91,7 +93,7 @@ func main() { case <-ctx.Done(): return case tx := <-txCh: - if tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow" { + if tx.Label == "photon" || tx.Label == "jupiterv6" || tx.Label == "okxdexroutev2" { fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart)) } } diff --git a/pkg/shreder/client.go b/pkg/shreder/client.go index e25fd1f..c61ac71 100644 --- a/pkg/shreder/client.go +++ b/pkg/shreder/client.go @@ -8,6 +8,7 @@ import ( "runtime" "time" + "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" @@ -23,6 +24,10 @@ type Client struct { tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions + entriesFilter map[string]FilterParams + + parser map[solana.PublicKey]Handler + pool *ants.Pool lastSlot uint64 @@ -33,6 +38,8 @@ type ClientOpts struct { blockStats bool showTableLoaded bool logParseStats bool + + parser map[solana.PublicKey]Handler } type ClientOption func(*ClientOpts) @@ -43,6 +50,12 @@ func ShowTableLoaded(enable bool) ClientOption { } } +func WithCustomParsers(parsers map[solana.PublicKey]Handler) ClientOption { + return func(opts *ClientOpts) { + opts.parser = parsers + } +} + func BlocksStats(enable bool) ClientOption { return func(opts *ClientOpts) { opts.blockStats = enable @@ -82,16 +95,33 @@ func NewShrederClient( blockStats: false, showTableLoaded: true, logParseStats: false, + parser: registered, } for _, option := range options { option(o) } + filterParams := make(map[string]FilterParams) + for name, params := range subscription { + filterParams[name] = FilterParams{ + Exclude: parseAccountArray(params.AccountExclude), + Require: parseAccountArray(params.AccountRequired), + Include: parseAccountArray(params.AccountInclude), + } + } + if len(filterParams) == 0 { + filterParams["default"] = FilterParams{ + Include: defaultFilterAccount, + } + } + s := &Client{ - conn: conn, - client: NewShrederServiceClient(conn), - subscription: subscription, - tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), - pool: pool, + conn: conn, + client: NewShrederServiceClient(conn), + subscription: subscription, + entriesFilter: filterParams, + parser: o.parser, + tableLoader: NewAddressTables(rpcClient, o.showTableLoaded), + pool: pool, enableBlockStats: o.blockStats, enableParseStats: o.logParseStats, @@ -142,7 +172,16 @@ func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) erro } err = c.pool.Submit(func() { - ParseTransactionForEntries(ctx, slot, bytes.NewReader(response.Entries), c.tableLoader, txCh) + err := entriesToVersionedTransaction(slot, bytes.NewReader(response.Entries), func(versioned VersionedTransaction) { + // filter out vote transactions + if FilterTransactionForEntriesWithFilter(versioned, c.entriesFilter) { + return + } + go ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser) + }) + if err != nil { + logger.Debug("txparser: failed to parse entries", "error", err) + } }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") @@ -187,10 +226,15 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error { } } - txData := response.Transaction + // txData := response.Transaction err := c.pool.Submit(func() { - ParseTransactionForSubscribe(ctx, txData, c.tableLoader, txCh, nil) + versioned, err := toVersionedTransaction(response.Transaction) + if err != nil { + logger.Debug("txparser: failed to convert to versioned transaction", "error", err) + return + } + ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser) }) if err != nil && errors.Is(err, ants.ErrPoolOverload) { logger.Warn("task pool is full") @@ -202,3 +246,11 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error { return err } + +func parseAccountArray(accountArray []string) []solana.PublicKey { + var result []solana.PublicKey + for _, acc := range accountArray { + result = append(result, solana.MustPublicKeyFromBase58(acc)) + } + return result +} diff --git a/pkg/shreder/txparser.go b/pkg/shreder/txparser.go index 1dd00b3..db4f3b8 100644 --- a/pkg/shreder/txparser.go +++ b/pkg/shreder/txparser.go @@ -28,26 +28,32 @@ type FillAccount interface { } func init() { - for account := range parsedMap { - parseProgram = append(parseProgram, account) + for account := range registered { + defaultFilterAccount = append(defaultFilterAccount, account) } //"GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR", //Event Authority //"5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx", // Fee Config //"pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ", // pump fee program - parseProgram = append(parseProgram, + defaultFilterAccount = append(defaultFilterAccount, solana.MustPublicKeyFromBase58("GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR"), solana.MustPublicKeyFromBase58("5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx"), solana.MustPublicKeyFromBase58("pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ"), ) - slices.SortFunc(parseProgram, func(a, b solana.PublicKey) int { + slices.SortFunc(defaultFilterAccount, func(a, b solana.PublicKey) int { return bytes.Compare(a[:], b[:]) }) } -var ( - parseProgram []solana.PublicKey +type FilterParams struct { + Require []solana.PublicKey + Include []solana.PublicKey + Exclude []solana.PublicKey +} - parsedMap = map[solana.PublicKey]Handler{ +var ( + defaultFilterAccount []solana.PublicKey + + registered = map[solana.PublicKey]Handler{ pumpProgramID: {parsePumpInstruction, "pump"}, azczProgramID: {parseAzczInstruction, "azcz"}, f5tfProgramID: {parseF5tfInstruction, "f5tf"}, @@ -97,7 +103,7 @@ func FilterTransactionForEntries(versioned VersionedTransaction) bool { // accounts filter? include := false for _, key := range versioned.StaticAccountKeys { - _, include = slices.BinarySearchFunc(parseProgram, key, func(key solana.PublicKey, key2 solana.PublicKey) int { + _, include = slices.BinarySearchFunc(defaultFilterAccount, key, func(key solana.PublicKey, key2 solana.PublicKey) int { return bytes.Compare(key[:], key2[:]) }) if include { @@ -107,6 +113,53 @@ func FilterTransactionForEntries(versioned VersionedTransaction) bool { return !include } +func GetRegisteredHandlers() map[solana.PublicKey]Handler { + return registered +} + +func FilterTransactionForEntriesWithFilter(versioned VersionedTransaction, filter map[string]FilterParams) bool { + if len(versioned.Instructions) >= 1 { + programKey, _ := versioned.GetAccount(int(versioned.Instructions[0].ProgramIDIndex)) + if programKey.Equals(VoteProgram) && len(versioned.AddressTableLookups) == 0 { + return true + } + } + + for _, params := range filter { + excludePass := true + // exclude first + for _, key := range params.Exclude { + if slices.Contains(versioned.StaticAccountKeys, key) { + excludePass = false + break + } + } + requirePass := true + if excludePass { + for _, key := range params.Require { + if !slices.Contains(versioned.StaticAccountKeys, key) { + requirePass = false + break + } + } + } + + include := len(params.Include) == 0 + if excludePass && requirePass { + for _, key := range params.Include { + if slices.Contains(versioned.StaticAccountKeys, key) { + include = true + break + } + } + } + if excludePass && requirePass && include { + return false + } + } + return true +} + func ParseTransactionForEntries(ctx context.Context, slot uint64, entriesReader io.Reader, loader *AddressTables, parsed chan<- TxSignal) { err := entriesToVersionedTransaction(slot, entriesReader, func(versioned VersionedTransaction) { // filter out vote transactions @@ -121,8 +174,7 @@ func ParseTransactionForEntries(ctx context.Context, slot uint64, entriesReader } } -func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal) { - // staticKeys := versioned.Message.StaticAccountKeys +func ParseTransactionWithHandler(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal, handlers map[solana.PublicKey]Handler) { if loader != nil && len(versioned.AddressTableLookups) > 0 { lookupTableOk := true for _, lookups := range versioned.AddressTableLookups { @@ -147,7 +199,7 @@ func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loade if err != nil { continue } - handler, ok := parsedMap[program] + handler, ok := handlers[program] if !ok { continue } @@ -178,6 +230,11 @@ func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loade return } +func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal) { + // staticKeys := versioned.Message.StaticAccountKeys + ParseTransactionWithHandler(ctx, versioned, loader, parsed, registered) +} + func toVersionedTransaction(update *SubscribeUpdateTransaction) (VersionedTransaction, error) { if update == nil || update.Transaction == nil || update.Transaction.Message == nil { return VersionedTransaction{}, fmt.Errorf("transaction is nil")