蹒跚学Go-Http编程+爬虫+redis+RabbitMQ+etcd+mongdb

HTTP

HTTP(Hyper Text Transfer Protocol,超文本传输协议)是互联网上应用最为广泛的一种网络协议,定义了客户端和服务器端之间请求与响应的传输标准。
Go 语言标准库内建提供了net/http包,涵盖了HTTP客户端和服务器端的具体实现。
使用net/htp包,可以很方便地编写HTTP客户端或服务器端的程序。

  • HTTP协议通常承载于TCP协议之上,有时也承载于TLS或SSL协议层之上,这个时候,就成了我们常说的HTTPS。

地址

URL全称为Unique Resource Location,用来表示网络资源,可以理解为网络文件路径。
基本URL的结构包含模式(协议)、服务器名称(IP地址)、路径和文件名。常见的协议/模式如http、https、ftp等。服务器的名称或IP地址后面有时还跟一个冒号和一个端口号。再后面是到达这个文件的路径和文件本身的名称

  • URL的长度有限制,不同的服务器的限制值不太相同,但是不能无限长

  • http://主机名:端口号/path?表单

正常的相应包的例子

类型 解析
HTTP/1.1 200 OK 状态行
Server: nginx 服务器使用的WEB软件名及版本
Content-Type: text/html; charset=UTF-8 服务器发送信息的类型
Connection: keep-alive 保持连接状态
Set-Cookie: PHPSESSID=mjup58ggbefu7ni9jea7908kub; path=/; HttpOnly
Cache-Control: no-cache
Date: Wed, 14 Nov 2018 08:27:32 GMT 发送时间
Content-Length: 99324 主体内容长度
空行用来分割消息头和主体
消息体**
func main() {
    //创建监听socket
    listener,err := net.Listen("tcp",":8000")
    if err != nil {
        fmt.Println("Listener err",err)
        return
    }
    defer listener.Close()
    //阻塞等待客户端连接
    conn,err := listener.Accept()
    if err != nil {
        fmt.Println("Accept err :",err)
        return
    }
    defer conn.Close()
    fmt.Println(conn.RemoteAddr().String(),"连接成功")
    buf := make([]byte,2048)
    n,err:=conn.Read(buf)
    result := buf[:n]
    fmt.Printf("#\n%s#",string(result))
}
//在浏览器中输出127.0.0.1:8000
----------------------------------------
127.0.0.1:64665 连接成功
#
GET / HTTP/1.1
Host: 127.0.0.1:8000
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9
Cookie: __guid=96992031.2518852697064897000.1530511322665.4133

#

请求行

  • GET
    • 当客户端要从服务器中读取某个资源时,使用GET 方法。GET 方法要求服务器将URL 定位的资源放在响应报文的数据部分,回送给客户端,即向服务器请求某个资源。

    • 使用GET方法时,请求参数和对应的值附加在 URL 后面,利用一个问号(“?”)代表URL 的结尾与请求参数的开始,传递参数长度受限制,因此GET方法不适合用于上传数据。

    • 通过GET方法来获取网页时,参数会显示在浏览器地址栏上,因此保密性很差。

  • POST

    • 当客户端给服务器提供信息较多时可以使用POST 方法,POST 方法向服务器提交数据,比如完成表单数据的提交,将数据提交给服务器处理。

    • GET 一般用于获取/查询资源信息,POST 会附带用户数据,一般用于更新资源信息。POST 方法将请求参数封装在HTTP 请求数据中,而且长度没有限制,因为POST携带的数据,在HTTP的请求正文中,以名称/值的形式出现,可以传输大量数据。

请求头

请求头部为请求报文添加了一些附加信息,由“名/值”对组成,每行一对,名和值之间使用冒号分隔。请求头部通知服务器有关于客户端请求的信息

请求头 含义
User-Agent 请求的浏览器类型
Accept 客户端可识别的响应内容类型列表,星号“ * ”用于按范围将类型分组,用“ / ”指示可接受全部类型,用“ type/* ”指示可接受 type 类型的所有子类型
Accept-Language 客户端可接受的自然语言
Accept-Encoding 客户端可接受的编码压缩格式
Accept-Charset 可接受的应答的字符集
Host 请求的主机名,允许多个域名同处一个IP 地址,即虚拟主机
connection 连接方式(close或keepalive)
Cookie 存储于客户端扩展字段,向同一域名的服务端发送属于该域的cookie

空行

  • 最后一个请求头之后是一个空行,发送回车符和换行符,通知服务器以下不再有请求头。

请求包体

  • 请求包体不在GET方法中使用,而在POST方法中使用。POST方法适用于需要客户填写表单的场合。与请求包体相关的最常使用的是包体类型Content-Type和包体长度Content-Length。

推荐学习地址

  • https://segmentfault.com/a/1190000018129846#articleHeader0 -> Get/Post区别

  • https://github.com/guyan0319/golang_development_notes/blob/master/zh/7.2.md -> net/http源码分析

HTTP 服务端

Web服务器的工作原理

  • 客户机通过TCP/IP协议建立到服务器的TCP连接

  • 客户端向服务器发送HTTP协议请求包,请求服务器里的资源文档

  • 服务器向客户机发送HTTP协议应答包,如果请求的资源包含有动态语言的内容,那么服务器会调用动态语言的解释引擎负责处理“动态内容”,并将处理得到的数据返回给客户端

  • 客户机与服务器断开。由客户端解释HTML文档,在客户端屏幕上渲染图形结果

Go中的web

  • Request:用户请求的信息,用来解析用户的请求信息,包括post、get、cookie、url等信息

  • Response:服务器需要反馈给客户端的信息

  • Conn:用户的每次请求链接

  • Handler:处理请求和生成返回信息的处理逻辑

  • 理解go中的http服务,最重要的就是要理解Multiplexer和hander,Golang中的Multiplexer基于ServerMux结构,同时也实现了Handler接口

  • 接收request的过程中,最重要的莫过于路由(router),即实现一个Multiplexer器。Go中既可以使用内置的mutilplexer–DefaultServeMux,也可以自定义。Multiplexer路由的目的就是为了找到处理器函数(hander),后者将对request进行处理,同时构建response。

回调函数

  • 本质:函数指针。通过地址,在某一特定位置,调用函数。

  • 在程序中,定义一个函数,但不显示调用。当某一条件满足时,该函数由操作系统自动调用。

实现简单的http服务

func sayHello(w http.ResponseWriter,r *http.Request) {
    fmt.Println("________________")
    r.ParseForm()   //解析参数,默认不会给解析
    fmt.Println("path ->",r.URL.Path)
    fmt.Println("scheme ->",r.URL.Scheme)
    fmt.Println(r.Form["url_long"])
    for k,v := range r.Form {
        fmt.Println("key ->",k)
        fmt.Println("value ->",v)
    }
    fmt.Fprintf(w,"Hello My Web")
}

func main() {
    //参数一:pattern string 监听的路径
    //参数二:handler func(ResponseWriter,*Request)
    http.HandleFunc("/",sayHello)   //设置访问的路由
    /*
    参数一:addr 监听地址
    参数二:handler 通常为空,意味服务端调用http.DefaultServerMux进行处理
    而服务端编写的业务逻辑处理程序http.Handle()或http.HandleFunc()
    默认注入http.DefaultServeMux中
    */
    err:=http.ListenAndServe(":8000",nil)
    if err != nil {
        log.Fatal("ListenAndServer:",err)
    }
}

