package main import ( "context" "flag" "fmt" "io" "os" "os/signal" "path/filepath" "sort" "syscall" "time" "github.com/gagliardetto/solana-go" pump_parser "github.com/thloyi/pump-parser" pb "go.onsig.ai/onsig/yellowstone-proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" ) type collector struct { endpoint string xToken string plaintext bool blocks map[uint64][]pump_parser.RawTx seen map[string]struct{} totalUpdates uint64 txUpdates uint64 savedNonVote uint64 duplicates uint64 voteFiltered uint64 convertErrs uint64 reconnects uint64 firstSlot uint64 lastSlot uint64 } func main() { var ( endpoint = flag.String("endpoint", "ams.rpc.orbitflare.com:10000", "Yellowstone gRPC endpoint") xToken = flag.String("x-token", os.Getenv("YELLOWSTONE_X_TOKEN"), "Yellowstone x-token; defaults to YELLOWSTONE_X_TOKEN") duration = flag.Duration("duration", 5*time.Minute, "collection duration") output = flag.String("output", "", "output .prbs file path") plaintext = flag.Bool("plaintext", true, "use plaintext gRPC instead of TLS") ) flag.Parse() if *xToken == "" { exitf("missing -x-token or YELLOWSTONE_X_TOKEN") } if *duration <= 0 { exitf("-duration must be positive") } if *output == "" { *output = filepath.Join("testdata", "rawtx-binary", fmt.Sprintf("rawtx-yellowstone-%s.prbs", time.Now().Format("20060102-150405"))) } ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() ctx, cancel := context.WithTimeout(ctx, *duration) defer cancel() c := &collector{ endpoint: *endpoint, xToken: *xToken, plaintext: *plaintext, blocks: make(map[uint64][]pump_parser.RawTx), seen: make(map[string]struct{}), } started := time.Now() done := make(chan error, 1) go func() { done <- c.run(ctx) }() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() loop: for { select { case err := <-done: if err != nil { exitf("%v", err) } break loop case <-ctx.Done(): if err := <-done; err != nil { exitf("%v", err) } break loop case <-ticker.C: c.printProgress(started) } } encoded, decodedCount, err := encodeAndVerify(c.blocks) if err != nil { exitf("raw tx binary encode/decode: %v", err) } if decodedCount != int(c.savedNonVote) { exitf("decoded tx count mismatch: got=%d want=%d", decodedCount, c.savedNonVote) } if err := os.MkdirAll(filepath.Dir(*output), 0o755); err != nil { exitf("mkdir output dir: %v", err) } if err := os.WriteFile(*output, encoded, 0o644); err != nil { exitf("write output: %v", err) } fmt.Printf("output=%s\n", *output) fmt.Printf("duration=%s elapsed=%s\n", *duration, time.Since(started).Truncate(time.Second)) fmt.Printf("updates=%d tx_updates=%d converted_nonvote=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n", c.totalUpdates, c.txUpdates, c.savedNonVote, c.duplicates, c.voteFiltered, c.convertErrs, c.reconnects) fmt.Printf("slots=%d first_slot=%d last_slot=%d decoded=%d\n", len(c.blocks), c.firstSlot, c.lastSlot, decodedCount) fmt.Printf("bytes=%d bytes_per_tx=%.2f\n", len(encoded), float64(len(encoded))/float64(max(int(c.savedNonVote), 1))) } func (c *collector) run(ctx context.Context) error { for ctx.Err() == nil { if err := c.recvOnce(ctx); err != nil && ctx.Err() == nil { c.reconnects++ fmt.Fprintf(os.Stderr, "stream_err reconnect=%d err=%v\n", c.reconnects, err) time.Sleep(time.Second) continue } if ctx.Err() == nil { c.reconnects++ time.Sleep(time.Second) } } return nil } func (c *collector) recvOnce(ctx context.Context) error { conn, err := grpc.NewClient( c.endpoint, c.transportOption(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: time.Second, PermitWithoutStream: true, }), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64*1024*1024)), ) if err != nil { return err } defer conn.Close() ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{"x-token": c.xToken})) stream, err := pb.NewGeyserClient(conn).Subscribe(ctx) if err != nil { return err } vote := false subscription := &pb.SubscribeRequest{ Transactions: map[string]*pb.SubscribeRequestFilterTransactions{ "nonvote": {Vote: &vote}, }, } if err := stream.Send(subscription); err != nil { return err } for { resp, err := stream.Recv() if err != nil { if err == io.EOF || ctx.Err() != nil { return nil } return err } c.totalUpdates++ txn := resp.GetTransaction() if txn == nil { continue } c.txUpdates++ created := time.Now().Unix() if resp.GetCreatedAt() != nil { created = resp.GetCreatedAt().Seconds } rawTx, err := pump_parser.ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, created) if err != nil { c.convertErrs++ continue } txHash := rawTx.TxHash() if txHash != "" { if _, exists := c.seen[txHash]; exists { c.duplicates++ continue } c.seen[txHash] = struct{}{} } if c.firstSlot == 0 || rawTx.Slot < c.firstSlot { c.firstSlot = rawTx.Slot } if rawTx.Slot > c.lastSlot { c.lastSlot = rawTx.Slot } if isVoteTx(rawTx) { c.voteFiltered++ continue } c.blocks[rawTx.Slot] = append(c.blocks[rawTx.Slot], *rawTx) c.savedNonVote++ } } func (c *collector) transportOption() grpc.DialOption { if c.plaintext { return grpc.WithTransportCredentials(insecure.NewCredentials()) } return grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")) } func (c *collector) printProgress(started time.Time) { fmt.Fprintf(os.Stderr, "progress elapsed=%s updates=%d tx_updates=%d saved_nonvote=%d slots=%d duplicate=%d vote_filtered=%d convert_err=%d reconnects=%d\n", time.Since(started).Truncate(time.Second), c.totalUpdates, c.txUpdates, c.savedNonVote, len(c.blocks), c.duplicates, c.voteFiltered, c.convertErrs, c.reconnects, ) } func encodeAndVerify(blocks map[uint64][]pump_parser.RawTx) ([]byte, int, error) { slots := make([]uint64, 0, len(blocks)) for slot := range blocks { slots = append(slots, slot) } sort.Slice(slots, func(i, j int) bool { return slots[i] < slots[j] }) ordered := make([][]pump_parser.RawTx, 0, len(slots)) for _, slot := range slots { txs := blocks[slot] blockTime := int64(0) if len(txs) > 0 { blockTime = txs[0].BlockTime } for i := range txs { txs[i].BlockTime = blockTime } ordered = append(ordered, txs) } encoded, err := pump_parser.EncodeRawTxBlocksBinary(ordered) if err != nil { return nil, 0, err } decoded, err := pump_parser.DecodeRawTxBlocksBinary(encoded) if err != nil { return nil, 0, err } decodedCount := 0 for _, block := range decoded { decodedCount += len(block) } return encoded, decodedCount, nil } func isVoteTx(tx *pump_parser.RawTx) bool { if tx == nil { return false } accountList := tx.GetAccountList() for _, instr := range tx.Transaction.Message.Instructions { if instr.ProgramIDIndex >= 0 && instr.ProgramIDIndex < len(accountList) && accountList[instr.ProgramIDIndex] == solana.VoteProgramID { return true } } return false } func max(a, b int) int { if a > b { return a } return b } func exitf(format string, args ...any) { fmt.Fprintf(os.Stderr, format+"\n", args...) os.Exit(1) }