- High availability based on kafka
- Support flow control, overload protection
- Quickly complete the construction of multiple applications and multiple services
- Support message delay queue
- Point-to-point consumption based on command points
Message queue for service decoupling, You can use it to decouple services, but it is worth noting that you need to ensure that your services are idempotent.
Install alpaca-mq using the "go get" command:
- go get github.com/SheepGardener/alpaca-mq
The pusher of the message, produces and builds the message, and completes the push of the message, Puller startup is very simple, you can start it quickly, of course, you need to configure puller first
package main
import (
alpaca "github.com/SheepGardener/alpaca-mq"
)
func main() {
puller := alpaca.InitPuller("./log/puller.log","./config/puller.yml", "./config/apps/")
puller.Pull()
}
The consumer of the message, passing the message to the service application,Here is only a simple example, in fact, you can customize a pusher service more flexibly
package main
import (
"encoding/json"
"fmt"
"net/http"
alpaca "github.com/SheepGardener/alpaca-mq"
)
type Response struct {
Errno int8 `json:"errno"`
Errmsg string `json:"errmsg"`
Logid string `json:"log_id"`
}
var Pusher *alpaca.Pusher
func init() {
Pusher = alpaca.InitPusher("./log/pusher.log","./config/pusher.yml")
}
func sendMsg(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
resp := Response{}
resp.Errno = 0
resp.Errmsg = "success"
Logid := r.Form.Get("logid")
Cmd := r.Form.Get("cmd")
Hashkey := r.Form.Get("hash_key")
Data := r.Form.Get("data")
if Logid == "" {
Logid = "test" //alpaca.GetLogId()
}
if Cmd == "" {
w.Write([]byte("{\"errno\":-1,\"errmsg\":\"Command cannot be empty\"}"))
return
}
resp.Logid = Logid
kmsg := &alpaca.Kmessage{
Cmd: Cmd,
Data: Data,
LogId: Logid,
HashKey: Hashkey,
Delay: 12,
}
err := Pusher.Push(kmsg)
if err != nil {
resp.Errno = -1
resp.Errmsg = fmt.Sprintf("%s", err)
}
respJson, err := json.Marshal(resp)
if err != nil {
w.Write([]byte("{\"errno\":-1,\"errmsg\":\"ResponData json marchal failed\"}"))
return
}
w.Write(respJson)
}
func main() {
http.HandleFunc("/sendmsg", sendMsg)
http.ListenAndServe(":8009", nil)
}
The message is transmitted in the form of alpaca.Kmessage
alpaca.Kmessage{
Cmd: Cmd, // Command point
Data: Data, // transfer data
LogId: Logid, // Log ID
HashKey: Hashkey, // The same hashkey will be assigned to the same queue for sequential processing
Delay: 12, //Message delay time
}
Alpace implements message processing and service load balancing strategy by default. If you want to customize your own message processing and load balancing strategy, you can implement it in the following custom ways
// Custom message processing implementation
type customizeHandleMessage struct {
...Implement your own logic
}
func (c *customizeHandleMessage) MessageHandle(url stirng, msg *alpaca.Kmessage) {
...Implement your own logic
}
//Register message handler
handlemsg := &customizeHandleMessage{}
pull.SetMessageHandle(handlemsg)
**********************************************************************************
// Implement a custom service load balancing strategy
type customizeServerSelector struct {
...Implement your own logic
}
func (c *customizeServerSelector) GetAppUrl(ap App) {
...Implement your own logic
}
//Register service load balancing selector
selector := &customizeServerSelector{}
pull.SetServerSelect(selector)
- Service request succeeded, The condition for the puller request service to be successful is that the requested service needs to return 200 and the return parameter exists errno and is 0
- Puller deployment recommendations. If you want to deploy multiple pullers, it is recommended that you initialize the number of kafka partitions equal to the number of pullers. If you can, please ensure that one puller only handles one partition, which will make full use of the performance advantages of alpaca-mq
- Puller and pusher will only handle one topic, so don’t expect it to handle multiple topics. If you want to implement multiple topics, it is recommended that you divide different topics according to different businesses.
- In order to ensure the entire message link, please be sure to carry the LogId, which will bring great benefits, not just limited to the location of the problem
- Of course, you have more suggestions and ideas, you can contact me
If you want to know more how to use it, you can refer to the examples, it can provide you with more help