http-go源码实现

func (srv *Server) Serve(l net.Listener) error {
    defer l.Close()
    if fn := testHookServerServe; fn != nil {
        fn(srv, l)
    }
    var tempDelay time.Duration // how long to sleep on accept failure

    if err := srv.setupHTTP2_Serve(); err != nil {
        return err
    }

    srv.trackListener(l, true)
    defer srv.trackListener(l, false)

    baseCtx := context.Background() // base is always background, per Issue 16220
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        rw, e := l.Accept()
        if e != nil {
            select {
            case <-srv.getDoneChan():
                return ErrServerClosed
            default:
            }
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay > max {
                    tempDelay = max
                }
                srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
                time.Sleep(tempDelay)
                continue
            }
            return e
        }
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve(ctx)
    }
}

HTTP 客户端

//解析天气的例子

func main() {
    Url := "http://www.webxml.com.cn/WebServices/WeatherWebService.asmx/getWeatherbyCityName?theCityName=58367"
    ResPonse,_ := http.Get(Url)
    defer ResPonse.Body.Close()
    if ResPonse.StatusCode == 200 {
        body,_ := ioutil.ReadAll(ResPonse.Body)
        fmt.Println(string(body))
    }
    fmt.Println("___________天气预报_________")
    fmt.Printf("ResPonse: %v\n",ResPonse)
    fmt.Println("-----------------------------------------")
    fmt.Printf("response.Body:%+v\n", ResPonse.Body)
    fmt.Printf("response.Header:%+v\n", ResPonse.Header)
    fmt.Printf("response.StatusCode:%+v\n", ResPonse.StatusCode)
    fmt.Printf("response.Status:%+v\n", ResPonse.Status)
    fmt.Printf("response.Request:%+v\n", ResPonse.Request)
    fmt.Printf("response.Cookies:%+v\n", ResPonse.Cookies())
}
结果输出:
<?xml version="1.0" encoding="utf-8"?>
<ArrayOfString xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://WebXml.com.cn/">
  <string>直辖市</string>
  <string>上海</string>
  <string>58367</string>
  <string>58367.jpg</string>
  <string>2019/3/13 14:19:03</string>
  <string>9℃/16℃</string>
  <string>3月13日 多云转小雨</string>
  <string>东南风3-4级</string>
  <string>1.gif</string>
  <string>7.gif</string>
  <string>今日天气实况:气温:15℃;风向/风力:西北风 1级;湿度:33%;紫外线强度:最弱。空气质量:良。</string>
  <string>紫外线指数:最弱,辐射弱,涂擦SPF8-12防晒护肤品。
健臻·血糖指数:不易波动,天气条件好,血糖不易波动,可适时进行户外锻炼。
穿衣指数:较冷,建议着厚外套加毛衣等服装。
洗车指数:不宜,有雨,雨水和泥水会弄脏爱车。
空气污染指数:良,气象条件有利于空气污染物扩散。
</string>
  <string>7℃/13℃</string>
  <string>3月14日 小雨转晴</string>
  <string>南风3-4级</string>
  <string>7.gif</string>
  <string>0.gif</string>
  <string>7℃/18℃</string>
  <string>3月15日 多云</string>
  <string>南风3-4级转小于3级</string>
  <string>1.gif</string>
  <string>1.gif</string>
  <string>上海简称:沪,位置:上海地处长江三角洲前缘,东濒东海,南临杭州湾,西接江苏,浙江两省,北界长江入海,正当我国南北岸线的中部,北纬31°14′,东经121°29′。面积:总面积7823.5平方公里。人口:人口1000多万。上海丰富的人文资源、迷人的城市风貌、繁华的商业街市和欢乐的节庆活动形成了独特的都市景观。游览上海,不仅能体验到大都市中西合壁、商儒交融、八方来风的氛围,而且能感受到这个城市人流熙攘、车水马龙、灯火璀璨的活力。上海在中国现代史上占有着十分重要的地位,她是中国共产党的诞生地。许多震动中外的历史事件在这里发生,留下了众多的革命遗迹,处处为您讲述着一个个使人永不忘怀的可歌可泣的故事,成为包含民俗的人文景观和纪念地。在上海,每到秋祭,纷至沓来的人们在这里祭祀先烈、缅怀革命历史,已成为了一种风俗。大上海在中国近代历史中,曾是风起云涌可歌可泣的地方。在这里荟萃多少风云人物,散落在上海各处的不同住宅建筑,由于其主人的非同寻常,蕴含了耐人寻味的历史意义。这里曾留下许多革命先烈的足迹。瞻仰孙中山、宋庆龄、鲁迅等故居,会使您产生抚今追昔的深沉遐思,这里还有无数个达官贵人的住宅,探访一下李鸿章、蒋介石等人的公馆,可以联想起主人那段显赫的发迹史。</string>
