raw tx binary

This commit is contained in:
thloyi
2026-04-30 17:56:35 +08:00
parent d46e8b651c
commit 0a4aabc67f

View File

@@ -0,0 +1,302 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"os"
"os/signal"
"path/filepath"
"sort"
"syscall"
"time"
"github.com/gagliardetto/solana-go"
pump_parser "github.com/thloyi/pump-parser"
pb "go.onsig.ai/onsig/yellowstone-proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)
type collector struct {
endpoint string
xToken string
plaintext bool
blocks map[uint64][]pump_parser.RawTx
seen map[string]struct{}
totalUpdates uint64
txUpdates uint64
savedNonVote uint64
duplicates uint64
voteFiltered uint64
convertErrs uint64
reconnects uint64
firstSlot uint64
lastSlot uint64
}
func main() {
var (
endpoint = flag.String("endpoint", "ams.rpc.orbitflare.com:10000", "Yellowstone gRPC endpoint")
xToken = flag.String("x-token", os.Getenv("YELLOWSTONE_X_TOKEN"), "Yellowstone x-token; defaults to YELLOWSTONE_X_TOKEN")
duration = flag.Duration("duration", 5*time.Minute, "collection duration")
output = flag.String("output", "", "output .prbs file path")
plaintext = flag.Bool("plaintext", true, "use plaintext gRPC instead of TLS")
)
flag.Parse()
if *xToken == "" {
exitf("missing -x-token or YELLOWSTONE_X_TOKEN")
}
if *duration <= 0 {
exitf("-duration must be positive")
}
if *output == "" {
*output = filepath.Join("testdata", "rawtx-binary", fmt.Sprintf("rawtx-yellowstone-%s.prbs", time.Now().Format("20060102-150405")))
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
ctx, cancel := context.WithTimeout(ctx, *duration)
defer cancel()
c := &collector{
endpoint: *endpoint,
xToken: *xToken,
plaintext: *plaintext,
blocks: make(map[uint64][]pump_parser.RawTx),
seen: make(map[string]struct{}),
}
started := time.Now()
done := make(chan error, 1)
go func() {
done <- c.run(ctx)
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
loop:
for {
select {
case err := <-done:
if err != nil {
exitf("%v", err)
}
break loop
case <-ctx.Done():
if err := <-done; err != nil {
exitf("%v", err)
}
break loop
case <-ticker.C:
c.printProgress(started)
}
}
encoded, decodedCount, err := encodeAndVerify(c.blocks)
if err != nil {
exitf("raw tx binary encode/decode: %v", err)
}
if decodedCount != int(c.savedNonVote) {
exitf("decoded tx count mismatch: got=%d want=%d", decodedCount, c.savedNonVote)
}
if err := os.MkdirAll(filepath.Dir(*output), 0o755); err != nil {
exitf("mkdir output dir: %v", err)
}
if err := os.WriteFile(*output, encoded, 0o644); err != nil {
exitf("write output: %v", err)
}
fmt.Printf("output=%s\n", *output)
fmt.Printf("duration=%s elapsed=%s\n", *duration, time.Since(started).Truncate(time.Second))
fmt.Printf("updates=%d tx_updates=%d converted_nonvote=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n",
c.totalUpdates, c.txUpdates, c.savedNonVote, c.duplicates, c.voteFiltered, c.convertErrs, c.reconnects)
fmt.Printf("slots=%d first_slot=%d last_slot=%d decoded=%d\n", len(c.blocks), c.firstSlot, c.lastSlot, decodedCount)
fmt.Printf("bytes=%d bytes_per_tx=%.2f\n", len(encoded), float64(len(encoded))/float64(max(int(c.savedNonVote), 1)))
}
func (c *collector) run(ctx context.Context) error {
for ctx.Err() == nil {
if err := c.recvOnce(ctx); err != nil && ctx.Err() == nil {
c.reconnects++
fmt.Fprintf(os.Stderr, "stream_err reconnect=%d err=%v\n", c.reconnects, err)
time.Sleep(time.Second)
continue
}
if ctx.Err() == nil {
c.reconnects++
time.Sleep(time.Second)
}
}
return nil
}
func (c *collector) recvOnce(ctx context.Context) error {
conn, err := grpc.NewClient(
c.endpoint,
c.transportOption(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64*1024*1024)),
)
if err != nil {
return err
}
defer conn.Close()
ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-token": c.xToken}))
stream, err := pb.NewGeyserClient(conn).Subscribe(ctx)
if err != nil {
return err
}
vote := false
subscription := &pb.SubscribeRequest{
Transactions: map[string]*pb.SubscribeRequestFilterTransactions{
"nonvote": {Vote: &vote},
},
}
if err := stream.Send(subscription); err != nil {
return err
}
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF || ctx.Err() != nil {
return nil
}
return err
}
c.totalUpdates++
txn := resp.GetTransaction()
if txn == nil {
continue
}
c.txUpdates++
created := time.Now().Unix()
if resp.GetCreatedAt() != nil {
created = resp.GetCreatedAt().Seconds
}
rawTx, err := pump_parser.ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, created)
if err != nil {
c.convertErrs++
continue
}
txHash := rawTx.TxHash()
if txHash != "" {
if _, exists := c.seen[txHash]; exists {
c.duplicates++
continue
}
c.seen[txHash] = struct{}{}
}
if c.firstSlot == 0 || rawTx.Slot < c.firstSlot {
c.firstSlot = rawTx.Slot
}
if rawTx.Slot > c.lastSlot {
c.lastSlot = rawTx.Slot
}
if isVoteTx(rawTx) {
c.voteFiltered++
continue
}
c.blocks[rawTx.Slot] = append(c.blocks[rawTx.Slot], *rawTx)
c.savedNonVote++
}
}
func (c *collector) transportOption() grpc.DialOption {
if c.plaintext {
return grpc.WithTransportCredentials(insecure.NewCredentials())
}
return grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))
}
func (c *collector) printProgress(started time.Time) {
fmt.Fprintf(os.Stderr,
"progress elapsed=%s updates=%d tx_updates=%d saved_nonvote=%d slots=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n",
time.Since(started).Truncate(time.Second),
c.totalUpdates,
c.txUpdates,
c.savedNonVote,
len(c.blocks),
c.duplicates,
c.voteFiltered,
c.convertErrs,
c.reconnects,
)
}
func encodeAndVerify(blocks map[uint64][]pump_parser.RawTx) ([]byte, int, error) {
slots := make([]uint64, 0, len(blocks))
for slot := range blocks {
slots = append(slots, slot)
}
sort.Slice(slots, func(i, j int) bool { return slots[i] < slots[j] })
ordered := make([][]pump_parser.RawTx, 0, len(slots))
for _, slot := range slots {
txs := blocks[slot]
blockTime := int64(0)
if len(txs) > 0 {
blockTime = txs[0].BlockTime
}
for i := range txs {
txs[i].BlockTime = blockTime
}
ordered = append(ordered, txs)
}
encoded, err := pump_parser.EncodeRawTxBlocksBinary(ordered)
if err != nil {
return nil, 0, err
}
decoded, err := pump_parser.DecodeRawTxBlocksBinary(encoded)
if err != nil {
return nil, 0, err
}
decodedCount := 0
for _, block := range decoded {
decodedCount += len(block)
}
return encoded, decodedCount, nil
}
func isVoteTx(tx *pump_parser.RawTx) bool {
if tx == nil {
return false
}
accountList := tx.GetAccountList()
for _, instr := range tx.Transaction.Message.Instructions {
if instr.ProgramIDIndex >= 0 && instr.ProgramIDIndex < len(accountList) && accountList[instr.ProgramIDIndex] == solana.VoteProgramID {
return true
}
}
return false
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
func exitf(format string, args ...any) {
fmt.Fprintf(os.Stderr, format+"\n", args...)
os.Exit(1)
}