Hike News
Hike News

Golang項目-Kafka,tailf,config,log

introduction

  • Kafka為一分布式的系統

    • 一個kafka的集群可能有三台以上
  • 使用第三方基礎庫來操作Kafka

    • import "github.com/Shopify/sarama"
    • 往kafka放東西的,稱為生產者(Producer)
    • 客戶端連上Kafka,從Kafka取(消費)數據,稱為消費者
  • tailf庫
    • 其為golang內部的基礎庫
    • 其可以從一個不斷寫入的文件,持續的拿出數據
    • import "github.com/hpcloud/tail"

Sarama 往kafka發送數據實例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"fmt"
//導入操作Kafka的第三方庫
"github.com/Shopify/sarama"
)


func main(){
//實例化一個新的配置
config := sarama.NewConfig()
//config.Producer為一個結構體
config.Producer.RequiredAcks = sarama.WaitForAll //[1]
//Partitioner為進行分區(負載均衡)配置
//NewRandomPartitioner其為隨機分區,其會分區到最大的配置主機數目上
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true

//因此須建立與kafka連接的實例(NewSyncProducer為同步的客戶端)
//若為異步客戶端 則內部會創建一個channel
client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)
if err != nil {
fmt.Println("producer.close, err:",err)
return
}
defer client.Close()

//構造欲寫入Kafka的消息(為一struct)
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a test message")

//向kafka發送消息
//異步客戶端發送消息時,會往內部channel發送消息,由後台發送給kafka
//同步客戶端則直接向kafka發送消息
//會返回寫到哪個分區的id號(pid),及訊息在kafka分區的偏移量(offset)
pid,offset,err := client.SendMessage(msg)

if err != nil {
fmt.Println("send message failed:",err)
return
}
fmt.Printf("pid %v offset %v\n",pid,offset)

}
  1. 透過第三方庫發消息給kafka時,sarma會等待接收ack包,當kafka收到消息並寫入文件時,會回發ack包給sarma,藉以確認消息未丟失且已經存入kafka中
    • 可評估自己所需的業務邏輯是否允許消息丟失

使用kafka-cli測試消費數據

  • 使用kafka-console-consumer測試從終端消費數據
    1
    kafka-console-consumer --bootstrap-server localhost:9092 --topic nginx_log --from-beginning

result

成功消費數據
Imgur


tailf

  • 其可在程序中去調用tailf,而非傳統Linux的開啟一個tail命令的進程
  • 使用tail.TailFile(filename,config)
    • filename: 欲被tail的文件
    • config:其為tail.Config的實例,為一個struct
      • Location:記下文件最後的位置
      • Poll:是否不斷地進行查詢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"github.com/hpcloud/tail"
"time"
)

func main(){
fileName := "./my.log"
// 生成一個tail的實例
tails,err := tail.TailFile(fileName,tail.Config{
ReOpen:true, //類似Linux下的tail -F
Follow:true,
Location: &tail.SeekInfo{Offset:0, Whence:2},
MustExist:false,
Poll:true,
})
if err != nil {
fmt.Println("tail file err",err)
return
}
//一行日誌數據
var msg *tail.Line
var ok bool
// 無間斷的從配置的tail.Lines中拿取數據 //[1]
for {
msg,ok = <- tails.Lines //tails.Lines為一個管道
if !ok {
fmt.Println("tail file close reopen,filename:%s\n",tails.Filename)
time.Sleep(100* time.Millisecond)
continue
}
fmt.Println("msg:",msg)
}
}
```
1. 真實情況下須處理信號問題

************************
# 配置文件(config)庫
* 其可支持多種格式的配置文件,例如yaml,ini等
* `go get github.com/astaxie/beego/config`
* 其為beego web框架其中一個基礎庫,可單獨使用
* `import "github.com/astaxie/beego/config"`
* 暫時使用config庫來將配置寫到文件中,之後可以使用**etcd**在線web介面管理配,並實時生效

## ini格式初識
* ini格式的配置文件分為**節、配置項**`(配置項名 = 配置項內容)`

[server]
listen_ip = “0.0.0.0”
listen_port = 8080

[logs]
log_level= debug
log_path = ./logs/logagent.log

[collect]
log_path = /home/work/logs/nginx/access.log
topic = nginx_log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

## 讀取配置文件

```go
package main

import (
"fmt"
"github.com/astaxie/beego/config"
)

func main(){
//新增一個讀取配置文件實例
conf, err := config.NewConfig("ini","./logagent.conf") //[1]
if err != nil{
fmt.Println("new config failed:",err)
return
}
port,err := conf.Int("server::listen_port")
fmt.Println("port :",port)

log_level := conf.String("logs::log_level")
fmt.Println("log_Level :",log_level)
}

  1. NewConfig(adapterName,fileName)
    • adapterName:欲讀取配置文件的格式
    • fileName:欲讀取配置文件名

日誌(log)庫

  • 配置日誌庫logs.SetLogger(AdapterFile,config)的config參數為一json串
    • 通常會使用json Marshal函數進行配置(較安全),而非自己構造json字串
  • go get github.com/astaxie/beego/logs
  • import "github.com/astaxie/beego/logs"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"encoding/json"
"fmt"
//導入日誌庫
"github.com/astaxie/beego/logs"
)

func main(){
//Log的配置(config)放入map中
config := make(map[string]interface{})
config["filename"] = "./my.log" //寫日誌的路徑及文件名
config["level"] = logs.LevelDebug //日誌級別

//將log配置Marshal成json字串
configStr,err := json.Marshal(config)
if err != nil{
fmt.Println("Config Marshal Failed:",err)
return
}

//實例化一個logger對象
err = logs.SetLogger(logs.AdapterFile,string(configStr)) //log.AdapterFile為一常量 ---> "file"
if err != nil {
fmt.Println("SetLogger Failed:",err)
}

//格式化日誌並記錄到日誌文件中
logs.Debug("this is a test my name is %s","stu01")
logs.Trace("this is a trace my name is %s","stu02")
logs.Warn("this is a warn, my name is %s","stu03")
}