</ArrayOfString>
___________天气预报_________
ResPonse: &{200 OK 200 HTTP/1.1 1 1 map[Cache-Control:[private, max-age=0] X-Powered-By:[ASP.NET] Date:[Wed, 13 Mar 2019 06:29:31 GMT] Content-Type:[text/xml; charset=utf-8] Vary:[Accept-Encoding] Server:[Microsoft-IIS/7.5] X-Aspnet-Version:[2.0.50727]] 0xc0420ee080 -1 [] false true map[] 0xc0420d8000 <nil>}
-----------------------------------------
response.Body:&{body:0xc0420e8140 zr:0xc0420f82c0 zerr:<nil>}
response.Header:map[X-Aspnet-Version:[2.0.50727] Content-Type:[text/xml; charset=utf-8] Vary:[Accept-Encoding] Server:[Microsoft-IIS/7.5] Date:[Wed, 13 Mar 2019 06:29:31 GMT] Cache-Control:[private, max-age=0] X-Powered-By:[ASP.NET]]
response.StatusCode:200
response.Status:200 OK
response.Request:&{Method:GET URL:http://www.webxml.com.cn/WebServices/WeatherWebService.asmx/getWeatherbyCityName?theCityName=58367 Proto:HTTP/1.1 ProtoMajor:1 ProtoMinor:1 Header:map[] Body:<nil> GetBody:<nil> ContentLength:0 TransferEncoding:[] Close:false Host:www.webxml.com.cn Form:map[] PostForm:map[] MultipartForm:<nil> Trailer:map[] RemoteAddr: RequestURI: TLS:<nil> Cancel:<nil> Response:<nil> ctx:<nil>}
response.Cookies:[]

实现简单的文件服务器

package main

import (
    "log"
    "net/http"
)

func main() {
    err := http.ListenAndServe(":5000",http.FileServer(http.Dir("C://")))
    if err != nil {
        log.Fatal(err)
    }
}

实战云盘项目

目录结构

data
└── go
    └── work
        ├── bin
        ├── pkg
        └── src
            ├── filestore_server
            │   ├── db
            │   │   ├── dbfile.go
            │   │   ├── mysql
            │   │   │   └── db.go
            │   │   ├── userfile.go
            │   │   ├── userfileupload.go
            │   │   └── user.go
            │   ├── handler
            │   │   ├── handler.go
            │   │   └── user.go
            │   ├── main.go
            │   ├── meta
            │   │   └── meta.go
            │   ├── static
            │   │   ├── img
            │   │   │   └── avatar.jpeg
            │   │   ├── js
            │   │   │   └── auth.js
            │   │   └── view
            │   │       ├── home.html
            │   │       ├── index.html
            │   │       ├── signin.html
            │   │       └── signup.html
            │   └── util
            │       ├── resp.go
            │       └── util.go
            ├── github.com
            └── golang.org

项目API规划

接口描述 接口Url
文件上传接口 Post /file/upload
文件查询接口 Get /file/query
文件下载接口 Get /file/download
文件删除接口 POST /file/delete
文件修改(重命名) POST /file/update
用户注册 POST /user/signup
用户登入 POST /user/signin
用户信息 POST /user/info

文件的校验值计算

校验算法类型              各方面的比较
CRC(32/64)              1.校验值长度
                        2.校验值类别
MD5                     3.安全级别
                        4.计算效率
SHA1                    5.应用场景

CRC  ->  文件传输情况,传输的两端都使用CRC进行检查检验,保证传输过程中文件没有被篡改
MD5  ->  常用文件签名
SHA1 ->  文件唯一的标志

文件秒传的原理

  • 1.用户上传 -> 有相同的文件已经被上传则不需要二次上传直接可以使用覆盖
  • 2.离线下载 -> 大文件秒下载
  • 3.好友分享

关键点

  • 文件HASH(MD5,SHA1)的计算执行成功

  • 用户文件关联(文件的隔离可以给多个用户使用)

Go调用Redis池

//定义连接redis的信息
var (
    pool *redis.Pool
    redisHost="127.0.0.1:6379"
    redisPass="testupload"
)

func init(){
    pool = NewRedisPool()
}

//创建redis连接池->返回对象指针
func NewRedisPool() *redis.Pool{
    return &redis.Pool{
        MaxIdle:50,
        MaxActive:30,
        IdleTimeout:300*time.Second,
        //创建连接的方法
        Dial: func() (conn redis.Conn, e error) {
            //打开连接
            conn,err :=redis.Dial("tcp",redisHost)
            if err != nil{
                log.Println("Redis Conn is Failed ->",err)
                return nil,err
            }
            //访问认证
            if _,err:=conn.Do("AUTH",redisPass);err !=nil{
                conn.Close()
                return nil,err
            }
            return conn,nil
        },
        //定时的去检查Redis-server连接是否是可用的方法
        TestOnBorrow: func(conn redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute{
                return nil
            }
            //ping不通则返回错误信息
            _,err := conn.Do("PING")
            return err
        },
    }
}

func RedisPool() *redis.Pool{
    return pool
}

Go调用Rabbitmq

常用的库

go get github.com/streadway/amqp

MQ-Simple方式调用

MQ环境搭建

#ubuntu/centos对号入座
sudo apt-get install docker.io
yum install docker-ce

#查看docker是否ok
docker info

#docker快速部署
docker run -d --name rabbitmq --publish 5671:5671  --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management

自己实现调用方式

package rabbitmq

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

//定义常量 -> amqp://用户名:密码@地址:端口/VirtualHost
const MQURL = "amqp://guest:guest@127.0.0.1:5672/ddy-test"

//定义MQ的结构体
type RabbitMQ struct {
    conn *amqp.Connection
    channel *amqp.Channel
    //队列名称
    QueueName string
    //交换机
    Exchange string
    //key信息
    key string
    //连接信息
    Myurl string
}

//创建Rabbitmq结构体实例
func NewRabbitMQ(queueName string,EXchange string,key string) *RabbitMQ {
    mq := &RabbitMQ{QueueName:queueName,Exchange:EXchange,key:key,Myurl:MQURL}
    var err error
    //创建连接
    mq.conn,err = amqp.Dial(MQURL)
    mq.FailErr(err,"创建RabbitMQ连接错误!")
    mq.channel,err = mq.conn.Channel()
    mq.FailErr(err,"获取RabbitMQ的channel失败!")
    return mq
}

//断开channel和connection
func (r *RabbitMQ) DestoryMQ(){
    r.channel.Close()
    r.conn.Close()
}

//自定义错误处理函数
func (r *RabbitMQ) FailErr(err error,message string) {
    if err != nil {
        log.Fatalf("%s%s", message, err)
        panic(fmt.Sprintf("%s%s", message, err))
    }
}

//创建Simple模式上的MQ实例
func NewRabbitmqSimple(queueName string) *RabbitMQ {
    //MQ simple模式下默认EXchange为defaults/key为空
    return NewRabbitMQ(queueName,"","")
}

//生产者
func (r *RabbitMQ) PubilshSimple(message string) {
    //1.申请队列,如果队列不存在会自动的创建,如果存在则跳过
    _,err := r.channel.QueueDeclare(
        r.QueueName,
        //是否持久化
        false,
        //是否自动删除
        false,
        //是否具有排他性
        false,
        //是否阻塞
        false,
        //额外的熟悉参数传入
        nil)
    if err != nil {
        r.FailErr(err,"申请RabbitMQ队列错误!")
    }
    //2.发送消息到队列中去
    err = r.channel.Publish(
            r.Exchange,
            r.QueueName,
            //为真根据exchange规则和routekey如果找不到符合的队列会把消息返回给发送者(回退)
            false,
            false,
            amqp.Publishing{
                ContentType:    "test/plain",
                Body:           []byte(message),
            })
}

func (r RabbitMQ) ConsumeSimple() {
    //1.申请队列,如果队列不存在会自动的创建,如果存在则跳过
    _,err := r.channel.QueueDeclare(
        r.QueueName,
        //是否持久化
        false,
        //是否自动删除
        false,
        //是否具有排他性
        false,
        //是否阻塞
        false,
        //额外的熟悉参数传入
        nil)
    if err != nil {
        r.FailErr(err,"申请RabbitMQ队列错误!")
    }
    //接受消息
    message,err := r.channel.Consume(
        r.QueueName,
        //区分多个消费者
        "",
        //是否自动应答
        true,
        //是否具有排他性
        false,
        //为真表示不能将同一个connection中发送的消息传递给消费者
        false,
        //队列消费是否阻塞
        false,
        nil)
    if err != nil {
        r.FailErr(err,"接受消息出错!")
    }
    //阻塞的channel
    forever := make(chan bool)
    //启动协程处理消息
    go func() {
        for a := range message {
            //实现简单的逻辑处理
            log.Printf("received a message -> %s",a.Body)
        }
    }()
    log.Println("[*] Waiting for Message")
    <- forever
}

生产及其消费

  • 简单的本地生产及其消费模式
    package main

import (
    "../rabbitmq"
    "log"
    "time"
)

func main() {
    rabbit := rabbitmq.NewRabbitmqSimple("ddy-test")
    var a int16 = 0
    go func() {
        time.Sleep(10*time.Second)
        rabbit.ConsumeSimple()
    }()

    for {
        a++
        time.Sleep(10*time.Millisecond)
        rabbit.PubilshSimple("第"+string(a)+"条消息")
        log.Println(a)
    }

}

MQ-subpub模式

  • work模式:一个消息只能被一个消费者消费
//订阅模式的下Rabbitmq实例
func NewRabbitmqPubSub(exchanage string) *RabbitMQ{
    //创建实例
    mq := NewRabbitMQ("",exchanage,"")
    var err error
    //获取连接
    mq.conn,err = amqp.Dial(MQURL)
    mq.FailErr(err,"无法连接RabbitMQ!")
    //获取channel
    mq.channel,err = mq.conn.Channel()
    mq.FailErr(err,"获取MQ的channel失败!")
    return mq
}

//订阅模式下的生产者
func (r *RabbitMQ) PublishPub(message string) {
    //创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //订阅模式下设置为广播类型
        "fanout",
        true,
        false,
        false,
        false,
        nil,)
    r.FailErr(err,"订阅模式创建失败!")
    //消息的发送
    err = r.channel.Publish(
        r.Exchange,
        "",
        false,
        false,
        amqp.Publishing{
            ContentType:    "test/plain",
            Body:           []byte(message),
        })
}

