package geyser import ( "context" "crypto/x509" "encoding/json" "fmt" "io" "log" "time" solana2 "github.com/gagliardetto/solana-go" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" types "github.com/thloyi/pump-parser" pb "github.com/thloyi/pump-parser/example/geyser/proto" ) type Handler interface { HandleMessage(rawTx *types.RawTx) } var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams } type Client struct { ch chan SubscriptionMessage endpoint string conn *grpc.ClientConn ctx context.Context lastReceiveTime time.Time backoffFactor float64 subscription *pb.SubscribeRequest subStatus bool leastBlock BlockInfo firstMessage bool handler Handler } func NewClientWithPumpSwap(endpoint string, ch chan SubscriptionMessage) *Client { var subscription pb.SubscribeRequest var failed = false var vote = false subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions) subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{ Failed: &failed, Vote: &vote, } subscription.Transactions["transactions_sub"].AccountInclude = []string{ "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", //Pump AMM "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", //Pump } subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta) subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{} c := &Client{ backoffFactor: 1.5, ch: ch, endpoint: endpoint, lastReceiveTime: time.Now(), subStatus: false, subscription: &subscription, } c.handler = NewPumpHandler(func(tx *types.Tx, tx2 *types.RawTx) { c.sendTx(tx, tx2) }) return c } func NewClientWithLaunchLab(endpoint string, ch chan SubscriptionMessage) *Client { var subscription pb.SubscribeRequest var failed = false var vote = false subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions) subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{ Failed: &failed, Vote: &vote, } subscription.Transactions["transactions_sub"].AccountInclude = []string{ "LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj", //LaunchLab "CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C", //CPMM //"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", //V4 } subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta) subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{} c := &Client{ backoffFactor: 1.5, ch: ch, endpoint: endpoint, lastReceiveTime: time.Now(), subStatus: false, subscription: &subscription, } c.handler = NewPumpHandler(func(tx *types.Tx, tx2 *types.RawTx) { c.sendTx(tx, tx2) }) return c } func RunLoopWithReConnect(ctx context.Context, endpoint, program string, ch chan SubscriptionMessage) { var client *Client if program == types.SolProgramRaydiumLaunchLab { client = NewClientWithLaunchLab(endpoint, ch) } else { client = NewClientWithPumpSwap(endpoint, ch) } for { select { case <-ctx.Done(): log.Println("Context done, exiting loop") return default: } err := client.Connect(ctx) if err != nil { log.Printf("Failed to connect: %v", err) time.Sleep(5 * time.Second) continue } // should not reach here, because Connect will block panic("geyser already connected, waiting for messages...") } } func (c *Client) SetSubscribe(subscription *pb.SubscribeRequest) { c.subscription = subscription } func (c *Client) Connect(ctx context.Context) error { c.ctx = ctx if c.conn == nil { // 连接到 geyser conn, err := c.grpcConnect(c.endpoint, true) if err != nil { return err } c.conn = conn } if c.subStatus { return nil // 已经订阅了 } // 订阅交易 err := c.grpcSubscribe(ctx, c.conn) if err != nil { if c.conn != nil { c.conn.Close() } c.conn = nil c.subStatus = false log.Printf("Failed to subscribe: %v", err) return err } return nil } func (c *Client) grpcConnect(address string, plaintext bool) (*grpc.ClientConn, error) { var opts []grpc.DialOption if plaintext { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { pool, _ := x509.SystemCertPool() creds := credentials.NewClientTLSFromCert(pool, "") opts = append(opts, grpc.WithTransportCredentials(creds)) } opts = append(opts, grpc.WithKeepaliveParams(kacp)) log.Println("Starting grpc client, connecting to", address) conn, err := grpc.NewClient(address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial: %v", err) } return conn, nil } func (c *Client) grpcSubscribe(ctx context.Context, conn *grpc.ClientConn) error { var err error client := pb.NewGeyserClient(conn) //subscription.Transactions["transactions_sub"].AccountExclude = transactionsAccountsExclude subscriptionJson, err := json.Marshal(c.subscription) if err != nil { log.Printf("Failed to marshal subscription request: %v", subscriptionJson) return err } log.Printf("Subscription request: %s", string(subscriptionJson)) // Set up the subscription request //if *token != "" { // md := metadata.New(map[string]string{"x-token": *token}) // ctx = metadata.NewOutgoingContext(ctx, md) //} md := metadata.New(map[string]string{"x-token": "5adcf1f9-5719-43d1-bf3f-c2d4e1e5f94d"}) ctx = metadata.NewOutgoingContext(ctx, md) stream, err := client.Subscribe(ctx) if err != nil { return err } err = stream.Send(c.subscription) if err != nil { return err } c.subStatus = true c.firstMessage = true for { resp, err := stream.Recv() if err == io.EOF { return err } if err != nil { return fmt.Errorf("error occurred in receiving update: %s", err) } txn := resp.GetTransaction() if txn == nil { blockMeta := resp.GetBlockMeta() if blockMeta != nil && c.ch != nil { c.sendBlock(blockMeta) } continue } rawTx, err := ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, resp.GetCreatedAt().Seconds) if err != nil { log.Printf("Failed to convert transaction: %v", err) continue } c.handler.HandleMessage(rawTx) } } func (c *Client) computeDelay(slot uint64) int64 { if c.leastBlock.Slot == 0 { return 0 } if slot < c.leastBlock.Slot { return 0 } delay := time.Now().Unix() - c.leastBlock.BlockTime - (4 * int64(slot-c.leastBlock.Slot) / 10) return delay } func (c *Client) sendTx(t *types.Tx, tx *types.RawTx) { c.ch <- SubscriptionMessage{ Reconnect: c.firstMessage, EstimateDelaySecond: c.computeDelay(tx.Slot), Block: nil, Tx: t, RawTx: tx, } c.firstMessage = false } func (c *Client) sendBlock(blockMeta *pb.SubscribeUpdateBlockMeta) { c.leastBlock.Slot = blockMeta.GetSlot() c.leastBlock.BlockTime = blockMeta.GetBlockTime().Timestamp c.leastBlock.BlockHash = blockMeta.Blockhash c.leastBlock.Height = blockMeta.BlockHeight.BlockHeight c.ch <- SubscriptionMessage{ EstimateDelaySecond: time.Now().Unix() - blockMeta.GetBlockTime().Timestamp, Reconnect: c.firstMessage, Block: &BlockInfo{ Slot: c.leastBlock.Slot, BlockTime: c.leastBlock.BlockTime, BlockHash: c.leastBlock.BlockHash, Height: c.leastBlock.Height, }, Tx: nil, RawTx: nil, } c.firstMessage = false } func ConvertYellowstoneGrpcTransactionToSolanaTransaction(y *pb.SubscribeUpdateTransaction, created int64) (*types.RawTx, error) { sTx := &types.RawTx{ BlockTime: created, Slot: y.Slot, IndexWithinBlock: int64(y.Transaction.Index), Meta: types.Meta{ Err: nil, Fee: 0, InnerInstructions: nil, LoadedAddresses: types.LoadedAddresses{}, LogMessages: nil, PostBalances: nil, PostTokenBalances: nil, PreBalances: nil, PreTokenBalances: nil, Rewards: nil, }, //Transaction: types.Transaction{ // Message: types.Message{ // AccountKeys: nil, // AddressTableLookups: nil, // Header: types.Header{}, // Instructions: nil, // RecentBlockHash: "", // }, // Signatures: nil, //}, //Version: nil, } meta := y.Transaction.GetMeta() yTx := y.Transaction.Transaction if meta.Err != nil && len(meta.Err.GetErr()) > 0 { // If the transaction has an error, we set the error in the Meta transError, err := DecodeTransactionError(meta.Err.GetErr()) if err != nil { sTx.Meta.Err = err } else { sTx.Meta.Err = transError } // sTx.Meta.Err = meta.Err.GetErr() } sTx.Meta.Fee = meta.Fee //sTx.Meta.InnerInstructions = meta.InnerInstructions for _, innerInstr := range meta.InnerInstructions { var instrs []types.Instruction for _, instr := range innerInstr.Instructions { instrs = append(instrs, types.Instruction{ ProgramIDIndex: int(instr.ProgramIdIndex), Accounts: func() []int { var out []int for i := range instr.Accounts { out = append(out, int(instr.Accounts[i])) } return out }(), Data: instr.Data, }) } sTx.Meta.InnerInstructions = append(sTx.Meta.InnerInstructions, types.InnerInstructions{ Index: int(innerInstr.Index), Instructions: instrs, }) } sTx.Meta.LogMessages = meta.LogMessages sTx.Meta.PostBalances = meta.PostBalances sTx.Meta.PostTokenBalances = grpcTokenBalance(meta.PostTokenBalances) sTx.Meta.PreBalances = meta.PreBalances sTx.Meta.PreTokenBalances = grpcTokenBalance(meta.PreTokenBalances) sTx.Meta.Rewards = nil sTx.Meta.LoadedAddresses.Readonly = byteSlicesToKeySlices(meta.LoadedReadonlyAddresses) sTx.Meta.LoadedAddresses.Writable = byteSlicesToKeySlices(meta.LoadedWritableAddresses) // copy signatures for i := range yTx.Signatures { sTx.Transaction.Signatures = append(sTx.Transaction.Signatures, solana2.SignatureFromBytes(yTx.Signatures[i])) } // copy message sTx.Transaction.Message = types.Message{ RecentBlockHash: solana2.HashFromBytes(yTx.Message.RecentBlockhash).String(), } // copy message.AccountKeys //stopAt := len(yTx.Message.AccountKeys) - sTx.Message.NumLookups() stopAt := len(yTx.Message.AccountKeys) for accIndex, acc := range yTx.Message.AccountKeys { sTx.Transaction.Message.AccountKeys = append(sTx.Transaction.Message.AccountKeys, solana2.PublicKeyFromBytes(acc)) if accIndex == stopAt-1 { break } } // copy message.Header sTx.Transaction.Message.Header = types.Header{ NumRequiredSignatures: int(yTx.Message.Header.NumRequiredSignatures), NumReadonlySignedAccounts: int(yTx.Message.Header.NumReadonlySignedAccounts), NumReadonlyUnsignedAccounts: int(yTx.Message.Header.NumReadonlyUnsignedAccounts), } // copy message.versioned if yTx.Message.Versioned { sTx.Version = solana2.MessageVersionV0 } else { sTx.Version = solana2.MessageVersionLegacy } // copy address table lookups { tables := map[solana2.PublicKey]solana2.PublicKeySlice{} writable := byteSlicesToKeySlices(meta.LoadedWritableAddresses) readonly := byteSlicesToKeySlices(meta.LoadedReadonlyAddresses) for _, addr := range yTx.Message.AddressTableLookups { sTx.Transaction.Message.AddressTableLookups = append(sTx.Transaction.Message.AddressTableLookups, solana2.MessageAddressTableLookup{ AccountKey: solana2.PublicKeyFromBytes(addr.AccountKey), WritableIndexes: addr.WritableIndexes, ReadonlyIndexes: addr.ReadonlyIndexes, }) numTakeWritable := len(addr.WritableIndexes) numTakeReadonly := len(addr.ReadonlyIndexes) tableKey := solana2.PublicKeyFromBytes(addr.AccountKey) { // now need to rebuild the address table taking into account the indexes, and put the keys into the tables maxIndex := 0 for _, indexB := range addr.WritableIndexes { index := int(indexB) if index > maxIndex { maxIndex = index } } for _, indexB := range addr.ReadonlyIndexes { index := int(indexB) if index > maxIndex { maxIndex = index } } tables[tableKey] = make([]solana2.PublicKey, maxIndex+1) } if numTakeWritable > 0 { writableForTable := writable[:numTakeWritable] for i, indexB := range addr.WritableIndexes { index := int(indexB) tables[tableKey][index] = writableForTable[i] } writable = writable[numTakeWritable:] } if numTakeReadonly > 0 { readableForTable := readonly[:numTakeReadonly] for i, indexB := range addr.ReadonlyIndexes { index := int(indexB) tables[tableKey][index] = readableForTable[i] } readonly = readonly[numTakeReadonly:] } } } // copy instructions for _, instr := range yTx.Message.Instructions { sTx.Transaction.Message.Instructions = append(sTx.Transaction.Message.Instructions, types.Instruction{ ProgramIDIndex: int(instr.ProgramIdIndex), Accounts: func() []int { var out []int for i := range instr.Accounts { out = append(out, int(instr.Accounts[i])) } return out }(), Data: instr.Data, }) } // resolve the lookups //{ // if sTx.Transaction.Message.IsVersioned() { // // only versioned transactions have address table lookups // err := sTx.Transaction.Message.ResolveLookups() // if err != nil { // return sTx, fmt.Errorf("failed to resolve lookups: %w", err) // } // } //} return sTx, nil } func byteSlicesToKeySlices(keys [][]byte) []solana2.PublicKey { var out []solana2.PublicKey for _, key := range keys { var k solana2.PublicKey copy(k[:], key) out = append(out, k) } return out } func grpcTokenBalance(src []*pb.TokenBalance) []types.TokenBalance { out := make([]types.TokenBalance, len(src)) for i, tb := range src { var ( mintAccount solana2.PublicKey ownerAccount solana2.PublicKey programIDAccount solana2.PublicKey ) if tb.Mint != "" { mintAccount, _ = solana2.PublicKeyFromBase58(tb.Mint) } if tb.Owner != "" { ownerAccount, _ = solana2.PublicKeyFromBase58(tb.Owner) } if tb.ProgramId != "" { programIDAccount, _ = solana2.PublicKeyFromBase58(tb.ProgramId) } out[i] = types.TokenBalance{ AccountIndex: int(tb.AccountIndex), MintAccount: mintAccount, OwnerAccount: &ownerAccount, ProgramIDAccount: programIDAccount, Mint: tb.Mint, Owner: tb.Owner, ProgramID: tb.ProgramId, UITokenAmount: types.UITokenAmount{ Amount: tb.UiTokenAmount.Amount, Decimals: uint64(tb.UiTokenAmount.Decimals), UIAmount: tb.UiTokenAmount.UiAmount, UIAmountString: tb.UiTokenAmount.UiAmountString, }, } } return out }