package pump_parser import ( "bufio" "bytes" "encoding/binary" "fmt" "io" "iter" "math" "sort" "strconv" "github.com/gagliardetto/solana-go" "github.com/shopspring/decimal" ) const ( txBinarySchemaVersionCurrent uint16 = 3 txBinaryEnumVersionV1 uint16 = 1 txBinarySOLScale int32 = 9 txBinaryCUPriceScale int32 = 6 ) var txBinaryMagic = [4]byte{'P', 'T', 'X', 'B'} var txsBinaryMagic = [4]byte{'P', 'T', 'X', 'S'} type TxBinary struct { SchemaVersion uint16 EnumVersion uint16 AddressTable []solana.PublicKey Signer uint32 Block uint64 BlockIndex uint64 TxHash *[64]byte CuFee uint64 Swaps []SwapBinary Platform []PlatformBinary MevAgent []MevAgentBinary CUPrice uint64 BeforeSolBalance float64 AfterSOLBalance float64 ComputeUnitsConsumed uint64 CuLimit uint32 } type SwapBinary struct { Program string Event string TxIndex int32 InstrIdx uint8 InnerIdx uint8 Pool uint32 BaseMint uint32 QuoteMint uint32 BaseTokenProgram uint32 QuoteTokenProgram uint32 Creator uint32 BaseMintDecimals uint8 QuoteMintDecimals uint8 User uint32 BaseAmount uint64 QuoteAmount uint64 SwapMode SwapMode FixedAmount uint64 FixedAmountSide SwapAmountSide FixedMint uint32 LimitAmountType SwapLimitType LimitAmount uint64 LimitAmountSide SwapAmountSide LimitMint uint32 ActualLimitAmount uint64 ActualLimitAmountSide SwapAmountSide SlippageBps uint64 BaseReserve float64 QuoteReserve float64 Mayhem bool Cashback bool UserBaseBalance uint64 UserQuoteBalance uint64 EntryContract uint32 MigrateToPool uint32 MigrateTopProgram uint32 LpMint uint32 AfterSOLBalance float64 } type TxsBinary struct { SchemaVersion uint16 EnumVersion uint16 AddressTable []solana.PublicKey Txs []TxBinary } type TxsBinaryReaderSource interface { OpenTxsBinaryReader() (io.ReadCloser, error) } type TxsBinaryBatchHeaderContext struct { SourceIndex int BatchIndex int Reader *bufio.Reader } type TxsBinaryBatchHeaderFunc func(ctx *TxsBinaryBatchHeaderContext) (skip bool, err error) type TxsBinaryMergeOptions struct { BatchHeaderFunc TxsBinaryBatchHeaderFunc } type PlatformBinary struct { Platform string PlatformFee uint64 } type MevAgentBinary struct { MevAgent string MevAgentFee uint64 } type txBinaryBytesSource struct { data []byte } type txsBinaryMergePlan struct { schemaVersion uint16 enumVersion uint16 enumTable *txBinaryEnumTable addressTable []solana.PublicKey addressIndex *txBinaryAddressIndex txCount uint32 } func (s txBinaryBytesSource) OpenTxsBinaryReader() (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(s.data)), nil } func NewTxBinary(tx *Tx) (*TxBinary, error) { if tx == nil { return nil, fmt.Errorf("tx is nil") } addressTable, err := txBinaryBuildAddressTable([]*Tx{tx}) if err != nil { return nil, err } addressIndex, err := newTxBinaryAddressIndex(addressTable) if err != nil { return nil, err } return newTxBinaryWithAddressTable(tx, addressTable, addressIndex) } func NewTxsBinary(txs []Tx) (*TxsBinary, error) { txPtrs := make([]*Tx, 0, len(txs)) for i := range txs { txPtrs = append(txPtrs, &txs[i]) } addressTable, err := txBinaryBuildAddressTable(txPtrs) if err != nil { return nil, err } addressIndex, err := newTxBinaryAddressIndex(addressTable) if err != nil { return nil, err } out := &TxsBinary{ SchemaVersion: txBinarySchemaVersionCurrent, EnumVersion: txBinaryEnumVersionV1, AddressTable: addressTable, Txs: make([]TxBinary, 0, len(txPtrs)), } for i, tx := range txPtrs { binaryTx, err := newTxBinaryWithAddressTable(tx, addressTable, addressIndex) if err != nil { return nil, fmt.Errorf("tx[%d]: %w", i, err) } out.Txs = append(out.Txs, *binaryTx) } return out, nil } func newTxBinaryWithAddressTable(tx *Tx, addressTable []solana.PublicKey, addressIndex *txBinaryAddressIndex) (*TxBinary, error) { if tx == nil { return nil, fmt.Errorf("tx is nil") } out := &TxBinary{ SchemaVersion: txBinarySchemaVersionCurrent, EnumVersion: txBinaryEnumVersionV1, Block: tx.Block, BlockIndex: tx.BlockIndex, CuLimit: tx.CuLimit, ComputeUnitsConsumed: tx.ComputeUnitsConsumed, } if tx.TxHash != nil { txHash := *tx.TxHash out.TxHash = &txHash } var err error if out.CuFee, err = txBinaryDecimalToUint64(tx.CuFee, "tx.cu_fee"); err != nil { return nil, err } if out.CUPrice, err = txBinaryScaledDecimalToUint64(tx.CUPrice, txBinaryCUPriceScale, "tx.cu_price"); err != nil { return nil, err } if out.BeforeSolBalance, err = txBinaryDecimalToFloat64(tx.BeforeSolBalance, txBinarySOLScale, "tx.before_sol_balance"); err != nil { return nil, err } if out.AfterSOLBalance, err = txBinaryDecimalToFloat64(tx.AfterSOLBalance, txBinarySOLScale, "tx.after_sol_balance"); err != nil { return nil, err } out.Platform, err = txBinaryPlatformsFromTx(tx.Platform) if err != nil { return nil, err } out.MevAgent, err = txBinaryMevAgentsFromTx(tx.MevAgent) if err != nil { return nil, err } out.AddressTable = addressTable if out.Signer, err = addressIndex.id(tx.Signer); err != nil { return nil, fmt.Errorf("tx.signer: %w", err) } out.Swaps = make([]SwapBinary, 0, len(tx.Swaps)) for i, swap := range tx.Swaps { encodedSwap, err := newSwapBinary(swap, i, addressIndex) if err != nil { return nil, err } out.Swaps = append(out.Swaps, encodedSwap) } return out, nil } func EncodeTxBinary(tx *Tx) ([]byte, error) { binaryTx, err := NewTxBinary(tx) if err != nil { return nil, err } return binaryTx.MarshalBinary() } func EncodeTxsBinary(txs []Tx) ([]byte, error) { binaryTxs, err := NewTxsBinary(txs) if err != nil { return nil, err } return binaryTxs.MarshalBinary() } func DecodeTxBinary(data []byte) (*Tx, error) { var binaryTx TxBinary if err := binaryTx.UnmarshalBinary(data); err != nil { return nil, err } return binaryTx.ToTx() } func DecodeTxsBinary(data []byte) ([]*Tx, error) { var binaryTxs TxsBinary if err := binaryTxs.UnmarshalBinary(data); err != nil { return nil, err } return binaryTxs.ToTxs() } func DecodeTxsBinaryReader(r io.Reader) iter.Seq2[*Tx, error] { return func(yield func(*Tx, error) bool) { if r == nil { yield(nil, fmt.Errorf("txs binary reader is nil")) return } dec := txBinaryStreamDecoder{reader: r} header, err := dec.readTxsBinaryHeader() if err != nil { yield(nil, err) return } for i := uint32(0); i < header.count; i++ { tx := TxBinary{ SchemaVersion: header.schemaVersion, EnumVersion: header.enumVersion, AddressTable: header.addressTable, } if err := txBinaryReadTxBody(&dec, &tx, header.enumTable, header.addressTable); err != nil { yield(nil, fmt.Errorf("tx[%d]: %w", i, err)) return } decodedTx, err := tx.ToTx() if err != nil { yield(nil, fmt.Errorf("tx[%d]: %w", i, err)) return } if !yield(decodedTx, nil) { return } } } } func MergeTxsBinaryBytes(encodedBatches [][]byte) ([]byte, error) { return MergeTxsBinaryBytesWithOptions(encodedBatches, TxsBinaryMergeOptions{}) } func MergeTxsBinaryBytesWithOptions(encodedBatches [][]byte, opts TxsBinaryMergeOptions) ([]byte, error) { sources := make([]TxsBinaryReaderSource, 0, len(encodedBatches)) for _, encoded := range encodedBatches { sources = append(sources, txBinaryBytesSource{data: encoded}) } var out bytes.Buffer if err := MergeTxsBinarySourcesToWriterWithOptions(sources, &out, opts); err != nil { return nil, err } return out.Bytes(), nil } func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer) error { return MergeTxsBinarySourcesToWriterWithOptions(sources, w, TxsBinaryMergeOptions{}) } func MergeTxsBinarySourcesToWriterWithOptions(sources []TxsBinaryReaderSource, w io.Writer, opts TxsBinaryMergeOptions) error { if w == nil { return fmt.Errorf("txs binary writer is nil") } plan, err := txBinaryBuildMergePlan(sources, opts) if err != nil { return err } headerBytes, err := txBinaryMarshalTxsHeader(plan.schemaVersion, plan.enumVersion, plan.addressTable, plan.txCount) if err != nil { return err } if err := txBinaryWriteAll(w, headerBytes); err != nil { return err } for sourceIndex, source := range sources { reader, err := source.OpenTxsBinaryReader() if err != nil { return fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err) } bufferedReader := bufio.NewReader(reader) dec := txBinaryStreamDecoder{reader: bufferedReader} batchIndex := 0 for { skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex) if err != nil { closeErr := reader.Close() if err == io.EOF { if closeErr != nil { return fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr) } break } return fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err) } header, err := dec.readTxsBinaryHeaderOrEOF() if err != nil { closeErr := reader.Close() if err == io.EOF { if closeErr != nil { return fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr) } break } return fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err) } for txIndex := uint32(0); txIndex < header.count; txIndex++ { tx := TxBinary{ SchemaVersion: header.schemaVersion, EnumVersion: header.enumVersion, AddressTable: header.addressTable, } if err := txBinaryReadTxBody(&dec, &tx, header.enumTable, header.addressTable); err != nil { reader.Close() return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err) } if skipBatch { continue } if err := txBinaryRemapTxAddressTable(&tx, header.addressTable, plan.addressTable, plan.addressIndex); err != nil { reader.Close() return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err) } bodyBytes, err := txBinaryMarshalTxBody(&tx, plan.enumTable) if err != nil { reader.Close() return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err) } if err := txBinaryWriteAll(w, bodyBytes); err != nil { reader.Close() return fmt.Errorf("source[%d].batch[%d].tx[%d]: write merged body: %w", sourceIndex, batchIndex, txIndex, err) } } batchIndex++ } } return nil } func (tx *TxBinary) MarshalBinary() ([]byte, error) { if tx == nil { return nil, fmt.Errorf("tx binary is nil") } if tx.SchemaVersion != txBinarySchemaVersionCurrent { return nil, fmt.Errorf("unsupported tx binary schema version: %d", tx.SchemaVersion) } enumTable, err := txBinaryEnumTableByVersion(tx.EnumVersion) if err != nil { return nil, err } addressTable := tx.AddressTable enc := txBinaryEncoder{} enc.writeBytes(txBinaryMagic[:]) enc.writeUint16(tx.SchemaVersion) enc.writeUint16(tx.EnumVersion) if err := enc.writeAddressTable(addressTable); err != nil { return nil, err } if err := enc.writeTxBinaryBody(tx, enumTable); err != nil { return nil, err } return enc.bytes(), nil } func (txs *TxsBinary) MarshalBinary() ([]byte, error) { if txs == nil { return nil, fmt.Errorf("txs binary is nil") } if txs.SchemaVersion != txBinarySchemaVersionCurrent { return nil, fmt.Errorf("unsupported tx binary schema version: %d", txs.SchemaVersion) } enumTable, err := txBinaryEnumTableByVersion(txs.EnumVersion) if err != nil { return nil, err } enc := txBinaryEncoder{} enc.writeBytes(txsBinaryMagic[:]) enc.writeUint16(txs.SchemaVersion) enc.writeUint16(txs.EnumVersion) if err := enc.writeAddressTable(txs.AddressTable); err != nil { return nil, err } enc.writeUint32(uint32(len(txs.Txs))) for i := range txs.Txs { if err := enc.writeTxBinaryBody(&txs.Txs[i], enumTable); err != nil { return nil, fmt.Errorf("tx[%d]: %w", i, err) } } return enc.bytes(), nil } func txBinaryMarshalTxsHeader(schemaVersion uint16, enumVersion uint16, addressTable []solana.PublicKey, txCount uint32) ([]byte, error) { enc := txBinaryEncoder{} enc.writeBytes(txsBinaryMagic[:]) enc.writeUint16(schemaVersion) enc.writeUint16(enumVersion) if err := enc.writeAddressTable(addressTable); err != nil { return nil, err } enc.writeUint32(txCount) return enc.bytes(), nil } func txBinaryMarshalTxBody(tx *TxBinary, enumTable *txBinaryEnumTable) ([]byte, error) { enc := txBinaryEncoder{} if err := enc.writeTxBinaryBody(tx, enumTable); err != nil { return nil, err } return enc.bytes(), nil } func (tx *TxBinary) UnmarshalBinary(data []byte) error { dec := txBinaryDecoder{reader: bytes.NewReader(data)} magic, err := dec.readN(len(txBinaryMagic)) if err != nil { return err } if !bytes.Equal(magic, txBinaryMagic[:]) { return fmt.Errorf("invalid tx binary magic") } tx.SchemaVersion, err = dec.readUint16() if err != nil { return err } if tx.SchemaVersion != txBinarySchemaVersionCurrent { return fmt.Errorf("unsupported tx binary schema version: %d", tx.SchemaVersion) } tx.EnumVersion, err = dec.readUint16() if err != nil { return err } enumTable, err := txBinaryEnumTableByVersion(tx.EnumVersion) if err != nil { return err } tx.AddressTable, err = dec.readAddressTable() if err != nil { return err } if err := txBinaryReadTxBody(&dec, tx, enumTable, tx.AddressTable); err != nil { return err } if dec.reader.Len() != 0 { return fmt.Errorf("unexpected trailing tx binary data: %d bytes", dec.reader.Len()) } return nil } func (txs *TxsBinary) UnmarshalBinary(data []byte) error { dec := txBinaryDecoder{reader: bytes.NewReader(data)} magic, err := dec.readN(len(txsBinaryMagic)) if err != nil { return err } if !bytes.Equal(magic, txsBinaryMagic[:]) { return fmt.Errorf("invalid txs binary magic") } txs.SchemaVersion, err = dec.readUint16() if err != nil { return err } if txs.SchemaVersion != txBinarySchemaVersionCurrent { return fmt.Errorf("unsupported tx binary schema version: %d", txs.SchemaVersion) } txs.EnumVersion, err = dec.readUint16() if err != nil { return err } enumTable, err := txBinaryEnumTableByVersion(txs.EnumVersion) if err != nil { return err } txs.AddressTable, err = dec.readAddressTable() if err != nil { return err } count, err := dec.readUint32() if err != nil { return err } txs.Txs = make([]TxBinary, 0, count) for i := uint32(0); i < count; i++ { tx := TxBinary{ SchemaVersion: txs.SchemaVersion, EnumVersion: txs.EnumVersion, AddressTable: txs.AddressTable, } if err := txBinaryReadTxBody(&dec, &tx, enumTable, txs.AddressTable); err != nil { return fmt.Errorf("tx[%d]: %w", i, err) } txs.Txs = append(txs.Txs, tx) } if dec.reader.Len() != 0 { return fmt.Errorf("unexpected trailing txs binary data: %d bytes", dec.reader.Len()) } return nil } func (tx *TxBinary) ToTx() (*Tx, error) { if tx == nil { return nil, nil } signer, err := txBinaryAddressAt(tx.AddressTable, tx.Signer, "tx.signer") if err != nil { return nil, err } out := &Tx{ Signer: signer, Block: tx.Block, BlockIndex: tx.BlockIndex, CuFee: decimal.NewFromUint64(tx.CuFee), CUPrice: decimal.NewFromUint64(tx.CUPrice).Shift(-txBinaryCUPriceScale), BeforeSolBalance: txBinaryFloat64ToDecimal(tx.BeforeSolBalance, txBinarySOLScale), AfterSOLBalance: txBinaryFloat64ToDecimal(tx.AfterSOLBalance, txBinarySOLScale), ComputeUnitsConsumed: tx.ComputeUnitsConsumed, CuLimit: tx.CuLimit, } if tx.TxHash != nil { txHash := *tx.TxHash out.TxHash = &txHash } if len(tx.Platform) > 0 { out.Platform = make(map[string]platformInfo, len(tx.Platform)) for _, platform := range tx.Platform { out.Platform[platform.Platform] = platformInfo{ Platform: platform.Platform, PlatformFee: decimal.NewFromUint64(platform.PlatformFee).Shift(-txBinarySOLScale), } } } if len(tx.MevAgent) > 0 { out.MevAgent = make(map[string]mevInfo, len(tx.MevAgent)) for _, mevAgent := range tx.MevAgent { out.MevAgent[mevAgent.MevAgent] = mevInfo{ MevAgent: mevAgent.MevAgent, MevAgentFee: decimal.NewFromUint64(mevAgent.MevAgentFee).Shift(-txBinarySOLScale), } } } if len(tx.Swaps) > 0 { out.Swaps = make([]Swap, 0, len(tx.Swaps)) for i, swap := range tx.Swaps { decodedSwap, err := swap.toSwap(tx.AddressTable, i) if err != nil { return nil, err } out.Swaps = append(out.Swaps, decodedSwap) } } return out, nil } func (txs *TxsBinary) ToTxs() ([]*Tx, error) { if txs == nil { return nil, nil } out := make([]*Tx, 0, len(txs.Txs)) for i := range txs.Txs { txs.Txs[i].AddressTable = txs.AddressTable tx, err := txs.Txs[i].ToTx() if err != nil { return nil, fmt.Errorf("tx[%d]: %w", i, err) } out = append(out, tx) } return out, nil } func newSwapBinary(swap Swap, index int, addressIndex *txBinaryAddressIndex) (SwapBinary, error) { pool, err := addressIndex.id(swap.Pool) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].pool: %w", index, err) } baseMint, err := addressIndex.id(swap.BaseMint) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].base_mint: %w", index, err) } quoteMint, err := addressIndex.id(swap.QuoteMint) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].quote_mint: %w", index, err) } baseTokenProgram, err := addressIndex.id(swap.BaseTokenProgram) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].base_token_program: %w", index, err) } quoteTokenProgram, err := addressIndex.id(swap.QuoteTokenProgram) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].quote_token_program: %w", index, err) } creator, err := addressIndex.id(swap.Creator) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].creator: %w", index, err) } user, err := addressIndex.id(swap.User) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].user: %w", index, err) } fixedMint, err := addressIndex.id(swap.FixedMint) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].fixed_mint: %w", index, err) } limitMint, err := addressIndex.id(swap.LimitMint) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].limit_mint: %w", index, err) } entryContract, err := addressIndex.id(swap.EntryContract) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].entry_contract: %w", index, err) } migrateToPool, err := addressIndex.id(swap.MigrateToPool) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].migrate_to_pool: %w", index, err) } migrateTopProgram, err := addressIndex.id(swap.MigrateTopProgram) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].migrate_top_program: %w", index, err) } lpMint, err := addressIndex.id(swap.LpMint) if err != nil { return SwapBinary{}, fmt.Errorf("swap[%d].lp_mint: %w", index, err) } out := SwapBinary{ Program: swap.Program, Event: txBinaryCanonicalEvent(swap.Event), TxIndex: int32(swap.TxIndex), InstrIdx: swap.InstrIdx, InnerIdx: swap.InnerIdx, Pool: pool, BaseMint: baseMint, QuoteMint: quoteMint, BaseTokenProgram: baseTokenProgram, QuoteTokenProgram: quoteTokenProgram, Creator: creator, BaseMintDecimals: swap.BaseMintDecimals, QuoteMintDecimals: swap.QuoteMintDecimals, User: user, SwapMode: swap.SwapMode, FixedAmountSide: swap.FixedAmountSide, FixedMint: fixedMint, LimitAmountType: swap.LimitAmountType, LimitAmountSide: swap.LimitAmountSide, LimitMint: limitMint, ActualLimitAmountSide: swap.ActualLimitAmountSide, Mayhem: swap.Mayhem, Cashback: swap.Cashback, EntryContract: entryContract, MigrateToPool: migrateToPool, MigrateTopProgram: migrateTopProgram, LpMint: lpMint, } if swap.TxIndex > math.MaxInt32 || swap.TxIndex < math.MinInt32 { return SwapBinary{}, fmt.Errorf("swap[%d].tx_index overflows int32: %d", index, swap.TxIndex) } if out.BaseAmount, err = txBinaryDecimalToUint64(swap.BaseAmount, fmt.Sprintf("swap[%d].base_amount", index)); err != nil { return SwapBinary{}, err } if out.QuoteAmount, err = txBinaryDecimalToUint64(swap.QuoteAmount, fmt.Sprintf("swap[%d].quote_amount", index)); err != nil { return SwapBinary{}, err } if out.FixedAmount, err = txBinaryDecimalToUint64(swap.FixedAmount, fmt.Sprintf("swap[%d].fixed_amount", index)); err != nil { return SwapBinary{}, err } if out.LimitAmount, err = txBinaryDecimalToUint64(swap.LimitAmount, fmt.Sprintf("swap[%d].limit_amount", index)); err != nil { return SwapBinary{}, err } if out.ActualLimitAmount, err = txBinaryDecimalToUint64(swap.ActualLimitAmount, fmt.Sprintf("swap[%d].actual_limit_amount", index)); err != nil { return SwapBinary{}, err } if out.SlippageBps, err = txBinaryRoundedDecimalToUint64(swap.SlippageBps, fmt.Sprintf("swap[%d].slippage_bps", index)); err != nil { return SwapBinary{}, err } if out.BaseReserve, err = txBinaryDecimalToFloat64Raw(swap.BaseReserve, fmt.Sprintf("swap[%d].base_reserve", index)); err != nil { return SwapBinary{}, err } if out.QuoteReserve, err = txBinaryDecimalToFloat64Raw(swap.QuoteReserve, fmt.Sprintf("swap[%d].quote_reserve", index)); err != nil { return SwapBinary{}, err } if out.UserBaseBalance, err = txBinaryDecimalToUint64(swap.UserBaseBalance, fmt.Sprintf("swap[%d].user_base_balance", index)); err != nil { return SwapBinary{}, err } if out.UserQuoteBalance, err = txBinaryDecimalToUint64(swap.UserQuoteBalance, fmt.Sprintf("swap[%d].user_quote_balance", index)); err != nil { return SwapBinary{}, err } if out.AfterSOLBalance, err = txBinaryDecimalToFloat64(swap.AfterSOLBalance, txBinarySOLScale, fmt.Sprintf("swap[%d].after_sol_balance", index)); err != nil { return SwapBinary{}, err } return out, nil } func (swap SwapBinary) toSwap(addressTable []solana.PublicKey, index int) (Swap, error) { pool, err := txBinaryAddressAt(addressTable, swap.Pool, fmt.Sprintf("swap[%d].pool", index)) if err != nil { return Swap{}, err } baseMint, err := txBinaryAddressAt(addressTable, swap.BaseMint, fmt.Sprintf("swap[%d].base_mint", index)) if err != nil { return Swap{}, err } quoteMint, err := txBinaryAddressAt(addressTable, swap.QuoteMint, fmt.Sprintf("swap[%d].quote_mint", index)) if err != nil { return Swap{}, err } baseTokenProgram, err := txBinaryAddressAt(addressTable, swap.BaseTokenProgram, fmt.Sprintf("swap[%d].base_token_program", index)) if err != nil { return Swap{}, err } quoteTokenProgram, err := txBinaryAddressAt(addressTable, swap.QuoteTokenProgram, fmt.Sprintf("swap[%d].quote_token_program", index)) if err != nil { return Swap{}, err } creator, err := txBinaryAddressAt(addressTable, swap.Creator, fmt.Sprintf("swap[%d].creator", index)) if err != nil { return Swap{}, err } user, err := txBinaryAddressAt(addressTable, swap.User, fmt.Sprintf("swap[%d].user", index)) if err != nil { return Swap{}, err } fixedMint, err := txBinaryAddressAt(addressTable, swap.FixedMint, fmt.Sprintf("swap[%d].fixed_mint", index)) if err != nil { return Swap{}, err } limitMint, err := txBinaryAddressAt(addressTable, swap.LimitMint, fmt.Sprintf("swap[%d].limit_mint", index)) if err != nil { return Swap{}, err } entryContract, err := txBinaryAddressAt(addressTable, swap.EntryContract, fmt.Sprintf("swap[%d].entry_contract", index)) if err != nil { return Swap{}, err } migrateToPool, err := txBinaryAddressAt(addressTable, swap.MigrateToPool, fmt.Sprintf("swap[%d].migrate_to_pool", index)) if err != nil { return Swap{}, err } migrateTopProgram, err := txBinaryAddressAt(addressTable, swap.MigrateTopProgram, fmt.Sprintf("swap[%d].migrate_top_program", index)) if err != nil { return Swap{}, err } lpMint, err := txBinaryAddressAt(addressTable, swap.LpMint, fmt.Sprintf("swap[%d].lp_mint", index)) if err != nil { return Swap{}, err } return Swap{ Program: swap.Program, Event: swap.Event, TxIndex: int(swap.TxIndex), InstrIdx: swap.InstrIdx, InnerIdx: swap.InnerIdx, Pool: pool, BaseMint: baseMint, QuoteMint: quoteMint, BaseTokenProgram: baseTokenProgram, QuoteTokenProgram: quoteTokenProgram, Creator: creator, BaseMintDecimals: swap.BaseMintDecimals, QuoteMintDecimals: swap.QuoteMintDecimals, User: user, BaseAmount: decimal.NewFromUint64(swap.BaseAmount), QuoteAmount: decimal.NewFromUint64(swap.QuoteAmount), SwapMode: swap.SwapMode, FixedAmount: decimal.NewFromUint64(swap.FixedAmount), FixedAmountSide: swap.FixedAmountSide, FixedMint: fixedMint, LimitAmountType: swap.LimitAmountType, LimitAmount: decimal.NewFromUint64(swap.LimitAmount), LimitAmountSide: swap.LimitAmountSide, LimitMint: limitMint, ActualLimitAmount: decimal.NewFromUint64(swap.ActualLimitAmount), ActualLimitAmountSide: swap.ActualLimitAmountSide, SlippageBps: decimal.NewFromUint64(swap.SlippageBps), BaseReserve: txBinaryFloat64ToDecimalRaw(swap.BaseReserve), QuoteReserve: txBinaryFloat64ToDecimalRaw(swap.QuoteReserve), Mayhem: swap.Mayhem, Cashback: swap.Cashback, UserBaseBalance: decimal.NewFromUint64(swap.UserBaseBalance), UserQuoteBalance: decimal.NewFromUint64(swap.UserQuoteBalance), EntryContract: entryContract, MigrateToPool: migrateToPool, MigrateTopProgram: migrateTopProgram, LpMint: lpMint, AfterSOLBalance: txBinaryFloat64ToDecimal(swap.AfterSOLBalance, txBinarySOLScale), }, nil } func txBinaryPlatformsFromTx(platforms map[string]platformInfo) ([]PlatformBinary, error) { if len(platforms) == 0 { return nil, nil } keys := make([]string, 0, len(platforms)) for key := range platforms { keys = append(keys, key) } sort.Strings(keys) out := make([]PlatformBinary, 0, len(keys)) for _, key := range keys { platform := platforms[key] platformFee, err := txBinaryScaledDecimalToUint64(platform.PlatformFee, txBinarySOLScale, fmt.Sprintf("platform[%s].fee", key)) if err != nil { return nil, err } out = append(out, PlatformBinary{ Platform: key, PlatformFee: platformFee, }) } return out, nil } func txBinaryCanonicalEvent(event string) string { switch event { case "add_liquidity_on_side": return TxEventAddLiquidityOneSide case "remove_liquidity_on_side": return TxEventRemoveLiquidityOneSide default: return event } } func txBinaryMevAgentsFromTx(mevAgents map[string]mevInfo) ([]MevAgentBinary, error) { if len(mevAgents) == 0 { return nil, nil } keys := make([]string, 0, len(mevAgents)) for key := range mevAgents { keys = append(keys, key) } sort.Strings(keys) out := make([]MevAgentBinary, 0, len(keys)) for _, key := range keys { mevAgent := mevAgents[key] mevFee, err := txBinaryScaledDecimalToUint64(mevAgent.MevAgentFee, txBinarySOLScale, fmt.Sprintf("mev_agent[%s].fee", key)) if err != nil { return nil, err } out = append(out, MevAgentBinary{ MevAgent: key, MevAgentFee: mevFee, }) } return out, nil } func txBinaryBuildAddressTable(txs []*Tx) ([]solana.PublicKey, error) { builder := txBinaryAddressTableBuilder{ index: make(map[solana.PublicKey]struct{}), } for txIndex, tx := range txs { if tx == nil { return nil, fmt.Errorf("tx[%d] is nil", txIndex) } if err := builder.add(tx.Signer); err != nil { return nil, fmt.Errorf("tx[%d].signer: %w", txIndex, err) } for swapIndex, swap := range tx.Swaps { for _, address := range []solana.PublicKey{ swap.Pool, swap.BaseMint, swap.QuoteMint, swap.BaseTokenProgram, swap.QuoteTokenProgram, swap.Creator, swap.User, swap.FixedMint, swap.LimitMint, swap.EntryContract, swap.MigrateToPool, swap.MigrateTopProgram, swap.LpMint, } { if err := builder.add(address); err != nil { return nil, fmt.Errorf("tx[%d].swap[%d] address table: %w", txIndex, swapIndex, err) } } } } return builder.addresses, nil } type txBinaryAddressTableBuilder struct { addresses []solana.PublicKey index map[solana.PublicKey]struct{} } func (b *txBinaryAddressTableBuilder) add(address solana.PublicKey) error { if _, ok := b.index[address]; ok { return nil } if uint64(len(b.addresses)) >= uint64(math.MaxUint32) { return fmt.Errorf("address table exceeds uint32 capacity") } b.addresses = append(b.addresses, address) b.index[address] = struct{}{} return nil } type txBinaryAddressIndex struct { index map[solana.PublicKey]uint32 } func newTxBinaryAddressIndex(addresses []solana.PublicKey) (*txBinaryAddressIndex, error) { if uint64(len(addresses)) > uint64(math.MaxUint32) { return nil, fmt.Errorf("address table exceeds uint32 capacity") } index := make(map[solana.PublicKey]uint32, len(addresses)) for i, address := range addresses { if _, exists := index[address]; exists { return nil, fmt.Errorf("duplicate address table entry: %s", address.String()) } index[address] = uint32(i) } return &txBinaryAddressIndex{index: index}, nil } func (idx *txBinaryAddressIndex) id(address solana.PublicKey) (uint32, error) { id, ok := idx.index[address] if !ok { return 0, fmt.Errorf("address not found in address table: %s", address.String()) } return id, nil } func txBinaryAddressAt(addressTable []solana.PublicKey, index uint32, field string) (solana.PublicKey, error) { if int(index) >= len(addressTable) { return solana.PublicKey{}, fmt.Errorf("%s address index out of range: %d", field, index) } return addressTable[index], nil } func txBinaryDecimalToUint64(value decimal.Decimal, field string) (uint64, error) { if value.IsNegative() { return 0, fmt.Errorf("%s must be >= 0, got %s", field, value.String()) } if !value.Equal(value.Truncate(0)) { return 0, fmt.Errorf("%s must be an integer, got %s", field, value.String()) } bigInt := value.BigInt() if !bigInt.IsUint64() { return 0, fmt.Errorf("%s overflows uint64: %s", field, value.String()) } return bigInt.Uint64(), nil } func txBinaryScaledDecimalToUint64(value decimal.Decimal, scale int32, field string) (uint64, error) { return txBinaryDecimalToUint64(value.Shift(scale), field) } func txBinaryRoundedDecimalToUint64(value decimal.Decimal, field string) (uint64, error) { return txBinaryDecimalToUint64(value.Round(0), field) } func txBinaryDecimalToFloat64(value decimal.Decimal, scale int32, field string) (float64, error) { rounded := value.Round(scale) f, exact := rounded.Float64() if !exact && math.IsInf(f, 0) { return 0, fmt.Errorf("%s cannot be represented as float64: %s", field, value.String()) } return f, nil } func txBinaryDecimalToFloat64Raw(value decimal.Decimal, field string) (float64, error) { f, exact := value.Float64() if !exact && math.IsInf(f, 0) { return 0, fmt.Errorf("%s cannot be represented as float64: %s", field, value.String()) } return f, nil } func txBinaryFloat64ToDecimal(value float64, scale int32) decimal.Decimal { formatted := strconv.FormatFloat(value, 'f', int(scale), 64) out, err := decimal.NewFromString(formatted) if err != nil { return decimal.Zero } return out } func txBinaryFloat64ToDecimalRaw(value float64) decimal.Decimal { formatted := strconv.FormatFloat(value, 'f', -1, 64) out, err := decimal.NewFromString(formatted) if err != nil { return decimal.Zero } return out } type txBinaryEncoder struct { buf bytes.Buffer } func (enc *txBinaryEncoder) bytes() []byte { return enc.buf.Bytes() } func (enc *txBinaryEncoder) writeBool(value bool) { if value { enc.writeUint8(1) return } enc.writeUint8(0) } func (enc *txBinaryEncoder) writeUint8(value uint8) { enc.buf.WriteByte(value) } func (enc *txBinaryEncoder) writeUint16(value uint16) { var raw [2]byte binary.LittleEndian.PutUint16(raw[:], value) enc.buf.Write(raw[:]) } func (enc *txBinaryEncoder) writeUint32(value uint32) { var raw [4]byte binary.LittleEndian.PutUint32(raw[:], value) enc.buf.Write(raw[:]) } func (enc *txBinaryEncoder) writeUint64(value uint64) { var raw [8]byte binary.LittleEndian.PutUint64(raw[:], value) enc.buf.Write(raw[:]) } func (enc *txBinaryEncoder) writeFloat64(value float64) { enc.writeUint64(math.Float64bits(value)) } func (enc *txBinaryEncoder) writeInt32(value int32) { enc.writeUint32(uint32(value)) } func (enc *txBinaryEncoder) writeBytes(value []byte) { enc.buf.Write(value) } func (enc *txBinaryEncoder) writeAddressTable(addresses []solana.PublicKey) error { if uint64(len(addresses)) > uint64(math.MaxUint32) { return fmt.Errorf("address table exceeds uint32 capacity") } enc.writeUint32(uint32(len(addresses))) for _, address := range addresses { enc.writeBytes(address[:]) } return nil } func (enc *txBinaryEncoder) writeTxBinaryBody(tx *TxBinary, enumTable *txBinaryEnumTable) error { enc.writeUint32(tx.Signer) enc.writeUint64(tx.Block) enc.writeUint64(tx.BlockIndex) enc.writeBool(tx.TxHash != nil) if tx.TxHash != nil { enc.writeBytes(tx.TxHash[:]) } enc.writeUint64(tx.CuFee) enc.writeUint64(tx.CUPrice) enc.writeFloat64(tx.BeforeSolBalance) enc.writeFloat64(tx.AfterSOLBalance) enc.writeUint64(tx.ComputeUnitsConsumed) enc.writeUint32(tx.CuLimit) if err := enc.writePlatformEntries(tx.Platform, enumTable); err != nil { return err } if err := enc.writeMevAgentEntries(tx.MevAgent, enumTable); err != nil { return err } if err := enc.writeSwaps(tx.Swaps, enumTable); err != nil { return err } return nil } func (enc *txBinaryEncoder) writePlatformEntries(entries []PlatformBinary, enumTable *txBinaryEnumTable) error { enc.writeUint32(uint32(len(entries))) for i, entry := range entries { enumID, err := enumTable.platforms.id(entry.Platform) if err != nil { return fmt.Errorf("platform[%d]: %w", i, err) } enc.writeUint16(enumID) enc.writeUint64(entry.PlatformFee) } return nil } func (enc *txBinaryEncoder) writeMevAgentEntries(entries []MevAgentBinary, enumTable *txBinaryEnumTable) error { enc.writeUint32(uint32(len(entries))) for i, entry := range entries { enumID, err := enumTable.mevAgents.id(entry.MevAgent) if err != nil { return fmt.Errorf("mev_agent[%d]: %w", i, err) } enc.writeUint16(enumID) enc.writeUint64(entry.MevAgentFee) } return nil } func (enc *txBinaryEncoder) writeSwaps(swaps []SwapBinary, enumTable *txBinaryEnumTable) error { enc.writeUint32(uint32(len(swaps))) for i, swap := range swaps { programID, err := enumTable.programs.id(swap.Program) if err != nil { return fmt.Errorf("swap[%d].program: %w", i, err) } eventID, err := enumTable.events.id(swap.Event) if err != nil { return fmt.Errorf("swap[%d].event: %w", i, err) } enc.writeUint16(programID) enc.writeUint16(eventID) enc.writeInt32(swap.TxIndex) enc.writeUint8(swap.InstrIdx) enc.writeUint8(swap.InnerIdx) enc.writeUint32(swap.Pool) enc.writeUint32(swap.BaseMint) enc.writeUint32(swap.QuoteMint) enc.writeUint32(swap.BaseTokenProgram) enc.writeUint32(swap.QuoteTokenProgram) enc.writeUint32(swap.Creator) enc.writeUint8(swap.BaseMintDecimals) enc.writeUint8(swap.QuoteMintDecimals) enc.writeUint32(swap.User) enc.writeUint64(swap.BaseAmount) enc.writeUint64(swap.QuoteAmount) enc.writeUint8(uint8(swap.SwapMode)) enc.writeUint64(swap.FixedAmount) enc.writeUint8(uint8(swap.FixedAmountSide)) enc.writeUint32(swap.FixedMint) enc.writeUint8(uint8(swap.LimitAmountType)) enc.writeUint64(swap.LimitAmount) enc.writeUint8(uint8(swap.LimitAmountSide)) enc.writeUint32(swap.LimitMint) enc.writeUint64(swap.ActualLimitAmount) enc.writeUint8(uint8(swap.ActualLimitAmountSide)) enc.writeUint64(swap.SlippageBps) enc.writeFloat64(swap.BaseReserve) enc.writeFloat64(swap.QuoteReserve) enc.writeBool(swap.Mayhem) enc.writeBool(swap.Cashback) enc.writeUint64(swap.UserBaseBalance) enc.writeUint64(swap.UserQuoteBalance) enc.writeUint32(swap.EntryContract) enc.writeUint32(swap.MigrateToPool) enc.writeUint32(swap.MigrateTopProgram) enc.writeUint32(swap.LpMint) enc.writeFloat64(swap.AfterSOLBalance) } return nil } type txBinaryDecoder struct { reader *bytes.Reader } type txBinaryStreamDecoder struct { reader io.Reader } type txBinaryBodyReader interface { readBool() (bool, error) readUint8() (uint8, error) readUint16() (uint16, error) readUint32() (uint32, error) readUint64() (uint64, error) readFloat64() (float64, error) readInt32() (int32, error) readN(int) ([]byte, error) } type txsBinaryHeader struct { schemaVersion uint16 enumVersion uint16 addressTable []solana.PublicKey enumTable *txBinaryEnumTable count uint32 } func (dec *txBinaryDecoder) readBool() (bool, error) { value, err := dec.readUint8() if err != nil { return false, err } switch value { case 0: return false, nil case 1: return true, nil default: return false, fmt.Errorf("invalid bool value: %d", value) } } func (dec *txBinaryDecoder) readUint8() (uint8, error) { raw, err := dec.readN(1) if err != nil { return 0, err } return raw[0], nil } func (dec *txBinaryDecoder) readUint16() (uint16, error) { raw, err := dec.readN(2) if err != nil { return 0, err } return binary.LittleEndian.Uint16(raw), nil } func (dec *txBinaryDecoder) readUint32() (uint32, error) { raw, err := dec.readN(4) if err != nil { return 0, err } return binary.LittleEndian.Uint32(raw), nil } func (dec *txBinaryDecoder) readUint64() (uint64, error) { raw, err := dec.readN(8) if err != nil { return 0, err } return binary.LittleEndian.Uint64(raw), nil } func (dec *txBinaryDecoder) readFloat64() (float64, error) { value, err := dec.readUint64() if err != nil { return 0, err } return math.Float64frombits(value), nil } func (dec *txBinaryDecoder) readInt32() (int32, error) { value, err := dec.readUint32() if err != nil { return 0, err } return int32(value), nil } func (dec *txBinaryDecoder) readAddressTable() ([]solana.PublicKey, error) { return txBinaryReadAddressTable(dec) } func (dec *txBinaryDecoder) readN(n int) ([]byte, error) { out := make([]byte, n) if _, err := io.ReadFull(dec.reader, out); err != nil { return nil, err } return out, nil } func (dec *txBinaryDecoder) readPlatformEntries(enumTable *txBinaryEnumTable) ([]PlatformBinary, error) { return txBinaryReadPlatformEntries(dec, enumTable) } func (dec *txBinaryDecoder) readMevAgentEntries(enumTable *txBinaryEnumTable) ([]MevAgentBinary, error) { return txBinaryReadMevAgentEntries(dec, enumTable) } func (dec *txBinaryDecoder) readSwaps(enumTable *txBinaryEnumTable, _ []solana.PublicKey) ([]SwapBinary, error) { return txBinaryReadSwaps(dec, enumTable) } func (dec *txBinaryDecoder) readTxBinaryBody(tx *TxBinary, enumTable *txBinaryEnumTable, addressTable []solana.PublicKey) error { return txBinaryReadTxBody(dec, tx, enumTable, addressTable) } func (dec *txBinaryStreamDecoder) readBool() (bool, error) { value, err := dec.readUint8() if err != nil { return false, err } switch value { case 0: return false, nil case 1: return true, nil default: return false, fmt.Errorf("invalid bool value: %d", value) } } func (dec *txBinaryStreamDecoder) readUint8() (uint8, error) { raw, err := dec.readN(1) if err != nil { return 0, err } return raw[0], nil } func (dec *txBinaryStreamDecoder) readUint16() (uint16, error) { raw, err := dec.readN(2) if err != nil { return 0, err } return binary.LittleEndian.Uint16(raw), nil } func (dec *txBinaryStreamDecoder) readUint32() (uint32, error) { raw, err := dec.readN(4) if err != nil { return 0, err } return binary.LittleEndian.Uint32(raw), nil } func (dec *txBinaryStreamDecoder) readUint64() (uint64, error) { raw, err := dec.readN(8) if err != nil { return 0, err } return binary.LittleEndian.Uint64(raw), nil } func (dec *txBinaryStreamDecoder) readFloat64() (float64, error) { value, err := dec.readUint64() if err != nil { return 0, err } return math.Float64frombits(value), nil } func (dec *txBinaryStreamDecoder) readInt32() (int32, error) { value, err := dec.readUint32() if err != nil { return 0, err } return int32(value), nil } func (dec *txBinaryStreamDecoder) readAddressTable() ([]solana.PublicKey, error) { return txBinaryReadAddressTable(dec) } func (dec *txBinaryStreamDecoder) readN(n int) ([]byte, error) { out := make([]byte, n) if _, err := io.ReadFull(dec.reader, out); err != nil { return nil, err } return out, nil } func (dec *txBinaryStreamDecoder) readTxsBinaryHeader() (*txsBinaryHeader, error) { magic, err := dec.readN(len(txsBinaryMagic)) if err != nil { return nil, err } if !bytes.Equal(magic, txsBinaryMagic[:]) { return nil, fmt.Errorf("invalid txs binary magic") } schemaVersion, err := dec.readUint16() if err != nil { return nil, err } if schemaVersion != txBinarySchemaVersionCurrent { return nil, fmt.Errorf("unsupported tx binary schema version: %d", schemaVersion) } enumVersion, err := dec.readUint16() if err != nil { return nil, err } enumTable, err := txBinaryEnumTableByVersion(enumVersion) if err != nil { return nil, err } addressTable, err := dec.readAddressTable() if err != nil { return nil, err } count, err := dec.readUint32() if err != nil { return nil, err } return &txsBinaryHeader{ schemaVersion: schemaVersion, enumVersion: enumVersion, addressTable: addressTable, enumTable: enumTable, count: count, }, nil } func (dec *txBinaryStreamDecoder) readNOrEOF(n int) ([]byte, error) { out := make([]byte, n) readN, err := io.ReadFull(dec.reader, out) if err != nil { if err == io.EOF && readN == 0 { return nil, io.EOF } return nil, err } return out, nil } func (dec *txBinaryStreamDecoder) readTxsBinaryHeaderOrEOF() (*txsBinaryHeader, error) { magic, err := dec.readNOrEOF(len(txsBinaryMagic)) if err != nil { return nil, err } if !bytes.Equal(magic, txsBinaryMagic[:]) { return nil, fmt.Errorf("invalid txs binary magic") } schemaVersion, err := dec.readUint16() if err != nil { return nil, err } if schemaVersion != txBinarySchemaVersionCurrent { return nil, fmt.Errorf("unsupported tx binary schema version: %d", schemaVersion) } enumVersion, err := dec.readUint16() if err != nil { return nil, err } enumTable, err := txBinaryEnumTableByVersion(enumVersion) if err != nil { return nil, err } addressTable, err := dec.readAddressTable() if err != nil { return nil, err } count, err := dec.readUint32() if err != nil { return nil, err } return &txsBinaryHeader{ schemaVersion: schemaVersion, enumVersion: enumVersion, addressTable: addressTable, enumTable: enumTable, count: count, }, nil } func txBinaryReadAddressTable(dec txBinaryBodyReader) ([]solana.PublicKey, error) { count, err := dec.readUint32() if err != nil { return nil, err } addresses := make([]solana.PublicKey, 0, count) for i := uint32(0); i < count; i++ { raw, err := dec.readN(solana.PublicKeyLength) if err != nil { return nil, err } var publicKey solana.PublicKey copy(publicKey[:], raw) addresses = append(addresses, publicKey) } return addresses, nil } func txBinaryReadPlatformEntries(dec txBinaryBodyReader, enumTable *txBinaryEnumTable) ([]PlatformBinary, error) { count, err := dec.readUint32() if err != nil { return nil, err } out := make([]PlatformBinary, 0, count) for i := uint32(0); i < count; i++ { enumID, err := dec.readUint16() if err != nil { return nil, err } platform, err := enumTable.platforms.value(enumID) if err != nil { return nil, fmt.Errorf("platform[%d]: %w", i, err) } fee, err := dec.readUint64() if err != nil { return nil, err } out = append(out, PlatformBinary{ Platform: platform, PlatformFee: fee, }) } return out, nil } func txBinaryReadMevAgentEntries(dec txBinaryBodyReader, enumTable *txBinaryEnumTable) ([]MevAgentBinary, error) { count, err := dec.readUint32() if err != nil { return nil, err } out := make([]MevAgentBinary, 0, count) for i := uint32(0); i < count; i++ { enumID, err := dec.readUint16() if err != nil { return nil, err } mevAgent, err := enumTable.mevAgents.value(enumID) if err != nil { return nil, fmt.Errorf("mev_agent[%d]: %w", i, err) } fee, err := dec.readUint64() if err != nil { return nil, err } out = append(out, MevAgentBinary{ MevAgent: mevAgent, MevAgentFee: fee, }) } return out, nil } func txBinaryReadSwaps(dec txBinaryBodyReader, enumTable *txBinaryEnumTable) ([]SwapBinary, error) { count, err := dec.readUint32() if err != nil { return nil, err } out := make([]SwapBinary, 0, count) for i := uint32(0); i < count; i++ { programID, err := dec.readUint16() if err != nil { return nil, err } program, err := enumTable.programs.value(programID) if err != nil { return nil, fmt.Errorf("swap[%d].program: %w", i, err) } eventID, err := dec.readUint16() if err != nil { return nil, err } event, err := enumTable.events.value(eventID) if err != nil { return nil, fmt.Errorf("swap[%d].event: %w", i, err) } swap := SwapBinary{ Program: program, Event: event, } if swap.TxIndex, err = dec.readInt32(); err != nil { return nil, err } if swap.InstrIdx, err = dec.readUint8(); err != nil { return nil, err } if swap.InnerIdx, err = dec.readUint8(); err != nil { return nil, err } if swap.Pool, err = dec.readUint32(); err != nil { return nil, err } if swap.BaseMint, err = dec.readUint32(); err != nil { return nil, err } if swap.QuoteMint, err = dec.readUint32(); err != nil { return nil, err } if swap.BaseTokenProgram, err = dec.readUint32(); err != nil { return nil, err } if swap.QuoteTokenProgram, err = dec.readUint32(); err != nil { return nil, err } if swap.Creator, err = dec.readUint32(); err != nil { return nil, err } if swap.BaseMintDecimals, err = dec.readUint8(); err != nil { return nil, err } if swap.QuoteMintDecimals, err = dec.readUint8(); err != nil { return nil, err } if swap.User, err = dec.readUint32(); err != nil { return nil, err } if swap.BaseAmount, err = dec.readUint64(); err != nil { return nil, err } if swap.QuoteAmount, err = dec.readUint64(); err != nil { return nil, err } swapMode, err := dec.readUint8() if err != nil { return nil, err } swap.SwapMode = SwapMode(swapMode) if swap.FixedAmount, err = dec.readUint64(); err != nil { return nil, err } fixedAmountSide, err := dec.readUint8() if err != nil { return nil, err } swap.FixedAmountSide = SwapAmountSide(fixedAmountSide) if swap.FixedMint, err = dec.readUint32(); err != nil { return nil, err } limitType, err := dec.readUint8() if err != nil { return nil, err } swap.LimitAmountType = SwapLimitType(limitType) if swap.LimitAmount, err = dec.readUint64(); err != nil { return nil, err } limitAmountSide, err := dec.readUint8() if err != nil { return nil, err } swap.LimitAmountSide = SwapAmountSide(limitAmountSide) if swap.LimitMint, err = dec.readUint32(); err != nil { return nil, err } if swap.ActualLimitAmount, err = dec.readUint64(); err != nil { return nil, err } actualLimitAmountSide, err := dec.readUint8() if err != nil { return nil, err } swap.ActualLimitAmountSide = SwapAmountSide(actualLimitAmountSide) if swap.SlippageBps, err = dec.readUint64(); err != nil { return nil, err } if swap.BaseReserve, err = dec.readFloat64(); err != nil { return nil, err } if swap.QuoteReserve, err = dec.readFloat64(); err != nil { return nil, err } if swap.Mayhem, err = dec.readBool(); err != nil { return nil, err } if swap.Cashback, err = dec.readBool(); err != nil { return nil, err } if swap.UserBaseBalance, err = dec.readUint64(); err != nil { return nil, err } if swap.UserQuoteBalance, err = dec.readUint64(); err != nil { return nil, err } if swap.EntryContract, err = dec.readUint32(); err != nil { return nil, err } if swap.MigrateToPool, err = dec.readUint32(); err != nil { return nil, err } if swap.MigrateTopProgram, err = dec.readUint32(); err != nil { return nil, err } if swap.LpMint, err = dec.readUint32(); err != nil { return nil, err } if swap.AfterSOLBalance, err = dec.readFloat64(); err != nil { return nil, err } out = append(out, swap) } return out, nil } func txBinaryReadTxBody(dec txBinaryBodyReader, tx *TxBinary, enumTable *txBinaryEnumTable, addressTable []solana.PublicKey) error { var err error tx.AddressTable = addressTable if tx.Signer, err = dec.readUint32(); err != nil { return err } if tx.Block, err = dec.readUint64(); err != nil { return err } if tx.BlockIndex, err = dec.readUint64(); err != nil { return err } hasTxHash, err := dec.readBool() if err != nil { return err } if hasTxHash { rawHash, err := dec.readN(64) if err != nil { return err } var txHash [64]byte copy(txHash[:], rawHash) tx.TxHash = &txHash } else { tx.TxHash = nil } if tx.CuFee, err = dec.readUint64(); err != nil { return err } if tx.CUPrice, err = dec.readUint64(); err != nil { return err } if tx.BeforeSolBalance, err = dec.readFloat64(); err != nil { return err } if tx.AfterSOLBalance, err = dec.readFloat64(); err != nil { return err } if tx.ComputeUnitsConsumed, err = dec.readUint64(); err != nil { return err } if tx.CuLimit, err = dec.readUint32(); err != nil { return err } if tx.Platform, err = txBinaryReadPlatformEntries(dec, enumTable); err != nil { return err } if tx.MevAgent, err = txBinaryReadMevAgentEntries(dec, enumTable); err != nil { return err } if tx.Swaps, err = txBinaryReadSwaps(dec, enumTable); err != nil { return err } return nil } func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource, opts TxsBinaryMergeOptions) (*txsBinaryMergePlan, error) { if len(sources) == 0 { return nil, fmt.Errorf("txs binary sources are empty") } builder := txBinaryAddressTableBuilder{ index: make(map[solana.PublicKey]struct{}), } plan := &txsBinaryMergePlan{} hasBatch := false for sourceIndex, source := range sources { if source == nil { return nil, fmt.Errorf("source[%d] is nil", sourceIndex) } reader, err := source.OpenTxsBinaryReader() if err != nil { return nil, fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err) } bufferedReader := bufio.NewReader(reader) dec := txBinaryStreamDecoder{reader: bufferedReader} batchIndex := 0 for { skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex) if err != nil { closeErr := reader.Close() if err == io.EOF { if closeErr != nil { return nil, fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr) } break } return nil, fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err) } header, err := dec.readTxsBinaryHeaderOrEOF() if err != nil { closeErr := reader.Close() if err == io.EOF { if closeErr != nil { return nil, fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr) } break } return nil, fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err) } if !hasBatch { plan.schemaVersion = header.schemaVersion plan.enumVersion = header.enumVersion plan.enumTable = header.enumTable hasBatch = true } else { if header.schemaVersion != plan.schemaVersion { reader.Close() return nil, fmt.Errorf("source[%d].batch[%d]: schema version mismatch: got %d want %d", sourceIndex, batchIndex, header.schemaVersion, plan.schemaVersion) } if header.enumVersion != plan.enumVersion { reader.Close() return nil, fmt.Errorf("source[%d].batch[%d]: enum version mismatch: got %d want %d", sourceIndex, batchIndex, header.enumVersion, plan.enumVersion) } } for addressIndex, address := range header.addressTable { if !skipBatch { if err := builder.add(address); err != nil { reader.Close() return nil, fmt.Errorf("source[%d].batch[%d].address[%d]: %w", sourceIndex, batchIndex, addressIndex, err) } } } if !skipBatch { if uint64(plan.txCount)+uint64(header.count) > uint64(math.MaxUint32) { reader.Close() return nil, fmt.Errorf("merged tx count exceeds uint32 capacity") } plan.txCount += header.count } for txIndex := uint32(0); txIndex < header.count; txIndex++ { tx := TxBinary{ SchemaVersion: header.schemaVersion, EnumVersion: header.enumVersion, AddressTable: header.addressTable, } if err := txBinaryReadTxBody(&dec, &tx, header.enumTable, header.addressTable); err != nil { reader.Close() return nil, fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err) } } batchIndex++ } } if !hasBatch { return nil, fmt.Errorf("no txs binary batches found") } addressIndex, err := newTxBinaryAddressIndex(builder.addresses) if err != nil { return nil, err } plan.addressTable = builder.addresses plan.addressIndex = addressIndex return plan, nil } func txBinaryRemapTxAddressTable(tx *TxBinary, fromAddressTable []solana.PublicKey, toAddressTable []solana.PublicKey, toAddressIndex *txBinaryAddressIndex) error { var err error if tx.Signer, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Signer, "tx.signer"); err != nil { return err } for i := range tx.Swaps { if tx.Swaps[i].Pool, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].Pool, fmt.Sprintf("swap[%d].pool", i)); err != nil { return err } if tx.Swaps[i].BaseMint, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].BaseMint, fmt.Sprintf("swap[%d].base_mint", i)); err != nil { return err } if tx.Swaps[i].QuoteMint, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].QuoteMint, fmt.Sprintf("swap[%d].quote_mint", i)); err != nil { return err } if tx.Swaps[i].BaseTokenProgram, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].BaseTokenProgram, fmt.Sprintf("swap[%d].base_token_program", i)); err != nil { return err } if tx.Swaps[i].QuoteTokenProgram, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].QuoteTokenProgram, fmt.Sprintf("swap[%d].quote_token_program", i)); err != nil { return err } if tx.Swaps[i].Creator, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].Creator, fmt.Sprintf("swap[%d].creator", i)); err != nil { return err } if tx.Swaps[i].User, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].User, fmt.Sprintf("swap[%d].user", i)); err != nil { return err } if tx.Swaps[i].FixedMint, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].FixedMint, fmt.Sprintf("swap[%d].fixed_mint", i)); err != nil { return err } if tx.Swaps[i].LimitMint, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].LimitMint, fmt.Sprintf("swap[%d].limit_mint", i)); err != nil { return err } if tx.Swaps[i].EntryContract, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].EntryContract, fmt.Sprintf("swap[%d].entry_contract", i)); err != nil { return err } if tx.Swaps[i].MigrateToPool, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].MigrateToPool, fmt.Sprintf("swap[%d].migrate_to_pool", i)); err != nil { return err } if tx.Swaps[i].MigrateTopProgram, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].MigrateTopProgram, fmt.Sprintf("swap[%d].migrate_top_program", i)); err != nil { return err } if tx.Swaps[i].LpMint, err = txBinaryRemapAddressRef(fromAddressTable, toAddressIndex, tx.Swaps[i].LpMint, fmt.Sprintf("swap[%d].lp_mint", i)); err != nil { return err } } tx.AddressTable = toAddressTable return nil } func txBinaryRemapAddressRef(fromAddressTable []solana.PublicKey, toAddressIndex *txBinaryAddressIndex, fromRef uint32, field string) (uint32, error) { address, err := txBinaryAddressAt(fromAddressTable, fromRef, field) if err != nil { return 0, err } return toAddressIndex.id(address) } func txBinaryWriteAll(w io.Writer, data []byte) error { for len(data) > 0 { written, err := w.Write(data) if err != nil { return err } if written == 0 { return io.ErrShortWrite } data = data[written:] } return nil } func txBinaryApplyMergeBatchHeader(reader *bufio.Reader, opts TxsBinaryMergeOptions, sourceIndex int, batchIndex int) (bool, error) { if opts.BatchHeaderFunc == nil { return false, nil } return opts.BatchHeaderFunc(&TxsBinaryBatchHeaderContext{ SourceIndex: sourceIndex, BatchIndex: batchIndex, Reader: reader, }) } type txBinaryEnumTable struct { version uint16 programs txBinaryEnumSet events txBinaryEnumSet platforms txBinaryEnumSet mevAgents txBinaryEnumSet } type txBinaryEnumSet struct { name string values []string ids map[string]uint16 } var txBinaryEnumTables = map[uint16]*txBinaryEnumTable{ txBinaryEnumVersionV1: newTxBinaryEnumTable( txBinaryEnumVersionV1, "program", []string{ "", SolProgramPump, SolProgramRaydiumV4, SolProgramRaydiumCLMM, SolProgramRaydiumCPMM, SolProgramMeteoraDLMM, SolProgramOrcaWhirPool, SolProgramPumpAMM, SolProgramMeteoraAmmV2, SolProgramMeteoraBondingCurve, SolProgramMeteoraPools, SolProgramRaydiumLaunchLab, SolProgramRaydiumLaunchLabBonk, }, "event", []string{ "", TxEventAddLP, TxEventRemoveLP, TxEventBuy, TxEventSell, TxEventBuyFailed, TxEventSellFailed, TxEventBurn, TxEventCreate, TxEventComplete, TxEventMigrate, TxEventDeposit, TxEventWithdraw, TxEventOpen, TxEventClose, TxEventClaimFee, TxEventAddLiquidity, TxEventAddLiquidityOneSide, TxEventRemoveLiquidity, TxEventRemoveLiquidityOneSide, }, "platform", []string{ "", PlatformGMGN, PlatformPhoton, PlatformAxiom, PlatformPepe, PlatformBullX, PlatformBanana, PlatformTrojan, PlatformRaybot, PlatformMoonshot, PlatformMEVX, PlatformTradeWiz, PlatformSolTradingBot, PlatformMoonshotMoney, PlatformMaestro, PlatformBonkBot, PlatformPadre, PlatformDexScreener, PlatformFake, PlatformNone, }, "mev_agent", []string{ "", MevAgentJito, MevAgent0slot, MevAgentBlocxRoute, MevAgentNozomi, MevAgentNextBlock, MevAgentHelius, MevAgentNode1, MevAgentFlashBlock, MevAgentUnknown, MevAgentBlockRazor, MevAgentFast, MevAgentSoyas, MevAgentStellium, MevAgentAstralane, MevagentFa1con, MevagentBlocksprint, MevAgentMoon, MevAgentSpeedlanding, MevAgentAllenhark, MevAgentRaiden, }, ), } func txBinaryEnumTableByVersion(version uint16) (*txBinaryEnumTable, error) { table, ok := txBinaryEnumTables[version] if !ok { return nil, fmt.Errorf("unsupported tx binary enum version: %d", version) } return table, nil } func newTxBinaryEnumTable( version uint16, programName string, programs []string, eventName string, events []string, platformName string, platforms []string, mevAgentName string, mevAgents []string, ) *txBinaryEnumTable { return &txBinaryEnumTable{ version: version, programs: newTxBinaryEnumSet(programName, programs), events: newTxBinaryEnumSet(eventName, events), platforms: newTxBinaryEnumSet(platformName, platforms), mevAgents: newTxBinaryEnumSet(mevAgentName, mevAgents), } } func newTxBinaryEnumSet(name string, values []string) txBinaryEnumSet { ids := make(map[string]uint16, len(values)) for i, value := range values { if _, exists := ids[value]; exists { panic(fmt.Sprintf("duplicate %s enum value: %q", name, value)) } ids[value] = uint16(i) } return txBinaryEnumSet{ name: name, values: values, ids: ids, } } func (set txBinaryEnumSet) id(value string) (uint16, error) { id, ok := set.ids[value] if !ok { return 0, fmt.Errorf("unsupported %s enum value %q for versioned tx binary", set.name, value) } return id, nil } func (set txBinaryEnumSet) value(id uint16) (string, error) { if int(id) >= len(set.values) { return "", fmt.Errorf("unknown %s enum id %d", set.name, id) } return set.values[id], nil }