//订阅模式下的消费者
func (r *RabbitMQ) RecieveSub(name string) {
    //创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //订阅模式下设置为广播类型
        "fanout",
        true,
        false,
        false,
        false,
        nil,)
    r.FailErr(err,"订阅模式创建失败!")
    //创建队列,不指定队列
    queue,err := r.channel.QueueDeclare(
        //随机生产队列名称
        "",
        false,
        false,
        true,
        false,
        nil)
    r.FailErr(err,"订阅模式创建队列失败!")
    //绑定队列到 exchange
    err = r.channel.QueueBind(
        //已经声明成功的队列名称
        queue.Name,
        //在pub/sub模式下,key值必须为空
        "",
        r.Exchange,
        false,
        nil)
    r.FailErr(err,"订阅模式绑定队列失败!")
    //消费消息
    messages,err := r.channel.Consume(
        queue.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
        )
    forever := make(chan bool)

    go func() {
        for d :=range messages{
            log.Println("当前的消费端 ->",name,"订阅模式接受到 -> ",d)
        }
    }()
    <- forever
}

简单生产及其消费

func main() {
    rabbit := rabbitmq.NewRabbitmqPubSub(""+"test-PROduct")
    var a = 0
    var cha = make(chan bool)
    go func() {
        rabbit.RecieveSub("Client"+strconv.Itoa(1))
    }()

    go func() {
        rabbit.RecieveSub("Client"+strconv.Itoa(2))
    }()

    for a<5000{
        a++
        time.Sleep(1000*time.Millisecond)
        rabbit.PublishPub("当前第"+strconv.Itoa(a)+"条消息")
        log.Println("当前生产第"+strconv.Itoa(a)+"条消息")
    }
    <- cha
}
输出结果:
2019/06/05 14:47:42 当前生产第1条消息
2019/06/05 14:47:42 当前的消费端 -> Client2 订阅模式接收到数据 ->  当前第1条消息
2019/06/05 14:47:42 当前的消费端 -> Client1 订阅模式接收到数据 ->  当前第1条消息
2019/06/05 14:47:43 当前生产第2条消息
2019/06/05 14:47:43 当前的消费端 -> Client2 订阅模式接收到数据 ->  当前第2条消息
2019/06/05 14:47:43 当前的消费端 -> Client1 订阅模式接收到数据 ->  当前第2条消息
2019/06/05 14:47:44 当前生产第3条消息
2019/06/05 14:47:44 当前的消费端 -> Client2 订阅模式接收到数据 ->  当前第3条消息
2019/06/05 14:47:44 当前的消费端 -> Client1 订阅模式接收到数据 ->  当前第3条消息
2019/06/05 14:47:45 当前生产第4条消息
2019/06/05 14:47:45 当前的消费端 -> Client2 订阅模式接收到数据 ->  当前第4条消息
2019/06/05 14:47:45 当前的消费端 -> Client1 订阅模式接收到数据 ->  当前第4条消息

