diff --git a/cmd/collect_yellowstone_rawtx_binary/main.go b/cmd/collect_yellowstone_rawtx_binary/main.go new file mode 100644 index 0000000..a72bdd6 --- /dev/null +++ b/cmd/collect_yellowstone_rawtx_binary/main.go @@ -0,0 +1,302 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "os/signal" + "path/filepath" + "sort" + "syscall" + "time" + + "github.com/gagliardetto/solana-go" + pump_parser "github.com/thloyi/pump-parser" + pb "go.onsig.ai/onsig/yellowstone-proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" +) + +type collector struct { + endpoint string + xToken string + plaintext bool + + blocks map[uint64][]pump_parser.RawTx + seen map[string]struct{} + + totalUpdates uint64 + txUpdates uint64 + savedNonVote uint64 + duplicates uint64 + voteFiltered uint64 + convertErrs uint64 + reconnects uint64 + + firstSlot uint64 + lastSlot uint64 +} + +func main() { + var ( + endpoint = flag.String("endpoint", "ams.rpc.orbitflare.com:10000", "Yellowstone gRPC endpoint") + xToken = flag.String("x-token", os.Getenv("YELLOWSTONE_X_TOKEN"), "Yellowstone x-token; defaults to YELLOWSTONE_X_TOKEN") + duration = flag.Duration("duration", 5*time.Minute, "collection duration") + output = flag.String("output", "", "output .prbs file path") + plaintext = flag.Bool("plaintext", true, "use plaintext gRPC instead of TLS") + ) + flag.Parse() + + if *xToken == "" { + exitf("missing -x-token or YELLOWSTONE_X_TOKEN") + } + if *duration <= 0 { + exitf("-duration must be positive") + } + if *output == "" { + *output = filepath.Join("testdata", "rawtx-binary", fmt.Sprintf("rawtx-yellowstone-%s.prbs", time.Now().Format("20060102-150405"))) + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + ctx, cancel := context.WithTimeout(ctx, *duration) + defer cancel() + + c := &collector{ + endpoint: *endpoint, + xToken: *xToken, + plaintext: *plaintext, + blocks: make(map[uint64][]pump_parser.RawTx), + seen: make(map[string]struct{}), + } + + started := time.Now() + done := make(chan error, 1) + go func() { + done <- c.run(ctx) + }() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + +loop: + for { + select { + case err := <-done: + if err != nil { + exitf("%v", err) + } + break loop + case <-ctx.Done(): + if err := <-done; err != nil { + exitf("%v", err) + } + break loop + case <-ticker.C: + c.printProgress(started) + } + } + + encoded, decodedCount, err := encodeAndVerify(c.blocks) + if err != nil { + exitf("raw tx binary encode/decode: %v", err) + } + if decodedCount != int(c.savedNonVote) { + exitf("decoded tx count mismatch: got=%d want=%d", decodedCount, c.savedNonVote) + } + if err := os.MkdirAll(filepath.Dir(*output), 0o755); err != nil { + exitf("mkdir output dir: %v", err) + } + if err := os.WriteFile(*output, encoded, 0o644); err != nil { + exitf("write output: %v", err) + } + + fmt.Printf("output=%s\n", *output) + fmt.Printf("duration=%s elapsed=%s\n", *duration, time.Since(started).Truncate(time.Second)) + fmt.Printf("updates=%d tx_updates=%d converted_nonvote=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n", + c.totalUpdates, c.txUpdates, c.savedNonVote, c.duplicates, c.voteFiltered, c.convertErrs, c.reconnects) + fmt.Printf("slots=%d first_slot=%d last_slot=%d decoded=%d\n", len(c.blocks), c.firstSlot, c.lastSlot, decodedCount) + fmt.Printf("bytes=%d bytes_per_tx=%.2f\n", len(encoded), float64(len(encoded))/float64(max(int(c.savedNonVote), 1))) +} + +func (c *collector) run(ctx context.Context) error { + for ctx.Err() == nil { + if err := c.recvOnce(ctx); err != nil && ctx.Err() == nil { + c.reconnects++ + fmt.Fprintf(os.Stderr, "stream_err reconnect=%d err=%v\n", c.reconnects, err) + time.Sleep(time.Second) + continue + } + if ctx.Err() == nil { + c.reconnects++ + time.Sleep(time.Second) + } + } + return nil +} + +func (c *collector) recvOnce(ctx context.Context) error { + conn, err := grpc.NewClient( + c.endpoint, + c.transportOption(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: time.Second, + PermitWithoutStream: true, + }), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64*1024*1024)), + ) + if err != nil { + return err + } + defer conn.Close() + + ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-token": c.xToken})) + stream, err := pb.NewGeyserClient(conn).Subscribe(ctx) + if err != nil { + return err + } + + vote := false + subscription := &pb.SubscribeRequest{ + Transactions: map[string]*pb.SubscribeRequestFilterTransactions{ + "nonvote": {Vote: &vote}, + }, + } + if err := stream.Send(subscription); err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err != nil { + if err == io.EOF || ctx.Err() != nil { + return nil + } + return err + } + c.totalUpdates++ + txn := resp.GetTransaction() + if txn == nil { + continue + } + c.txUpdates++ + + created := time.Now().Unix() + if resp.GetCreatedAt() != nil { + created = resp.GetCreatedAt().Seconds + } + rawTx, err := pump_parser.ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, created) + if err != nil { + c.convertErrs++ + continue + } + txHash := rawTx.TxHash() + if txHash != "" { + if _, exists := c.seen[txHash]; exists { + c.duplicates++ + continue + } + c.seen[txHash] = struct{}{} + } + if c.firstSlot == 0 || rawTx.Slot < c.firstSlot { + c.firstSlot = rawTx.Slot + } + if rawTx.Slot > c.lastSlot { + c.lastSlot = rawTx.Slot + } + if isVoteTx(rawTx) { + c.voteFiltered++ + continue + } + c.blocks[rawTx.Slot] = append(c.blocks[rawTx.Slot], *rawTx) + c.savedNonVote++ + } +} + +func (c *collector) transportOption() grpc.DialOption { + if c.plaintext { + return grpc.WithTransportCredentials(insecure.NewCredentials()) + } + return grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")) +} + +func (c *collector) printProgress(started time.Time) { + fmt.Fprintf(os.Stderr, + "progress elapsed=%s updates=%d tx_updates=%d saved_nonvote=%d slots=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n", + time.Since(started).Truncate(time.Second), + c.totalUpdates, + c.txUpdates, + c.savedNonVote, + len(c.blocks), + c.duplicates, + c.voteFiltered, + c.convertErrs, + c.reconnects, + ) +} + +func encodeAndVerify(blocks map[uint64][]pump_parser.RawTx) ([]byte, int, error) { + slots := make([]uint64, 0, len(blocks)) + for slot := range blocks { + slots = append(slots, slot) + } + sort.Slice(slots, func(i, j int) bool { return slots[i] < slots[j] }) + + ordered := make([][]pump_parser.RawTx, 0, len(slots)) + for _, slot := range slots { + txs := blocks[slot] + blockTime := int64(0) + if len(txs) > 0 { + blockTime = txs[0].BlockTime + } + for i := range txs { + txs[i].BlockTime = blockTime + } + ordered = append(ordered, txs) + } + + encoded, err := pump_parser.EncodeRawTxBlocksBinary(ordered) + if err != nil { + return nil, 0, err + } + decoded, err := pump_parser.DecodeRawTxBlocksBinary(encoded) + if err != nil { + return nil, 0, err + } + decodedCount := 0 + for _, block := range decoded { + decodedCount += len(block) + } + return encoded, decodedCount, nil +} + +func isVoteTx(tx *pump_parser.RawTx) bool { + if tx == nil { + return false + } + accountList := tx.GetAccountList() + for _, instr := range tx.Transaction.Message.Instructions { + if instr.ProgramIDIndex >= 0 && instr.ProgramIDIndex < len(accountList) && accountList[instr.ProgramIDIndex] == solana.VoteProgramID { + return true + } + } + return false +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func exitf(format string, args ...any) { + fmt.Fprintf(os.Stderr, format+"\n", args...) + os.Exit(1) +}