diff --git a/cmd/shreder/main.go b/cmd/shreder/main.go index 14973e3..8b54673 100644 --- a/cmd/shreder/main.go +++ b/cmd/shreder/main.go @@ -2,13 +2,15 @@ package main import ( "context" - "encoding/json" "errors" "fmt" + "log/slog" "os" "os/signal" "syscall" + "github.com/gagliardetto/solana-go/rpc" + "github.com/samlior/libsam/pkg/shreder" ) @@ -17,13 +19,24 @@ func main() { if url == "" { panic("URL is not set") } - + rpcUrl := os.Getenv("RPC_URL") + if rpcUrl == "" { + panic("RPC_URL is not set") + } + rpcClient := rpc.New(rpcUrl) + shreder.SetLogLevel(slog.LevelDebug) shrederClient, cleanup, err := shreder.NewShrederClient( url, + rpcClient, map[string]*shreder.SubscribeRequestFilterTransactions{ "pumpfunamm": { - AccountRequired: []string{ + //AccountRequired: []string{ + // "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", + //}, + AccountInclude: []string{ "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", + "GS4CU59F31iL7aR2Q8zVS8DRrcRnXX1yjQ66TqNVQnaR", //Event Authority + "5PHirr8joyTMp9JMm6nW7hNDVyEYdkzDqazxPD7RaTjx", // Fee Config }, }, "photon": { @@ -63,8 +76,8 @@ func main() { case <-ctx.Done(): return case txBatch := <-txCh: - jsonData, _ := json.MarshalIndent(txBatch, "", " ") - fmt.Println(string(jsonData)) + //jsonData, _ := json.MarshalIndent(txBatch, "", " ") + fmt.Println(txBatch[0].TxHash) } } } diff --git a/go.mod b/go.mod index 037ebc3..745fb57 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/gagliardetto/solana-go v1.12.0 github.com/mr-tron/base58 v1.2.0 github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454 + github.com/panjf2000/ants/v2 v2.11.4 github.com/shopspring/decimal v1.4.0 google.golang.org/grpc v1.75.0 google.golang.org/protobuf v1.36.10 @@ -38,6 +39,7 @@ require ( go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.44.0 // indirect golang.org/x/net v0.47.0 // indirect + golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/text v0.31.0 // indirect diff --git a/go.sum b/go.sum index 0e80975..478afe3 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454 h1:lFN7TVecCMbCHVN github.com/near/borsh-go v0.3.2-0.20220516180422-1ff87d108454/go.mod h1:NeMochZp7jN/pYFuxLkrZtmLqbADmnp/y1+/dL+AsyQ= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/panjf2000/ants/v2 v2.11.4 h1:UJQbtN1jIcI5CYNocTj0fuAUYvsLjPoYi0YuhqV/Y48= +github.com/panjf2000/ants/v2 v2.11.4/go.mod h1:8u92CYMUc6gyvTIw8Ru7Mt7+/ESnJahz5EVtqfrilek= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -81,8 +83,9 @@ github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091/go.mod h1:Vl github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -140,6 +143,8 @@ golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/shreder/addresstables.go b/pkg/shreder/addresstables.go new file mode 100644 index 0000000..7bded4a --- /dev/null +++ b/pkg/shreder/addresstables.go @@ -0,0 +1,100 @@ +package shreder + +import ( + "context" + "fmt" + "sync" + + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "github.com/panjf2000/ants/v2" +) + +type AddressTables struct { + rpcClient *rpc.Client + mux sync.RWMutex + tables map[solana.PublicKey][]solana.PublicKey + loading map[solana.PublicKey]struct{} + + pool *ants.Pool +} + +func NewAddressTables(rpcClient *rpc.Client) *AddressTables { + pool, _ := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithNonblocking(true)) + return &AddressTables{ + rpcClient: rpcClient, + tables: make(map[solana.PublicKey][]solana.PublicKey), + loading: make(map[solana.PublicKey]struct{}), + pool: pool, + } +} + +func (at *AddressTables) loadAddressTable(tablePubkey solana.PublicKey) ([]solana.PublicKey, error) { + // decode acc + acc, err := at.rpcClient.GetAccountInfoWithOpts(context.Background(), tablePubkey, &rpc.GetAccountInfoOpts{ + Encoding: solana.EncodingBase64, + }) + if err != nil { + return nil, err + } + data := acc.GetBinary() + if len(data) <= 56 { + return nil, fmt.Errorf("account data too short") + } + offset := 56 + var addresses solana.PublicKeySlice = make([]solana.PublicKey, 0, (len(data)-offset)/32) + for offset+32 <= len(data) { + addresses = append(addresses, solana.PublicKeyFromBytes(data[offset:offset+32])) + offset += 32 + } + // addresses = append(addresses, solana.PublicKeyFromBytes(data[start:start+32])) + return addresses, nil + +} + +func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey { + + at.mux.RLock() + addresses, ok := at.tables[tablePubkey] + if !ok { + at.mux.RUnlock() + _ = at.pool.Submit(func() { + at.mux.RLock() + _, loading := at.loading[tablePubkey] + if loading { + at.mux.RUnlock() + return + } + at.mux.RUnlock() + at.mux.Lock() + at.loading[tablePubkey] = struct{}{} + at.mux.Unlock() + + table, err := at.loadAddressTable(tablePubkey) + if err != nil { + logger.Error("loadAddressTable failed", "err", err, "table", tablePubkey) + at.mux.Lock() + delete(at.loading, tablePubkey) + at.mux.Unlock() + return + } + at.mux.Lock() + at.tables[tablePubkey] = table + total := len(at.tables) + delete(at.loading, tablePubkey) + at.mux.Unlock() + logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total) + }) + return nil + } + at.mux.RUnlock() + + var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx)) + for i := range idx { + if int(idx[i]) >= len(addresses) { + continue + } + result = append(result, addresses[i]) + } + return result +} diff --git a/pkg/shreder/client.go b/pkg/shreder/client.go index e0cbd8c..87a0a9a 100644 --- a/pkg/shreder/client.go +++ b/pkg/shreder/client.go @@ -2,35 +2,39 @@ package shreder import ( "context" - "log/slog" + "fmt" + "github.com/gagliardetto/solana-go/rpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type Client struct { - log *slog.Logger - conn *grpc.ClientConn client ShrederServiceClient + tableLoader *AddressTables subscription map[string]*SubscribeRequestFilterTransactions } func NewShrederClient( url string, + rpcClient *rpc.Client, subscription map[string]*SubscribeRequestFilterTransactions, ) (*Client, func(), error) { + if rpcClient == nil { + return nil, func() {}, fmt.Errorf("rpc client is nil") + } + conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, func() {}, err } - logger := slog.Default() s := &Client{ - log: logger, conn: conn, client: NewShrederServiceClient(conn), subscription: subscription, + tableLoader: NewAddressTables(rpcClient), } return s, func() { @@ -39,14 +43,14 @@ func NewShrederClient( } func (c *Client) Wait() { - c.log.Debug("waiting for shreder client to stop") + logger.Debug("waiting for shreder client to stop") err := c.conn.Close() if err != nil { - c.log.Error("failed to close connection: ", "err", err) + logger.Error("failed to close connection: ", "err", err) } - c.log.Debug("shreder client stopped") + logger.Debug("shreder client stopped") } func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error { @@ -68,7 +72,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error return err } - txBatch := ParseTransaction(response.Transaction) + txBatch := ParseTransaction(response.Transaction, c.tableLoader) if len(txBatch) == 0 { continue } diff --git a/pkg/shreder/tx.go b/pkg/shreder/tx.go index ffc5c4b..8b04290 100644 --- a/pkg/shreder/tx.go +++ b/pkg/shreder/tx.go @@ -1,6 +1,8 @@ package shreder import ( + "log/slog" + "os" "time" "github.com/shopspring/decimal" @@ -11,6 +13,20 @@ const ( SolDecimals = 9 ) +var ( + logger *slog.Logger +) + +func init() { + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}) + logger = slog.New(handler) +} + +func SetLogLevel(level slog.Level) { + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level}) + logger = slog.New(handler) +} + type TxSignal struct { Source string `json:"source"` TxHash string `json:"tx_hash"` diff --git a/pkg/shreder/txparser.go b/pkg/shreder/txparser.go index f835b55..0a0b1d5 100644 --- a/pkg/shreder/txparser.go +++ b/pkg/shreder/txparser.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "fmt" - "log/slog" "math/big" "github.com/gagliardetto/solana-go" @@ -193,7 +192,7 @@ type fjszBuyArgs struct { } // ParseTransaction mirrors the Rust parse_transaction entry point. -func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal { +func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables) []*TxSignal { versioned, err := toVersionedTransaction(update) if err != nil || versioned == nil || len(versioned.Signatures) == 0 { return nil @@ -203,6 +202,22 @@ func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal { staticKeys := versioned.Message.StaticAccountKeys instructions := versioned.Message.Instructions + if loader != nil && len(versioned.Message.AddressTableLookups) > 0 { + // currently we only care about photon table lookup + for _, lookup := range versioned.Message.AddressTableLookups { + accounts := loader.GetAddressTable(lookup.AccountKey, lookup.WritableIndexes) + if len(accounts) != len(lookup.WritableIndexes) { + break + } + staticKeys = append(staticKeys, accounts...) + accounts2 := loader.GetAddressTable(lookup.AccountKey, lookup.ReadonlyIndexes) + if len(accounts2) != len(lookup.ReadonlyIndexes) { + break + } + staticKeys = append(staticKeys, accounts2...) + } + } + var parsed []*TxSignal for i := range instructions { @@ -251,7 +266,7 @@ func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal { func appendParsed(list []*TxSignal, parsed *TxSignal, err error, txHash [64]byte, label string) []*TxSignal { if err != nil { - slog.Error("txparser: failed to parse", "label", label, "instruction", err, "tx_hash", base58.Encode(txHash[:])) + logger.Debug("txparser: failed to parse", "label", label, "instruction", err, "tx_hash", base58.Encode(txHash[:])) return list } if parsed != nil { @@ -321,8 +336,11 @@ func formatSolAmount(lamports uint64) decimal.Decimal { } func getStaticKey(static []solana.PublicKey, index int) (solana.PublicKey, error) { - if index < 0 || index >= len(static) { - return solana.PublicKey{}, fmt.Errorf("account index %d out of bounds", index) + if index < 0 { + return solana.PublicKey{}, fmt.Errorf("account index %d less then 0", index) + } + if index >= len(static) { + return solana.PublicKey{}, fmt.Errorf("account index %d out of range", index) } return static[index], nil }