MQ-routing模式

  • 一个消息能被多个消费者获取,并且小心的目标队列可以被生产者指定

  • 优势:可以在生产端指定消费端进行消费

//路由模式创建MQ实例
func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ{
    //创建RabbitMQ实例
    rabbitmq := NewRabbitMQ("",exchangeName,routingKey)
    var err error
    //获取连接Connection
    rabbitmq.conn,err = amqp.Dial(rabbitmq.Myurl)
    rabbitmq.FailErr(err,"建立Routing模式失败!")
    //获取channel
    rabbitmq.channel,err = rabbitmq.conn.Channel()
    rabbitmq.FailErr(err,"Routing模式取回channel失败!")
    return rabbitmq
}

//路由模式生产者
func (r *RabbitMQ) PublishRouting(message string) {
    //创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //修改成direct
        "direct",
        true,
        false,
        false,
        false,
        nil)
    r.FailErr(err,"Routing创建交换机失败!")
    //发送消息
    err = r.channel.Publish(
        r.Exchange,
        r.key,
        false,
        false,
        amqp.Publishing{
            ContentType:    "test/plain",
            Body:           []byte(message),
        })
    r.FailErr(err,"Routing生产消息推送失败!")
}

//Routing模式消费者
func (r *RabbitMQ) RecieveRouting() {
    //创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //交换机类型
        "direct",
        true,
        false,
        false,
        false,
        nil)
    r.FailErr(err,"Routing交换机创建失败!")
    //创建队列,队列名称不要填写
    queue,err := r.channel.QueueDeclare(
        //随机队列名称
        "",
        false,
        false,
        true,
        false,
        nil)
    r.FailErr(err,"Routing创建队列失败!")
    //绑定队列到exchange中
    err = r.channel.QueueBind(
        queue.Name,
        r.key,
        r.Exchange,
        false,
        nil)
    r.FailErr(err,"Routing绑定队列失败!")
    //开始消费消息
    message,err :=r.channel.Consume(
        queue.Name,
        "",
        true,
        false,
        false,
        false,
        nil)
    r.FailErr(err,"Routing消费消息失败!")
    forever := make(chan bool)
    go func() {
        for d :=range message{
            log.Printf("Routine Received message -> %s",d.Body)
        }
    }()
    <-forever
}

生产消费

func main() {
    rabbitone := rabbitmq.NewRabbitMQRouting(""+"Routing-test","test-one")
    rabbittwo := rabbitmq.NewRabbitMQRouting(""+"Routing-test","test-two")
    var a = 0
    var cha = make(chan bool)
    go func() {
        rabbitone.RecieveRouting()
    }()

    go func() {
        rabbittwo.RecieveRouting()
    }()

    for a<5000{
        a++
        time.Sleep(1000*time.Millisecond)
        rabbitone.PublishRouting("这是one的消息:当前第"+strconv.Itoa(a)+"条消息")
        rabbittwo.PublishRouting("这是two的消息:当前第"+strconv.Itoa(a)+"条消息")
        log.Println("当前生产第"+strconv.Itoa(a)+"条消息")
    }
    <- cha
}
输出信息:
2019/06/05 16:37:03 Routine Received message -> 这是one的消息:当前第1条消息
2019/06/05 16:37:03 当前生产第1条消息
2019/06/05 16:37:03 Routine Received message -> 这是two的消息:当前第1条消息
2019/06/05 16:37:04 Routine Received message -> 这是one的消息:当前第2条消息
2019/06/05 16:37:04 当前生产第2条消息
2019/06/05 16:37:04 Routine Received message -> 这是two的消息:当前第2条消息
2019/06/05 16:37:05 Routine Received message -> 这是one的消息:当前第3条消息
2019/06/05 16:37:05 当前生产第3条消息
2019/06/05 16:37:05 Routine Received message -> 这是two的消息:当前第3条消息
2019/06/05 16:37:06 Routine Received message -> 这是one的消息:当前第4条消息
2019/06/05 16:37:06 当前生产第4条消息
2019/06/05 16:37:06 Routine Received message -> 这是two的消息:当前第4条消息

MQ-topic模式

  • 一个消息被多个消费者获取。消息的目标queue可用BindingKey以通配符,(#:一个或多个词,*:一个词)的方式指定。

topic实现

//话题模式
//Key规则:其中"*"用于匹配一个单词,"#"用于匹配多个单词(可以是0个)
//例子:匹配docker.* 代表 docker.com,但是docker.aa.com需要docker.#才能匹配到
func NewRabbitMQTopic(exchangeName string,routingKey string) *RabbitMQ {
    //创建RabbitMQ实例
    rabbitmq := NewRabbitMQ("",exchangeName,routingKey)
    var err error
    //获取Connection
    rabbitmq.conn,err = amqp.Dial(rabbitmq.Myurl)
    rabbitmq.FailErr(err,"Topic获取Connection失败")
    //获取Channel
    rabbitmq.channel,err = rabbitmq.conn.Channel()
    rabbitmq.FailErr(err,"Topic获取Channel失败")
    return rabbitmq
}

//Topic生产模式
func (r *RabbitMQ) PublishTopic(message string) {
    //创建交换机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        //要改成topic
        "topic",
        true,
        false,
        false,
        false,
        nil,)
    r.FailErr(err,"Topic创建交换机失败!")
    //发送消息
    err = r.channel.Publish(
        r.Exchange,
        r.key,
        false,
        false,
        amqp.Publishing{
            ContentType:    "test-plain",
            Body:           []byte(message),
        })
    r.FailErr(err,"Topic发送消息失败")
}

