0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

采用Go开发语言实现海量日志收集系统的开发

马哥Linux运维 来源:IT大咖说  作者:IT大咖说  2021-07-05 14:18 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

再次整理了一下这个日志收集系统的框,如下图

这次要实现的代码的整体逻辑为:

完整代码地址为: https://github.com/pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

服务发现和服务注册

配置中心(我们实现的日志收集客户端需要用到)

分布式锁

master选举

官网对etcd的有一个非常简明的介绍:

etcd搭建:

下载地址:https://github.com/coreos/etcd/releases/

根据自己的环境下载对应的版本然后启动起来就可以了

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl set name zhaofan

zhaofan

[root@localhost etcd-v3.2.18-linux-amd64]# 。/etcdctl get name

zhaofan

[root@localhost etcd-v3.2.18-linux-amd64]#

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

控制goroutine的超时

保存上下文数据

通过下面一个简单的例子进行理解:

package main

import (

“fmt”

“time”

“net/http”

“context”

“io/ioutil”

type Result struct{

r *http.Response

err error

}

func process(){

ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)

defer cancel()

tr := &http.Transport{}

client := &http.Client{Transport:tr}

c := make(chan Result,1)

req,err := http.NewRequest(“GET”,“http://www.google.com”,nil)

if err != nil{

fmt.Println(“http request failed,err:”,err)

return

}

// 如果请求成功了会将数据存入到管道中

go func(){

resp,err := client.Do(req)

pack := Result{resp,err}

c 《- pack

}()

select{

case 《- ctx.Done():

tr.CancelRequest(req)

fmt.Println(“timeout!”)

case res := 《-c:

defer res.r.Body.Close()

out,_:= ioutil.ReadAll(res.r.Body)

fmt.Printf(“server response:%s”,out)

}

return

}

func main() {

process()

}

写一个通过context保存上下文,代码例子如:

package main

import (

“github.com/Go-zh/net/context”

“fmt”

func add(ctx context.Context,a,b int) int {

traceId := ctx.Value(“trace_id”)。(string)

fmt.Printf(“trace_id:%v

”,traceId)

return a+b

}

func calc(ctx context.Context,a, b int) int{

traceId := ctx.Value(“trace_id”)。(string)

fmt.Printf(“trace_id:%v

”,traceId)

//再将ctx传入到add中

return add(ctx,a,b)

}

func main() {

//将ctx传递到calc中

ctx := context.WithValue(context.Background(),“trace_id”,“123456”)

calc(ctx,20,30)

}

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:

。/etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381

package main

import (

etcd_client “github.com/coreos/etcd/clientv3”

“time”

“fmt”

func main() {

cli, err := etcd_client.New(etcd_client.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil{

fmt.Println(“connect failed,err:”,err)

return

}

fmt.Println(“connect success”)

defer cli.Close()

}

下面一个例子是通过连接etcd,存值并取值

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“fmt”

“context”

func main() {

cli,err := clientv3.New(clientv3.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil{

fmt.Println(“connect failed,err:”,err)

return

}

fmt.Println(“connect succ”)

defer cli.Close()

ctx,cancel := context.WithTimeout(context.Background(),time.Second)

_,err = cli.Put(ctx,“logagent/conf/”,“sample_value”)

cancel()

if err != nil{

fmt.Println(“put failed,err”,err)

return

}

ctx, cancel = context.WithTimeout(context.Background(),time.Second)

resp,err := cli.Get(ctx,“logagent/conf/”)

cancel()

if err != nil{

fmt.Println(“get failed,err:”,err)

return

}

for _,ev := range resp.Kvs{

fmt.Printf(“%s:%s

”,ev.Key,ev.Value)

}

}

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

package main

import (

“context”

“fmt”

func main() {

// gen generates integers in a separate goroutine and

// sends them to the returned channel.

// The callers of gen need to cancel the context once

// they are done consuming generated integers not to leak

// the internal goroutine started by gen.

gen := func(ctx context.Context) 《-chan int {

dst := make(chan int)

n := 1

go func() {

for {

select {

case 《-ctx.Done():

return // returning not to leak the goroutine

case dst 《- n:

n++

}

}

}()

return dst

}

ctx, cancel := context.WithCancel(context.Background())

defer cancel() // cancel when we are finished consuming integers

for n := range gen(ctx) {

fmt.Println(n)

if n == 5 {

break

}

}

}

关于官网文档中的WithDeadline演示的代码例子:

package main

import (

“context”

“fmt”

“time”

func main() {

d := time.Now().Add(50 * time.Millisecond)

ctx, cancel := context.WithDeadline(context.Background(), d)

// Even though ctx will be expired, it is good practice to call its

// cancelation function in any case. Failure to do so may keep the

// context and its parent alive longer than necessary.

defer cancel()

select {

case 《-time.After(1 * time.Second):

fmt.Println(“overslept”)

case 《-ctx.Done():

fmt.Println(ctx.Err())

}

}

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“fmt”

“context”

func main() {

cli,err := clientv3.New(clientv3.Config{

Endpoints:[]string{“192.168.0.118:2371”},

DialTimeout:5*time.Second,

})

if err != nil {

fmt.Println(“connect failed,err:”,err)

return

}

defer cli.Close()

// 这里会阻塞

rch := cli.Watch(context.Background(),“logagent/conf/”)

for wresp := range rch{

for _,ev := range wresp.Events{

fmt.Printf(“%s %q : %q

”, ev.Type, ev.Kv.Key, ev.Kv.Value)

}

}

}

实现一个kafka的消费者代码的简单例子:

package main

import (

“github.com/Shopify/sarama”

“strings”

“fmt”

“time”

func main() {

consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)

if err != nil{

fmt.Println(“failed to start consumer:”,err)

return

}

partitionList,err := consumer.Partitions(“nginx_log”)

if err != nil {

fmt.Println(“Failed to get the list of partitions:”,err)

return

}

fmt.Println(partitionList)

for partition := range partitionList{

pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)

if err != nil {

fmt.Printf(“failed to start consumer for partition %d:%s

”,partition,err)

return

}

defer pc.AsyncClose()

go func(partitionConsumer sarama.PartitionConsumer){

for msg := range pc.Messages(){

fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))

}

}(pc)

}

time.Sleep(time.Hour)

consumer.Close()

}

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package main

import (

“github.com/Shopify/sarama”

“strings”

“fmt”

“sync”

var (

wg sync.WaitGroup

func main() {

consumer,err := sarama.NewConsumer(strings.Split(“192.168.0.118:9092”,“,”),nil)

if err != nil{

fmt.Println(“failed to start consumer:”,err)

return

}

partitionList,err := consumer.Partitions(“nginx_log”)

if err != nil {

fmt.Println(“Failed to get the list of partitions:”,err)

return

}

fmt.Println(partitionList)

for partition := range partitionList{

pc,err := consumer.ConsumePartition(“nginx_log”,int32(partition),sarama.OffsetNewest)

if err != nil {

fmt.Printf(“failed to start consumer for partition %d:%s

”,partition,err)

return

}

defer pc.AsyncClose()

go func(partitionConsumer sarama.PartitionConsumer){

wg.Add(1)

for msg := range partitionConsumer.Messages(){

fmt.Printf(“partition:%d Offset:%d Key:%s Value:%s”,msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))

}

wg.Done()

}(pc)

}

//time.Sleep(time.Hour)

wg.Wait()

consumer.Close()

}

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package main

import (

“github.com/coreos/etcd/clientv3”

“time”

“github.com/astaxie/beego/logs”

“context”

“fmt”

var Client *clientv3.Client

var logConfChan chan string

// 初始化etcd

func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

var keys []string

for _,ip := range ipArrays{

//keyfmt = /logagent/%s/log_config

keys = append(keys,fmt.Sprintf(keyfmt,ip))

}

logConfChan = make(chan string,10)

logs.Debug(“etcd watch key:%v timeout:%v”, keys, timeout)

Client,err = clientv3.New(clientv3.Config{

Endpoints:addr,

DialTimeout: timeout,

})

if err != nil{

logs.Error(“connect failed,err:%v”,err)

return

}

logs.Debug(“init etcd success”)

waitGroup.Add(1)

for _, key := range keys{

ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)

// 从etcd中获取要收集日志的信息

resp,err := Client.Get(ctx,key)

cancel()

if err != nil {

logs.Warn(“get key %s failed,err:%v”,key,err)

continue

}

for _, ev := range resp.Kvs{

logs.Debug(“%q : %q

”, ev.Key, ev.Value)

logConfChan 《- string(ev.Value)

}

}

go WatchEtcd(keys)

return

}

func WatchEtcd(keys []string){

// 这里用于检测当需要收集的日志信息更改时及时更新

var watchChans []clientv3.WatchChan

for _,key := range keys{

rch := Client.Watch(context.Background(),key)

watchChans = append(watchChans,rch)

}

for {

for _,watchC := range watchChans{

select{

case wresp := 《-watchC:

for _,ev:= range wresp.Events{

logs.Debug(“%s %q : %q

”, ev.Type, ev.Kv.Key, ev.Kv.Value)

logConfChan 《- string(ev.Kv.Value)

}

default:

}

}

time.Sleep(time.Second)

}

waitGroup.Done()

}

func GetLogConf()chan string{

return logConfChan

}

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package main

import (

“time”

“sync/atomic”

“github.com/astaxie/beego/logs”

type SecondLimit struct {

unixSecond int64

curCount int32

limit int32

}

func NewSecondLimit(limit int32) *SecondLimit {

secLimit := &SecondLimit{

unixSecond:time.Now().Unix(),

curCount:0,

limit:limit,

}

return secLimit

}

func (s *SecondLimit) Add(count int) {

sec := time.Now().Unix()

if sec == s.unixSecond {

atomic.AddInt32(&s.curCount,int32(count))

return

}

atomic.StoreInt64(&s.unixSecond,sec)

atomic.StoreInt32(&s.curCount, int32(count))

}

func (s *SecondLimit) Wait()bool {

for {

sec := time.Now().Unix()

if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {

time.Sleep(time.Microsecond)

logs.Debug(“limit is running,limit:%d s.curCount:%d”,s.limit,s.curCount)

continue

}

if sec != atomic.LoadInt64(&s.unixSecond) {

atomic.StoreInt64(&s.unixSecond,sec)

atomic.StoreInt32(&s.curCount,0)

}

logs.Debug(“limit is exited”)

return false

}

}

小结

这次基本实现了日志收集的前半段的处理,后面将把日志扔到es中,并最终在页面上呈现

来源:IT大咖说

责任编辑:gt

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 开发
    +关注

    关注

    0

    文章

    382

    浏览量

    42272
  • 代码
    +关注

    关注

    30

    文章

    4985

    浏览量

    74575

原文标题:Go实现海量日志收集系统

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    EnOcean Developer Kit EDK 350:开启能量收集无线传感器技术开发之旅

    EnOcean Developer Kit EDK 350:开启能量收集无线传感器技术开发之旅 在电子工程师的世界里,不断探索和创新是永恒的主题。今天,我们要深入了解一款强大的开发
    的头像 发表于 05-14 16:30 123次阅读

    Linux系统日志的分析方法和排查技巧

    日志是运维工程师排查问题的第一手资料。当服务器出现异常时,系统日志、应用日志、安全日志中往往隐藏着问题的答案。然而,很多工程师面对
    的头像 发表于 05-14 10:44 439次阅读

    零代码软件开发平台ATECLOUD如何实现仪器程控软件开发

    、MATLAB等主流编程语言,通过编写脚本实现对电子测试仪器的精准控制。不可否认,这些传统开发方式在短期内能够满足基础的测试需求,但深入实践后便会发现,其暗藏的诸多限制与隐患,正成为企业测试效率提升、项目迭代推进的阻碍。 传统编
    的头像 发表于 03-02 15:59 1772次阅读
    零代码软件<b class='flag-5'>开发</b>平台ATECLOUD如何<b class='flag-5'>实现</b>仪器程控软件<b class='flag-5'>开发</b>?

    讲解C语言代码的实现过程

    重点讲解C语言代码的实现过程,算法的C语言实现过程具有一般性,通过PID算法的C语言实现,可以以此类推,设计其它算法的C语言实现。 第一步:
    发表于 01-21 07:58

    模组日志功能技术概览

    模组日志功能技术方案以低侵入、高可用为原则,提供统一的日志API、多级日志分类与条件输出机制。通过集成该技术,开发者可在不干扰业务逻辑的前提下,全面掌握模组的执行状态与异常行为。 一、
    的头像 发表于 01-14 15:32 309次阅读
    模组<b class='flag-5'>日志</b>功能技术概览

    什么是嵌入式应用开发

    实现和部署,还包括硬件选择、软件设计、测试、集成和维护等流程‌。 定义和背景 嵌入式应用开发是指将软件部署到嵌入式系统中,这些系统广泛应用于消费电子和工业自动化中。嵌入式
    发表于 01-12 16:13

    从0到1搭建实时日志监控系统:基于WebSocket + Elasticsearch的实战方案

    1. 背景与痛点 在开发分布式系统时,日志分散在多个服务节点中,传统轮询查询方式存在延迟高、资源浪费的问题。某次线上故障中,因未能实时发现错误日志,导致问题排查时间延长2小时。因此,决
    发表于 01-09 16:43

    KIT_XMC14_2GO开发板:功能特性与硬件解析

    KIT_XMC14_2GO开发板:功能特性与硬件解析 在电子开发领域,一款性能出色且功能丰富的开发板对于工程师来说至关重要。今天,我们就来详细探讨一下KIT_XMC14_2
    的头像 发表于 12-19 10:20 716次阅读

    Linux内核日志玩明白了吗?printk调试神器全解析

    前言:做Linux驱动开发或内核调试的朋友,一定对printk不陌生,但你真的会用它吗?为什么同样是调试RK3588内核,别人能精准捕捉关键错误,你却被海量日志淹没?今天就带大家吃透printk
    的头像 发表于 12-19 08:32 1195次阅读
    Linux内核<b class='flag-5'>日志</b>玩明白了吗?printk调试神器全解析

    RT-Thread ULOG: 创建多个文件后端并保存不同日志方法 | 技术集结

    目录前言使用场景实现功能具体操作1前言在项目开发中需要使用到日志功能来调试和查看问题。有些问题并不会在我们实时查看的时候发生,而是在你上个厕所的功夫可能就发生了。如果上位机的缓冲区不够大,可能错误
    的头像 发表于 12-15 19:22 5295次阅读
    RT-Thread ULOG: 创建多个文件后端并保存不同<b class='flag-5'>日志</b>方法 | 技术集结

    C语言在嵌入式开发中的应用

    稳定性控制系统(VSC)等关键部件的开发,C 语言都发挥着至关重要的作用。 以工业自动化生产线中的运动控制系统为例,C 语言可以编写高效可靠
    发表于 11-21 08:09

    电商API日志分析的实用工具

      在当今数字化电商时代,API(应用程序编程接口)已成为平台与外部系统交互的核心通道。电商API日志记录了每一次请求的详细信息,包括用户行为、交易状态、错误响应等。分析这些日志能帮助企业监控性能
    的头像 发表于 07-23 15:50 870次阅读
    电商API<b class='flag-5'>日志</b>分析的实用工具

    UI开发概述

    的渲染效果。开发者可以将系统内置组件组合为自定义组件,通过这种方式将页面组件化为一个个独立的UI单元,实现页面不同单元的独立创建、开发和复用,具有更强的工程性。 页面路由和组件导航 应
    发表于 06-24 06:36

    芯科科技xG22E能量收集开发套件的应用示例

    EFR32xG22E(xG22E)能量收集(Energy Harvesting)开发套件是设计节能物联网应用的一个理想起点,可用于探索和评估Silicon Labs(芯科科技)多协议无线系统单芯片(SoC)支持的多种能量
    的头像 发表于 06-23 14:04 2387次阅读
    芯科科技xG22E能量<b class='flag-5'>收集</b><b class='flag-5'>开发</b>套件的应用示例

    详解journalctl日志管理

    systemd 提供了自己的日志系统(logging system),称为 journal。使用 systemd 日志,无需额外安装日志服务(syslog)。
    的头像 发表于 06-05 17:22 2088次阅读
    详解journalctl<b class='flag-5'>日志</b>管理