entries custom filter and parse

This commit is contained in:
thloyi
2026-01-30 11:45:57 +08:00
parent 1223b34117
commit eb394c5650
3 changed files with 137 additions and 26 deletions

View File

@@ -25,6 +25,7 @@ func main() {
}
rpcClient := rpc.New(rpcUrl)
shreder.SetLogLevel(slog.LevelDebug)
//handlers := shreder.GetRegisteredHandlers()
shrederClient, cleanup, err := shreder.NewShrederClient(
url,
rpcClient,
@@ -55,13 +56,14 @@ func main() {
"proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u",
},
},
"dflow": {
AccountRequired: []string{
"DF1ow4tspfHX9JwWJsAb9epbkA8hmpSEAtxXy1V27QBH",
},
},
// TODO: axiom, gmgn, etc.
}, shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false))
},
//shreder.WithCustomParsers(map[solana.PublicKey]shreder.Handler{
// solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4"): handlers[solana.MustPublicKeyFromBase58("JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4")],
// solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u"): handlers[solana.MustPublicKeyFromBase58("proVF4pMXVaYqmy4NjniPh4pqKNfMmsihgd4wdkCX3u")],
//}),
shreder.BlocksStats(false), shreder.LogParsedStats(true), shreder.ShowTableLoaded(false))
if err != nil {
panic(err)
}
@@ -91,7 +93,7 @@ func main() {
case <-ctx.Done():
return
case tx := <-txCh:
if tx.Label == "okxdexroutev2" || tx.Label == "jupiterv6" || tx.Label == "dflow" {
if tx.Label == "photon" || tx.Label == "jupiterv6" || tx.Label == "okxdexroutev2" {
fmt.Println("===============", tx.TxHash, tx.Label, tx.Event, tx.Token0Address, "token:", tx.Token0Amount, "parse time:", tx.ParseEnd.Sub(tx.ParseStart))
}
}

View File

@@ -8,6 +8,7 @@ import (
"runtime"
"time"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/panjf2000/ants/v2"
"google.golang.org/grpc"
@@ -23,6 +24,10 @@ type Client struct {
tableLoader *AddressTables
subscription map[string]*SubscribeRequestFilterTransactions
entriesFilter map[string]FilterParams
parser map[solana.PublicKey]Handler
pool *ants.Pool
lastSlot uint64
@@ -33,6 +38,8 @@ type ClientOpts struct {
blockStats bool
showTableLoaded bool
logParseStats bool
parser map[solana.PublicKey]Handler
}
type ClientOption func(*ClientOpts)
@@ -43,6 +50,12 @@ func ShowTableLoaded(enable bool) ClientOption {
}
}
func WithCustomParsers(parsers map[solana.PublicKey]Handler) ClientOption {
return func(opts *ClientOpts) {
opts.parser = parsers
}
}
func BlocksStats(enable bool) ClientOption {
return func(opts *ClientOpts) {
opts.blockStats = enable
@@ -82,14 +95,31 @@ func NewShrederClient(
blockStats: false,
showTableLoaded: true,
logParseStats: false,
parser: registered,
}
for _, option := range options {
option(o)
}
filterParams := make(map[string]FilterParams)
for name, params := range subscription {
filterParams[name] = FilterParams{
Exclude: parseAccountArray(params.AccountExclude),
Require: parseAccountArray(params.AccountRequired),
Include: parseAccountArray(params.AccountInclude),
}
}
if len(filterParams) == 0 {
filterParams["default"] = FilterParams{
Include: defaultFilterAccount,
}
}
s := &Client{
conn: conn,
client: NewShrederServiceClient(conn),
subscription: subscription,
entriesFilter: filterParams,
parser: o.parser,
tableLoader: NewAddressTables(rpcClient, o.showTableLoaded),
pool: pool,
@@ -142,7 +172,16 @@ func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) erro
}
err = c.pool.Submit(func() {
ParseTransactionForEntries(ctx, slot, bytes.NewReader(response.Entries), c.tableLoader, txCh)
err := entriesToVersionedTransaction(slot, bytes.NewReader(response.Entries), func(versioned VersionedTransaction) {
// filter out vote transactions
if FilterTransactionForEntriesWithFilter(versioned, c.entriesFilter) {
return
}
go ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser)
})
if err != nil {
logger.Debug("txparser: failed to parse entries", "error", err)
}
})
if err != nil && errors.Is(err, ants.ErrPoolOverload) {
logger.Warn("task pool is full")
@@ -187,10 +226,15 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error {
}
}
txData := response.Transaction
// txData := response.Transaction
err := c.pool.Submit(func() {
ParseTransactionForSubscribe(ctx, txData, c.tableLoader, txCh, nil)
versioned, err := toVersionedTransaction(response.Transaction)
if err != nil {
logger.Debug("txparser: failed to convert to versioned transaction", "error", err)
return
}
ParseTransactionWithHandler(ctx, versioned, c.tableLoader, txCh, c.parser)
})
if err != nil && errors.Is(err, ants.ErrPoolOverload) {
logger.Warn("task pool is full")
@@ -202,3 +246,11 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error {
return err
}
func parseAccountArray(accountArray []string) []solana.PublicKey {
var result []solana.PublicKey
for _, acc := range accountArray {
result = append(result, solana.MustPublicKeyFromBase58(acc))
}
return result
}

View File

@@ -28,26 +28,32 @@ type FillAccount interface {
}
func init() {
for account := range parsedMap {
parseProgram = append(parseProgram, account)
for account := range registered {
defaultFilterAccount = append(defaultFilterAccount, account)
}
//"GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR", //Event Authority
//"5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx", // Fee Config
//"pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ", // pump fee program
parseProgram = append(parseProgram,
defaultFilterAccount = append(defaultFilterAccount,
solana.MustPublicKeyFromBase58("GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR"),
solana.MustPublicKeyFromBase58("5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx"),
solana.MustPublicKeyFromBase58("pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ"),
)
slices.SortFunc(parseProgram, func(a, b solana.PublicKey) int {
slices.SortFunc(defaultFilterAccount, func(a, b solana.PublicKey) int {
return bytes.Compare(a[:], b[:])
})
}
var (
parseProgram []solana.PublicKey
type FilterParams struct {
Require []solana.PublicKey
Include []solana.PublicKey
Exclude []solana.PublicKey
}
parsedMap = map[solana.PublicKey]Handler{
var (
defaultFilterAccount []solana.PublicKey
registered = map[solana.PublicKey]Handler{
pumpProgramID: {parsePumpInstruction, "pump"},
azczProgramID: {parseAzczInstruction, "azcz"},
f5tfProgramID: {parseF5tfInstruction, "f5tf"},
@@ -97,7 +103,7 @@ func FilterTransactionForEntries(versioned VersionedTransaction) bool {
// accounts filter?
include := false
for _, key := range versioned.StaticAccountKeys {
_, include = slices.BinarySearchFunc(parseProgram, key, func(key solana.PublicKey, key2 solana.PublicKey) int {
_, include = slices.BinarySearchFunc(defaultFilterAccount, key, func(key solana.PublicKey, key2 solana.PublicKey) int {
return bytes.Compare(key[:], key2[:])
})
if include {
@@ -107,6 +113,53 @@ func FilterTransactionForEntries(versioned VersionedTransaction) bool {
return !include
}
func GetRegisteredHandlers() map[solana.PublicKey]Handler {
return registered
}
func FilterTransactionForEntriesWithFilter(versioned VersionedTransaction, filter map[string]FilterParams) bool {
if len(versioned.Instructions) >= 1 {
programKey, _ := versioned.GetAccount(int(versioned.Instructions[0].ProgramIDIndex))
if programKey.Equals(VoteProgram) && len(versioned.AddressTableLookups) == 0 {
return true
}
}
for _, params := range filter {
excludePass := true
// exclude first
for _, key := range params.Exclude {
if slices.Contains(versioned.StaticAccountKeys, key) {
excludePass = false
break
}
}
requirePass := true
if excludePass {
for _, key := range params.Require {
if !slices.Contains(versioned.StaticAccountKeys, key) {
requirePass = false
break
}
}
}
include := len(params.Include) == 0
if excludePass && requirePass {
for _, key := range params.Include {
if slices.Contains(versioned.StaticAccountKeys, key) {
include = true
break
}
}
}
if excludePass && requirePass && include {
return false
}
}
return true
}
func ParseTransactionForEntries(ctx context.Context, slot uint64, entriesReader io.Reader, loader *AddressTables, parsed chan<- TxSignal) {
err := entriesToVersionedTransaction(slot, entriesReader, func(versioned VersionedTransaction) {
// filter out vote transactions
@@ -121,8 +174,7 @@ func ParseTransactionForEntries(ctx context.Context, slot uint64, entriesReader
}
}
func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal) {
// staticKeys := versioned.Message.StaticAccountKeys
func ParseTransactionWithHandler(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal, handlers map[solana.PublicKey]Handler) {
if loader != nil && len(versioned.AddressTableLookups) > 0 {
lookupTableOk := true
for _, lookups := range versioned.AddressTableLookups {
@@ -147,7 +199,7 @@ func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loade
if err != nil {
continue
}
handler, ok := parsedMap[program]
handler, ok := handlers[program]
if !ok {
continue
}
@@ -178,6 +230,11 @@ func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loade
return
}
func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loader *AddressTables, parsed chan<- TxSignal) {
// staticKeys := versioned.Message.StaticAccountKeys
ParseTransactionWithHandler(ctx, versioned, loader, parsed, registered)
}
func toVersionedTransaction(update *SubscribeUpdateTransaction) (VersionedTransaction, error) {
if update == nil || update.Transaction == nil || update.Transaction.Message == nil {
return VersionedTransaction{}, fmt.Errorf("transaction is nil")