//消费者模式
func (r *RabbitMQ) RecieveTopic(name string) {
    //创建虚拟机
    err := r.channel.ExchangeDeclare(
        r.Exchange,
        "topic",
        true,
        false,
        false,
        false,
        nil)
    r.FailErr(err,"Topic创建虚拟机失败")
    //创建队列,这里注意队列名称不要写
    queue,err := r.channel.QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil)
    r.FailErr(err,"Topic创建队列失败")
    //绑定队列到Exchange中
    err = r.channel.QueueBind(
        queue.Name,
        r.key,
        r.Exchange,
        false,
        nil)
    r.FailErr(err,"Topic绑定队列到Exchange失败")
    //消费信息
    message,err := r.channel.Consume(
        queue.Name,
        "",
        true,
        false,
        false,
        false,
        nil)
    r.FailErr(err,"Topic消费信息失败")
    go func() {
        for d := range message{
            log.Printf("[%s]Topic Received message -> %s",name,d.Body)
        }
    }()
}

生产消费

func main() {
    rabbitone := rabbitmq.NewRabbitMQTopic("Topic-test","docker.red.com")
    rabbittwo := rabbitmq.NewRabbitMQRouting("Topic-test","docker.yellow.two")
    var a = 0
    var cha = make(chan bool)
    go func() {
        rabbittest1 := rabbitmq.NewRabbitMQTopic("Topic-test","#")
        rabbittest1.RecieveTopic("消费端一")
    }()

    go func() {
        rabbittest2 := rabbitmq.NewRabbitMQTopic("Topic-test","docker.*.com")
        rabbittest2.RecieveTopic("消费端二")
    }()

    for a<5000{
        a++
        time.Sleep(1000*time.Millisecond)
        rabbitone.PublishTopic("这是[docker.red.com]的消息:当前第"+strconv.Itoa(a)+"条消息")
        rabbittwo.PublishTopic("这是[docker.yellow.two]的消息:当前第"+strconv.Itoa(a)+"条消息")
        log.Println("当前生产第"+strconv.Itoa(a)+"条消息")
    }
    <- cha
}
输出结果:
2019/06/05 18:26:07 [消费端二]Topic Received message -> 这是[docker.red.com]的消息:当前第1条消息
2019/06/05 18:26:07 [消费端一]Topic Received message -> 这是[docker.red.com]的消息:当前第1条消息
2019/06/05 18:26:07 当前生产第1条消息
2019/06/05 18:26:07 [消费端一]Topic Received message -> 这是[docker.yellow.two]的消息:当前第1条消息
2019/06/05 18:26:08 [消费端二]Topic Received message -> 这是[docker.red.com]的消息:当前第2条消息
2019/06/05 18:26:08 当前生产第2条消息
2019/06/05 18:26:08 [消费端一]Topic Received message -> 这是[docker.red.com]的消息:当前第2条消息
2019/06/05 18:26:08 [消费端一]Topic Received message -> 这是[docker.yellow.two]的消息:当前第2条消息
2019/06/05 18:26:09 [消费端二]Topic Received message -> 这是[docker.red.com]的消息:当前第3条消息
2019/06/05 18:26:09 [消费端一]Topic Received message -> 这是[docker.red.com]的消息:当前第3条消息
2019/06/05 18:26:09 当前生产第3条消息
2019/06/05 18:26:09 [消费端一]Topic Received message -> 这是[docker.yellow.two]的消息:当前第3条消息 

Go调用etcd用法

  • put/get/delete用法

  • 租约方式

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/mvcc/mvccpb"
    "log"
    "time"
)

func main() {
    //配置客户端
    conf := clientv3.Config{
        Endpoints:[]string{"192.168.42.128:2379"},
        DialTimeout:5*time.Second,
    }
    //建立新的连接
    client,err := clientv3.New(conf)
    if err !=nil {
        log.Println(err)
        return
    }
    //用于读写etcd的键值对
    kv := clientv3.NewKV(client)
    //context.TODO站位的永远不会取消
    putRespon,err := kv.Put(context.TODO(),"/cron/jobs/job1","hello",clientv3.WithPrevKV())
    _,err = kv.Put(context.TODO(),"/cron/jobs/job2","world")
    if err != nil{
        log.Println(err)
        return
    }else {
        fmt.Println("Revision",putRespon.Header.Revision)
        if putRespon.PrevKv != nil {
            fmt.Println("KV Value:",string(putRespon.PrevKv.Value))
        }
    }
    //get方法例子
    getrespon,err := kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly())
    if err != nil {
        log.Println(err)
        return
    }else {
        fmt.Println(getrespon.Kvs,getrespon.Count)
    }
    //delect方法
    delrespon,err := client.Delete(context.TODO(),"/cron/jobs/job1",clientv3.WithPrevKV())
    if err != nil {
        log.Println(err)
        return
    }else {
        //删除之前的KV值
        fmt.Println(delrespon.PrevKvs)
        if len(delrespon.PrevKvs) !=0 {
            for _,kepair := range delrespon.PrevKvs{
                fmt.Println("delete ->",string(kepair.Key),string(kepair.Value))
            }
        }
    }
    //租约使用方法,申请一个lease
    lease := clientv3.NewLease(client)
    //申请3秒的租约
    leaseRes,_ := lease.Grant(context.TODO(),3)
    //获取租约的ID号
    leaseID := leaseRes.ID
    leaseID = leaseID
    //续租的姿势
    //context,5s后切换
    ctx,_ := context.WithTimeout(context.TODO(),5*time.Second)
    keepalived,err := lease.KeepAlive(ctx,leaseID)
    if err !=nil {
        return
    }
    //启动协程去消费他
    go func() {
        select {
        case keepalivedRes := <- keepalived:
            if keepalived == nil {
                fmt.Println("租约到期")
                goto END
            }else {//每秒续租一次就会受到一次应答
                fmt.Println("收到自动续租应答",keepalivedRes.ID)
            }
        }
        END:
    }()

    //put一个kv实现3秒过期
    petre,_ := kv.Put(context.TODO(),"/cron/lock/job","xiongdai",clientv3.WithLease(leaseID))
    fmt.Println(petre)
    //查询租约情况是否过期
    for {
        getre,_ :=kv.Get(context.TODO(),"/cron/lock/job")
        if getre.Count==0 {
            fmt.Println("过期了")
            break
        }
        fmt.Println("有效期中:",getre.Kvs)
        time.Sleep(1*time.Second)
    }

    //启动协程不停的创建修改删除
    go func() {
        time.Sleep(1*time.Second)
        _,_ = kv.Put(context.TODO(),"/cron/jobs/job6","test-watch",clientv3.WithPrevKV())
        _,_ = kv.Delete(context.TODO(),"/cron/jobs/job6")
    }()

    //当前的etcd集群事务ID递增
    watchStartRevision := getrespon.Header.Revision + 1

    //用watch监听kv值的变化
    watcher := clientv3.NewWatcher(client)

    //启动监听
    fmt.Println("启动当前版本向后监听->",watchStartRevision)
    watchan := watcher.Watch(context.TODO(),"/cron/jobs/job6",clientv3.WithRev(watchStartRevision))

    //处理KV变化事件
    for watchRen := range watchan {
        for _,event := range watchRen.Events {
            switch event.Type {
            case mvccpb.PUT:
                fmt.Println("修改为",string(event.Kv.Value),"Revision:",event.Kv.CreateRevision,event.Kv.ModRevision)
            case mvccpb.DELETE:
                fmt.Println("删除了","Resvsion",event.Kv.ModRevision)
            }
        }
    }
}
输出结果:
Revision 25
[] 1
[key:"/cron/jobs/job1" create_revision:25 mod_revision:25 version:1 value:"hello" ]
delete -> /cron/jobs/job1 hello
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:28 raft_term:4  <nil>}
收到自动续租应答 7587838939770889794
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
有效期中: [key:"/cron/lock/job" create_revision:28 mod_revision:28 version:1 value:"xiongdai" lease:7587838939770889794 ]
过期了
<watcher>启动当前版本向后监听-> 41
<watcher>[修改为] test-watch Revision: 44 44
<watcher>[删除了] Resvsion 45
  • 使用OP替代get/put/delete方法
