首页 技术 正文
技术 2022年11月20日
0 收藏 895 点赞 2,409 浏览 3991 个字

//sevice.go

package discoveryimport (
"context"
"errors"
"sync"
"time" "github.com/coreos/etcd/clientv3"
l4g "github.com/alecthomas/log4go"
)type Service struct {
closeChan chan struct{} //关闭通道
client *clientv3.Client //etcd v3 client
leaseID clientv3.LeaseID //etcd 租约id
key string //键
val string //值
wg sync.WaitGroup
}// NewService 构造一个注册服务
func NewService(etcdEndpoints []string, key string, val string) (*Service, error) { cli, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
DialTimeout: * time.Second,
}) if nil != err {
return nil, err
} s := &Service{
client: cli,
closeChan: make(chan struct{}),
key: key,
val: val,
} return s, nil
}// Start 开启注册
// @param - ttlSecond 租期(秒)
func (s *Service) Start(ttlSecond int64) error { // minimum lease TTL is 5-second
resp, err := s.client.Grant(context.TODO(), ttlSecond)
if err != nil {
panic(err)
} s.leaseID = resp.ID
_, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))
if err != nil {
panic(err)
} ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)
if nil != err1 {
panic(err)
} l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val) s.wg.Add()
defer s.wg.Done() for {
select {
case <-s.closeChan:
return s.revoke()
case <-s.client.Ctx().Done():
return errors.New("server closed")
case ka, ok := <-ch:
if !ok {
l4g.Warn("[discovery] Service Start keep alive channel closed")
return s.revoke()
} else {
l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)
}
}
} return nil
}// Stop 停止
func (s *Service) Stop() {
close(s.closeChan)
s.wg.Wait()
s.client.Close()
}func (s *Service) revoke() error { _, err := s.client.Revoke(context.TODO(), s.leaseID) if err != nil {
l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())
} else {
l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)
} return err
}

//watch.go

package discoveryimport (
"context"
"os"
"time" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
l4g "github.com/alecthomas/log4go"
"google.golang.org/grpc/grpclog")type GroupManager struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
once sync.Once
}func NewGroupManager() *GroupManager {
ret := new(GroupManager)
ret.ctx, ret.cancel = context.WithCancel(context.Background())
return ret
}func (this *GroupManager) Close() {
this.once.Do(this.cancel)
}func (this *GroupManager) Wait() {
this.wg.Wait()
}func (this *GroupManager) Add(delta int) {
this.wg.Add(delta)
}func (this *GroupManager) Done() {
this.wg.Done()
}func (this *GroupManager) Chan() <-chan struct{} {
return this.ctx.Done()
}type Target interface {
Set(string, string)
Create(string, string)
Modify(string, string)
Delete(string)
}type Config struct {
Servers []string
DailTimeout int64
RequestTimeout int64
Prefix bool
Target string
}func Watch(gm *GroupManager, cfg *Config, target Target) {
defer gm.Done() cli, err := clientv3.New(clientv3.Config{
Endpoints: cfg.Servers,
DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,
})
if err != nil {
panic(err.Error())
return
}
defer cli.Close() // make sure to close the client l4g.Info("[discovery] start watch %s", cfg.Target) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)
var resp *clientv3.GetResponse
if cfg.Prefix {
resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())
} else {
resp, err = cli.Get(ctx, cfg.Target)
}
cancel()
if err != nil {
panic(err.Error())
}
for _, ev := range resp.Kvs {
target.Set(string(string(ev.Key)), string(ev.Value))
} var rch clientv3.WatchChan
if cfg.Prefix {
rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+))
} else {
rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+))
}
for {
select {
case <-gm.Chan():
l4g.Info("[discovery] watch %s close", cfg.Target)
return
case wresp := <-rch:
err := wresp.Err()
if err != nil {
l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())
gm.Close()
return
}
l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)
for _, ev := range wresp.Events {
if ev.IsCreate() {
target.Create(string(ev.Kv.Key), string(ev.Kv.Value))
} else if ev.IsModify() {
target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))
} else if ev.Type == mvccpb.DELETE {
target.Delete(string(ev.Kv.Key))
} else {
l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)
}
}
}
}
}
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,991
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,505
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,349
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,134
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,766
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,844