fix entry parse

This commit is contained in:
thloyi
2026-01-28 14:55:47 +08:00
parent dab77c0b61
commit c3536761af
7 changed files with 147 additions and 52 deletions

View File

@@ -78,7 +78,7 @@ func main() {
// async read from shreder
txCh := make(chan shreder.TxSignal, 1000)
go func() {
err := shrederClient.ReadSync(ctx, txCh)
err := shrederClient.ReadEntriesSync(ctx, txCh)
if err != nil {
if !errors.Is(err, context.Canceled) {
panic(err)

View File

@@ -141,10 +141,8 @@ func (c *Client) ReadEntriesSync(ctx context.Context, txCh chan<- TxSignal) erro
}
}
entries := response.Entries
err = c.pool.Submit(func() {
ParseTransactionForEntries(ctx, slot, bytes.NewReader(entries), c.tableLoader, txCh)
ParseTransactionForEntries(ctx, slot, bytes.NewReader(response.Entries), c.tableLoader, txCh)
})
if err != nil && errors.Is(err, ants.ErrPoolOverload) {
logger.Warn("task pool is full")
@@ -192,7 +190,7 @@ func (c *Client) ReadSync(ctx context.Context, txCh chan<- TxSignal) error {
txData := response.Transaction
err := c.pool.Submit(func() {
ParseTransactionForSubscribe(ctx, txData, c.tableLoader, txCh)
ParseTransactionForSubscribe(ctx, txData, c.tableLoader, txCh, nil)
})
if err != nil && errors.Is(err, ants.ErrPoolOverload) {
logger.Warn("task pool is full")

View File

@@ -29,9 +29,8 @@ func (wr *wrapperReader) Skip(n int) error {
func (wr *wrapperReader) ReadCompactU16() (uint16, error) {
ln := 0
size := 0
for i := 0; i < 3; i++ {
var buf [1]byte
for i := 0; i < 3; i++ {
_, err := io.ReadFull(wr, buf[:])
if err != nil {
return 0, fmt.Errorf("unable to decode compact u16 at %d: %w", i, err)
@@ -61,11 +60,11 @@ func (wr *wrapperReader) ReadByte() (uint8, error) {
return buf[0], nil
}
func ResizeSlice[T any](slice []T, newSize int) {
func ResizeSlice[T any](slice []T, newSize int) []T {
if cap(slice) < newSize {
slice = append(slice, make([]T, newSize-len(slice))...)
}
slice = slice[:newSize]
return slice[:newSize]
}
// entriesToVersionedTransaction converts raw entry bytes to versioned transactions.
@@ -84,7 +83,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
// return nil, nil
//}
if entriesNum > 2048 {
return fmt.Errorf("entries num is too large: %d > %d", entriesNum, 1024)
return fmt.Errorf("entries num is too large: %d > %d", entriesNum, 2048)
}
for i := uint64(0); i < entriesNum; i++ {
@@ -113,7 +112,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
versioned := VersionedTransaction{}
versioned.Block = slot
versioned.Time = time.Now()
ResizeSlice(versioned.Signatures, int(numSignatures))
versioned.Signatures = ResizeSlice(versioned.Signatures, int(numSignatures))
for k := 0; k < int(numSignatures); k++ {
_, err = io.ReadFull(b, versioned.Signatures[k][:])
if err != nil {
@@ -151,7 +150,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
return fmt.Errorf("numAccountKeys %d exceeds maximum in entry %d, txn %d", numAccountKeys, i, j)
}
ResizeSlice(versioned.StaticAccountKeys, int(numAccountKeys))
versioned.StaticAccountKeys = ResizeSlice(versioned.StaticAccountKeys, int(numAccountKeys))
for k := 0; k < int(numAccountKeys); k++ {
_, err = io.ReadFull(b, versioned.StaticAccountKeys[k][:])
@@ -176,7 +175,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if numInstructions >= 256 {
return fmt.Errorf("numInstructions %d exceeds maximum in entry %d, txn %d, txHash: %s", numInstructions, i, j, versioned.GetSignature())
}
ResizeSlice(versioned.Instructions, int(numInstructions))
versioned.Instructions = ResizeSlice(versioned.Instructions, int(numInstructions))
for k := 0; k < int(numInstructions); k++ {
versioned.Instructions[k].ProgramIDIndex, err = b.ReadByte()
if err != nil {
@@ -191,7 +190,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if numAccounts >= 256 {
return fmt.Errorf("numAccounts %d exceeds maximum for ix[%d] in entry %d, txn %d", numAccounts, k, i, j)
}
ResizeSlice(versioned.Instructions[k].Accounts, int(numAccounts))
versioned.Instructions[k].Accounts = ResizeSlice(versioned.Instructions[k].Accounts, int(numAccounts))
//.AccountsLen = int(numAccounts)
if numAccounts != 0 {
@@ -208,7 +207,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if dataLen > 2048 {
return fmt.Errorf("mx.Instructions[%d].Data length %d exceeds maximum in entry %d, txn %d, txHash: %s", k, dataLen, i, j, versioned.GetSignature())
}
ResizeSlice(versioned.Instructions[k].Accounts, int(numAccounts))
versioned.Instructions[k].Data = ResizeSlice(versioned.Instructions[k].Data, int(dataLen))
if dataLen > 0 {
_, err = io.ReadFull(b, versioned.Instructions[k].Data)
if err != nil {
@@ -226,7 +225,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if numLookups >= 32 {
return fmt.Errorf("numLookups %d exceeds maximum in entry %d, txn %d", numLookups, i, j)
}
ResizeSlice(versioned.AddressTableLookups, int(numLookups))
versioned.AddressTableLookups = ResizeSlice(versioned.AddressTableLookups, int(numLookups))
for k := uint8(0); k < numLookups; k++ {
_, err = io.ReadFull(b, versioned.AddressTableLookups[k].AccountKey[:])
if err != nil {
@@ -240,7 +239,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if numWritable >= 256 {
return fmt.Errorf("numWritableIndexes %d exceeds maximum for lookup[%d] in entry %d, txn %d", numWritable, k, i, j)
}
ResizeSlice(versioned.AddressTableLookups[k].WritableIndexes, int(numWritable))
versioned.AddressTableLookups[k].WritableIndexes = ResizeSlice(versioned.AddressTableLookups[k].WritableIndexes, int(numWritable))
if numWritable > 0 {
_, err = io.ReadFull(b, versioned.AddressTableLookups[k].WritableIndexes)
if err != nil {
@@ -256,7 +255,7 @@ func entriesToVersionedTransaction(slot uint64, data io.Reader, callback func(tx
if numReadonly > 256 {
return fmt.Errorf("numReadonlyIndexes %d exceeds maximum for lookup[%d] in entry %d, txn %d", numReadonly, k, i, j)
}
ResizeSlice(versioned.AddressTableLookups[k].ReadonlyIndexes, int(numReadonly))
versioned.AddressTableLookups[k].ReadonlyIndexes = ResizeSlice(versioned.AddressTableLookups[k].ReadonlyIndexes, int(numReadonly))
if numReadonly > 0 {
_, err = io.ReadFull(b, versioned.AddressTableLookups[k].ReadonlyIndexes)
if err != nil {

30
pkg/shreder/entry_test.go Normal file

File diff suppressed because one or more lines are too long

View File

@@ -104,7 +104,7 @@ func parseDlmmInstruction(tx VersionedTransaction, instructionIndex int) (TxSign
return nil, fmt.Errorf("data is empty")
}
if len(instruction.Accounts) < 13 {
return nil, fmt.Errorf("accounts too short")
return nil, nil // fmt.Errorf("accounts too short")
}
disc := instruction.Data[:8]

View File

@@ -68,13 +68,15 @@ var (
}
)
func ParseTransactionForSubscribe(ctx context.Context, update *SubscribeUpdateTransaction, loader *AddressTables, parsed chan<- TxSignal) {
func ParseTransactionForSubscribe(ctx context.Context, update *SubscribeUpdateTransaction, loader *AddressTables, parsed chan<- TxSignal, done chan<- struct{}) {
versioned, err := toVersionedTransaction(update)
if err != nil {
logger.Debug("txparser: failed to convert to versioned transaction", "error", err)
close(done)
return
}
go ParseTransaction(ctx, versioned, loader, parsed)
ParseTransaction(ctx, versioned, loader, parsed)
close(done)
}
var VoteProgram = solana.MustPublicKeyFromBase58("Vote111111111111111111111111111111111111111")
@@ -147,7 +149,7 @@ func ParseTransaction(ctx context.Context, versioned VersionedTransaction, loade
txRes, err := handler.Func(versioned, i)
if err != nil {
if !strings.HasPrefix(err.Error(), "account index") {
logger.Debug("txparser: failed to parse", "label", handler.Label, "instruction", err, "tx_hash", versioned.Signatures[0].String())
logger.Debug("txparser: failed to parse", "label", handler.Label, "err", err, "tx_hash", versioned.Signatures[0].String())
}
continue
}

View File

@@ -149,11 +149,21 @@ func TestParseTermBuy(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "5Gz1fa4Qhb35bkg9QCMXpxCX5uuNr7WcjcmrwajGZA7kXsvNS9pDnYe12ggWeSqf1nwZbVPob6DkX6fcwbE9ofBR"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}
@@ -186,11 +196,23 @@ func TestParseBonkBuy(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "3gHF3TA2aA8rpjdmoEs2vA89vrq9J9NnTTUSXHfE6uXcaYP9cJgLtEUjCmsK9EWAyHEg7cEiepehQf4GFv1272jW"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}
@@ -223,11 +245,22 @@ func TestParseBonkSell(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "3XNi6b3j69SSStqLLRQVH5BNGVfEoFxGCzmpdd5FvrY4kmC8T644WGdEhCH9fAdrxWuR2Mtzgywq8K7qetu5MGyb"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}
@@ -260,11 +293,23 @@ func TestParsePhotonBuy(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "4DCEcXAWBxagXoUNGhWsJ7qfxq5SuE5BG2cBDBqAY7sCHkBopaMJu33ZnXnFHqzPMmWxVxq6666KRF4hMHVB33Ux"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}
@@ -297,11 +342,22 @@ func TestParseJupiterV6PumpFunBuy(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "4QF5whXwjx234fMXeH3HrJCy5knFJmKPtgbXys8xKGz1pZypqPvXBr4BoAqXfYn8jLL4HXPY1pcvxCCW1XREFNxd"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}
@@ -337,11 +393,21 @@ func TestParseJupiterV6PumpFunSell(t *testing.T) {
}
client := rpc.New(rpcUrl)
signals := ParseTransaction(
ch := make(chan TxSignal)
closed := make(chan struct{})
go func() {
ParseTransactionForSubscribe(
context.Background(),
getTransaction(t, client, "yCnE7ZA8dqB5iAZtwpSN2ar5HXh3gBjgaG2xtnwXDPFyHAm5XFU8642uTZTH5A2iPQ6G9hrj5eEPAJiWrfe38gM"),
nil,
false,
ch,
closed,
)
}()
signals := make([]TxSignal, 0)
for signal := range ch {
signals = append(signals, signal)
}
if len(signals) != 1 {
t.Fatalf("expected 1 signal, got %d", len(signals))
}