loading address tables

This commit is contained in:
thloyi
2026-01-05 12:45:32 +08:00
parent 4afa412231
commit e6922e4561
7 changed files with 178 additions and 20 deletions

View File

@@ -0,0 +1,100 @@
package shreder
import (
"context"
"fmt"
"sync"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"github.com/panjf2000/ants/v2"
)
type AddressTables struct {
rpcClient *rpc.Client
mux sync.RWMutex
tables map[solana.PublicKey][]solana.PublicKey
loading map[solana.PublicKey]struct{}
pool *ants.Pool
}
func NewAddressTables(rpcClient *rpc.Client) *AddressTables {
pool, _ := ants.NewPool(10, ants.WithPreAlloc(true), ants.WithNonblocking(true))
return &AddressTables{
rpcClient: rpcClient,
tables: make(map[solana.PublicKey][]solana.PublicKey),
loading: make(map[solana.PublicKey]struct{}),
pool: pool,
}
}
func (at *AddressTables) loadAddressTable(tablePubkey solana.PublicKey) ([]solana.PublicKey, error) {
// decode acc
acc, err := at.rpcClient.GetAccountInfoWithOpts(context.Background(), tablePubkey, &rpc.GetAccountInfoOpts{
Encoding: solana.EncodingBase64,
})
if err != nil {
return nil, err
}
data := acc.GetBinary()
if len(data) <= 56 {
return nil, fmt.Errorf("account data too short")
}
offset := 56
var addresses solana.PublicKeySlice = make([]solana.PublicKey, 0, (len(data)-offset)/32)
for offset+32 <= len(data) {
addresses = append(addresses, solana.PublicKeyFromBytes(data[offset:offset+32]))
offset += 32
}
// addresses = append(addresses, solana.PublicKeyFromBytes(data[start:start+32]))
return addresses, nil
}
func (at *AddressTables) GetAddressTable(tablePubkey solana.PublicKey, idx []uint8) []solana.PublicKey {
at.mux.RLock()
addresses, ok := at.tables[tablePubkey]
if !ok {
at.mux.RUnlock()
_ = at.pool.Submit(func() {
at.mux.RLock()
_, loading := at.loading[tablePubkey]
if loading {
at.mux.RUnlock()
return
}
at.mux.RUnlock()
at.mux.Lock()
at.loading[tablePubkey] = struct{}{}
at.mux.Unlock()
table, err := at.loadAddressTable(tablePubkey)
if err != nil {
logger.Error("loadAddressTable failed", "err", err, "table", tablePubkey)
at.mux.Lock()
delete(at.loading, tablePubkey)
at.mux.Unlock()
return
}
at.mux.Lock()
at.tables[tablePubkey] = table
total := len(at.tables)
delete(at.loading, tablePubkey)
at.mux.Unlock()
logger.Info("loadAddressTable", "table", tablePubkey.String(), "table count:", total)
})
return nil
}
at.mux.RUnlock()
var result solana.PublicKeySlice = make([]solana.PublicKey, 0, len(idx))
for i := range idx {
if int(idx[i]) >= len(addresses) {
continue
}
result = append(result, addresses[i])
}
return result
}

View File

@@ -2,35 +2,39 @@ package shreder
import (
"context"
"log/slog"
"fmt"
"github.com/gagliardetto/solana-go/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type Client struct {
log *slog.Logger
conn *grpc.ClientConn
client ShrederServiceClient
tableLoader *AddressTables
subscription map[string]*SubscribeRequestFilterTransactions
}
func NewShrederClient(
url string,
rpcClient *rpc.Client,
subscription map[string]*SubscribeRequestFilterTransactions,
) (*Client, func(), error) {
if rpcClient == nil {
return nil, func() {}, fmt.Errorf("rpc client is nil")
}
conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, func() {}, err
}
logger := slog.Default()
s := &Client{
log: logger,
conn: conn,
client: NewShrederServiceClient(conn),
subscription: subscription,
tableLoader: NewAddressTables(rpcClient),
}
return s, func() {
@@ -39,14 +43,14 @@ func NewShrederClient(
}
func (c *Client) Wait() {
c.log.Debug("waiting for shreder client to stop")
logger.Debug("waiting for shreder client to stop")
err := c.conn.Close()
if err != nil {
c.log.Error("failed to close connection: ", "err", err)
logger.Error("failed to close connection: ", "err", err)
}
c.log.Debug("shreder client stopped")
logger.Debug("shreder client stopped")
}
func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error {
@@ -68,7 +72,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignalBatch) error
return err
}
txBatch := ParseTransaction(response.Transaction)
txBatch := ParseTransaction(response.Transaction, c.tableLoader)
if len(txBatch) == 0 {
continue
}

View File

@@ -1,6 +1,8 @@
package shreder
import (
"log/slog"
"os"
"time"
"github.com/shopspring/decimal"
@@ -11,6 +13,20 @@ const (
SolDecimals = 9
)
var (
logger *slog.Logger
)
func init() {
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})
logger = slog.New(handler)
}
func SetLogLevel(level slog.Level) {
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
logger = slog.New(handler)
}
type TxSignal struct {
Source string `json:"source"`
TxHash string `json:"tx_hash"`

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/binary"
"fmt"
"log/slog"
"math/big"
"github.com/gagliardetto/solana-go"
@@ -193,7 +192,7 @@ type fjszBuyArgs struct {
}
// ParseTransaction mirrors the Rust parse_transaction entry point.
func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal {
func ParseTransaction(update *SubscribeUpdateTransaction, loader *AddressTables) []*TxSignal {
versioned, err := toVersionedTransaction(update)
if err != nil || versioned == nil || len(versioned.Signatures) == 0 {
return nil
@@ -203,6 +202,22 @@ func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal {
staticKeys := versioned.Message.StaticAccountKeys
instructions := versioned.Message.Instructions
if loader != nil && len(versioned.Message.AddressTableLookups) > 0 {
// currently we only care about photon table lookup
for _, lookup := range versioned.Message.AddressTableLookups {
accounts := loader.GetAddressTable(lookup.AccountKey, lookup.WritableIndexes)
if len(accounts) != len(lookup.WritableIndexes) {
break
}
staticKeys = append(staticKeys, accounts...)
accounts2 := loader.GetAddressTable(lookup.AccountKey, lookup.ReadonlyIndexes)
if len(accounts2) != len(lookup.ReadonlyIndexes) {
break
}
staticKeys = append(staticKeys, accounts2...)
}
}
var parsed []*TxSignal
for i := range instructions {
@@ -251,7 +266,7 @@ func ParseTransaction(update *SubscribeUpdateTransaction) []*TxSignal {
func appendParsed(list []*TxSignal, parsed *TxSignal, err error, txHash [64]byte, label string) []*TxSignal {
if err != nil {
slog.Error("txparser: failed to parse", "label", label, "instruction", err, "tx_hash", base58.Encode(txHash[:]))
logger.Debug("txparser: failed to parse", "label", label, "instruction", err, "tx_hash", base58.Encode(txHash[:]))
return list
}
if parsed != nil {
@@ -321,8 +336,11 @@ func formatSolAmount(lamports uint64) decimal.Decimal {
}
func getStaticKey(static []solana.PublicKey, index int) (solana.PublicKey, error) {
if index < 0 || index >= len(static) {
return solana.PublicKey{}, fmt.Errorf("account index %d out of bounds", index)
if index < 0 {
return solana.PublicKey{}, fmt.Errorf("account index %d less then 0", index)
}
if index >= len(static) {
return solana.PublicKey{}, fmt.Errorf("account index %d out of range", index)
}
return static[index], nil
}