import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        putop clientv3.Op
        opRespon clientv3.OpResponse
        opget clientv3.Op
    )

    //客户端配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.42.128:2379"},
        DialTimeout:5*time.Second,
    }
    //建立连接
    client,err = clientv3.New(config)
    if err != nil {
        fmt.Println(err)
        return
    }
    //设置KV的接收
    kv = clientv3.KV(client)
    //使用OP,替换之前的get,put,delete
    putop = clientv3.OpPut("/cron/jobs/job7","123")
    //执行OP
    if opRespon,err = kv.Do(context.TODO(),putop);err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("写入Revision版本:",opRespon.Put().Header.Revision)
    //创建OP
    opget = clientv3.OpGet("/cron/jobs/job7")
    //执行OP
    if opRespon,err = kv.Do(context.TODO(),opget);err != nil {
        log.Println(err)
        return
    }
    //打印数据
    fmt.Println("数据的Revision:",opRespon.Get().Kvs[0].ModRevision)
    fmt.Println("数据:",opRespon.Get().Kvs[0].Value)
}
输出结果:
写入Revision版本: 47
数据的Revision: 47
数据: [49 50 51]

分布式场景下实现乐观锁

  • 实现方式租约lease
func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        kv clientv3.KV
        lease clientv3.Lease    //通过传递Client获得API的子集
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseID clientv3.LeaseID
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse    //只读的指针chan
        keepRespon *clientv3.LeaseKeepAliveResponse
        ctx context.Context
        cancelFunc context.CancelFunc
        txn clientv3.Txn
        txnRsp *clientv3.TxnResponse
    )

    //客户端配置
    config = clientv3.Config{
        Endpoints:[]string{"192.168.42.128:2379"},
        DialTimeout:5*time.Second,
    }
    //建立连接
    client,err = clientv3.New(config)
    if err != nil {
        fmt.Println(err)
        return
    }

    //1.上锁(创建租约,自动续租,拿着租约抢占一个key)
    lease = clientv3.NewLease(client)

    //申请一个10秒的租约
    if leaseGrantResp,err = lease.Grant(context.TODO(),5);err != nil {
        fmt.Println(err)
        return
    }

    //获取租约的ID信息
    leaseID = leaseGrantResp.ID

    //设置一个取消续租的Context
    ctx,cancelFunc = context.WithCancel(context.TODO())

    //确保函数退出后自动结束续租,关联的KV删除
    defer cancelFunc()
    defer lease.Revoke(context.TODO(),leaseID)  //etcd销毁租约

    //拿到ID后保持锁持久化续租
    if keepRespChan,err = lease.KeepAlive(ctx,leaseID);err != nil {
        fmt.Println(err)
        return
    }

    //处理续约应答的协程
    go func() {
        for {
            select {
            case keepRespon = <- keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("[info] 租约失效了")
                    goto END
                }else {//每秒自动续租一次
                    fmt.Println("[info] 收到续租应答",keepRespon.ID)
                }
            }
        }
        END:
    }()

    //争抢锁的姿势,如果不存在,then设置它else抢锁失败,设置KV的接收
    kv = clientv3.NewKV(client)

    //创建事物
    txn = kv.Txn(context.TODO())

    //定义事物要做的事情
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/jobs/txn"),"=",0)).
        Then(clientv3.OpPut("/cron/jobs/txn","666",clientv3.WithLease(leaseID))).
        Else(clientv3.OpGet("/cron/jobs/txn"))

    //提交事物
    if txnRsp,err = txn.Commit();err != nil {
        fmt.Println(err)
        return
    }

    //判断抢占锁是否成功
    if !txnRsp.Succeeded{
        fmt.Println("锁被占用",string(txnRsp.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    //2.处理业务
    log.Println("[info] 处理任务中")
    time.Sleep(5 * time.Second)
    //3.释放锁(取消自动续租/释放租约)
}
输出结果:
2019-06-17 17:58:55.097000 I | [info] 处理任务中
[info] 收到续租应答 7587838939770889976
[info] 收到续租应答 7587838939770889976
[info] 收到续租应答 7587838939770889976

golang调用mongo用法

  • 步骤:

  • docker 构建环境

  • docker run -d -p 27017:27017 -v mongo_configdb:/data/configdb -v mongo_db:/data/db –name mongo docker.io/mongo

  • 创建连接(auth认证,没有不填) -> 选择库 -> 选择表 -> 增删改查

package main

import (
    "context"
    "fmt"
    "github.com/mongodb/mongo-go-driver/bson/objectid"
    "github.com/mongodb/mongo-go-driver/mongo"
    "github.com/mongodb/mongo-go-driver/mongo/clientopt"
    "go.etcd.io/etcd/clientv3"
    "log"
    "time"
)

//任务执行的时间段
type TimePoint struct {
    StarTime int64      `bson:"startTime"`
    EndTime int64       `bson:"endTime"`
}

//一条日志
type LogRecord struct {
    JobName string          `bson:"job_name"`   //任务名字
    Command string          `bson:"command"`    //shell命令
    Err string              `bson:"err"`    //捕获错误
    Content string          `bson:"content"`    //捕获脚本输出
    TimePoint TimePoint     `bson:"timePoint"`  //任务执行的时间点
}

//过滤条件
type FindByJobName struct {
    JobName string `bson:"job_name"`    //Jobname赋值为10字节
}

//删除的条件(timePoint.startTime:{"$lt":timestamp})
//(k:v)->(k=timePoint.startTime,v=TimeBeforeCond)
type DeleteCond struct {
    BeforeCond TimeBeforeCond   `bson:"timePoint.startTime"`
}

//小于某时间的条件({"$lt":timestamp})
type TimeBeforeCond struct {
    Before int64    `bson:"$lt"`
}

//连接mongodb-demo
func main() {
    var (
        client *mongo.Client
        err error
        databases *mongo.Database
        collection *mongo.Collection
        record  *LogRecord
        insetoneRes *mongo.InsertOneResult
        docid objectid.ObjectID
        cond *FindByJobName
        findrs mongo.Cursor
        recode *LogRecord
        deleteCond *DeleteCond
        deleteRes *mongo.DeleteResult
    )
    //定义连接的auth内容
    config := clientopt.Credential{
        Username:"go-mongo",
        Password:"123456",
    }
    //1.创建mongdb连接
    if client,err = mongo.Connect(context.TODO(),"mongodb://192.168.42.128:27017",clientopt.Auth(config),
        clientopt.ConnectTimeout(5*time.Second));err != nil {
        log.Println(err)
    }
    //2.选择数据库
    databases = client.Database("admin")
    //3.选择表
    collection = databases.Collection("my_collection")

    //4.插入记录 -> bson
    record = &LogRecord{
        JobName:"job1",
        Command:"echo job1 running is ok!",
        Err:"",
        Content:"job1 running is ok!",
        TimePoint:TimePoint{StarTime:time.Now().Unix(),EndTime:time.Now().Unix()+10},
    }
    if insetoneRes,err = collection.InsertOne(context.TODO(),record);err !=nil {
        fmt.Println(err)
        return
    }
    //5.ID默认生成一个全局唯一ID,OBjectID:12字节的二进制
    docid = insetoneRes.InsertedID.(objectid.ObjectID)
    //docid是12字节的,正常情况默认需要转换16字节
    fmt.Println("自增ID->",docid.Hex())

    //6.进行相关的查找,按照jobname为10的记录
    cond = & FindByJobName{JobName:"job1"}
    //通过添加方式来取出数据
    if findrs,err = collection.Find(context.TODO(),cond,findopt.Skip(0),findopt.Limit(3));err!=nil {
        log.Println(err)
        return
    }
    //7.遍历结果集,每次取回的结果都会返回一个游标
    for findrs.Next(context.TODO()){
        //调用游标的方法反序列化,新变量定义一个空对象
        recode = &LogRecord{}
        //释放游标
        defer findrs.Close(context.TODO())
        //进行反序列化
        if err = findrs.Decode(recode);err != nil{
            log.Println(err)
            return
        }
        //打印日志出来
        fmt.Println(*recode)
    }

    //8.清除数据的方式(删除某一个时间段之前的)
    deleteCond = &DeleteCond{
        BeforeCond:TimeBeforeCond{
            Before:time.Now().Unix(),
        },
    }
    //9.执行删除
    if deleteRes,err = collection.DeleteMany(context.TODO(),deleteCond);err != nil {
        log.Println(err)
        return
    }
    fmt.Println("当前删除了多少行->",deleteRes.DeletedCount)
}
输出结果:
自增ID-> 5d09d5380cd1cb99f1d75c84
{job1 echo job1 running is ok!  job1 running is ok! {1560913196 1560913206}}
{job1 echo job1 running is ok!  job1 running is ok! {1560924000 1560924010}}
{job1 echo job1 running is ok!  job1 running is ok! {1560924005 1560924015}}
当前删除了多少行-> 5
  • docker exec -it mongo mongo admin

蹒跚学Go-Http编程+爬虫+redis+RabbitMQ+etcd+mongdb》有10个想法

  1. Outstanding post, you have pointed out some fantastic points, I as well think this is a very great website. fafkakgdgbag

  2. Usually I do not read writeup on blogs, nevertheless I wish to say that this writeup extremely forced me to take a look at and do so! Your writing taste has been amazed me. Thanks, really wonderful post. ddkdgdcefgdcdecc

  3. I discovered your blog site on google and test a number of of your early posts. Proceed to keep up the excellent operate. I simply extra up your RSS feed to my MSN Information Reader. Looking for forward to reading extra from you in a while! ddkdkdaekekf

  4. whoah this weblog is wonderful i really like studying your posts. Keep up the great paintings! You already know, lots of individuals are searching around for this information, you can help them greatly. bkaceecedckdbkcf

  5. I like what you guys are up also. Such smart work and reporting! Carry on the excellent works guys I’ve incorporated you guys to my blogroll. I think it’ll improve the value of my website 🙂

发表评论

电子邮件地址不会被公开。 必填项已用*标注