package parser import ( "context" "crypto/x509" "encoding/json" "fmt" "io" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" types "github.com/thloyi/pump-parser" pb "go.onsig.ai/onsig/yellowstone-proto" ) type Handler interface { HandleMessage(rawTx *types.RawTx) } var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // send pings every 10 seconds if there is no activity Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead PermitWithoutStream: true, // send pings even without active streams } type Client struct { ch chan SubscriptionMessage endpoint string conn *grpc.ClientConn ctx context.Context lastReceiveTime time.Time backoffFactor float64 subscription *pb.SubscribeRequest subStatus bool leastBlock BlockInfo firstMessage bool handler Handler xToken string } func NewClientWithPumpSwap(endpoint string, xtoken string, ch chan SubscriptionMessage) *Client { var subscription pb.SubscribeRequest //var failed = true var vote = false subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions) subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{ //Failed: &failed, Vote: &vote, } //subscription.Transactions["transactions_sub"].AccountInclude = []string{ // "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA", //Pump AMM // "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P", //Pump //} subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta) subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{} c := &Client{ backoffFactor: 1.5, ch: ch, endpoint: endpoint, lastReceiveTime: time.Now(), subStatus: false, subscription: &subscription, xToken: xtoken, } c.handler = NewPumpHandler(func(tx *types.Tx) { c.sendTx(tx) }) return c } func NewClientWithLaunchLab(endpoint string, ch chan SubscriptionMessage) *Client { var subscription pb.SubscribeRequest //var failed = false var vote = false subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions) subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{ //Failed: &failed, Vote: &vote, } subscription.Transactions["transactions_sub"].AccountInclude = []string{ "LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj", //LaunchLab "CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C", //CPMM //"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", //V4 } subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta) subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{} c := &Client{ backoffFactor: 1.5, ch: ch, endpoint: endpoint, lastReceiveTime: time.Now(), subStatus: false, subscription: &subscription, } c.handler = NewPumpHandler(func(tx *types.Tx) { c.sendTx(tx) }) return c } func RunLoopWithReConnect(ctx context.Context, endpoint, token, program string, ch chan SubscriptionMessage) { var client *Client if program == types.SolProgramRaydiumLaunchLab { client = NewClientWithLaunchLab(endpoint, ch) } else { client = NewClientWithPumpSwap(endpoint, token, ch) } for { select { case <-ctx.Done(): log.Println("Context done, exiting loop") return default: } err := client.Connect(ctx) if err != nil { log.Printf("Failed to connect: %v", err) time.Sleep(5 * time.Second) continue } // should not reach here, because Connect will block panic("geyser already connected, waiting for messages...") } } func (c *Client) SetSubscribe(subscription *pb.SubscribeRequest) { c.subscription = subscription } func (c *Client) Connect(ctx context.Context) error { c.ctx = ctx if c.conn == nil { // 连接到 geyser conn, err := c.grpcConnect(c.endpoint, true) if err != nil { return err } c.conn = conn } if c.subStatus { return nil // 已经订阅了 } // 订阅交易 err := c.grpcSubscribe(ctx, c.conn) if err != nil { if c.conn != nil { c.conn.Close() } c.conn = nil c.subStatus = false log.Printf("Failed to subscribe: %v", err) return err } return nil } func (c *Client) grpcConnect(address string, plaintext bool) (*grpc.ClientConn, error) { var opts []grpc.DialOption if plaintext { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { pool, _ := x509.SystemCertPool() creds := credentials.NewClientTLSFromCert(pool, "") opts = append(opts, grpc.WithTransportCredentials(creds)) } opts = append(opts, grpc.WithKeepaliveParams(kacp)) log.Println("Starting grpc client, connecting to", address) conn, err := grpc.NewClient(address, opts...) if err != nil { return nil, fmt.Errorf("fail to dial: %v", err) } return conn, nil } func (c *Client) grpcSubscribe(ctx context.Context, conn *grpc.ClientConn) error { var err error client := pb.NewGeyserClient(conn) //subscription.Transactions["transactions_sub"].AccountExclude = transactionsAccountsExclude subscriptionJson, err := json.Marshal(c.subscription) if err != nil { log.Printf("Failed to marshal subscription request: %v", subscriptionJson) return err } log.Printf("Subscription request: %s", string(subscriptionJson)) // Set up the subscription request if c.xToken != "" { fmt.Println("xtoken", c.xToken) md := metadata.New(map[string]string{"x-token": c.xToken}) ctx = metadata.NewOutgoingContext(ctx, md) } //md := metadata.New(map[string]string{"x-token": "5adcf1f9-5719-43d1-bf3f-c2d4e1e5f94d"}) //ctx = metadata.NewOutgoingContext(ctx, md) stream, err := client.Subscribe(ctx) if err != nil { return err } err = stream.Send(c.subscription) if err != nil { return err } c.subStatus = true c.firstMessage = true for { resp, err := stream.Recv() if err == io.EOF { return err } if err != nil { return fmt.Errorf("error occurred in receiving update: %s", err) } txn := resp.GetTransaction() if txn == nil { blockMeta := resp.GetBlockMeta() if blockMeta != nil && c.ch != nil { c.sendBlock(blockMeta) } continue } rawTx, err := types.ConvertYellowstoneGrpcTransactionToSolanaTransaction(txn, resp.GetCreatedAt().Seconds) if err != nil { log.Printf("Failed to convert transaction: %v", err) continue } c.handler.HandleMessage(rawTx) } } func (c *Client) computeDelay(slot uint64) int64 { if c.leastBlock.Slot == 0 { return 0 } if slot < c.leastBlock.Slot { return 0 } delay := time.Now().Unix() - c.leastBlock.BlockTime - (4 * int64(slot-c.leastBlock.Slot) / 10) return delay } func (c *Client) sendTx(t *types.Tx) { c.ch <- SubscriptionMessage{ Reconnect: c.firstMessage, EstimateDelaySecond: c.computeDelay(t.Block), Block: nil, Tx: t, } c.firstMessage = false } func (c *Client) sendBlock(blockMeta *pb.SubscribeUpdateBlockMeta) { c.leastBlock.Slot = blockMeta.GetSlot() c.leastBlock.BlockTime = blockMeta.GetBlockTime().Timestamp c.leastBlock.BlockHash = blockMeta.Blockhash c.leastBlock.Height = blockMeta.BlockHeight.BlockHeight c.ch <- SubscriptionMessage{ EstimateDelaySecond: time.Now().Unix() - blockMeta.GetBlockTime().Timestamp, Reconnect: c.firstMessage, Block: &BlockInfo{ Slot: c.leastBlock.Slot, BlockTime: c.leastBlock.BlockTime, BlockHash: c.leastBlock.BlockHash, Height: c.leastBlock.Height, }, Tx: nil, } c.firstMessage = false }