Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dd843c393 |
75
tx_binary.go
75
tx_binary.go
@@ -1,6 +1,7 @@
|
|||||||
package pump_parser
|
package pump_parser
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -107,6 +108,18 @@ type TxsBinaryReaderSource interface {
|
|||||||
OpenTxsBinaryReader() (io.ReadCloser, error)
|
OpenTxsBinaryReader() (io.ReadCloser, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TxsBinaryBatchHeaderContext struct {
|
||||||
|
SourceIndex int
|
||||||
|
BatchIndex int
|
||||||
|
Reader *bufio.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
type TxsBinaryBatchHeaderFunc func(ctx *TxsBinaryBatchHeaderContext) (skip bool, err error)
|
||||||
|
|
||||||
|
type TxsBinaryMergeOptions struct {
|
||||||
|
BatchHeaderFunc TxsBinaryBatchHeaderFunc
|
||||||
|
}
|
||||||
|
|
||||||
type PlatformBinary struct {
|
type PlatformBinary struct {
|
||||||
Platform string
|
Platform string
|
||||||
PlatformFee uint64
|
PlatformFee uint64
|
||||||
@@ -307,24 +320,32 @@ func DecodeTxsBinaryReader(r io.Reader) iter.Seq2[*Tx, error] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func MergeTxsBinaryBytes(encodedBatches [][]byte) ([]byte, error) {
|
func MergeTxsBinaryBytes(encodedBatches [][]byte) ([]byte, error) {
|
||||||
|
return MergeTxsBinaryBytesWithOptions(encodedBatches, TxsBinaryMergeOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func MergeTxsBinaryBytesWithOptions(encodedBatches [][]byte, opts TxsBinaryMergeOptions) ([]byte, error) {
|
||||||
sources := make([]TxsBinaryReaderSource, 0, len(encodedBatches))
|
sources := make([]TxsBinaryReaderSource, 0, len(encodedBatches))
|
||||||
for _, encoded := range encodedBatches {
|
for _, encoded := range encodedBatches {
|
||||||
sources = append(sources, txBinaryBytesSource{data: encoded})
|
sources = append(sources, txBinaryBytesSource{data: encoded})
|
||||||
}
|
}
|
||||||
|
|
||||||
var out bytes.Buffer
|
var out bytes.Buffer
|
||||||
if err := MergeTxsBinarySourcesToWriter(sources, &out); err != nil {
|
if err := MergeTxsBinarySourcesToWriterWithOptions(sources, &out, opts); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return out.Bytes(), nil
|
return out.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer) error {
|
func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer) error {
|
||||||
|
return MergeTxsBinarySourcesToWriterWithOptions(sources, w, TxsBinaryMergeOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func MergeTxsBinarySourcesToWriterWithOptions(sources []TxsBinaryReaderSource, w io.Writer, opts TxsBinaryMergeOptions) error {
|
||||||
if w == nil {
|
if w == nil {
|
||||||
return fmt.Errorf("txs binary writer is nil")
|
return fmt.Errorf("txs binary writer is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
plan, err := txBinaryBuildMergePlan(sources)
|
plan, err := txBinaryBuildMergePlan(sources, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -343,9 +364,22 @@ func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer)
|
|||||||
return fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
return fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := txBinaryStreamDecoder{reader: reader}
|
bufferedReader := bufio.NewReader(reader)
|
||||||
|
dec := txBinaryStreamDecoder{reader: bufferedReader}
|
||||||
batchIndex := 0
|
batchIndex := 0
|
||||||
for {
|
for {
|
||||||
|
skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex)
|
||||||
|
if err != nil {
|
||||||
|
closeErr := reader.Close()
|
||||||
|
if err == io.EOF {
|
||||||
|
if closeErr != nil {
|
||||||
|
return fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err)
|
||||||
|
}
|
||||||
|
|
||||||
header, err := dec.readTxsBinaryHeaderOrEOF()
|
header, err := dec.readTxsBinaryHeaderOrEOF()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
closeErr := reader.Close()
|
closeErr := reader.Close()
|
||||||
@@ -368,6 +402,9 @@ func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer)
|
|||||||
reader.Close()
|
reader.Close()
|
||||||
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
||||||
}
|
}
|
||||||
|
if skipBatch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := txBinaryRemapTxAddressTable(&tx, header.addressTable, plan.addressTable, plan.addressIndex); err != nil {
|
if err := txBinaryRemapTxAddressTable(&tx, header.addressTable, plan.addressTable, plan.addressIndex); err != nil {
|
||||||
reader.Close()
|
reader.Close()
|
||||||
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
||||||
@@ -1780,7 +1817,7 @@ func txBinaryReadTxBody(dec txBinaryBodyReader, tx *TxBinary, enumTable *txBinar
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePlan, error) {
|
func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource, opts TxsBinaryMergeOptions) (*txsBinaryMergePlan, error) {
|
||||||
if len(sources) == 0 {
|
if len(sources) == 0 {
|
||||||
return nil, fmt.Errorf("txs binary sources are empty")
|
return nil, fmt.Errorf("txs binary sources are empty")
|
||||||
}
|
}
|
||||||
@@ -1801,9 +1838,22 @@ func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePla
|
|||||||
return nil, fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
return nil, fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := txBinaryStreamDecoder{reader: reader}
|
bufferedReader := bufio.NewReader(reader)
|
||||||
|
dec := txBinaryStreamDecoder{reader: bufferedReader}
|
||||||
batchIndex := 0
|
batchIndex := 0
|
||||||
for {
|
for {
|
||||||
|
skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex)
|
||||||
|
if err != nil {
|
||||||
|
closeErr := reader.Close()
|
||||||
|
if err == io.EOF {
|
||||||
|
if closeErr != nil {
|
||||||
|
return nil, fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err)
|
||||||
|
}
|
||||||
|
|
||||||
header, err := dec.readTxsBinaryHeaderOrEOF()
|
header, err := dec.readTxsBinaryHeaderOrEOF()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
closeErr := reader.Close()
|
closeErr := reader.Close()
|
||||||
@@ -1833,17 +1883,21 @@ func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePla
|
|||||||
}
|
}
|
||||||
|
|
||||||
for addressIndex, address := range header.addressTable {
|
for addressIndex, address := range header.addressTable {
|
||||||
|
if !skipBatch {
|
||||||
if err := builder.add(address); err != nil {
|
if err := builder.add(address); err != nil {
|
||||||
reader.Close()
|
reader.Close()
|
||||||
return nil, fmt.Errorf("source[%d].batch[%d].address[%d]: %w", sourceIndex, batchIndex, addressIndex, err)
|
return nil, fmt.Errorf("source[%d].batch[%d].address[%d]: %w", sourceIndex, batchIndex, addressIndex, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !skipBatch {
|
||||||
if uint64(plan.txCount)+uint64(header.count) > uint64(math.MaxUint32) {
|
if uint64(plan.txCount)+uint64(header.count) > uint64(math.MaxUint32) {
|
||||||
reader.Close()
|
reader.Close()
|
||||||
return nil, fmt.Errorf("merged tx count exceeds uint32 capacity")
|
return nil, fmt.Errorf("merged tx count exceeds uint32 capacity")
|
||||||
}
|
}
|
||||||
plan.txCount += header.count
|
plan.txCount += header.count
|
||||||
|
}
|
||||||
|
|
||||||
for txIndex := uint32(0); txIndex < header.count; txIndex++ {
|
for txIndex := uint32(0); txIndex < header.count; txIndex++ {
|
||||||
tx := TxBinary{
|
tx := TxBinary{
|
||||||
@@ -1947,6 +2001,17 @@ func txBinaryWriteAll(w io.Writer, data []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func txBinaryApplyMergeBatchHeader(reader *bufio.Reader, opts TxsBinaryMergeOptions, sourceIndex int, batchIndex int) (bool, error) {
|
||||||
|
if opts.BatchHeaderFunc == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return opts.BatchHeaderFunc(&TxsBinaryBatchHeaderContext{
|
||||||
|
SourceIndex: sourceIndex,
|
||||||
|
BatchIndex: batchIndex,
|
||||||
|
Reader: reader,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type txBinaryEnumTable struct {
|
type txBinaryEnumTable struct {
|
||||||
version uint16
|
version uint16
|
||||||
programs txBinaryEnumSet
|
programs txBinaryEnumSet
|
||||||
|
|||||||
@@ -602,6 +602,89 @@ func TestMergeTxsBinarySourcesToWriterWithConcatenatedBatches(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMergeTxsBinarySourcesToWriterWithBatchHeaderFuncSkip(t *testing.T) {
|
||||||
|
tx1 := Tx{
|
||||||
|
Signer: mustPubKey("So11111111111111111111111111111111111111112"),
|
||||||
|
Block: 31,
|
||||||
|
BlockIndex: 1,
|
||||||
|
CuFee: decimal.NewFromInt(1),
|
||||||
|
CUPrice: decimal.RequireFromString("0.000001"),
|
||||||
|
BeforeSolBalance: decimal.RequireFromString("1.000000000"),
|
||||||
|
AfterSOLBalance: decimal.RequireFromString("0.900000000"),
|
||||||
|
ComputeUnitsConsumed: 11,
|
||||||
|
CuLimit: 111,
|
||||||
|
}
|
||||||
|
tx2 := tx1
|
||||||
|
tx2.Block = 32
|
||||||
|
tx2.BlockIndex = 2
|
||||||
|
tx2.Signer = mustPubKey("SysvarRent111111111111111111111111111111111")
|
||||||
|
tx3 := tx1
|
||||||
|
tx3.Block = 33
|
||||||
|
tx3.BlockIndex = 3
|
||||||
|
tx3.Signer = mustPubKey("ComputeBudget111111111111111111111111111111")
|
||||||
|
|
||||||
|
batch1, err := EncodeTxsBinary([]Tx{tx1})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("EncodeTxsBinary(batch1) error = %v", err)
|
||||||
|
}
|
||||||
|
batch2, err := EncodeTxsBinary([]Tx{tx2})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("EncodeTxsBinary(batch2) error = %v", err)
|
||||||
|
}
|
||||||
|
batch3, err := EncodeTxsBinary([]Tx{tx3})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("EncodeTxsBinary(batch3) error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
source := &testTxsBinarySource{
|
||||||
|
data: append(
|
||||||
|
append(
|
||||||
|
append([]byte{}, testBatchHeader(false)...),
|
||||||
|
batch1...,
|
||||||
|
),
|
||||||
|
append(
|
||||||
|
append(testBatchHeader(true), batch2...),
|
||||||
|
append(testBatchHeader(false), batch3...)...,
|
||||||
|
)...,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
var out bytes.Buffer
|
||||||
|
err = MergeTxsBinarySourcesToWriterWithOptions(
|
||||||
|
[]TxsBinaryReaderSource{source},
|
||||||
|
&out,
|
||||||
|
TxsBinaryMergeOptions{
|
||||||
|
BatchHeaderFunc: func(ctx *TxsBinaryBatchHeaderContext) (bool, error) {
|
||||||
|
header := make([]byte, 5)
|
||||||
|
if _, err := io.ReadFull(ctx.Reader, header); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !bytes.Equal(header[:4], []byte("BHDR")) {
|
||||||
|
return false, io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
return header[4] == 1, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("MergeTxsBinarySourcesToWriterWithOptions() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
decoded, err := DecodeTxsBinary(out.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("DecodeTxsBinary(merged) error = %v", err)
|
||||||
|
}
|
||||||
|
if len(decoded) != 2 {
|
||||||
|
t.Fatalf("decoded len = %d, want 2", len(decoded))
|
||||||
|
}
|
||||||
|
if decoded[0].Block != tx1.Block || decoded[1].Block != tx3.Block {
|
||||||
|
t.Fatalf("decoded block order mismatch after skip")
|
||||||
|
}
|
||||||
|
if source.opens != 2 {
|
||||||
|
t.Fatalf("source.opens = %d, want 2", source.opens)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mustPubKey(value string) solana.PublicKey {
|
func mustPubKey(value string) solana.PublicKey {
|
||||||
return solana.MustPublicKeyFromBase58(value)
|
return solana.MustPublicKeyFromBase58(value)
|
||||||
}
|
}
|
||||||
@@ -625,3 +708,11 @@ func (s *testTxsBinarySource) OpenTxsBinaryReader() (io.ReadCloser, error) {
|
|||||||
s.opens++
|
s.opens++
|
||||||
return io.NopCloser(bytes.NewReader(s.data)), nil
|
return io.NopCloser(bytes.NewReader(s.data)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testBatchHeader(skip bool) []byte {
|
||||||
|
header := []byte("BHDR\x00")
|
||||||
|
if skip {
|
||||||
|
header[4] = 1
|
||||||
|
}
|
||||||
|
return header
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user