go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {
	// 获取connection
	amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"
	connection, err := amqp.Dial(amqUrl)
	if err != nil {
		panic(fmt.Sprintf("获取connection异常:%s\n", err))
	}

	// 获取channel
	channel, err := connection.Channel()
	if err != nil {
		panic(fmt.Sprintf("获取channel异常:%s\n", err))
	}

	return channel
}

1.1.client code:

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明队列(不存在自动创建)
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 生产_发送消息到队列
message := "ic元器件活动来新单啦"
err = channel.Publish(
    // 交换机
    "",
    // 队列名称
    queueName,
    // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
    false,
    // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
    false,
    amqp.Publishing{
        ContentType: "text/plain",
        // 队列和消息同时设置持久化
        DeliveryMode: 2,
        Body:         []byte(message),
    },
)
if err != nil {
    fmt.Printf("发送消息到队列异常:%s", err)
    return
}

1.2.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明队列(不存在自动创建)
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 生产_发送消息到队列
message := "ic元器件活动来新单啦,订单id"
messageSize := 10
for i := 0; i < messageSize; i++ {
    // 方便观察消费者
    time.Sleep(time.Second * 1)
    err = channel.Publish(
        // 交换机
        "",
        // 队列名称
        queueName,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message + strconv.Itoa(i)),
        },
    )
    if err != nil {
        fmt.Printf("发送消息到队列异常:%s", err)
        return
    }
}

2.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "ic_order_active"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "notice"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "fanout",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
message := "最新消息,华秋全场元器件3折起"
messageSize := 10
for i := 0; i < messageSize; i++ {
    // 方便观察消费者
    time.Sleep(time.Second * 1)
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        "",
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message + strconv.Itoa(i)),
        },
    )
    if err != nil {
        fmt.Printf("发送消息到队列异常:%s", err)
        return
    }
}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "pcb_layout_order"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "direct",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
allRouteKey := []string{
    "order_insert", // 新增订单
    "order_delete", // 删除订单
}

// 循环发送到两个路由key
message := "订单id1事件"
for _, routeKey := range allRouteKey {
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        routeKey,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        false,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message),
        },
    )
}

4.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "notice_queue"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 队列绑定交换机+绑定订单新增key
exchangeName := "pcb_layout_order"
allRouteKey := []string{
    "order_insert", // 新增订单
    "order_delete", // 删除订单
}
for _, routeKey := range allRouteKey {
    channel.QueueBind(
        // 队列名称
        queueName,
        // 绑定的键
        routeKey,
        // 交换机名称
        exchangeName,
        // 是否阻塞处理
        false,
        // 其他参数
        nil,
    )
}

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "ic订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}
// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 生产_声明交换机
exchangeName := "smt_steel_order"
err := channel.ExchangeDeclare(
    // 交换机名称
    exchangeName,
    // 交换机类型
    "topic",
    // 持久化
    true,
    // true->当所有绑定都与交换器解绑后,会自动删除此交换器
    false,
    // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器
    false,
    // 是否非阻塞
    false,
    // 其他参数
    nil,
)
if err != nil {
    fmt.Printf("声明交换机异常:%s", err)
    return
}

// 生产_发送消息到交换机
allRouteKey := []string{
    "order.insert", // 新增订单
    "order.delete", // 删除订单
}
for _, routeKey := range allRouteKey {
    //fmt.Print(routeKey)
    message := "来自" + routeKey + "的消息"
    err = channel.Publish(
        // 交换机
        exchangeName,
        // 路由key
        routeKey,
        // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃
        true,
        // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 队列和消息同时设置持久化
            DeliveryMode: 2,
            Body:         []byte(message),
        },
    )
}

5.1.service code

// 消费_获取connection的channel
channel := Connecttion.NewRabbitMQ()

// 消费_声明队列
queueName := "notice_queue"
_, err := channel.QueueDeclare(
    // 队列名称
    queueName,
    // 是否持久化
    false,
    // 是否自动删除
    false,
    // 是否具有排他性
    false,
    // 是否阻塞处理
    false,
    // 额外属性
    nil,
)
if err != nil {
    fmt.Printf("声明队列异常:%s", err)
    return
}

// 队列绑定交换机+绑定订单新增key
exchangeName := "smt_steel_order"
routeKey := "order.#"
channel.QueueBind(
    // 队列名称
    queueName,
    // 绑定的路由
    routeKey,
    // 交换机名称
    exchangeName,
    // 是否阻塞处理
    false,
    // 其他参数
    nil,
)

// 设置同一时间服务器只会发送一条消息给消费者
channel.Qos(
    // 每次获取多少条
    10,
    // 预加载数量(rabbitMq不支持)
    0,
    // false->对当前队列可用 true->对channel可用(rabbitMq不支持)
    false,
)

// 消费_获取队列中的消息
message, err := channel.Consume(
    // 队列名称
    queueName,
    // 消费者名称
    "smt订单消费者",
    // 是否自动ack
    false,
    // 是否排他性队列标识
    false,
    false,
    false,
    nil,
)
if err != nil {
    return
}

// 输出消息
for msg := range message {
    // 打印消息内容
    fmt.Printf("收到队列消息%s \n", msg.Body)
    // 确认收到消息
    msg.Ack(true)
}
访客
邮箱
网址

Top