Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51f1511c8f | ||
|
|
7dfe003e5b | ||
|
|
fe94888b14 | ||
|
|
1dd843c393 |
@@ -68,7 +68,7 @@ Interpretation:
|
||||
- Positive: execution is better than the user limit
|
||||
- Zero: execution lands exactly on the user limit
|
||||
- `10000`: user limit is effectively unbounded on the constrained side (for example `min_out = 0`)
|
||||
- Negative: this usually indicates an incorrect parser-side mapping or inconsistent source data
|
||||
- Negative raw headroom is clamped to `0` because successful-swap storage uses a non-negative bounded metric
|
||||
|
||||
This definition makes `SlippageBps` a bounded "remaining headroom to the user's limit" metric for successful swaps:
|
||||
|
||||
|
||||
13
enum.go
13
enum.go
@@ -126,4 +126,17 @@ const (
|
||||
TxEventBuyFailed = "buy_failed"
|
||||
TxEventSellFailed = "sell_failed"
|
||||
TxEventBurn = "burn"
|
||||
TxEventCreate = "create"
|
||||
TxEventComplete = "complete"
|
||||
TxEventMigrate = "migrate"
|
||||
TxEventDeposit = "deposit"
|
||||
TxEventWithdraw = "withdraw"
|
||||
TxEventOpen = "open"
|
||||
TxEventClose = "close"
|
||||
TxEventClaimFee = "claim_fee"
|
||||
|
||||
TxEventAddLiquidity = "add_liquidity"
|
||||
TxEventAddLiquidityOneSide = "add_liquidity_one_side"
|
||||
TxEventRemoveLiquidity = "remove_liquidity"
|
||||
TxEventRemoveLiquidityOneSide = "remove_liquidity_one_side"
|
||||
)
|
||||
|
||||
@@ -25,7 +25,7 @@ func main() {
|
||||
// laserstream-mainnet-slc.helius-rpc.com:80
|
||||
|
||||
ch := make(chan example.SubscriptionMessage, 1)
|
||||
go example.RunLoopWithReConnect(context.Background(), "127.0.0.1:10001", parser.SolProgramPump, ch)
|
||||
go example.RunLoopWithReConnect(context.Background(), "", "", parser.SolProgramPump, ch)
|
||||
// var tokenTxs = make(map[string]*types.Tx)
|
||||
// currentBlock := uint64(0)
|
||||
for msg := range ch {
|
||||
@@ -51,9 +51,24 @@ func main() {
|
||||
//}
|
||||
|
||||
// 处理交易
|
||||
if len(ptx.Swaps) > 0 && (ptx.Swaps[0].Program == parser.SolProgramPump || ptx.Swaps[0].Program == parser.SolProgramPumpAMM) {
|
||||
fmt.Printf("success tx : %s, program: %s, event: %s, block: %d, tx: %s, base: %s, quote: %s \n", time.Now().Format("2006-01-02 15:04:05"), ptx.Swaps[0].Program, ptx.Swaps[0].Event, ptx.Block, ptx.GetTxHash(),
|
||||
ptx.Swaps[0].BaseAmount.Div(decimal.NewFromInt(1e6)), ptx.Swaps[0].QuoteAmount.Div(decimal.NewFromInt(1e9)))
|
||||
if len(ptx.Swaps) > 0 {
|
||||
for _, swap := range ptx.Swaps {
|
||||
if swap.SlippageBps.LessThan(decimal.Zero) || swap.SlippageBps.GreaterThan(decimal.NewFromInt(10000)) {
|
||||
fmt.Printf("success tx : %s, program: %s, event: %s, block: %d, tx: %s, base: %s, quote: %s \n", time.Now().Format("2006-01-02 15:04:05"), swap.Program, swap.Event, ptx.Block, ptx.GetTxHash(),
|
||||
swap.BaseAmount.Div(decimal.NewFromInt(1e6)), swap.QuoteAmount.Div(decimal.NewFromInt(1e9)))
|
||||
}
|
||||
if swap.SlippageBps.Equal(decimal.Zero) && (swap.Event == "buy" || swap.Event == "sell") {
|
||||
fmt.Printf("zero success tx : %s, program: %s, event: %s, block: %d, tx: %s, base: %s, quote: %s, fix: %s, limit: %s, \n", time.Now().Format("2006-01-02 15:04:05"), swap.Program, swap.Event, ptx.Block, ptx.GetTxHash(),
|
||||
swap.BaseAmount.Div(decimal.NewFromInt(1e6)), swap.QuoteAmount.Div(decimal.NewFromInt(1e9)), swap.FixedAmount.String(), swap.LimitAmount.String())
|
||||
}
|
||||
}
|
||||
if len(ptx.Swaps) > 0 {
|
||||
_, err := parser.EncodeTxBinary(ptx)
|
||||
if err != nil {
|
||||
fmt.Printf("success tx : %s, , block: %d, tx: %s, err: %s \n", time.Now().Format("2006-01-02 15:04:05"), ptx.Block, ptx.GetTxHash(), err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// currentBlock = ptx.Block
|
||||
//
|
||||
|
||||
@@ -45,9 +45,11 @@ type Client struct {
|
||||
firstMessage bool
|
||||
|
||||
handler Handler
|
||||
|
||||
xToken string
|
||||
}
|
||||
|
||||
func NewClientWithPumpSwap(endpoint string, ch chan SubscriptionMessage) *Client {
|
||||
func NewClientWithPumpSwap(endpoint string, xtoken string, ch chan SubscriptionMessage) *Client {
|
||||
var subscription pb.SubscribeRequest
|
||||
|
||||
//var failed = true
|
||||
@@ -58,10 +60,10 @@ func NewClientWithPumpSwap(endpoint string, ch chan SubscriptionMessage) *Client
|
||||
Vote: &vote,
|
||||
}
|
||||
|
||||
subscription.Transactions["transactions_sub"].AccountInclude = []string{
|
||||
"pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", //Pump AMM
|
||||
"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", //Pump
|
||||
}
|
||||
//subscription.Transactions["transactions_sub"].AccountInclude = []string{
|
||||
// "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", //Pump AMM
|
||||
// "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", //Pump
|
||||
//}
|
||||
subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta)
|
||||
subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{}
|
||||
|
||||
@@ -72,6 +74,7 @@ func NewClientWithPumpSwap(endpoint string, ch chan SubscriptionMessage) *Client
|
||||
lastReceiveTime: time.Now(),
|
||||
subStatus: false,
|
||||
subscription: &subscription,
|
||||
xToken: xtoken,
|
||||
}
|
||||
c.handler = NewPumpHandler(func(tx *types.Tx) {
|
||||
c.sendTx(tx)
|
||||
@@ -112,12 +115,12 @@ func NewClientWithLaunchLab(endpoint string, ch chan SubscriptionMessage) *Clien
|
||||
return c
|
||||
}
|
||||
|
||||
func RunLoopWithReConnect(ctx context.Context, endpoint, program string, ch chan SubscriptionMessage) {
|
||||
func RunLoopWithReConnect(ctx context.Context, endpoint, token, program string, ch chan SubscriptionMessage) {
|
||||
var client *Client
|
||||
if program == types.SolProgramRaydiumLaunchLab {
|
||||
client = NewClientWithLaunchLab(endpoint, ch)
|
||||
} else {
|
||||
client = NewClientWithPumpSwap(endpoint, ch)
|
||||
client = NewClientWithPumpSwap(endpoint, token, ch)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
@@ -206,12 +209,13 @@ func (c *Client) grpcSubscribe(ctx context.Context, conn *grpc.ClientConn) error
|
||||
log.Printf("Subscription request: %s", string(subscriptionJson))
|
||||
|
||||
// Set up the subscription request
|
||||
//if *token != "" {
|
||||
// md := metadata.New(map[string]string{"x-token": *token})
|
||||
// ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
//}
|
||||
md := metadata.New(map[string]string{"x-token": "5adcf1f9-5719-43d1-bf3f-c2d4e1e5f94d"})
|
||||
if c.xToken != "" {
|
||||
fmt.Println("xtoken", c.xToken)
|
||||
md := metadata.New(map[string]string{"x-token": c.xToken})
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
//md := metadata.New(map[string]string{"x-token": "5adcf1f9-5719-43d1-bf3f-c2d4e1e5f94d"})
|
||||
//ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
|
||||
stream, err := client.Subscribe(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -193,6 +193,7 @@ func meteoraDammSwapAmountInfo(event string, params *struct {
|
||||
Amount1 uint64
|
||||
SwapMode uint8
|
||||
}) (swapMode SwapMode, fixedAmount decimal.Decimal, limitAmount decimal.Decimal, ok bool) {
|
||||
_ = event
|
||||
if params == nil {
|
||||
return SwapModeUnknown, decimal.Zero, decimal.Zero, false
|
||||
}
|
||||
@@ -203,21 +204,14 @@ func meteoraDammSwapAmountInfo(event string, params *struct {
|
||||
// - ExactIn / PartialFill: amount0=amount_in, amount1=minimum_amount_out
|
||||
// - ExactOut: amount0=amount_out, amount1=maximum_amount_in
|
||||
//
|
||||
// The emitted event is normalized as token A <-> token B:
|
||||
// - `sell` means A -> B, so A is the input side and B is the output side
|
||||
// - `buy` means B -> A, so B is the input side and A is the output side
|
||||
// `SetSwapAmountInfo` derives sides from the normalized buy/sell event, so
|
||||
// the instruction parameters should stay in raw IDL order here.
|
||||
switch params.SwapMode {
|
||||
case 0, 1: // ExactIn / PartialFill
|
||||
swapMode = SwapModeExactIn
|
||||
if event == TxEventSell {
|
||||
return swapMode, decimal.NewFromUint64(params.Amount0), decimal.NewFromUint64(params.Amount1), true
|
||||
}
|
||||
return swapMode, decimal.NewFromUint64(params.Amount1), decimal.NewFromUint64(params.Amount0), true
|
||||
case 2: // ExactOut
|
||||
swapMode = SwapModeExactOut
|
||||
if event == TxEventSell {
|
||||
return swapMode, decimal.NewFromUint64(params.Amount1), decimal.NewFromUint64(params.Amount0), true
|
||||
}
|
||||
return swapMode, decimal.NewFromUint64(params.Amount0), decimal.NewFromUint64(params.Amount1), true
|
||||
default:
|
||||
return SwapModeUnknown, decimal.Zero, decimal.Zero, false
|
||||
|
||||
@@ -9,6 +9,16 @@ import (
|
||||
|
||||
var maxSlippageBps = decimal.NewFromInt(10000)
|
||||
|
||||
func normalizeSlippageBps(value decimal.Decimal) decimal.Decimal {
|
||||
//if value.IsNegative() {
|
||||
// return decimal.Zero
|
||||
//}
|
||||
//if value.GreaterThan(maxSlippageBps) {
|
||||
// return maxSlippageBps
|
||||
//}
|
||||
return value
|
||||
}
|
||||
|
||||
type SwapMode uint8
|
||||
type SwapAmountSide uint8
|
||||
type SwapLimitType uint8
|
||||
@@ -141,29 +151,36 @@ func limitSwapAmountType(swapMode SwapMode) SwapLimitType {
|
||||
}
|
||||
|
||||
func calculateLimitSlippageBps(limitType SwapLimitType, limitAmount, actualAmount decimal.Decimal) decimal.Decimal {
|
||||
var value decimal.Decimal
|
||||
switch limitType {
|
||||
case SwapLimitTypeMinOut:
|
||||
if !actualAmount.IsPositive() {
|
||||
if !limitAmount.IsPositive() {
|
||||
return maxSlippageBps
|
||||
value = maxSlippageBps
|
||||
break
|
||||
}
|
||||
return maxSlippageBps.Neg()
|
||||
value = maxSlippageBps.Neg()
|
||||
break
|
||||
}
|
||||
if !limitAmount.IsPositive() {
|
||||
return maxSlippageBps
|
||||
value = maxSlippageBps
|
||||
break
|
||||
}
|
||||
return actualAmount.Sub(limitAmount).Mul(maxSlippageBps).Div(actualAmount)
|
||||
value = actualAmount.Sub(limitAmount).Mul(maxSlippageBps).Div(actualAmount)
|
||||
case SwapLimitTypeMaxIn:
|
||||
if !limitAmount.IsPositive() {
|
||||
if !actualAmount.IsPositive() {
|
||||
return maxSlippageBps
|
||||
value = maxSlippageBps
|
||||
break
|
||||
}
|
||||
return maxSlippageBps.Neg()
|
||||
value = maxSlippageBps.Neg()
|
||||
break
|
||||
}
|
||||
return limitAmount.Sub(actualAmount).Mul(maxSlippageBps).Div(limitAmount)
|
||||
value = limitAmount.Sub(actualAmount).Mul(maxSlippageBps).Div(limitAmount)
|
||||
default:
|
||||
return decimal.Zero
|
||||
value = decimal.Zero
|
||||
}
|
||||
return normalizeSlippageBps(value)
|
||||
}
|
||||
|
||||
func (s *Swap) SetSwapAmountInfoDetailed(
|
||||
|
||||
@@ -79,6 +79,38 @@ func TestSetSwapAmountInfoExactInZeroLimitUsesMaxSlippage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetSwapAmountInfoExactInNegativeHeadroomClampsToZero(t *testing.T) {
|
||||
swap := Swap{
|
||||
Event: TxEventBuy,
|
||||
BaseMint: solana.MustPublicKeyFromBase58("11111111111111111111111111111111"),
|
||||
QuoteMint: solana.MustPublicKeyFromBase58("So11111111111111111111111111111111111111112"),
|
||||
BaseAmount: decimal.NewFromInt(90),
|
||||
QuoteAmount: decimal.NewFromInt(100),
|
||||
}
|
||||
|
||||
swap.SetSwapAmountInfo(SwapModeExactIn, decimal.NewFromInt(100), decimal.NewFromInt(110))
|
||||
|
||||
if got := swap.SlippageBps.String(); got != "0" {
|
||||
t.Fatalf("slippage bps = %s, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetSwapAmountInfoExactOutNegativeHeadroomClampsToZero(t *testing.T) {
|
||||
swap := Swap{
|
||||
Event: TxEventSell,
|
||||
BaseMint: solana.MustPublicKeyFromBase58("11111111111111111111111111111111"),
|
||||
QuoteMint: solana.MustPublicKeyFromBase58("So11111111111111111111111111111111111111112"),
|
||||
BaseAmount: decimal.NewFromInt(120),
|
||||
QuoteAmount: decimal.NewFromInt(100),
|
||||
}
|
||||
|
||||
swap.SetSwapAmountInfo(SwapModeExactOut, decimal.NewFromInt(100), decimal.NewFromInt(105))
|
||||
|
||||
if got := swap.SlippageBps.String(); got != "0" {
|
||||
t.Fatalf("slippage bps = %s, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeteoraDammSwapAmountInfo(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@@ -116,6 +148,18 @@ func TestMeteoraDammSwapAmountInfo(t *testing.T) {
|
||||
wantFixed: 101,
|
||||
wantLimit: 96,
|
||||
},
|
||||
{
|
||||
name: "buy exact in keeps amount0 as input and amount1 as min out",
|
||||
event: TxEventBuy,
|
||||
params: &struct {
|
||||
Amount0 uint64
|
||||
Amount1 uint64
|
||||
SwapMode uint8
|
||||
}{Amount0: 130, Amount1: 120, SwapMode: 0},
|
||||
wantMode: SwapModeExactIn,
|
||||
wantFixed: 130,
|
||||
wantLimit: 120,
|
||||
},
|
||||
{
|
||||
name: "buy exact out uses amount0 as target output and amount1 as max input",
|
||||
event: TxEventBuy,
|
||||
@@ -128,6 +172,18 @@ func TestMeteoraDammSwapAmountInfo(t *testing.T) {
|
||||
wantFixed: 120,
|
||||
wantLimit: 130,
|
||||
},
|
||||
{
|
||||
name: "sell exact out keeps amount0 as target output and amount1 as max input",
|
||||
event: TxEventSell,
|
||||
params: &struct {
|
||||
Amount0 uint64
|
||||
Amount1 uint64
|
||||
SwapMode uint8
|
||||
}{Amount0: 140, Amount1: 150, SwapMode: 2},
|
||||
wantMode: SwapModeExactOut,
|
||||
wantFixed: 140,
|
||||
wantLimit: 150,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
||||
124
tx_binary.go
124
tx_binary.go
@@ -1,6 +1,7 @@
|
||||
package pump_parser
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
@@ -79,8 +80,8 @@ type SwapBinary struct {
|
||||
ActualLimitAmountSide SwapAmountSide
|
||||
SlippageBps uint64
|
||||
|
||||
BaseReserve uint64
|
||||
QuoteReserve uint64
|
||||
BaseReserve float64
|
||||
QuoteReserve float64
|
||||
Mayhem bool
|
||||
Cashback bool
|
||||
|
||||
@@ -107,6 +108,18 @@ type TxsBinaryReaderSource interface {
|
||||
OpenTxsBinaryReader() (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
type TxsBinaryBatchHeaderContext struct {
|
||||
SourceIndex int
|
||||
BatchIndex int
|
||||
Reader *bufio.Reader
|
||||
}
|
||||
|
||||
type TxsBinaryBatchHeaderFunc func(ctx *TxsBinaryBatchHeaderContext) (skip bool, err error)
|
||||
|
||||
type TxsBinaryMergeOptions struct {
|
||||
BatchHeaderFunc TxsBinaryBatchHeaderFunc
|
||||
}
|
||||
|
||||
type PlatformBinary struct {
|
||||
Platform string
|
||||
PlatformFee uint64
|
||||
@@ -307,24 +320,32 @@ func DecodeTxsBinaryReader(r io.Reader) iter.Seq2[*Tx, error] {
|
||||
}
|
||||
|
||||
func MergeTxsBinaryBytes(encodedBatches [][]byte) ([]byte, error) {
|
||||
return MergeTxsBinaryBytesWithOptions(encodedBatches, TxsBinaryMergeOptions{})
|
||||
}
|
||||
|
||||
func MergeTxsBinaryBytesWithOptions(encodedBatches [][]byte, opts TxsBinaryMergeOptions) ([]byte, error) {
|
||||
sources := make([]TxsBinaryReaderSource, 0, len(encodedBatches))
|
||||
for _, encoded := range encodedBatches {
|
||||
sources = append(sources, txBinaryBytesSource{data: encoded})
|
||||
}
|
||||
|
||||
var out bytes.Buffer
|
||||
if err := MergeTxsBinarySourcesToWriter(sources, &out); err != nil {
|
||||
if err := MergeTxsBinarySourcesToWriterWithOptions(sources, &out, opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Bytes(), nil
|
||||
}
|
||||
|
||||
func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer) error {
|
||||
return MergeTxsBinarySourcesToWriterWithOptions(sources, w, TxsBinaryMergeOptions{})
|
||||
}
|
||||
|
||||
func MergeTxsBinarySourcesToWriterWithOptions(sources []TxsBinaryReaderSource, w io.Writer, opts TxsBinaryMergeOptions) error {
|
||||
if w == nil {
|
||||
return fmt.Errorf("txs binary writer is nil")
|
||||
}
|
||||
|
||||
plan, err := txBinaryBuildMergePlan(sources)
|
||||
plan, err := txBinaryBuildMergePlan(sources, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -343,9 +364,22 @@ func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer)
|
||||
return fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
||||
}
|
||||
|
||||
dec := txBinaryStreamDecoder{reader: reader}
|
||||
bufferedReader := bufio.NewReader(reader)
|
||||
dec := txBinaryStreamDecoder{reader: bufferedReader}
|
||||
batchIndex := 0
|
||||
for {
|
||||
skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex)
|
||||
if err != nil {
|
||||
closeErr := reader.Close()
|
||||
if err == io.EOF {
|
||||
if closeErr != nil {
|
||||
return fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr)
|
||||
}
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err)
|
||||
}
|
||||
|
||||
header, err := dec.readTxsBinaryHeaderOrEOF()
|
||||
if err != nil {
|
||||
closeErr := reader.Close()
|
||||
@@ -368,6 +402,9 @@ func MergeTxsBinarySourcesToWriter(sources []TxsBinaryReaderSource, w io.Writer)
|
||||
reader.Close()
|
||||
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
||||
}
|
||||
if skipBatch {
|
||||
continue
|
||||
}
|
||||
if err := txBinaryRemapTxAddressTable(&tx, header.addressTable, plan.addressTable, plan.addressIndex); err != nil {
|
||||
reader.Close()
|
||||
return fmt.Errorf("source[%d].batch[%d].tx[%d]: %w", sourceIndex, batchIndex, txIndex, err)
|
||||
@@ -740,10 +777,10 @@ func newSwapBinary(swap Swap, index int, addressIndex *txBinaryAddressIndex) (Sw
|
||||
if out.SlippageBps, err = txBinaryRoundedDecimalToUint64(swap.SlippageBps, fmt.Sprintf("swap[%d].slippage_bps", index)); err != nil {
|
||||
return SwapBinary{}, err
|
||||
}
|
||||
if out.BaseReserve, err = txBinaryDecimalToUint64(swap.BaseReserve, fmt.Sprintf("swap[%d].base_reserve", index)); err != nil {
|
||||
if out.BaseReserve, err = txBinaryDecimalToFloat64Raw(swap.BaseReserve, fmt.Sprintf("swap[%d].base_reserve", index)); err != nil {
|
||||
return SwapBinary{}, err
|
||||
}
|
||||
if out.QuoteReserve, err = txBinaryDecimalToUint64(swap.QuoteReserve, fmt.Sprintf("swap[%d].quote_reserve", index)); err != nil {
|
||||
if out.QuoteReserve, err = txBinaryDecimalToFloat64Raw(swap.QuoteReserve, fmt.Sprintf("swap[%d].quote_reserve", index)); err != nil {
|
||||
return SwapBinary{}, err
|
||||
}
|
||||
if out.UserBaseBalance, err = txBinaryDecimalToUint64(swap.UserBaseBalance, fmt.Sprintf("swap[%d].user_base_balance", index)); err != nil {
|
||||
@@ -841,8 +878,8 @@ func (swap SwapBinary) toSwap(addressTable []solana.PublicKey, index int) (Swap,
|
||||
ActualLimitAmount: decimal.NewFromUint64(swap.ActualLimitAmount),
|
||||
ActualLimitAmountSide: swap.ActualLimitAmountSide,
|
||||
SlippageBps: decimal.NewFromUint64(swap.SlippageBps),
|
||||
BaseReserve: decimal.NewFromUint64(swap.BaseReserve),
|
||||
QuoteReserve: decimal.NewFromUint64(swap.QuoteReserve),
|
||||
BaseReserve: txBinaryFloat64ToDecimalRaw(swap.BaseReserve),
|
||||
QuoteReserve: txBinaryFloat64ToDecimalRaw(swap.QuoteReserve),
|
||||
Mayhem: swap.Mayhem,
|
||||
Cashback: swap.Cashback,
|
||||
UserBaseBalance: decimal.NewFromUint64(swap.UserBaseBalance),
|
||||
@@ -1026,6 +1063,14 @@ func txBinaryDecimalToFloat64(value decimal.Decimal, scale int32, field string)
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func txBinaryDecimalToFloat64Raw(value decimal.Decimal, field string) (float64, error) {
|
||||
f, exact := value.Float64()
|
||||
if !exact && math.IsInf(f, 0) {
|
||||
return 0, fmt.Errorf("%s cannot be represented as float64: %s", field, value.String())
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func txBinaryFloat64ToDecimal(value float64, scale int32) decimal.Decimal {
|
||||
formatted := strconv.FormatFloat(value, 'f', int(scale), 64)
|
||||
out, err := decimal.NewFromString(formatted)
|
||||
@@ -1035,6 +1080,15 @@ func txBinaryFloat64ToDecimal(value float64, scale int32) decimal.Decimal {
|
||||
return out
|
||||
}
|
||||
|
||||
func txBinaryFloat64ToDecimalRaw(value float64) decimal.Decimal {
|
||||
formatted := strconv.FormatFloat(value, 'f', -1, 64)
|
||||
out, err := decimal.NewFromString(formatted)
|
||||
if err != nil {
|
||||
return decimal.Zero
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
type txBinaryEncoder struct {
|
||||
buf bytes.Buffer
|
||||
}
|
||||
@@ -1187,8 +1241,8 @@ func (enc *txBinaryEncoder) writeSwaps(swaps []SwapBinary, enumTable *txBinaryEn
|
||||
enc.writeUint64(swap.ActualLimitAmount)
|
||||
enc.writeUint8(uint8(swap.ActualLimitAmountSide))
|
||||
enc.writeUint64(swap.SlippageBps)
|
||||
enc.writeUint64(swap.BaseReserve)
|
||||
enc.writeUint64(swap.QuoteReserve)
|
||||
enc.writeFloat64(swap.BaseReserve)
|
||||
enc.writeFloat64(swap.QuoteReserve)
|
||||
enc.writeBool(swap.Mayhem)
|
||||
enc.writeBool(swap.Cashback)
|
||||
enc.writeUint64(swap.UserBaseBalance)
|
||||
@@ -1683,10 +1737,10 @@ func txBinaryReadSwaps(dec txBinaryBodyReader, enumTable *txBinaryEnumTable) ([]
|
||||
if swap.SlippageBps, err = dec.readUint64(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if swap.BaseReserve, err = dec.readUint64(); err != nil {
|
||||
if swap.BaseReserve, err = dec.readFloat64(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if swap.QuoteReserve, err = dec.readUint64(); err != nil {
|
||||
if swap.QuoteReserve, err = dec.readFloat64(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if swap.Mayhem, err = dec.readBool(); err != nil {
|
||||
@@ -1780,7 +1834,7 @@ func txBinaryReadTxBody(dec txBinaryBodyReader, tx *TxBinary, enumTable *txBinar
|
||||
return nil
|
||||
}
|
||||
|
||||
func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePlan, error) {
|
||||
func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource, opts TxsBinaryMergeOptions) (*txsBinaryMergePlan, error) {
|
||||
if len(sources) == 0 {
|
||||
return nil, fmt.Errorf("txs binary sources are empty")
|
||||
}
|
||||
@@ -1801,9 +1855,22 @@ func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePla
|
||||
return nil, fmt.Errorf("source[%d]: open reader: %w", sourceIndex, err)
|
||||
}
|
||||
|
||||
dec := txBinaryStreamDecoder{reader: reader}
|
||||
bufferedReader := bufio.NewReader(reader)
|
||||
dec := txBinaryStreamDecoder{reader: bufferedReader}
|
||||
batchIndex := 0
|
||||
for {
|
||||
skipBatch, err := txBinaryApplyMergeBatchHeader(bufferedReader, opts, sourceIndex, batchIndex)
|
||||
if err != nil {
|
||||
closeErr := reader.Close()
|
||||
if err == io.EOF {
|
||||
if closeErr != nil {
|
||||
return nil, fmt.Errorf("source[%d]: close reader: %w", sourceIndex, closeErr)
|
||||
}
|
||||
break
|
||||
}
|
||||
return nil, fmt.Errorf("source[%d].batch[%d]: %w", sourceIndex, batchIndex, err)
|
||||
}
|
||||
|
||||
header, err := dec.readTxsBinaryHeaderOrEOF()
|
||||
if err != nil {
|
||||
closeErr := reader.Close()
|
||||
@@ -1833,17 +1900,21 @@ func txBinaryBuildMergePlan(sources []TxsBinaryReaderSource) (*txsBinaryMergePla
|
||||
}
|
||||
|
||||
for addressIndex, address := range header.addressTable {
|
||||
if !skipBatch {
|
||||
if err := builder.add(address); err != nil {
|
||||
reader.Close()
|
||||
return nil, fmt.Errorf("source[%d].batch[%d].address[%d]: %w", sourceIndex, batchIndex, addressIndex, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !skipBatch {
|
||||
if uint64(plan.txCount)+uint64(header.count) > uint64(math.MaxUint32) {
|
||||
reader.Close()
|
||||
return nil, fmt.Errorf("merged tx count exceeds uint32 capacity")
|
||||
}
|
||||
plan.txCount += header.count
|
||||
}
|
||||
|
||||
for txIndex := uint32(0); txIndex < header.count; txIndex++ {
|
||||
tx := TxBinary{
|
||||
@@ -1947,6 +2018,17 @@ func txBinaryWriteAll(w io.Writer, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func txBinaryApplyMergeBatchHeader(reader *bufio.Reader, opts TxsBinaryMergeOptions, sourceIndex int, batchIndex int) (bool, error) {
|
||||
if opts.BatchHeaderFunc == nil {
|
||||
return false, nil
|
||||
}
|
||||
return opts.BatchHeaderFunc(&TxsBinaryBatchHeaderContext{
|
||||
SourceIndex: sourceIndex,
|
||||
BatchIndex: batchIndex,
|
||||
Reader: reader,
|
||||
})
|
||||
}
|
||||
|
||||
type txBinaryEnumTable struct {
|
||||
version uint16
|
||||
programs txBinaryEnumSet
|
||||
@@ -1990,6 +2072,18 @@ var txBinaryEnumTables = map[uint16]*txBinaryEnumTable{
|
||||
TxEventBuyFailed,
|
||||
TxEventSellFailed,
|
||||
TxEventBurn,
|
||||
TxEventCreate,
|
||||
TxEventComplete,
|
||||
TxEventMigrate,
|
||||
TxEventDeposit,
|
||||
TxEventWithdraw,
|
||||
TxEventOpen,
|
||||
TxEventClose,
|
||||
TxEventClaimFee,
|
||||
TxEventAddLiquidity,
|
||||
TxEventAddLiquidityOneSide,
|
||||
TxEventRemoveLiquidity,
|
||||
TxEventRemoveLiquidityOneSide,
|
||||
},
|
||||
"platform",
|
||||
[]string{
|
||||
|
||||
@@ -225,6 +225,135 @@ func TestTxBinaryRejectsUnknownProgramEnum(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxBinaryAcceptsKnownEventEnums(t *testing.T) {
|
||||
events := []string{
|
||||
TxEventAddLP,
|
||||
TxEventRemoveLP,
|
||||
TxEventBuy,
|
||||
TxEventSell,
|
||||
TxEventBuyFailed,
|
||||
TxEventSellFailed,
|
||||
TxEventBurn,
|
||||
TxEventCreate,
|
||||
TxEventComplete,
|
||||
TxEventMigrate,
|
||||
TxEventDeposit,
|
||||
TxEventWithdraw,
|
||||
TxEventOpen,
|
||||
TxEventClose,
|
||||
TxEventClaimFee,
|
||||
TxEventAddLiquidity,
|
||||
TxEventAddLiquidityOneSide,
|
||||
TxEventRemoveLiquidity,
|
||||
TxEventRemoveLiquidityOneSide,
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
t.Run(event, func(t *testing.T) {
|
||||
txBinary := &TxBinary{
|
||||
SchemaVersion: txBinarySchemaVersionCurrent,
|
||||
EnumVersion: txBinaryEnumVersionV1,
|
||||
AddressTable: []solana.PublicKey{
|
||||
mustPubKey("11111111111111111111111111111111"),
|
||||
mustPubKey("So11111111111111111111111111111111111111112"),
|
||||
solana.TokenProgramID,
|
||||
mustPubKey("BPFLoader1111111111111111111111111111111111"),
|
||||
mustPubKey("SysvarRent111111111111111111111111111111111"),
|
||||
},
|
||||
Swaps: []SwapBinary{
|
||||
{
|
||||
Program: SolProgramPump,
|
||||
Event: event,
|
||||
Pool: 0,
|
||||
BaseMint: 1,
|
||||
QuoteMint: 1,
|
||||
BaseTokenProgram: 2,
|
||||
QuoteTokenProgram: 2,
|
||||
Creator: 3,
|
||||
User: 4,
|
||||
FixedMint: 1,
|
||||
LimitMint: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
encoded, err := txBinary.MarshalBinary()
|
||||
if err != nil {
|
||||
t.Fatalf("MarshalBinary() error = %v", err)
|
||||
}
|
||||
|
||||
var decoded TxBinary
|
||||
if err := decoded.UnmarshalBinary(encoded); err != nil {
|
||||
t.Fatalf("UnmarshalBinary() error = %v", err)
|
||||
}
|
||||
if got := decoded.Swaps[0].Event; got != event {
|
||||
t.Fatalf("decoded event = %q, want %q", got, event)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxBinaryPreservesFractionalReserves(t *testing.T) {
|
||||
tx := &Tx{
|
||||
Signer: mustPubKey("So11111111111111111111111111111111111111112"),
|
||||
Block: 1,
|
||||
BlockIndex: 1,
|
||||
CuFee: decimal.NewFromInt(1),
|
||||
CUPrice: decimal.RequireFromString("0.000001"),
|
||||
BeforeSolBalance: decimal.RequireFromString("1.000000000"),
|
||||
AfterSOLBalance: decimal.RequireFromString("0.900000000"),
|
||||
ComputeUnitsConsumed: 1,
|
||||
CuLimit: 1,
|
||||
Swaps: []Swap{
|
||||
{
|
||||
Program: SolProgramMeteoraPools,
|
||||
Event: TxEventAddLiquidity,
|
||||
Pool: mustPubKey("11111111111111111111111111111111"),
|
||||
BaseMint: mustPubKey("3wyAj7RtG72wM1Wv9DkYfL7RAx9X3Jx1sC6E6mN4jWeL"),
|
||||
QuoteMint: solana.WrappedSol,
|
||||
BaseTokenProgram: solana.TokenProgramID,
|
||||
QuoteTokenProgram: solana.TokenProgramID,
|
||||
Creator: mustPubKey("BPFLoader1111111111111111111111111111111111"),
|
||||
BaseMintDecimals: 6,
|
||||
QuoteMintDecimals: 9,
|
||||
User: mustPubKey("SysvarRent111111111111111111111111111111111"),
|
||||
BaseAmount: decimal.NewFromInt(10),
|
||||
QuoteAmount: decimal.NewFromInt(20),
|
||||
SwapMode: SwapModeExactIn,
|
||||
FixedAmount: decimal.NewFromInt(20),
|
||||
FixedAmountSide: SwapAmountSideQuote,
|
||||
FixedMint: solana.WrappedSol,
|
||||
LimitAmountType: SwapLimitTypeMinOut,
|
||||
LimitAmount: decimal.NewFromInt(9),
|
||||
LimitAmountSide: SwapAmountSideBase,
|
||||
LimitMint: mustPubKey("3wyAj7RtG72wM1Wv9DkYfL7RAx9X3Jx1sC6E6mN4jWeL"),
|
||||
ActualLimitAmount: decimal.NewFromInt(10),
|
||||
ActualLimitAmountSide: SwapAmountSideBase,
|
||||
BaseReserve: decimal.RequireFromString("123.4"),
|
||||
QuoteReserve: decimal.RequireFromString("710079483.625409498"),
|
||||
AfterSOLBalance: decimal.RequireFromString("0.800000000"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
encoded, err := EncodeTxBinary(tx)
|
||||
if err != nil {
|
||||
t.Fatalf("EncodeTxBinary() error = %v", err)
|
||||
}
|
||||
|
||||
decoded, err := DecodeTxBinary(encoded)
|
||||
if err != nil {
|
||||
t.Fatalf("DecodeTxBinary() error = %v", err)
|
||||
}
|
||||
if got := decoded.Swaps[0].BaseReserve.String(); got != "123.4" {
|
||||
t.Fatalf("BaseReserve = %s, want 123.4", got)
|
||||
}
|
||||
diff := decoded.Swaps[0].QuoteReserve.Sub(decimal.RequireFromString("710079483.625409498")).Abs()
|
||||
if diff.GreaterThan(decimal.RequireFromString("0.0000001")) {
|
||||
t.Fatalf("QuoteReserve diff = %s, want <= 0.0000001", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxsBinaryRoundTripWithSharedAddressTable(t *testing.T) {
|
||||
tx1 := Tx{
|
||||
Signer: mustPubKey("So11111111111111111111111111111111111111112"),
|
||||
@@ -602,6 +731,89 @@ func TestMergeTxsBinarySourcesToWriterWithConcatenatedBatches(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMergeTxsBinarySourcesToWriterWithBatchHeaderFuncSkip(t *testing.T) {
|
||||
tx1 := Tx{
|
||||
Signer: mustPubKey("So11111111111111111111111111111111111111112"),
|
||||
Block: 31,
|
||||
BlockIndex: 1,
|
||||
CuFee: decimal.NewFromInt(1),
|
||||
CUPrice: decimal.RequireFromString("0.000001"),
|
||||
BeforeSolBalance: decimal.RequireFromString("1.000000000"),
|
||||
AfterSOLBalance: decimal.RequireFromString("0.900000000"),
|
||||
ComputeUnitsConsumed: 11,
|
||||
CuLimit: 111,
|
||||
}
|
||||
tx2 := tx1
|
||||
tx2.Block = 32
|
||||
tx2.BlockIndex = 2
|
||||
tx2.Signer = mustPubKey("SysvarRent111111111111111111111111111111111")
|
||||
tx3 := tx1
|
||||
tx3.Block = 33
|
||||
tx3.BlockIndex = 3
|
||||
tx3.Signer = mustPubKey("ComputeBudget111111111111111111111111111111")
|
||||
|
||||
batch1, err := EncodeTxsBinary([]Tx{tx1})
|
||||
if err != nil {
|
||||
t.Fatalf("EncodeTxsBinary(batch1) error = %v", err)
|
||||
}
|
||||
batch2, err := EncodeTxsBinary([]Tx{tx2})
|
||||
if err != nil {
|
||||
t.Fatalf("EncodeTxsBinary(batch2) error = %v", err)
|
||||
}
|
||||
batch3, err := EncodeTxsBinary([]Tx{tx3})
|
||||
if err != nil {
|
||||
t.Fatalf("EncodeTxsBinary(batch3) error = %v", err)
|
||||
}
|
||||
|
||||
source := &testTxsBinarySource{
|
||||
data: append(
|
||||
append(
|
||||
append([]byte{}, testBatchHeader(false)...),
|
||||
batch1...,
|
||||
),
|
||||
append(
|
||||
append(testBatchHeader(true), batch2...),
|
||||
append(testBatchHeader(false), batch3...)...,
|
||||
)...,
|
||||
),
|
||||
}
|
||||
|
||||
var out bytes.Buffer
|
||||
err = MergeTxsBinarySourcesToWriterWithOptions(
|
||||
[]TxsBinaryReaderSource{source},
|
||||
&out,
|
||||
TxsBinaryMergeOptions{
|
||||
BatchHeaderFunc: func(ctx *TxsBinaryBatchHeaderContext) (bool, error) {
|
||||
header := make([]byte, 5)
|
||||
if _, err := io.ReadFull(ctx.Reader, header); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !bytes.Equal(header[:4], []byte("BHDR")) {
|
||||
return false, io.ErrUnexpectedEOF
|
||||
}
|
||||
return header[4] == 1, nil
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("MergeTxsBinarySourcesToWriterWithOptions() error = %v", err)
|
||||
}
|
||||
|
||||
decoded, err := DecodeTxsBinary(out.Bytes())
|
||||
if err != nil {
|
||||
t.Fatalf("DecodeTxsBinary(merged) error = %v", err)
|
||||
}
|
||||
if len(decoded) != 2 {
|
||||
t.Fatalf("decoded len = %d, want 2", len(decoded))
|
||||
}
|
||||
if decoded[0].Block != tx1.Block || decoded[1].Block != tx3.Block {
|
||||
t.Fatalf("decoded block order mismatch after skip")
|
||||
}
|
||||
if source.opens != 2 {
|
||||
t.Fatalf("source.opens = %d, want 2", source.opens)
|
||||
}
|
||||
}
|
||||
|
||||
func mustPubKey(value string) solana.PublicKey {
|
||||
return solana.MustPublicKeyFromBase58(value)
|
||||
}
|
||||
@@ -625,3 +837,11 @@ func (s *testTxsBinarySource) OpenTxsBinaryReader() (io.ReadCloser, error) {
|
||||
s.opens++
|
||||
return io.NopCloser(bytes.NewReader(s.data)), nil
|
||||
}
|
||||
|
||||
func testBatchHeader(skip bool) []byte {
|
||||
header := []byte("BHDR\x00")
|
||||
if skip {
|
||||
header[4] = 1
|
||||
}
|
||||
return header
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user