89 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			89 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
| package pulsar
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 
 | |
| 	"git.apinb.com/bsm-sdk/core/types"
 | |
| 	"git.apinb.com/bsm-sdk/core/vars"
 | |
| 	pulsargo "github.com/apache/pulsar-client-go/pulsar"
 | |
| )
 | |
| 
 | |
| type Pulsar struct {
 | |
| 	Client pulsargo.Client
 | |
| }
 | |
| 
 | |
| func NewPulsar(cfg *types.PulsarConf) (*Pulsar, error) {
 | |
| 	client, err := pulsargo.NewClient(pulsargo.ClientOptions{
 | |
| 		URL:               cfg.Endpoints, //TODO: 更换为接入点地址(控制台集群管理页完整复制)
 | |
| 		Authentication:    pulsargo.NewAuthenticationToken(cfg.Token),
 | |
| 		OperationTimeout:  vars.OperationTimeout,
 | |
| 		ConnectionTimeout: vars.ConnectionTimeout,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &Pulsar{
 | |
| 		Client: client,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // push to pulsar server, return messageid.
 | |
| func (mq *Pulsar) Producer(topic, msg string) (MessageID string, err error) {
 | |
| 	if msg == "" {
 | |
| 		return "", errors.New("Message is nil.")
 | |
| 	}
 | |
| 
 | |
| 	producer, err := mq.Client.CreateProducer(pulsargo.ProducerOptions{
 | |
| 		Topic:           topic,
 | |
| 		CompressionType: pulsargo.ZSTD,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	msgID, err := producer.Send(context.Background(), &pulsargo.ProducerMessage{
 | |
| 		Payload: []byte(msg),
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	return msgID.String(), nil
 | |
| }
 | |
| 
 | |
| func (mq *Pulsar) Subscribe(topic, subscription string, subType pulsargo.SubscriptionType, do func([]byte)) error {
 | |
| 	// we can listen this channel
 | |
| 	channel := make(chan pulsargo.ConsumerMessage, 100)
 | |
| 
 | |
| 	options := pulsargo.ConsumerOptions{
 | |
| 		Topic:            topic,
 | |
| 		SubscriptionName: subscription,
 | |
| 		Type:             subType,
 | |
| 		// fill `MessageChannel` field will create a listener
 | |
| 		MessageChannel: channel,
 | |
| 	}
 | |
| 
 | |
| 	consumer, err := mq.Client.Subscribe(options)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	defer consumer.Close()
 | |
| 
 | |
| 	// Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
 | |
| 	// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
 | |
| 	// shared across multiple consumers as well
 | |
| 	for cm := range channel {
 | |
| 		consumer := cm.Consumer
 | |
| 		msg := cm.Message
 | |
| 
 | |
| 		do(msg.Payload())
 | |
| 		consumer.Ack(msg)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |